Untested log code

This commit is contained in:
ppom 2023-04-26 17:18:55 +02:00
parent 83a29904d5
commit aeb9af37be
3 changed files with 124 additions and 126 deletions

View File

@ -1,7 +1,9 @@
package app
import (
"encoding/gob"
"fmt"
"io"
"log"
"os"
"regexp"
@ -36,7 +38,8 @@ type Filter struct {
RetryPeriod string `yaml:"retry-period"`
retryDuration time.Duration `yaml:"-"`
Actions map[string]*Action `yaml:"actions"`
Actions map[string]*Action `yaml:"actions"`
longuestActionDuration *time.Duration
matches map[string][]time.Time `yaml:"-"`
}
@ -51,6 +54,13 @@ type Action struct {
afterDuration time.Duration `yaml:"-"`
}
type LogEntry struct {
t time.Time
pattern string
stream, filter string
exec bool
}
func (c *Conf) setup() {
for patternName, pattern := range c.Patterns {
c.Patterns[patternName] = fmt.Sprintf("(?P<%s>%s)", patternName, pattern)
@ -130,6 +140,102 @@ func (c *Conf) setup() {
}
}
var DBname = "./reaction.db"
var DBnewName = "./reaction.new.db"
func (c *Conf) updateFromDB() {
file, err := os.Open(DBname)
if err != nil {
if err == os.ErrNotExist {
log.Printf("WARN: No DB found at %s\n", DBname)
return
}
log.Fatalln("Failed to open DB:", err)
}
dec := gob.NewDecoder(&file)
newfile, err := os.Create(DBnewName)
if err != nil {
log.Fatalln("Failed to open DB:", err)
}
enc := gob.NewEncoder(&newfile)
// This extra code is made to warn only one time for each non-existant filter
type SF struct{ s, f string }
discardedEntries := make(map[SF]bool)
malformedEntries := 0
defer func() {
for sf, t := range discardedEntries {
if t {
log.Printf("WARN: info discarded from the DB: stream/filter not found: %s.%s\n", sf.s, sf.f)
}
}
if malformedEntries > 0 {
log.Printf("WARN: %v malformed entry discarded from the DB\n", malformedEntries)
}
}()
encodeOrFatal := func(entry LogEntry) {
err = enc.Encode(entry)
if err != nil {
log.Fatalln("ERRO: couldn't write to new DB:", err)
}
}
now := time.Now()
for {
var entry LogEntry
var filter *Filter
// decode entry
err = dec.Decode(&entry)
if err != nil {
if err == io.EOF {
return
}
malformedEntries++
continue
}
// retrieve related filter
if s := c.Streams[entry.stream]; stream != nil {
filter = stream.Filters[entry.filter]
}
if filter == nil {
discardedEntries[SF{entry.stream, entry.filter}] = true
continue
}
// store matches
if !entry.exec && entry.t+filter.retryDuration > now {
filter.matches[entry.pattern] = append(f.matches[entry.pattern], entry.t)
encodeOrFatal(entry)
}
// replay executions
if entry.exec && entry.t+filter.longuestActionDuration > now {
delete(filter.matches, match)
filter.execActions(match, now-entry.t)
encodeOrFatal(entry)
}
}
err = os.Rename(DBnewName, DBname)
if err != nil {
log.Fatalln("ERRO: Failed to replace old DB with new one:", err)
}
}
func openDB() {
f, err := os.OpenFile(DBname, os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
log.Fatalln("Failed to open DB:", err)
}
return gob.NewEncoder(&f)
}
func parseConf(filename string) *Conf {
data, err := os.ReadFile(filename)

119
app/db.go
View File

@ -1,119 +0,0 @@
package app
import (
"fmt"
"log"
"runtime"
"time"
"github.com/bmatsuo/lmdb-go/lmdb"
)
func numberOfFilters(conf *Conf) int {
n := 0
for _, s := range conf.Streams {
n += len(s.Filters)
}
return n
}
type CmdTime struct {
cmd []string
t time.Time
}
// Remove Cmd if last of its set
type CmdExecuted struct {
filter *Filter
pattern *string
value CmdTime
err chan error
}
// Append Cmd set
type AppendCmd struct {
filter *Filter
pattern *string
value []CmdTime
err chan error
}
// Append match, remove old ones and check match number
type AppendMatch struct {
filter *Filter
pattern *string
t time.Time
ret chan struct {
shouldExec bool
err error
}
}
func databaseHandler(env *lmdb.Env, chCE chan CmdExecuted, chAC chan AppendCmd, chAM chan AppendMatch) {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
defer env.Close()
select {
case ce := <-chCE:
ce = ce
// TODO
case ac := <-chAC:
ac = ac
// TODO
case am := <-chAM:
am = am
// TODO
}
}
func initDatabase(conf *Conf) (chan CmdExecuted, chan AppendCmd, chan AppendMatch) {
env, err := lmdb.NewEnv()
if err != nil {
log.Fatalln("LMDB.NewEnv failed")
}
err = env.SetMapSize(1 << 30)
if err != nil {
log.Fatalln("LMDB.SetMapSize failed")
}
filterNumber := numberOfFilters(conf)
err = env.SetMaxDBs(filterNumber * 2)
if err != nil {
log.Fatalln("LMDB.SetMaxDBs failed")
}
matchDBs := make(map[*Filter]lmdb.DBI, filterNumber)
cmdDBs := make(map[*Filter]lmdb.DBI, filterNumber)
runtime.LockOSThread()
for _, stream := range conf.Streams {
for _, filter := range stream.Filters {
err = env.UpdateLocked(func(txn *lmdb.Txn) (err error) {
matchDBs[filter], err = txn.CreateDBI(fmt.Sprintln("%s.%s.match", stream.name, filter.name))
if err != nil {
return err
}
cmdDBs[filter], err = txn.CreateDBI(fmt.Sprintln("%s.%s.cmd", stream.name, filter.name))
return err
})
if err != nil {
log.Fatalln("LMDB.CreateDBI failed")
}
}
}
runtime.UnlockOSThread()
chCE := make(chan CmdExecuted)
chAC := make(chan AppendCmd)
chAM := make(chan AppendMatch)
go databaseHandler(env, chCE, chAC, chAM)
return chCE, chAC, chAM
}

View File

@ -2,6 +2,7 @@ package app
import (
"bufio"
"encoding/gob"
"flag"
// "fmt"
@ -54,17 +55,17 @@ func (f *Filter) match(line *string) string {
return ""
}
func (f *Filter) execActions(match string) {
func (f *Filter) execActions(match string, advance time.Duration) {
for _, a := range f.Actions {
wgActions.Add(1)
go a.exec(match)
go a.exec(match, advance)
}
}
func (a *Action) exec(match string) {
func (a *Action) exec(match string, advance time.Duration) {
defer wgActions.Done()
if a.afterDuration != 0 {
time.Sleep(a.afterDuration)
if a.afterDuration != 0 && a.afterDuration > advance {
time.Sleep(a.afterDuration - advance)
}
computedCommand := make([]string, 0, len(a.Cmd))
@ -99,13 +100,19 @@ func (f *Filter) handle() chan *string {
for line := range lines {
if match := f.match(line); match != "" {
entry := LogEntry{time.Now(), match, f.stream.name, f.name, false}
f.cleanOldMatches(match)
f.matches[match] = append(f.matches[match], time.Now())
if len(f.matches[match]) >= f.Retry {
f.execActions(match)
entry.exec = true
delete(f.matches, match)
f.execActions(match, nil)
}
db.Encode(&entry)
}
}
}()
@ -147,6 +154,8 @@ func (s *Stream) handle(signal chan *Stream) {
var wgActions sync.WaitGroup
var db gob.Encoder
func Main() {
confFilename := flag.String("c", "", "configuration file. see an example at https://framagit.org/ppom/reaction/-/blob/main/reaction.yml")
flag.Parse()
@ -158,6 +167,8 @@ func Main() {
conf := parseConf(*confFilename)
db = openDB()
endSignals := make(chan *Stream)
for _, stream := range conf.Streams {