From a2be5a566c270f2c030c41a2b4caedb5fe8d7612 Mon Sep 17 00:00:00 2001 From: ppom <> Date: Thu, 27 Apr 2023 10:42:19 +0200 Subject: [PATCH] Working log code Closes #7 #17 --- .gitignore | 1 + app/conf.go | 80 ++++++++++++++++++++++------------------ app/reaction.go | 36 +++++++++++++----- config/reaction.test.yml | 4 +- 4 files changed, 74 insertions(+), 47 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4d6e003 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/reaction.db diff --git a/app/conf.go b/app/conf.go index bda568f..e574f85 100644 --- a/app/conf.go +++ b/app/conf.go @@ -2,6 +2,7 @@ package app import ( "encoding/gob" + "errors" "fmt" "io" "log" @@ -55,10 +56,10 @@ type Action struct { } type LogEntry struct { - t time.Time - pattern string - stream, filter string - exec bool + T time.Time + Pattern string + Stream, Filter string + Exec bool } func (c *Conf) setup() { @@ -135,6 +136,9 @@ func (c *Conf) setup() { } action.afterDuration = afterDuration } + if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() { + filter.longuestActionDuration = &action.afterDuration + } } } } @@ -143,22 +147,40 @@ func (c *Conf) setup() { var DBname = "./reaction.db" var DBnewName = "./reaction.new.db" -func (c *Conf) updateFromDB() { +func (c *Conf) updateFromDB() *gob.Encoder { file, err := os.Open(DBname) if err != nil { - if err == os.ErrNotExist { + if errors.Is(err, os.ErrNotExist) { log.Printf("WARN: No DB found at %s\n", DBname) - return + + file, err := os.Create(DBname) + if err != nil { + log.Fatalln("Failed to create DB:", err) + } + return gob.NewEncoder(file) } log.Fatalln("Failed to open DB:", err) } - dec := gob.NewDecoder(&file) + dec := gob.NewDecoder(file) newfile, err := os.Create(DBnewName) if err != nil { - log.Fatalln("Failed to open DB:", err) + log.Fatalln("Failed to create new DB:", err) } - enc := gob.NewEncoder(&newfile) + enc := gob.NewEncoder(newfile) + + defer func() { + err := file.Close() + if err != nil { + log.Fatalln("ERRO: Failed to close old DB:", err) + } + + // It should be ok to rename an open file + err = os.Rename(DBnewName, DBname) + if err != nil { + log.Fatalln("ERRO: Failed to replace old DB with new one:", err) + } + }() // This extra code is made to warn only one time for each non-existant filter type SF struct{ s, f string } @@ -171,7 +193,7 @@ func (c *Conf) updateFromDB() { } } if malformedEntries > 0 { - log.Printf("WARN: %v malformed entry discarded from the DB\n", malformedEntries) + log.Printf("WARN: %v malformed entries discarded from the DB\n", malformedEntries) } }() @@ -191,52 +213,39 @@ func (c *Conf) updateFromDB() { err = dec.Decode(&entry) if err != nil { if err == io.EOF { - return + return enc } malformedEntries++ continue } // retrieve related filter - if s := c.Streams[entry.stream]; stream != nil { - filter = stream.Filters[entry.filter] + if stream := c.Streams[entry.Stream]; stream != nil { + filter = stream.Filters[entry.Filter] } if filter == nil { - discardedEntries[SF{entry.stream, entry.filter}] = true + discardedEntries[SF{entry.Stream, entry.Filter}] = true continue } // store matches - if !entry.exec && entry.t+filter.retryDuration > now { - filter.matches[entry.pattern] = append(f.matches[entry.pattern], entry.t) + if !entry.Exec && entry.T.Add(filter.retryDuration).Unix() > now.Unix() { + filter.matches[entry.Pattern] = append(filter.matches[entry.Pattern], entry.T) encodeOrFatal(entry) } // replay executions - if entry.exec && entry.t+filter.longuestActionDuration > now { - delete(filter.matches, match) - filter.execActions(match, now-entry.t) + if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() { + delete(filter.matches, entry.Pattern) + filter.execActions(entry.Pattern, now.Sub(entry.T)) encodeOrFatal(entry) } } - - err = os.Rename(DBnewName, DBname) - if err != nil { - log.Fatalln("ERRO: Failed to replace old DB with new one:", err) - } } -func openDB() { - f, err := os.OpenFile(DBname, os.O_APPEND|os.O_WRONLY, 0600) - if err != nil { - log.Fatalln("Failed to open DB:", err) - } - return gob.NewEncoder(&f) -} - -func parseConf(filename string) *Conf { +func parseConf(filename string) (*Conf, *gob.Encoder) { data, err := os.ReadFile(filename) @@ -251,6 +260,7 @@ func parseConf(filename string) *Conf { } conf.setup() + enc := conf.updateFromDB() - return &conf + return &conf, enc } diff --git a/app/reaction.go b/app/reaction.go index 46f2703..a44a6c9 100644 --- a/app/reaction.go +++ b/app/reaction.go @@ -4,11 +4,13 @@ import ( "bufio" "encoding/gob" "flag" + "syscall" // "fmt" "log" "os" "os/exec" + "os/signal" "strings" "sync" "time" @@ -107,9 +109,9 @@ func (f *Filter) handle() chan *string { f.matches[match] = append(f.matches[match], time.Now()) if len(f.matches[match]) >= f.Retry { - entry.exec = true + entry.Exec = true delete(f.matches, match) - f.execActions(match, nil) + f.execActions(match, time.Duration(0)) } db.Encode(&entry) @@ -154,9 +156,12 @@ func (s *Stream) handle(signal chan *Stream) { var wgActions sync.WaitGroup -var db gob.Encoder +var db *gob.Encoder func Main() { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + confFilename := flag.String("c", "", "configuration file. see an example at https://framagit.org/ppom/reaction/-/blob/main/reaction.yml") flag.Parse() @@ -165,22 +170,33 @@ func Main() { os.Exit(2) } - conf := parseConf(*confFilename) - - db = openDB() + conf, localdb := parseConf(*confFilename) + db = localdb endSignals := make(chan *Stream) + noStreamsInExecution := len(conf.Streams) for _, stream := range conf.Streams { go stream.handle(endSignals) } - for i := 0; i < len(conf.Streams); i++ { - finishedStream := <-endSignals - log.Printf("ERR %s stream finished", finishedStream.name) + for { + select { + case finishedStream := <-endSignals: + log.Printf("ERR %s stream finished", finishedStream.name) + noStreamsInExecution-- + if noStreamsInExecution == 0 { + quit() + } + case <-sigs: + log.Printf("Received SIGINT or SIGTERM, exiting") + quit() + } } +} +func quit() { + // TODO replace with advanced execution of all WIP actions wgActions.Wait() - os.Exit(3) } diff --git a/config/reaction.test.yml b/config/reaction.test.yml index 2cfbc5b..6109669 100644 --- a/config/reaction.test.yml +++ b/config/reaction.test.yml @@ -10,10 +10,10 @@ streams: regex: - found retry: 2 - retry-period: 5s + retry-period: 1m actions: damn: cmd: [ "echo", "" ] sleepdamn: cmd: [ "echo", "sleep", "" ] - after: 1s + after: 10s