From 480aeb7f15332e85acadb900921f9798bb742c30 Mon Sep 17 00:00:00 2001 From: ppom <> Date: Sun, 1 Oct 2023 12:00:00 +0200 Subject: [PATCH] Reimplement flushes fix #33 fix #39 --- app/client.go | 16 ++++-- app/daemon.go | 146 +++++++++++++++++++++++++++++-------------------- app/persist.go | 17 +++--- app/pipe.go | 46 ++++++++++++++-- app/types.go | 11 +++- 5 files changed, 157 insertions(+), 79 deletions(-) diff --git a/app/client.go b/app/client.go index 5889a0e..53a3801 100644 --- a/app/client.go +++ b/app/client.go @@ -23,9 +23,10 @@ type Request struct { } type Response struct { - Err error - ClientStatus ClientStatus - Number int + Err error + ClientStatus ClientStatus + FlushedMatches map[string]map[string]int + FlushedActions map[string]map[string]map[string]int } func SendAndRetrieve(data Request) Response { @@ -111,7 +112,14 @@ func ClientFlush(pattern, streamfilter string) { log.Fatalln("Received error from daemon:", response.Err) os.Exit(1) } - fmt.Printf("flushed pattern %v times\n", response.Number) + text, err := yaml.Marshal(struct { + Matches map[string]map[string]int + Actions map[string]map[string]map[string]int + }{response.FlushedMatches, response.FlushedActions}) + if err != nil { + log.Fatalln("Failed to convert daemon binary response to text format:", err) + } + fmt.Println(string(text)) os.Exit(0) } diff --git a/app/daemon.go b/app/daemon.go index 92f83dc..37e2260 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -4,7 +4,6 @@ import ( "bufio" "syscall" - // "fmt" "log" "os" "os/exec" @@ -73,64 +72,79 @@ func (f *Filter) sendActions(match string, at time.Time) { } func (a *Action) exec(match string) { - defer wgActions.Done() + wgActions.Add(1) + go func() { + defer wgActions.Done() - computedCommand := make([]string, 0, len(a.Cmd)) - for _, item := range a.Cmd { - computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match)) - } + computedCommand := make([]string, 0, len(a.Cmd)) + for _, item := range a.Cmd { + computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match)) + } - log.Printf("INFO %s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand) + log.Printf("INFO %s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand) - cmd := exec.Command(computedCommand[0], computedCommand[1:]...) + cmd := exec.Command(computedCommand[0], computedCommand[1:]...) - if ret := cmd.Run(); ret != nil { - log.Printf("ERROR %s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret) - } + if ret := cmd.Run(); ret != nil { + log.Printf("ERROR %s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret) + } + }() } func ActionsManager() { pendingActionsC := make(chan PAT) - var ( - pat PAT - action *Action - match string - then time.Time - now time.Time - ) for { select { - case pat = <-actionsC: - match = pat.p - action = pat.a - then = pat.t - now = time.Now() - // check + case pat := <-actionsC: + pa := PA{pat.p, pat.a} + pattern, action, then := pat.p, pat.a, pat.t + now := time.Now() + // check if must be executed now if then.Compare(now) <= 0 { - wgActions.Add(1) - go action.exec(match) + action.exec(pattern) } else { actionsLock.Lock() - actions[PAT{match, action, then}] = struct{}{} + if actions[pa] == nil { + actions[pa] = make(map[time.Time]struct{}) + } + actions[PA{pattern, action}][then] = struct{}{} actionsLock.Unlock() - go func(pat PAT, now time.Time) { - time.Sleep(pat.t.Sub(now)) - pendingActionsC <- pat + go func(insidePat PAT, insideNow time.Time) { + time.Sleep(insidePat.t.Sub(insideNow)) + pendingActionsC <- insidePat }(pat, now) } - case pat = <-pendingActionsC: - match, action, then = pat.p, pat.a, pat.t + case pat := <-pendingActionsC: + pa := PA{pat.p, pat.a} + pattern, action, then := pat.p, pat.a, pat.t actionsLock.Lock() - delete(actions, PAT{match, action, then}) + if actions[pa] != nil { + if _, ok := actions[pa][then]; ok { + delete(actions[pa], then) + action.exec(pattern) + } + } actionsLock.Unlock() - wgActions.Add(1) - go action.exec(match) + action.exec(pattern) + case fo := <-flushToActionsC: + ret := make(map[*Action]int) + actionsLock.Lock() + for pa := range actions { + if pa.p == fo.p { + for range actions[pa] { + pa.a.exec(pa.p) + } + ret[pa.a] = len(actions[pa]) + delete(actions, pa) + } + } + actionsLock.Unlock() + fo.ret <- ret case _, _ = <-stopActions: actionsLock.Lock() for pat := range actions { if pat.a.OnExit { - wgActions.Add(1) - go action.exec(match) + pat.a.exec(pat.p) } } actionsLock.Unlock() @@ -141,44 +155,60 @@ func ActionsManager() { } func MatchesManager() { - var pf PF + var fo FlushMatchOrder var pft PFT end := false for !end { select { - case pf = <-flushToMatchesC: - delete(matches, pf) - case pft, ok := <-startupMatchesC: + case fo = <-flushToMatchesC: + matchesManagerHandleFlush(fo) + case fo, ok := <-startupMatchesC: if !ok { end = true } else { - _ = matchesManagerHandleMatch(pft) + _ = matchesManagerHandleMatch(fo) } } } for { select { - case pf = <-flushToMatchesC: - matchesLock.Lock() - delete(matches, pf) - matchesLock.Unlock() + case fo = <-flushToMatchesC: + matchesManagerHandleFlush(fo) case pft = <-matchesC: entry := LogEntry{pft.t, pft.p, pft.f.stream.name, pft.f.name, false} - matchesLock.Lock() entry.Exec = matchesManagerHandleMatch(pft) - matchesLock.Unlock() logsC <- entry } } } +func matchesManagerHandleFlush(fo FlushMatchOrder) { + ret := make(map[*Filter]int) + matchesLock.Lock() + for pf := range matches { + if fo.p == pf.p { + if fo.ret != nil { + ret[pf.f] = len(matches[pf]) + } + delete(matches, pf) + } + } + matchesLock.Unlock() + if fo.ret != nil { + fo.ret <- ret + } +} + func matchesManagerHandleMatch(pft PFT) bool { - filter, match, then := pft.f, pft.p, pft.t + matchesLock.Lock() + defer matchesLock.Unlock() + + filter, pattern, then := pft.f, pft.p, pft.t pf := PF{pft.p, pft.f} if filter.Retry > 1 { @@ -190,7 +220,7 @@ func matchesManagerHandleMatch(pft PFT) bool { matches[pf][then] = struct{}{} // remove match when expired go func(pf PF, then time.Time) { - time.Sleep(filter.retryDuration) + time.Sleep(then.Sub(time.Now()) + filter.retryDuration) matchesLock.Lock() if matches[pf] != nil { // FIXME replace this and all similar occurences @@ -203,7 +233,7 @@ func matchesManagerHandleMatch(pft PFT) bool { if filter.Retry <= 1 || len(matches[pf]) >= filter.Retry { delete(matches, pf) - filter.sendActions(match, then) + filter.sendActions(pattern, then) return true } return false @@ -244,7 +274,7 @@ var wgActions sync.WaitGroup var wgStreams sync.WaitGroup /* - + ↓ StreamManager onstartup:matches ↓ ↓ ↑ @@ -254,7 +284,7 @@ StreamManager onstartup:matches ↑ ↑ ↑ SocketManager →flushes→→→→→→→→→→·→→→→→→→→→→→→→→→→· ↑ - + */ // DatabaseManager → MatchesManager @@ -270,10 +300,10 @@ var logsC chan LogEntry var actionsC chan PAT // SocketManager, DatabaseManager → MatchesManager -var flushToMatchesC chan PF +var flushToMatchesC chan FlushMatchOrder // SocketManager → ActionsManager -var flushToActionsC chan PF +var flushToActionsC chan FlushActionOrder // SocketManager → DatabaseManager var flushToDatabaseC chan LogEntry @@ -285,8 +315,8 @@ func Daemon(confFilename string) { matchesC = make(chan PFT) logsC = make(chan LogEntry) actionsC = make(chan PAT) - flushToMatchesC = make(chan PF) - flushToActionsC = make(chan PF) + flushToMatchesC = make(chan FlushMatchOrder) + flushToActionsC = make(chan FlushActionOrder) flushToDatabaseC = make(chan LogEntry) stopActions = make(chan bool) stopStreams = make(chan bool) @@ -310,7 +340,7 @@ func Daemon(confFilename string) { go StreamManager(stream, endSignals) } - go SocketManager() + go SocketManager(conf.Streams) for { select { diff --git a/app/persist.go b/app/persist.go index ad0a990..3532f74 100644 --- a/app/persist.go +++ b/app/persist.go @@ -178,15 +178,12 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E continue } - // check if it hasn't been flushed, only for Exec:true for now - if entry.Exec { - lastGlobalFlush := flushes[PSF{entry.Pattern, "", ""}].Unix() - lastLocalFlush := flushes[PSF{entry.Pattern, entry.Stream, entry.Filter}].Unix() - entryTime := entry.T.Unix() - - if lastLocalFlush > entryTime || lastGlobalFlush > entryTime { - continue - } + // check if it hasn't been flushed + lastGlobalFlush := flushes[PSF{entry.Pattern, "", ""}].Unix() + lastLocalFlush := flushes[PSF{entry.Pattern, entry.Stream, entry.Filter}].Unix() + entryTime := entry.T.Unix() + if lastLocalFlush > entryTime || lastGlobalFlush > entryTime { + continue } // store matches @@ -201,7 +198,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E // replay executions if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() { if startup { - flushToMatchesC <- PF{entry.Pattern, filter} + flushToMatchesC <- FlushMatchOrder{entry.Pattern, nil} filter.sendActions(entry.Pattern, entry.T) } diff --git a/app/pipe.go b/app/pipe.go index 9565188..6c2eac5 100644 --- a/app/pipe.go +++ b/app/pipe.go @@ -29,8 +29,8 @@ func genClientStatus() ClientStatus { actionsLock.Lock() // Painful data manipulation - for pat := range actions { - pattern, action, then := pat.p, pat.a, pat.t + for pa := range actions { + pattern, action := pa.p, pa.a if cs[action.filter.stream.name] == nil { cs[action.filter.stream.name] = make(map[string]MapPatternStatus) } @@ -44,12 +44,39 @@ func genClientStatus() ClientStatus { if ps.Actions == nil { ps.Actions = make(map[string][]string) } - ps.Actions[action.name] = append(ps.Actions[action.name], then.Format(time.DateTime)) + for then := range actions[pa] { + ps.Actions[action.name] = append(ps.Actions[action.name], then.Format(time.DateTime)) + } } actionsLock.Unlock() return cs } +func genFlushedMatches(og map[*Filter]int) map[string]map[string]int { + ret := make(map[string]map[string]int) + for filter, nb := range og { + if ret[filter.stream.name] == nil { + ret[filter.stream.name] = make(map[string]int) + } + ret[filter.stream.name][filter.name] = nb + } + return ret +} + +func genFlushedActions(og map[*Action]int) map[string]map[string]map[string]int { + ret := make(map[string]map[string]map[string]int) + for action, nb := range og { + if ret[action.filter.stream.name] == nil { + ret[action.filter.stream.name] = make(map[string]map[string]int) + } + if ret[action.filter.stream.name][action.filter.name] == nil { + ret[action.filter.stream.name][action.filter.name] = make(map[string]int) + } + ret[action.filter.stream.name][action.filter.name][action.name] = nb + } + return ret +} + func createOpenSocket() net.Listener { err := os.MkdirAll(path.Dir(*SocketPath), 0755) if err != nil { @@ -71,7 +98,7 @@ func createOpenSocket() net.Listener { } // Handle connections -func SocketManager() { +func SocketManager(streams map[string]*Stream) { ln := createOpenSocket() defer ln.Close() for { @@ -95,8 +122,15 @@ func SocketManager() { case Show: response.ClientStatus = genClientStatus() case Flush: - // FIXME reimplement flush - response.Number = 0 + le := LogEntry{time.Now(), request.Pattern, "", "", false} + matches := FlushMatchOrder{request.Pattern, make(chan map[*Filter]int)} + actions := FlushActionOrder{request.Pattern, make(chan map[*Action]int)} + flushToMatchesC <- matches + flushToActionsC <- actions + flushToDatabaseC <- le + + response.FlushedMatches = genFlushedMatches(<-matches.ret) + response.FlushedActions = genFlushedActions(<-actions.ret) default: log.Println("ERROR Invalid Message from cli: unrecognised Request type") return diff --git a/app/types.go b/app/types.go index f1a5af4..219ca0b 100644 --- a/app/types.go +++ b/app/types.go @@ -76,7 +76,7 @@ type WriteDB struct { } type MatchesMap map[PF]map[time.Time]struct{} -type ActionsMap map[PAT]struct{} +type ActionsMap map[PA]map[time.Time]struct{} // Helper structs made to carry information type SF struct{ s, f string } @@ -99,3 +99,12 @@ type PAT struct { a *Action t time.Time } + +type FlushMatchOrder struct { + p string + ret chan map[*Filter]int +} +type FlushActionOrder struct { + p string + ret chan map[*Action]int +}