From 00f1647aa651b9d4b5df4562dda57ea21850c706 Mon Sep 17 00:00:00 2001 From: yo Date: Sat, 24 Feb 2024 11:01:50 +0100 Subject: [PATCH] Hanle multi-pattern match on a single line of log --- app/client.go | 6 ++-- app/daemon.go | 77 +++++++++++++++++++++++++++++++------------------- app/main.go | 4 ++- app/persist.go | 8 +++--- app/pipe.go | 9 +++--- app/startup.go | 27 ++++++------------ app/types.go | 30 ++++++++++++-------- go.mod | 1 + go.sum | 2 ++ 9 files changed, 93 insertions(+), 71 deletions(-) diff --git a/app/client.go b/app/client.go index 8be7bdb..01ece26 100644 --- a/app/client.go +++ b/app/client.go @@ -20,7 +20,7 @@ const ( type Request struct { Request int - Pattern string + Pattern []string } type Response struct { @@ -85,7 +85,7 @@ func usage(err string) { } func ClientShow(format, stream, filter string, regex *regexp.Regexp) { - response := SendAndRetrieve(Request{Show, ""}) + response := SendAndRetrieve(Request{Show, []string{""}}) if response.Err != nil { logger.Fatalln("Received error from daemon:", response.Err) } @@ -166,7 +166,7 @@ func ClientShow(format, stream, filter string, regex *regexp.Regexp) { os.Exit(0) } -func ClientFlush(pattern, streamfilter, format string) { +func ClientFlush(pattern []string, streamfilter, format string) { response := SendAndRetrieve(Request{Flush, pattern}) if response.Err != nil { logger.Fatalln("Received error from daemon:", response.Err) diff --git a/app/daemon.go b/app/daemon.go index f258a66..7ef4225 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -13,6 +13,16 @@ import ( "framagit.org/ppom/reaction/logger" ) +// Compare content and ordering. Case sensitive. +func IsStringArrayEqual(one, two []string) bool { + for i, a := range one { + if a != two[i] { + return false + } + } + return true +} + // Executes a command and channel-send its stdout func cmdStdout(commandline []string) chan *string { lines := make(chan *string) @@ -77,44 +87,53 @@ func (p *Pattern) notAnIgnore(match *string) bool { } // Whether one of the filter's regexes is matched on a line -func (f *Filter) match(line *string) string { +func (f *Filter) match(line *string) []string { + var result []string for _, regex := range f.compiledRegex { if matches := regex.FindStringSubmatch(*line); matches != nil { + var pnames []string + for _, p := range f.pattern { + pnames = append(pnames, p.name) + } - if f.pattern != nil { - match := matches[regex.SubexpIndex(f.pattern.name)] - - if f.pattern.notAnIgnore(&match) { + for _, p := range f.pattern { + match := matches[regex.SubexpIndex(p.name)] + if p.notAnIgnore(&match) { logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match) - return match + result = append(result, match) } - } else { - logger.Printf(logger.INFO, "%s.%s: match [.]\n", f.stream.name, f.name) + } + if f.pattern == nil { // No pattern, so this match will never actually be used - return "." + return []string{"."} } } } - return "" + return result } -func (f *Filter) sendActions(match string, at time.Time) { +func (f *Filter) sendActions(match []string, at time.Time) { for _, a := range f.Actions { actionsC <- PAT{match, a, at.Add(a.afterDuration)} } } -func (a *Action) exec(match string) { +func (a *Action) exec(match []string) { defer wgActions.Done() var computedCommand []string + var cmdItem string if a.filter.pattern != nil { computedCommand = make([]string, 0, len(a.Cmd)) for _, item := range a.Cmd { - computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match)) + cmdItem = strings.Clone(item) + for i, p := range a.filter.pattern { + cmdItem = strings.ReplaceAll(cmdItem, p.nameWithBraces, match[i]) + } + computedCommand = append(computedCommand, cmdItem) } } else { computedCommand = a.Cmd @@ -153,7 +172,7 @@ func ActionsManager(concurrency int) { } }() } - execAction := func(a *Action, p string) { + execAction := func(a *Action, p []string) { wgActions.Add(1) execActionsC <- PA{p, a} } @@ -171,10 +190,10 @@ func ActionsManager(concurrency int) { execAction(action, pattern) } else { actionsLock.Lock() - if actions[pa] == nil { - actions[pa] = make(map[time.Time]struct{}) + if actions[&pa] == nil { + actions[&pa] = make(map[time.Time]struct{}) } - actions[pa][then] = struct{}{} + actions[&pa][then] = struct{}{} actionsLock.Unlock() go func(insidePat PAT, insideNow time.Time) { time.Sleep(insidePat.t.Sub(insideNow)) @@ -185,8 +204,8 @@ func ActionsManager(concurrency int) { pa := PA{pat.p, pat.a} pattern, action, then := pat.p, pat.a, pat.t actionsLock.Lock() - if actions[pa] != nil { - delete(actions[pa], then) + if actions[&pa] != nil { + delete(actions[&pa], then) } actionsLock.Unlock() execAction(action, pattern) @@ -194,7 +213,7 @@ func ActionsManager(concurrency int) { ret := make(ActionsMap) actionsLock.Lock() for pa := range actions { - if pa.p == fo.p { + if IsStringArrayEqual(pa.p, fo.p) { for range actions[pa] { execAction(pa.a, pa.p) } @@ -257,7 +276,7 @@ func matchesManagerHandleFlush(fo FlushMatchOrder) { ret := make(MatchesMap) matchesLock.Lock() for pf := range matches { - if fo.p == pf.p { + if IsStringArrayEqual(fo.p, pf.p) { if fo.ret != nil { ret[pf] = matches[pf] } @@ -279,26 +298,26 @@ func matchesManagerHandleMatch(pft PFT) bool { if filter.Retry > 1 { // make sure map exists - if matches[pf] == nil { - matches[pf] = make(map[time.Time]struct{}) + if matches[&pf] == nil { + matches[&pf] = make(map[time.Time]struct{}) } // add new match - matches[pf][then] = struct{}{} + matches[&pf][then] = struct{}{} // remove match when expired go func(pf PF, then time.Time) { time.Sleep(then.Sub(time.Now()) + filter.retryDuration) matchesLock.Lock() - if matches[pf] != nil { + if matches[&pf] != nil { // FIXME replace this and all similar occurences // by clear() when switching to go 1.21 - delete(matches[pf], then) + delete(matches[&pf], then) } matchesLock.Unlock() }(pf, then) } - if filter.Retry <= 1 || len(matches[pf]) >= filter.Retry { - delete(matches, pf) + if filter.Retry <= 1 || len(matches[&pf]) >= filter.Retry { + delete(matches, &pf) filter.sendActions(pattern, then) return true } @@ -318,7 +337,7 @@ func StreamManager(s *Stream, endedSignal chan *Stream) { return } for _, filter := range s.Filters { - if match := filter.match(line); match != "" { + if match := filter.match(line); len(match) > 0 { matchesC <- PFT{match, filter, time.Now()} } } diff --git a/app/main.go b/app/main.go index 6c262f9..ae9fc4b 100644 --- a/app/main.go +++ b/app/main.go @@ -103,6 +103,8 @@ func basicUsage() { # remove currently active matches and run currently pending actions for the specified TARGET # (then show flushed matches and actions) # e.g. reaction flush 192.168.1.1 + # Concatenate patterns with " / " if several patterns in TARGET + # e.g. reaction flush "192.168.1.1 / root" # options: -s/--socket SOCKET # path to the client-daemon communication socket @@ -209,7 +211,7 @@ func Main(version, commit string) { logger.Fatalln("for now, -l/--limit is not supported") os.Exit(1) } - ClientFlush(f.Arg(0), *limit, *queryFormat) + ClientFlush(strings.Split(f.Arg(0), " / "), *limit, *queryFormat) case "test-regex": // socket not needed, no interaction with the daemon diff --git a/app/persist.go b/app/persist.go index 1e82a66..9b4f3c4 100644 --- a/app/persist.go +++ b/app/persist.go @@ -134,7 +134,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E }() // pattern, stream, fitler → last flush - flushes := make(map[PSF]time.Time) + flushes := make(map[*PSF]time.Time) for { var entry LogEntry var filter *Filter @@ -160,7 +160,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E } // store - flushes[PSF{entry.Pattern, entry.Stream, entry.Filter}] = entry.T + flushes[&PSF{entry.Pattern, entry.Stream, entry.Filter}] = entry.T } lastTimeCpt := int64(0) @@ -201,8 +201,8 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E } // check if it hasn't been flushed - lastGlobalFlush := flushes[PSF{entry.Pattern, "", ""}].Unix() - lastLocalFlush := flushes[PSF{entry.Pattern, entry.Stream, entry.Filter}].Unix() + lastGlobalFlush := flushes[&PSF{entry.Pattern, "", ""}].Unix() + lastLocalFlush := flushes[&PSF{entry.Pattern, entry.Stream, entry.Filter}].Unix() entryTime := entry.T.Unix() if lastLocalFlush > entryTime || lastGlobalFlush > entryTime { continue diff --git a/app/pipe.go b/app/pipe.go index c834bb6..08295f6 100644 --- a/app/pipe.go +++ b/app/pipe.go @@ -7,6 +7,7 @@ import ( "path" "sync" "time" + "strings" "framagit.org/ppom/reaction/logger" ) @@ -24,7 +25,7 @@ func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_a if cs[filter.stream.name][filter.name] == nil { cs[filter.stream.name][filter.name] = make(MapPatternStatus) } - cs[filter.stream.name][filter.name][pattern] = &PatternStatus{len(times), nil} + cs[filter.stream.name][filter.name][strings.Join(pattern, " / ")] = &PatternStatus{len(times), nil} } local_matchesLock.Unlock() @@ -39,10 +40,10 @@ func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_a if cs[action.filter.stream.name][action.filter.name] == nil { cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus) } - if cs[action.filter.stream.name][action.filter.name][pattern] == nil { - cs[action.filter.stream.name][action.filter.name][pattern] = new(PatternStatus) + if cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] == nil { + cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] = new(PatternStatus) } - ps := cs[action.filter.stream.name][action.filter.name][pattern] + ps := cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] if ps.Actions == nil { ps.Actions = make(map[string][]string) } diff --git a/app/startup.go b/app/startup.go index beeac0b..9e002c8 100644 --- a/app/startup.go +++ b/app/startup.go @@ -13,6 +13,7 @@ import ( "framagit.org/ppom/reaction/logger" "github.com/google/go-jsonnet" + "golang.org/x/exp/slices" ) func (c *Conf) setup() { @@ -74,17 +75,17 @@ func (c *Conf) setup() { filter.name = filterName if strings.Contains(filter.name, ".") { - logger.Fatalf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name) + logger.Fatalf(fmt.Sprintf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name)) } // Parse Duration if filter.RetryPeriod == "" { if filter.Retry > 1 { - logger.Fatalf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name) + logger.Fatalf(fmt.Sprintf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name)) } } else { retryDuration, err := time.ParseDuration(filter.RetryPeriod) if err != nil { - logger.Fatalf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err) + logger.Fatalf(fmt.Sprintf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err)) } filter.retryDuration = retryDuration } @@ -95,27 +96,17 @@ func (c *Conf) setup() { // Compute Regexes // Look for Patterns inside Regexes for _, regex := range filter.Regex { - for patternName, pattern := range c.Patterns { + for _, pattern := range c.Patterns { if strings.Contains(regex, pattern.nameWithBraces) { - - if filter.pattern == nil { - filter.pattern = pattern - } else if filter.pattern == pattern { - // no op - } else { - logger.Fatalf( - "Bad configuration: Can't mix different patterns (%s, %s) in same filter (%s.%s)\n", - filter.pattern.name, patternName, streamName, filterName, - ) + if !slices.Contains(filter.pattern, pattern) { + filter.pattern = append(filter.pattern, pattern) } - - // FIXME should go in the `if filter.pattern == nil`? regex = strings.Replace(regex, pattern.nameWithBraces, pattern.Regex, 1) } } compiledRegex, err := regexp.Compile(regex) if err != nil { - log.Fatalf("%vBad configuration: regex of filter %s.%s: %v", logger.FATAL, stream.name, filter.name, err) + log.Fatal(fmt.Sprintf("Bad configuration: regex of filter %s.%s: %v", stream.name, filter.name, err)) } filter.compiledRegex = append(filter.compiledRegex, *compiledRegex) } @@ -125,7 +116,7 @@ func (c *Conf) setup() { } for actionName := range filter.Actions { - action := filter.Actions[actionName] + action := filter.Actions[actionName] action.filter = filter action.name = actionName diff --git a/app/types.go b/app/types.go index 62072fb..caa7a5e 100644 --- a/app/types.go +++ b/app/types.go @@ -42,7 +42,7 @@ type Filter struct { Regex []string `json:"regex"` compiledRegex []regexp.Regexp `json:"-"` - pattern *Pattern `json:"-"` + pattern []*Pattern `json:"-"` Retry int `json:"retry"` RetryPeriod string `json:"retryperiod"` @@ -67,7 +67,7 @@ type Action struct { type LogEntry struct { T time.Time S int64 - Pattern string + Pattern []string Stream, Filter string SF int Exec bool @@ -82,37 +82,43 @@ type WriteDB struct { file *os.File enc *gob.Encoder } - -type MatchesMap map[PF]map[time.Time]struct{} -type ActionsMap map[PA]map[time.Time]struct{} +// https://stackoverflow.com/a/69691894 +type MatchesMap map[*PF]map[time.Time]struct{} +type ActionsMap map[*PA]map[time.Time]struct{} // Helper structs made to carry information +// Stream, Filter type SF struct{ s, f string } -type PSF struct{ p, s, f string } +// Pattern, Stream, Filter +type PSF struct{ + p []string + s string + f string +} type PF struct { - p string + p []string f *Filter } type PFT struct { - p string + p []string f *Filter t time.Time } type PA struct { - p string + p []string a *Action } type PAT struct { - p string + p []string a *Action t time.Time } type FlushMatchOrder struct { - p string + p []string ret chan MatchesMap } type FlushActionOrder struct { - p string + p []string ret chan ActionsMap } diff --git a/go.mod b/go.mod index f912c1d..3ce7a8e 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/google/go-jsonnet v0.20.0 + golang.org/x/exp v0.0.0-20240213143201-ec583247a57a sigs.k8s.io/yaml v1.1.0 ) diff --git a/go.sum b/go.sum index fe8df87..f49911f 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-jsonnet v0.20.0 h1:WG4TTSARuV7bSm4PMB4ohjxe33IHT5WVTrJSU33uT4g= github.com/google/go-jsonnet v0.20.0/go.mod h1:VbgWF9JX7ztlv770x/TolZNGGFfiHEVx9G6ca2eUmeA= +golang.org/x/exp v0.0.0-20240213143201-ec583247a57a h1:HinSgX1tJRX3KsL//Gxynpw5CTOAIPhgL4W8PNiIpVE= +golang.org/x/exp v0.0.0-20240213143201-ec583247a57a/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=