From 52556f69b9abe2a11ca81f0ad0c645a260900f09 Mon Sep 17 00:00:00 2001 From: ppom <> Date: Sat, 9 Sep 2023 23:38:53 +0200 Subject: [PATCH] one goroutine handles all matches it's not the filters that handles their matches anymore --- app/daemon.go | 116 ++++++++++++++++++++++++++++----------- app/persist.go | 24 +++----- app/pipe.go | 4 +- app/startup.go | 61 -------------------- app/types.go | 95 ++++++++++++++++++++++++++++++++ config/reaction.test.yml | 2 +- 6 files changed, 190 insertions(+), 112 deletions(-) create mode 100644 app/types.go diff --git a/app/daemon.go b/app/daemon.go index 87b01ce..7e76975 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -105,39 +105,74 @@ func (a *Action) exec(match string, advance time.Duration) { } } -func (f *Filter) cleanOldMatches(match string) { - now := time.Now() - newMatches := make([]time.Time, 0, len(f.matches[match])) - for _, old := range f.matches[match] { - if old.Add(f.retryDuration).After(now) { - newMatches = append(newMatches, old) +func MatchesManager() { + matches := make(MatchesMap) + var pf PF + var pft PFT + end := false + + for !end { + select { + case pf = <-cleanMatchesC: + delete(matches[pf.f], pf.p) + case pft, ok := <-startupMatchesC: + if !ok { + end = true + } else { + _ = matchesManagerHandleMatch(matches, pft) + } } } - f.matches[match] = newMatches -} -func (f *Filter) handle(line *string) { - if match := f.match(line); match != "" { + for { + select { + case pf = <-cleanMatchesC: + delete(matches[pf.f], pf.p) + case pft = <-matchesC: - entry := LogEntry{time.Now(), match, f.stream.name, f.name, false} + entry := LogEntry{pft.t, pft.p, pft.f.stream.name, pft.f.name, false} - if f.Retry > 1 { - f.cleanOldMatches(match) + entry.Exec = matchesManagerHandleMatch(matches, pft) - f.matches[match] = append(f.matches[match], time.Now()) + logsC <- entry } - - if f.Retry <= 1 || len(f.matches[match]) >= f.Retry { - entry.Exec = true - delete(f.matches, match) - f.execActions(match, time.Duration(0)) - } - - logs <- entry } } -func (s *Stream) handle(endedSignal chan *Stream) { +func matchesManagerHandleMatch(matches MatchesMap, pft PFT) bool { + var filter *Filter + var match string + var now time.Time + filter = pft.f + match = pft.p + now = pft.t + + if filter.Retry > 1 { + // make sure map exists + if matches[filter] == nil { + matches[filter] = make(map[string][]time.Time) + } + // clean old matches + newMatches := make([]time.Time, 0, len(matches[filter][match])) + for _, old := range matches[filter][match] { + if old.Add(filter.retryDuration).After(now) { + newMatches = append(newMatches, old) + } + } + // add new match + newMatches = append(newMatches, now) + matches[filter][match] = newMatches + } + + if filter.Retry <= 1 || len(matches[filter][match]) >= filter.Retry { + delete(matches[filter], match) + filter.execActions(match, time.Duration(0)) + return true + } + return false +} + +func StreamManager(s *Stream, endedSignal chan *Stream) { log.Printf("INFO %s: start %s\n", s.name, s.Cmd) lines := cmdStdout(s.Cmd) @@ -149,7 +184,9 @@ func (s *Stream) handle(endedSignal chan *Stream) { return } for _, filter := range s.Filters { - filter.handle(line) + if match := filter.match(line); match != "" { + matchesC <- PFT{match, filter, time.Now()} + } } case _, ok := <-stopStreams: if !ok { @@ -164,17 +201,34 @@ var stopStreams chan bool var actionStore ActionStore var wgActions sync.WaitGroup -var logs chan LogEntry -var flushes chan LogEntry +// MatchesManager → DatabaseManager +var logsC chan LogEntry +// SocketManager → DatabaseManager +var flushesC chan LogEntry + +// DatabaseManager → MatchesManager +var startupMatchesC chan PFT +// StreamManager → MatchesManager +var matchesC chan PFT +// StreamManager, DatabaseManager → MatchesManager +var cleanMatchesC chan PF +// MatchesManager → ExecsManager +var execsC chan PA func Daemon(confFilename string) { actionStore.store = make(ActionMap) conf := parseConf(confFilename) - logs = make(chan LogEntry) - flushes = make(chan LogEntry) + logsC = make(chan LogEntry) + flushesC = make(chan LogEntry) + matchesC = make(chan PFT) + startupMatchesC = make(chan PFT) + cleanMatchesC = make(chan PF) + execsC = make(chan PA) + go DatabaseManager(conf) + go MatchesManager() // Ready to start @@ -187,10 +241,10 @@ func Daemon(confFilename string) { nbStreamsInExecution := len(conf.Streams) for _, stream := range conf.Streams { - go stream.handle(endSignals) + go StreamManager(stream, endSignals) } - go ServeSocket() + go SocketManager() for { select { @@ -208,12 +262,12 @@ func Daemon(confFilename string) { } func quit() { + log.Println("INFO Quitting...") // stop all streams close(stopStreams) // stop all actions actionStore.Quit() // wait for them to complete - log.Println("INFO Waiting for actions to complete") wgActions.Wait() // delete pipe err := os.Remove(*SocketPath) diff --git a/app/persist.go b/app/persist.go index ed400eb..a76dc85 100644 --- a/app/persist.go +++ b/app/persist.go @@ -15,16 +15,6 @@ const ( flushDBName = "./reaction-flushes.db" ) -type ReadDB struct { - file *os.File - dec *gob.Decoder -} - -type WriteDB struct { - file *os.File - enc *gob.Encoder -} - func openDB(path string) (bool, *ReadDB) { file, err := os.Open(path) if err != nil { @@ -54,9 +44,9 @@ func (c *Conf) manageLogs(logDB *WriteDB, flushDB *WriteDB) { var cpt int for { select { - case entry := <-flushes: + case entry := <-flushesC: flushDB.enc.Encode(entry) - case entry := <-logs: + case entry := <-logsC: logDB.enc.Encode(entry) cpt++ // let's say 100 000 entries ~ 10 MB @@ -120,7 +110,6 @@ func (c *Conf) RotateDB(startup bool) (*WriteDB, *WriteDB) { func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.Encoder, startup bool) { // This extra code is made to warn only one time for each non-existant filter - type SF struct{ s, f string } discardedEntries := make(map[SF]int) malformedEntries := 0 defer func() { @@ -138,8 +127,6 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E var entry LogEntry var filter *Filter - type PSF struct{ p, s, f string } - // pattern, stream, fitler → last flush flushes := make(map[PSF]time.Time) for { @@ -204,7 +191,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E // store matches if !entry.Exec && entry.T.Add(filter.retryDuration).Unix() > now.Unix() { if startup { - filter.matches[entry.Pattern] = append(filter.matches[entry.Pattern], entry.T) + startupMatchesC <- PFT{entry.Pattern, filter, entry.T} } encodeOrFatal(logEnc, entry) @@ -213,13 +200,16 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E // replay executions if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() { if startup { - delete(filter.matches, entry.Pattern) + cleanMatchesC <- PF{entry.Pattern, filter} filter.execActions(entry.Pattern, now.Sub(entry.T)) } encodeOrFatal(logEnc, entry) } } + if startup { + close(startupMatchesC) + } } func encodeOrFatal(enc *gob.Encoder, entry LogEntry) { diff --git a/app/pipe.go b/app/pipe.go index caa53f7..3434719 100644 --- a/app/pipe.go +++ b/app/pipe.go @@ -74,7 +74,7 @@ func (a *ActionStore) Flush(pattern string) int { } } delete(a.store, pattern) - flushes<-LogEntry{time.Now(), pattern, "", "", false} + flushesC <- LogEntry{time.Now(), pattern, "", "", false} return cpt } @@ -136,7 +136,7 @@ func createOpenSocket() net.Listener { } // Handle connections -func ServeSocket() { +func SocketManager() { ln := createOpenSocket() defer ln.Close() for { diff --git a/app/startup.go b/app/startup.go index 5ff165a..bcafd31 100644 --- a/app/startup.go +++ b/app/startup.go @@ -11,66 +11,6 @@ import ( "gopkg.in/yaml.v3" ) -type Conf struct { - Patterns map[string]*Pattern `yaml:"patterns"` - Streams map[string]*Stream `yaml:"streams"` -} - -type Pattern struct { - Regex string `yaml:"regex"` - Ignore []string `yaml:"ignore"` - - name string `yaml:"-"` - nameWithBraces string `yaml:"-"` -} - -// Stream, Filter & Action structures must never be copied. -// They're always referenced through pointers - -type Stream struct { - name string `yaml:"-"` - - Cmd []string `yaml:"cmd"` - Filters map[string]*Filter `yaml:"filters"` -} - -type Filter struct { - stream *Stream `yaml:"-"` - name string `yaml:"-"` - - Regex []string `yaml:"regex"` - compiledRegex []regexp.Regexp `yaml:"-"` - pattern *Pattern `yaml:"-"` - - Retry int `yaml:"retry"` - RetryPeriod string `yaml:"retry-period"` - retryDuration time.Duration `yaml:"-"` - - Actions map[string]*Action `yaml:"actions"` - longuestActionDuration *time.Duration - - matches map[string][]time.Time `yaml:"-"` -} - -type Action struct { - filter *Filter `yaml:"-"` - name string `yaml:"-"` - - Cmd []string `yaml:"cmd"` - - After string `yaml:"after"` - afterDuration time.Duration `yaml:"-"` - - OnExit bool `yaml:"onexit"` -} - -type LogEntry struct { - T time.Time - Pattern string - Stream, Filter string - Exec bool -} - func (c *Conf) setup() { for patternName := range c.Patterns { @@ -114,7 +54,6 @@ func (c *Conf) setup() { filter := stream.Filters[filterName] filter.stream = stream filter.name = filterName - filter.matches = make(map[string][]time.Time) if strings.Contains(filter.name, ".") { log.Fatalln("FATAL Bad configuration: character '.' is not allowed in filter names", filter.name) diff --git a/app/types.go b/app/types.go new file mode 100644 index 0000000..0929101 --- /dev/null +++ b/app/types.go @@ -0,0 +1,95 @@ +package app + +import ( + "encoding/gob" + "os" + "regexp" + "time" +) + +type Conf struct { + Patterns map[string]*Pattern `yaml:"patterns"` + Streams map[string]*Stream `yaml:"streams"` +} + +type Pattern struct { + Regex string `yaml:"regex"` + Ignore []string `yaml:"ignore"` + + name string `yaml:"-"` + nameWithBraces string `yaml:"-"` +} + +// Stream, Filter & Action structures must never be copied. +// They're always referenced through pointers + +type Stream struct { + name string `yaml:"-"` + + Cmd []string `yaml:"cmd"` + Filters map[string]*Filter `yaml:"filters"` +} + +type Filter struct { + stream *Stream `yaml:"-"` + name string `yaml:"-"` + + Regex []string `yaml:"regex"` + compiledRegex []regexp.Regexp `yaml:"-"` + pattern *Pattern `yaml:"-"` + + Retry int `yaml:"retry"` + RetryPeriod string `yaml:"retry-period"` + retryDuration time.Duration `yaml:"-"` + + Actions map[string]*Action `yaml:"actions"` + longuestActionDuration *time.Duration +} + +type Action struct { + filter *Filter `yaml:"-"` + name string `yaml:"-"` + + Cmd []string `yaml:"cmd"` + + After string `yaml:"after"` + afterDuration time.Duration `yaml:"-"` + + OnExit bool `yaml:"onexit"` +} + +type LogEntry struct { + T time.Time + Pattern string + Stream, Filter string + Exec bool +} + +type ReadDB struct { + file *os.File + dec *gob.Decoder +} + +type WriteDB struct { + file *os.File + enc *gob.Encoder +} + +type MatchesMap map[*Filter]map[string][]time.Time + +// Helper structs made to carry information across channels +type SF struct{ s, f string } +type PSF struct{ p, s, f string } +type PF struct { + p string + f *Filter +} +type PFT struct { + p string + f *Filter + t time.Time +} +type PA struct { + p string + a *Action +} diff --git a/config/reaction.test.yml b/config/reaction.test.yml index 0959005..b55e16d 100644 --- a/config/reaction.test.yml +++ b/config/reaction.test.yml @@ -5,7 +5,7 @@ patterns: streams: tailDown: - cmd: [ "sh", "-c", "sleep 6; echo found 1; echo found 2; sleep 10" ] + cmd: [ "sh", "-c", "sleep 2; echo found 1; echo found 2; sleep 2" ] filters: findIP: regex: