diff --git a/app/conf.go b/app/conf.go index 119e0fa..bda568f 100644 --- a/app/conf.go +++ b/app/conf.go @@ -1,7 +1,9 @@ package app import ( + "encoding/gob" "fmt" + "io" "log" "os" "regexp" @@ -36,7 +38,8 @@ type Filter struct { RetryPeriod string `yaml:"retry-period"` retryDuration time.Duration `yaml:"-"` - Actions map[string]*Action `yaml:"actions"` + Actions map[string]*Action `yaml:"actions"` + longuestActionDuration *time.Duration matches map[string][]time.Time `yaml:"-"` } @@ -51,6 +54,13 @@ type Action struct { afterDuration time.Duration `yaml:"-"` } +type LogEntry struct { + t time.Time + pattern string + stream, filter string + exec bool +} + func (c *Conf) setup() { for patternName, pattern := range c.Patterns { c.Patterns[patternName] = fmt.Sprintf("(?P<%s>%s)", patternName, pattern) @@ -130,6 +140,102 @@ func (c *Conf) setup() { } } +var DBname = "./reaction.db" +var DBnewName = "./reaction.new.db" + +func (c *Conf) updateFromDB() { + file, err := os.Open(DBname) + if err != nil { + if err == os.ErrNotExist { + log.Printf("WARN: No DB found at %s\n", DBname) + return + } + log.Fatalln("Failed to open DB:", err) + } + dec := gob.NewDecoder(&file) + + newfile, err := os.Create(DBnewName) + if err != nil { + log.Fatalln("Failed to open DB:", err) + } + enc := gob.NewEncoder(&newfile) + + // This extra code is made to warn only one time for each non-existant filter + type SF struct{ s, f string } + discardedEntries := make(map[SF]bool) + malformedEntries := 0 + defer func() { + for sf, t := range discardedEntries { + if t { + log.Printf("WARN: info discarded from the DB: stream/filter not found: %s.%s\n", sf.s, sf.f) + } + } + if malformedEntries > 0 { + log.Printf("WARN: %v malformed entry discarded from the DB\n", malformedEntries) + } + }() + + encodeOrFatal := func(entry LogEntry) { + err = enc.Encode(entry) + if err != nil { + log.Fatalln("ERRO: couldn't write to new DB:", err) + } + } + + now := time.Now() + for { + var entry LogEntry + var filter *Filter + + // decode entry + err = dec.Decode(&entry) + if err != nil { + if err == io.EOF { + return + } + malformedEntries++ + continue + } + + // retrieve related filter + if s := c.Streams[entry.stream]; stream != nil { + filter = stream.Filters[entry.filter] + } + if filter == nil { + discardedEntries[SF{entry.stream, entry.filter}] = true + continue + } + + // store matches + if !entry.exec && entry.t+filter.retryDuration > now { + filter.matches[entry.pattern] = append(f.matches[entry.pattern], entry.t) + + encodeOrFatal(entry) + } + + // replay executions + if entry.exec && entry.t+filter.longuestActionDuration > now { + delete(filter.matches, match) + filter.execActions(match, now-entry.t) + + encodeOrFatal(entry) + } + } + + err = os.Rename(DBnewName, DBname) + if err != nil { + log.Fatalln("ERRO: Failed to replace old DB with new one:", err) + } +} + +func openDB() { + f, err := os.OpenFile(DBname, os.O_APPEND|os.O_WRONLY, 0600) + if err != nil { + log.Fatalln("Failed to open DB:", err) + } + return gob.NewEncoder(&f) +} + func parseConf(filename string) *Conf { data, err := os.ReadFile(filename) diff --git a/app/db.go b/app/db.go deleted file mode 100644 index 28bfd18..0000000 --- a/app/db.go +++ /dev/null @@ -1,119 +0,0 @@ -package app - -import ( - "fmt" - "log" - "runtime" - "time" - - "github.com/bmatsuo/lmdb-go/lmdb" -) - -func numberOfFilters(conf *Conf) int { - n := 0 - for _, s := range conf.Streams { - n += len(s.Filters) - } - return n -} - -type CmdTime struct { - cmd []string - t time.Time -} - -// Remove Cmd if last of its set -type CmdExecuted struct { - filter *Filter - pattern *string - value CmdTime - err chan error -} - -// Append Cmd set -type AppendCmd struct { - filter *Filter - pattern *string - value []CmdTime - err chan error -} - -// Append match, remove old ones and check match number -type AppendMatch struct { - filter *Filter - pattern *string - t time.Time - ret chan struct { - shouldExec bool - err error - } -} - -func databaseHandler(env *lmdb.Env, chCE chan CmdExecuted, chAC chan AppendCmd, chAM chan AppendMatch) { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - defer env.Close() - - select { - case ce := <-chCE: - ce = ce - // TODO - case ac := <-chAC: - ac = ac - // TODO - case am := <-chAM: - am = am - // TODO - } -} - -func initDatabase(conf *Conf) (chan CmdExecuted, chan AppendCmd, chan AppendMatch) { - env, err := lmdb.NewEnv() - if err != nil { - log.Fatalln("LMDB.NewEnv failed") - } - - err = env.SetMapSize(1 << 30) - if err != nil { - log.Fatalln("LMDB.SetMapSize failed") - } - - filterNumber := numberOfFilters(conf) - - err = env.SetMaxDBs(filterNumber * 2) - if err != nil { - log.Fatalln("LMDB.SetMaxDBs failed") - } - - matchDBs := make(map[*Filter]lmdb.DBI, filterNumber) - cmdDBs := make(map[*Filter]lmdb.DBI, filterNumber) - - runtime.LockOSThread() - - for _, stream := range conf.Streams { - for _, filter := range stream.Filters { - err = env.UpdateLocked(func(txn *lmdb.Txn) (err error) { - matchDBs[filter], err = txn.CreateDBI(fmt.Sprintln("%s.%s.match", stream.name, filter.name)) - if err != nil { - return err - } - - cmdDBs[filter], err = txn.CreateDBI(fmt.Sprintln("%s.%s.cmd", stream.name, filter.name)) - return err - }) - if err != nil { - log.Fatalln("LMDB.CreateDBI failed") - } - } - } - - runtime.UnlockOSThread() - - chCE := make(chan CmdExecuted) - chAC := make(chan AppendCmd) - chAM := make(chan AppendMatch) - - go databaseHandler(env, chCE, chAC, chAM) - - return chCE, chAC, chAM -} diff --git a/app/reaction.go b/app/reaction.go index 1fe65b8..46f2703 100644 --- a/app/reaction.go +++ b/app/reaction.go @@ -2,6 +2,7 @@ package app import ( "bufio" + "encoding/gob" "flag" // "fmt" @@ -54,17 +55,17 @@ func (f *Filter) match(line *string) string { return "" } -func (f *Filter) execActions(match string) { +func (f *Filter) execActions(match string, advance time.Duration) { for _, a := range f.Actions { wgActions.Add(1) - go a.exec(match) + go a.exec(match, advance) } } -func (a *Action) exec(match string) { +func (a *Action) exec(match string, advance time.Duration) { defer wgActions.Done() - if a.afterDuration != 0 { - time.Sleep(a.afterDuration) + if a.afterDuration != 0 && a.afterDuration > advance { + time.Sleep(a.afterDuration - advance) } computedCommand := make([]string, 0, len(a.Cmd)) @@ -99,13 +100,19 @@ func (f *Filter) handle() chan *string { for line := range lines { if match := f.match(line); match != "" { + entry := LogEntry{time.Now(), match, f.stream.name, f.name, false} + f.cleanOldMatches(match) f.matches[match] = append(f.matches[match], time.Now()) if len(f.matches[match]) >= f.Retry { - f.execActions(match) + entry.exec = true + delete(f.matches, match) + f.execActions(match, nil) } + + db.Encode(&entry) } } }() @@ -147,6 +154,8 @@ func (s *Stream) handle(signal chan *Stream) { var wgActions sync.WaitGroup +var db gob.Encoder + func Main() { confFilename := flag.String("c", "", "configuration file. see an example at https://framagit.org/ppom/reaction/-/blob/main/reaction.yml") flag.Parse() @@ -158,6 +167,8 @@ func Main() { conf := parseConf(*confFilename) + db = openDB() + endSignals := make(chan *Stream) for _, stream := range conf.Streams {