From 48f25170744d420bdbf5218ef6beef9e90bd826a Mon Sep 17 00:00:00 2001 From: ppom <> Date: Wed, 6 Sep 2023 02:00:33 +0200 Subject: [PATCH] database refactor fixes #21 --- app/daemon.go | 40 ++++----- app/persist.go | 175 +++++++++++++++++++++++++++++++++++++++ app/startup.go | 104 ----------------------- config/reaction.test.yml | 60 ++++++++++++-- 4 files changed, 249 insertions(+), 130 deletions(-) create mode 100644 app/persist.go diff --git a/app/daemon.go b/app/daemon.go index 3308bc6..50e4151 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -2,7 +2,6 @@ package app import ( "bufio" - "encoding/gob" "syscall" // "fmt" @@ -86,34 +85,33 @@ func sleep(d time.Duration) chan bool { func (a *Action) exec(match string, advance time.Duration) { defer wgActions.Done() - doExec := true // Wait for either end of sleep time, or actionStore requesting stop if a.afterDuration != 0 && a.afterDuration > advance { stopAction := actionStore.Register(a, match) select { case <-sleep(a.afterDuration - advance): - // no-op - case doExec = <-stopAction: - // no-op + // Let's not wait for the lock + go actionStore.Unregister(a, match, stopAction) + case doExec := <-stopAction: + // no need to unregister here + if !doExec { + return + } } - // Let's not wait for the lock - go actionStore.Unregister(a, match, stopAction) } - if doExec { - computedCommand := make([]string, 0, len(a.Cmd)) - for _, item := range a.Cmd { - computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match)) - } + computedCommand := make([]string, 0, len(a.Cmd)) + for _, item := range a.Cmd { + computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match)) + } - log.Printf("INFO %s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand) + log.Printf("INFO %s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand) - cmd := exec.Command(computedCommand[0], computedCommand[1:]...) + cmd := exec.Command(computedCommand[0], computedCommand[1:]...) - if ret := cmd.Run(); ret != nil { - log.Printf("ERROR %s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret) - } + if ret := cmd.Run(); ret != nil { + log.Printf("ERROR %s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret) } } @@ -149,7 +147,7 @@ func (f *Filter) handle() chan *string { f.execActions(match, time.Duration(0)) } - db.Encode(&entry) + logs <- &entry } } }() @@ -195,13 +193,14 @@ var stopStreams chan bool var actionStore ActionStore var wgActions sync.WaitGroup -var db *gob.Encoder +var logs chan *LogEntry func Daemon(confFilename string) { actionStore.store = make(ActionMap) conf := parseConf(confFilename) - db = conf.updateFromDB() + + logs = conf.DatabaseManager() // Ready to start @@ -240,6 +239,7 @@ func quit() { // stop all actions actionStore.Quit() // wait for them to complete + log.Println("INFO Waiting for actions to complete") wgActions.Wait() // delete pipe err := os.Remove(*SocketPath) diff --git a/app/persist.go b/app/persist.go new file mode 100644 index 0000000..a9f859e --- /dev/null +++ b/app/persist.go @@ -0,0 +1,175 @@ +package app + +import ( + "encoding/gob" + "errors" + "io" + "log" + "os" + "time" +) + +const ( + dbName = "./reaction.db" + dbNewName = "./reaction.new.db" +) + +type ReadDB struct { + file *os.File + dec *gob.Decoder +} + +type WriteDB struct { + file *os.File + enc *gob.Encoder +} + +func openDB(path string) (error, *ReadDB) { + file, err := os.Open(path) + if err != nil { + return err, nil + } + return nil, &ReadDB{file, gob.NewDecoder(file)} +} + +func createDB(path string) (error, *WriteDB) { + file, err := os.Create(path) + if err != nil { + return err, nil + } + return nil, &WriteDB{file, gob.NewEncoder(file)} +} + +func (c *Conf) DatabaseManager() chan *LogEntry { + logs := make(chan *LogEntry) + go func() { + db := c.RotateDB(true) + go c.manageLogs(logs, db) + }() + return logs +} + +func (c *Conf) manageLogs(logs <-chan *LogEntry, db *WriteDB) { + var cpt int + for { + db.enc.Encode(<-logs) + cpt++ + // let's say 100 000 entries ~ 10 MB + if cpt == 100_000 { + cpt = 0 + db.file.Close() + log.Printf("INFO Rotating database...") + db = c.RotateDB(false) + log.Printf("INFO Rotated database") + } + } +} + +func (c *Conf) RotateDB(startup bool) *WriteDB { + var ( + err error + enc *WriteDB + dec *ReadDB + ) + err, dec = openDB(dbName) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + log.Printf("WARN No DB found at %s. It's ok if this is the first time reaction is running.\n", dbName) + err, enc := createDB(dbName) + if err != nil { + log.Fatalln("FATAL Failed to create DB:", err) + } + return enc + } + log.Fatalln("FATAL Failed to open DB:", err) + } + + err, enc = createDB(dbNewName) + if err != nil { + log.Fatalln("FATAL Failed to create new DB:", err) + } + + rotateDB(c, dec.dec, enc.enc, startup) + + err = dec.file.Close() + if err != nil { + log.Fatalln("FATAL Failed to close old DB:", err) + } + + // It should be ok to rename an open file + err = os.Rename(dbNewName, dbName) + if err != nil { + log.Fatalln("FATAL Failed to replace old DB with new one:", err) + } + + return enc +} + +func rotateDB(c *Conf, dec *gob.Decoder, enc *gob.Encoder, startup bool) { + // This extra code is made to warn only one time for each non-existant filter + type SF struct{ s, f string } + discardedEntries := make(map[SF]int) + malformedEntries := 0 + defer func() { + for sf, t := range discardedEntries { + if t > 0 { + log.Printf("WARN info discarded %v times from the DB: stream/filter not found: %s.%s\n", t, sf.s, sf.f) + } + } + if malformedEntries > 0 { + log.Printf("WARN %v malformed entries discarded from the DB\n", malformedEntries) + } + }() + + encodeOrFatal := func(entry LogEntry) { + err := enc.Encode(entry) + if err != nil { + log.Fatalln("FATAL Failed to write to new DB:", err) + } + } + + var err error + now := time.Now() + for { + var entry LogEntry + var filter *Filter + + // decode entry + err = dec.Decode(&entry) + if err != nil { + if err == io.EOF { + return + } + malformedEntries++ + continue + } + + // retrieve related filter + if stream := c.Streams[entry.Stream]; stream != nil { + filter = stream.Filters[entry.Filter] + } + if filter == nil { + discardedEntries[SF{entry.Stream, entry.Filter}]++ + continue + } + + // store matches + if !entry.Exec && entry.T.Add(filter.retryDuration).Unix() > now.Unix() { + if startup { + filter.matches[entry.Pattern] = append(filter.matches[entry.Pattern], entry.T) + } + + encodeOrFatal(entry) + } + + // replay executions + if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() { + if startup { + delete(filter.matches, entry.Pattern) + filter.execActions(entry.Pattern, now.Sub(entry.T)) + } + + encodeOrFatal(entry) + } + } +} diff --git a/app/startup.go b/app/startup.go index 1b525fb..0d28f80 100644 --- a/app/startup.go +++ b/app/startup.go @@ -1,10 +1,7 @@ package app import ( - "encoding/gob" - "errors" "fmt" - "io" "log" "os" "regexp" @@ -183,107 +180,6 @@ func (c *Conf) setup() { } } -var DBname = "./reaction.db" -var DBnewName = "./reaction.new.db" - -func (c *Conf) updateFromDB() *gob.Encoder { - file, err := os.Open(DBname) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - log.Printf("WARN No DB found at %s. It's ok if this is the first time reaction is running.\n", DBname) - - file, err := os.Create(DBname) - if err != nil { - log.Fatalln("FATAL Failed to create DB:", err) - } - return gob.NewEncoder(file) - } - log.Fatalln("FATAL Failed to open DB:", err) - } - dec := gob.NewDecoder(file) - - newfile, err := os.Create(DBnewName) - if err != nil { - log.Fatalln("FATAL Failed to create new DB:", err) - } - enc := gob.NewEncoder(newfile) - - defer func() { - err := file.Close() - if err != nil { - log.Fatalln("FATAL Failed to close old DB:", err) - } - - // It should be ok to rename an open file - err = os.Rename(DBnewName, DBname) - if err != nil { - log.Fatalln("FATAL Failed to replace old DB with new one:", err) - } - }() - - // This extra code is made to warn only one time for each non-existant filter - type SF struct{ s, f string } - discardedEntries := make(map[SF]bool) - malformedEntries := 0 - defer func() { - for sf, t := range discardedEntries { - if t { - log.Printf("WARN info discarded from the DB: stream/filter not found: %s.%s\n", sf.s, sf.f) - } - } - if malformedEntries > 0 { - log.Printf("WARN %v malformed entries discarded from the DB\n", malformedEntries) - } - }() - - encodeOrFatal := func(entry LogEntry) { - err = enc.Encode(entry) - if err != nil { - log.Fatalln("FATAL Failed to write to new DB:", err) - } - } - - now := time.Now() - for { - var entry LogEntry - var filter *Filter - - // decode entry - err = dec.Decode(&entry) - if err != nil { - if err == io.EOF { - return enc - } - malformedEntries++ - continue - } - - // retrieve related filter - if stream := c.Streams[entry.Stream]; stream != nil { - filter = stream.Filters[entry.Filter] - } - if filter == nil { - discardedEntries[SF{entry.Stream, entry.Filter}] = true - continue - } - - // store matches - if !entry.Exec && entry.T.Add(filter.retryDuration).Unix() > now.Unix() { - filter.matches[entry.Pattern] = append(filter.matches[entry.Pattern], entry.T) - - encodeOrFatal(entry) - } - - // replay executions - if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() { - delete(filter.matches, entry.Pattern) - filter.execActions(entry.Pattern, now.Sub(entry.T)) - - encodeOrFatal(entry) - } - } -} - func parseConf(filename string) *Conf { data, err := os.ReadFile(filename) diff --git a/config/reaction.test.yml b/config/reaction.test.yml index 95d9000..d0a82f5 100644 --- a/config/reaction.test.yml +++ b/config/reaction.test.yml @@ -1,22 +1,70 @@ --- patterns: + num: + regex: '[0-9]+' ip: regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})' ignore: - 1.0.0.1 streams: - tailDown: - cmd: [ "sh", "-c", "echo 'found 1.1.1.1' && sleep 2 && echo 'found 1.0.0.1' && sleep 10m" ] + tailDown1: + cmd: [ "sh", "-c", "sleep 2; seq 100010 | while read i; do echo found $(($i % 100)); done" ] filters: findIP: regex: - - '^found ' - retry: 2 + - '^found $' + retry: 10 retry-period: 1m actions: damn: - cmd: [ "echo", "" ] + cmd: [ "echo", "" ] undamn: - cmd: [ "echo", "undamn", "" ] + cmd: [ "echo", "undamn", "" ] after: 1m + onexit: false + tailDown2: + cmd: [ "sh", "-c", "sleep 2; seq 100010 | while read i; do echo prout $(($i % 100)); done" ] + filters: + findIP: + regex: + - '^prout $' + retry: 10 + retry-period: 1m + actions: + damn: + cmd: [ "echo", "" ] + undamn: + cmd: [ "echo", "undamn", "" ] + after: 1m + onexit: false + tailDown3: + cmd: [ "sh", "-c", "sleep 2; seq 100010 | while read i; do echo nanana $(($i % 100)); done" ] + filters: + findIP: + regex: + - '^nanana $' + retry: 4 + retry-period: 2m + actions: + damn: + cmd: [ "true" ] + undamn: + cmd: [ "true" ] + after: 1m + onexit: false + tailDown4: + cmd: [ "sh", "-c", "sleep 2; seq 100010 | while read i; do echo nanana $(($i % 100)); done" ] + filters: + findIP: + regex: + - '^nomatch $' + retry: 5 + retry-period: 2m + actions: + damn: + cmd: [ "echo", "" ] + undamn: + cmd: [ "echo", "undamn", "" ] + after: 1m + onexit: false