From a119e0814b3b2504628e642919790a4de65cc239 Mon Sep 17 00:00:00 2001 From: ppom <> Date: Fri, 22 Sep 2023 18:09:31 +0200 Subject: [PATCH] one goroutine handles all actions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit pending actions are now data, they're not goroutines anymore ❤️ warn: cli currently doesn't work, but it's already a huge commit --- app/daemon.go | 140 ++++++++++++++++++++++++++++----------- app/persist.go | 8 +-- app/types.go | 11 ++- config/reaction.test.yml | 6 +- 4 files changed, 117 insertions(+), 48 deletions(-) diff --git a/app/daemon.go b/app/daemon.go index 7e76975..f4f5f66 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -66,31 +66,15 @@ func (f *Filter) match(line *string) string { return "" } -func (f *Filter) execActions(match string, advance time.Duration) { +func (f *Filter) sendActions(match string, at time.Time) { for _, a := range f.Actions { - wgActions.Add(1) - go a.exec(match, advance) + actionsC <- PAT{match, a, at.Add(a.afterDuration)} } } -func (a *Action) exec(match string, advance time.Duration) { +func (a *Action) exec(match string) { defer wgActions.Done() - // Wait for either end of sleep time, or actionStore requesting stop - if a.afterDuration != 0 && a.afterDuration > advance { - stopAction := actionStore.Register(a, match) - select { - case <-time.After(a.afterDuration - advance): - // Let's not wait for the lock - go actionStore.Unregister(a, match, stopAction) - case doExec := <-stopAction: - // no need to unregister here - if !doExec { - return - } - } - } - computedCommand := make([]string, 0, len(a.Cmd)) for _, item := range a.Cmd { computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match)) @@ -105,6 +89,75 @@ func (a *Action) exec(match string, advance time.Duration) { } } +func quasiBefore(then, now time.Time) bool { + // We won't complain if it's executed less than 1sec earlier + return then.Unix() <= now.Add(1*time.Second).Unix() +} + +func ActionsManager() { + actions := make(ActionsMap) + pendingActionsC := make(chan PAT) + var ( + pat PAT + action *Action + match string + then time.Time + now time.Time + ) + for { + select { + case pat = <-actionsC: + match = pat.p + action = pat.a + then = pat.t + now = time.Now() + // check + if quasiBefore(then, now) { + wgActions.Add(1) + go action.exec(match) + } else { + // make sure map exists + if actions[action] == nil { + actions[action] = make(PatternTimes) + } + // append() to nil is valid go + actions[action][match] = append(actions[action][match], then) + go func(pat PAT) { + log.Printf("DEBUG then: %v, now: %v, then.Sub(now): %v", then.String(), now.String(), then.Sub(now).String()) + time.Sleep(then.Sub(now)) + pendingActionsC <- pat + }(pat) + } + // FIXME convert to pendingActionsC to chan PA + // and forget about time checking + case pat = <-pendingActionsC: + match = pat.p + action = pat.a + then = pat.t + now = time.Now() + if quasiBefore(then, now) { + actions[action][match] = actions[action][match][1:] + wgActions.Add(1) + go action.exec(match) + } else { + // This should not happen + log.Fatalf("ERROR pendingActionsC then: %v << now %v\n", pat.t.String(), now) + } + case _, _ = <-stopActions: + for action := range actions { + if action.OnExit { + for match := range actions[action] { + wgActions.Add(1) + go action.exec(match) + } + } + } + wgActions.Done() + return + } + } +} + func MatchesManager() { matches := make(MatchesMap) var pf PF @@ -140,39 +193,35 @@ func MatchesManager() { } func matchesManagerHandleMatch(matches MatchesMap, pft PFT) bool { - var filter *Filter - var match string - var now time.Time - filter = pft.f - match = pft.p - now = pft.t + filter, match, then := pft.f, pft.p, pft.t if filter.Retry > 1 { // make sure map exists if matches[filter] == nil { - matches[filter] = make(map[string][]time.Time) + matches[filter] = make(PatternTimes) } // clean old matches newMatches := make([]time.Time, 0, len(matches[filter][match])) for _, old := range matches[filter][match] { - if old.Add(filter.retryDuration).After(now) { + if old.Add(filter.retryDuration).After(then) { newMatches = append(newMatches, old) } } // add new match - newMatches = append(newMatches, now) + newMatches = append(newMatches, then) matches[filter][match] = newMatches } if filter.Retry <= 1 || len(matches[filter][match]) >= filter.Retry { delete(matches[filter], match) - filter.execActions(match, time.Duration(0)) + filter.sendActions(match, then) return true } return false } func StreamManager(s *Stream, endedSignal chan *Stream) { + defer wgStreams.Done() log.Printf("INFO %s: start %s\n", s.name, s.Cmd) lines := cmdStdout(s.Cmd) @@ -188,32 +237,36 @@ func StreamManager(s *Stream, endedSignal chan *Stream) { matchesC <- PFT{match, filter, time.Now()} } } - case _, ok := <-stopStreams: - if !ok { - return - } + case _, _ = <-stopStreams: + return } } } var stopStreams chan bool +var stopActions chan bool var actionStore ActionStore var wgActions sync.WaitGroup +var wgStreams sync.WaitGroup // MatchesManager → DatabaseManager var logsC chan LogEntry + // SocketManager → DatabaseManager var flushesC chan LogEntry // DatabaseManager → MatchesManager var startupMatchesC chan PFT + // StreamManager → MatchesManager var matchesC chan PFT + // StreamManager, DatabaseManager → MatchesManager var cleanMatchesC chan PF + // MatchesManager → ExecsManager -var execsC chan PA +var actionsC chan PAT func Daemon(confFilename string) { actionStore.store = make(ActionMap) @@ -225,22 +278,24 @@ func Daemon(confFilename string) { matchesC = make(chan PFT) startupMatchesC = make(chan PFT) cleanMatchesC = make(chan PF) - execsC = make(chan PA) + actionsC = make(chan PAT) + stopActions = make(chan bool) + stopStreams = make(chan bool) go DatabaseManager(conf) go MatchesManager() + go ActionsManager() // Ready to start sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - stopStreams = make(chan bool) - endSignals := make(chan *Stream) nbStreamsInExecution := len(conf.Streams) for _, stream := range conf.Streams { + wgStreams.Add(1) go StreamManager(stream, endSignals) } @@ -262,12 +317,17 @@ func Daemon(confFilename string) { } func quit() { - log.Println("INFO Quitting...") - // stop all streams + // send stop to StreamManager·s close(stopStreams) + log.Println("INFO Waiting for Streams to finish...") + wgStreams.Wait() + // ActionsManager calls wgActions.Done() when it has launched all pending actions + wgActions.Add(1) + // send stop to ActionsManager + close(stopActions) // stop all actions actionStore.Quit() - // wait for them to complete + log.Println("INFO Waiting for Actions to finish...") wgActions.Wait() // delete pipe err := os.Remove(*SocketPath) diff --git a/app/persist.go b/app/persist.go index a76dc85..4f5ed69 100644 --- a/app/persist.go +++ b/app/persist.go @@ -63,6 +63,7 @@ func (c *Conf) manageLogs(logDB *WriteDB, flushDB *WriteDB) { } func (c *Conf) RotateDB(startup bool) (*WriteDB, *WriteDB) { + defer close(startupMatchesC) var ( doesntExist bool err error @@ -191,6 +192,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E // store matches if !entry.Exec && entry.T.Add(filter.retryDuration).Unix() > now.Unix() { if startup { + log.Println("DEBUG db send match") startupMatchesC <- PFT{entry.Pattern, filter, entry.T} } @@ -200,16 +202,14 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E // replay executions if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() { if startup { + log.Println("DEBUG db send match") cleanMatchesC <- PF{entry.Pattern, filter} - filter.execActions(entry.Pattern, now.Sub(entry.T)) + filter.sendActions(entry.Pattern, entry.T) } encodeOrFatal(logEnc, entry) } } - if startup { - close(startupMatchesC) - } } func encodeOrFatal(enc *gob.Encoder, entry LogEntry) { diff --git a/app/types.go b/app/types.go index 0929101..5789ce0 100644 --- a/app/types.go +++ b/app/types.go @@ -75,7 +75,11 @@ type WriteDB struct { enc *gob.Encoder } -type MatchesMap map[*Filter]map[string][]time.Time +type PatternTimes map[string][]time.Time + +type MatchesMap map[*Filter]PatternTimes + +type ActionsMap map[*Action]PatternTimes // Helper structs made to carry information across channels type SF struct{ s, f string } @@ -93,3 +97,8 @@ type PA struct { p string a *Action } +type PAT struct { + p string + a *Action + t time.Time +} diff --git a/config/reaction.test.yml b/config/reaction.test.yml index b55e16d..fbfe1f2 100644 --- a/config/reaction.test.yml +++ b/config/reaction.test.yml @@ -5,17 +5,17 @@ patterns: streams: tailDown: - cmd: [ "sh", "-c", "sleep 2; echo found 1; echo found 2; sleep 2" ] + cmd: [ "sh", "-c", "sleep 0.5; echo found 1; sleep 1; echo found 1; sleep 10" ] filters: findIP: regex: - '^found $' retry: 2 - retry-period: 2m + retry-period: 1m actions: damn: cmd: [ "echo", "" ] undamn: cmd: [ "echo", "undamn", "" ] - after: 1m + after: 5s onexit: true