diff --git a/.gitignore b/.gitignore index 2154c2b..e0cc912 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ -/reaction.db -/result /reaction +/reaction*.db +/reaction*.sock +/result diff --git a/app/daemon.go b/app/daemon.go index 50e4151..618ab24 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -147,7 +147,7 @@ func (f *Filter) handle() chan *string { f.execActions(match, time.Duration(0)) } - logs <- &entry + logs <- entry } } }() @@ -193,14 +193,17 @@ var stopStreams chan bool var actionStore ActionStore var wgActions sync.WaitGroup -var logs chan *LogEntry +var logs chan LogEntry +var flushes chan LogEntry func Daemon(confFilename string) { actionStore.store = make(ActionMap) conf := parseConf(confFilename) - logs = conf.DatabaseManager() + logs = make(chan LogEntry) + flushes = make(chan LogEntry) + go conf.DatabaseManager() // Ready to start @@ -210,7 +213,7 @@ func Daemon(confFilename string) { stopStreams = make(chan bool) endSignals := make(chan *Stream) - noStreamsInExecution := len(conf.Streams) + nbStreamsInExecution := len(conf.Streams) for _, stream := range conf.Streams { go stream.handle(endSignals) @@ -222,8 +225,8 @@ func Daemon(confFilename string) { select { case finishedStream := <-endSignals: log.Printf("ERROR %s stream finished", finishedStream.name) - noStreamsInExecution-- - if noStreamsInExecution == 0 { + nbStreamsInExecution-- + if nbStreamsInExecution == 0 { quit() } case <-sigs: diff --git a/app/persist.go b/app/persist.go index a9f859e..aa46515 100644 --- a/app/persist.go +++ b/app/persist.go @@ -10,8 +10,9 @@ import ( ) const ( - dbName = "./reaction.db" - dbNewName = "./reaction.new.db" + logDBName = "./reaction-matches.db" + logDBNewName = "./reaction-matches.new.db" + flushDBName = "./reaction-flushes.db" ) type ReadDB struct { @@ -24,88 +25,102 @@ type WriteDB struct { enc *gob.Encoder } -func openDB(path string) (error, *ReadDB) { +func openDB(path string) (bool, *ReadDB) { file, err := os.Open(path) - if err != nil { - return err, nil - } - return nil, &ReadDB{file, gob.NewDecoder(file)} -} - -func createDB(path string) (error, *WriteDB) { - file, err := os.Create(path) - if err != nil { - return err, nil - } - return nil, &WriteDB{file, gob.NewEncoder(file)} -} - -func (c *Conf) DatabaseManager() chan *LogEntry { - logs := make(chan *LogEntry) - go func() { - db := c.RotateDB(true) - go c.manageLogs(logs, db) - }() - return logs -} - -func (c *Conf) manageLogs(logs <-chan *LogEntry, db *WriteDB) { - var cpt int - for { - db.enc.Encode(<-logs) - cpt++ - // let's say 100 000 entries ~ 10 MB - if cpt == 100_000 { - cpt = 0 - db.file.Close() - log.Printf("INFO Rotating database...") - db = c.RotateDB(false) - log.Printf("INFO Rotated database") - } - } -} - -func (c *Conf) RotateDB(startup bool) *WriteDB { - var ( - err error - enc *WriteDB - dec *ReadDB - ) - err, dec = openDB(dbName) if err != nil { if errors.Is(err, os.ErrNotExist) { - log.Printf("WARN No DB found at %s. It's ok if this is the first time reaction is running.\n", dbName) - err, enc := createDB(dbName) - if err != nil { - log.Fatalln("FATAL Failed to create DB:", err) - } - return enc + log.Printf("WARN No DB found at %s. It's ok if this is the first time reaction is running.\n", path) + return true, nil } log.Fatalln("FATAL Failed to open DB:", err) } + return false, &ReadDB{file, gob.NewDecoder(file)} +} - err, enc = createDB(dbNewName) +func createDB(path string) *WriteDB { + file, err := os.Create(path) if err != nil { - log.Fatalln("FATAL Failed to create new DB:", err) + log.Fatalln("FATAL Failed to create DB:", err) + } + return &WriteDB{file, gob.NewEncoder(file)} +} + +func (c *Conf) DatabaseManager() { + logDB, flushDB := c.RotateDB(true) + log.Print("DEBUG startup finished, managing logs!") + c.manageLogs(logDB, flushDB) +} + +func (c *Conf) manageLogs(logDB *WriteDB, flushDB *WriteDB) { + var cpt int + for { + select { + case entry := <-flushes: + flushDB.enc.Encode(entry) + case entry := <-logs: + logDB.enc.Encode(entry) + cpt++ + // let's say 100 000 entries ~ 10 MB + if cpt == 100_000 { + cpt = 0 + log.Printf("INFO Rotating database...") + logDB.file.Close() + flushDB.file.Close() + logDB, flushDB = c.RotateDB(false) + log.Printf("INFO Rotated database") + } + } + } +} + +func (c *Conf) RotateDB(startup bool) (*WriteDB, *WriteDB) { + var ( + doesntExist bool + err error + logReadDB *ReadDB + flushReadDB *ReadDB + logWriteDB *WriteDB + flushWriteDB *WriteDB + ) + doesntExist, logReadDB = openDB(logDBName) + if doesntExist { + return createDB(logDBName), createDB(flushDBName) + } + doesntExist, flushReadDB = openDB(flushDBName) + if doesntExist { + log.Println("WARN Strange! No flushes db, opening /dev/null instead") + doesntExist, flushReadDB = openDB("/dev/null") + if doesntExist { + log.Fatalln("Opening dummy /dev/null failed") + } } - rotateDB(c, dec.dec, enc.enc, startup) + logWriteDB = createDB(logDBNewName) - err = dec.file.Close() + rotateDB(c, logReadDB.dec, flushReadDB.dec, logWriteDB.enc, startup) + + err = logReadDB.file.Close() if err != nil { log.Fatalln("FATAL Failed to close old DB:", err) } // It should be ok to rename an open file - err = os.Rename(dbNewName, dbName) + err = os.Rename(logDBNewName, logDBName) if err != nil { log.Fatalln("FATAL Failed to replace old DB with new one:", err) } - return enc + err = os.Remove(flushDBName) + if err != nil { + log.Fatalln("FATAL Failed to delete old DB:", err) + } + + flushWriteDB = createDB(flushDBName) + return logWriteDB, flushWriteDB } -func rotateDB(c *Conf, dec *gob.Decoder, enc *gob.Encoder, startup bool) { +func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.Encoder, startup bool) { + log.Println("DEBUG rotating db") // This extra code is made to warn only one time for each non-existant filter type SF struct{ s, f string } discardedEntries := make(map[SF]int) @@ -113,32 +128,58 @@ func rotateDB(c *Conf, dec *gob.Decoder, enc *gob.Encoder, startup bool) { defer func() { for sf, t := range discardedEntries { if t > 0 { - log.Printf("WARN info discarded %v times from the DB: stream/filter not found: %s.%s\n", t, sf.s, sf.f) + log.Printf("WARN info discarded %v times from the DBs: stream/filter not found: %s.%s\n", t, sf.s, sf.f) } } if malformedEntries > 0 { - log.Printf("WARN %v malformed entries discarded from the DB\n", malformedEntries) + log.Printf("WARN %v malformed entries discarded from the DBs\n", malformedEntries) } }() - encodeOrFatal := func(entry LogEntry) { - err := enc.Encode(entry) - if err != nil { - log.Fatalln("FATAL Failed to write to new DB:", err) - } - } - var err error - now := time.Now() - for { - var entry LogEntry - var filter *Filter + var entry LogEntry + var filter *Filter + type PSF struct{ p, s, f string } + + // pattern, stream, fitler → last flush + flushes := make(map[PSF]time.Time) + for { // decode entry - err = dec.Decode(&entry) + err = flushDec.Decode(&entry) if err != nil { if err == io.EOF { - return + break + } + malformedEntries++ + continue + } + + // retrieve related filter + if entry.Stream != "" || entry.Filter != "" { + if stream := c.Streams[entry.Stream]; stream != nil { + filter = stream.Filters[entry.Filter] + } + if filter == nil { + discardedEntries[SF{entry.Stream, entry.Filter}]++ + continue + } + } + + // store + log.Printf("DEBUG got flush: %v", entry.Pattern) + flushes[PSF{entry.Pattern, entry.Stream, entry.Filter}] = entry.T + } + log.Printf("DEBUG flushes: %v", flushes) + + now := time.Now() + for { + + // decode entry + err = logDec.Decode(&entry) + if err != nil { + if err == io.EOF { + break } malformedEntries++ continue @@ -153,23 +194,44 @@ func rotateDB(c *Conf, dec *gob.Decoder, enc *gob.Encoder, startup bool) { continue } + // check if it hasn't been flushed, only for Exec:true for now + if entry.Exec { + lastGlobalFlush := flushes[PSF{entry.Pattern, "", ""}].Unix() + lastLocalFlush := flushes[PSF{entry.Pattern, entry.Stream, entry.Filter}].Unix() + entryTime := entry.T.Unix() + + if lastLocalFlush > entryTime || lastGlobalFlush > entryTime { + log.Printf("DEBUG got %v (exec:%v) but it has been flushed\n", entry.Pattern, entry.Exec) + continue + } + } + // store matches if !entry.Exec && entry.T.Add(filter.retryDuration).Unix() > now.Unix() { + log.Printf("DEBUG got match: %v\n", entry.Pattern) if startup { filter.matches[entry.Pattern] = append(filter.matches[entry.Pattern], entry.T) } - encodeOrFatal(entry) + encodeOrFatal(logEnc, entry) } // replay executions if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() { + log.Printf("DEBUG got exec: %v\n", entry.Pattern) if startup { delete(filter.matches, entry.Pattern) filter.execActions(entry.Pattern, now.Sub(entry.T)) } - encodeOrFatal(entry) + encodeOrFatal(logEnc, entry) } } } + +func encodeOrFatal(enc *gob.Encoder, entry LogEntry) { + err := enc.Encode(entry) + if err != nil { + log.Fatalln("FATAL Failed to write to new DB:", err) + } +} diff --git a/app/pipe.go b/app/pipe.go index 0bc97ed..caa53f7 100644 --- a/app/pipe.go +++ b/app/pipe.go @@ -7,6 +7,7 @@ import ( "os" "path" "sync" + "time" "gopkg.in/yaml.v3" ) @@ -73,6 +74,7 @@ func (a *ActionStore) Flush(pattern string) int { } } delete(a.store, pattern) + flushes<-LogEntry{time.Now(), pattern, "", "", false} return cpt } diff --git a/config/reaction.test.yml b/config/reaction.test.yml index 02ea3de..0959005 100644 --- a/config/reaction.test.yml +++ b/config/reaction.test.yml @@ -5,13 +5,13 @@ patterns: streams: tailDown: - cmd: [ "sh", "-c", "sleep 2; echo found 1; echo found 2; sleep 1; echo found 1; sleep 2" ] + cmd: [ "sh", "-c", "sleep 6; echo found 1; echo found 2; sleep 10" ] filters: findIP: regex: - '^found $' retry: 2 - retry-period: 1m + retry-period: 2m actions: damn: cmd: [ "echo", "" ]