Handle multi-pattern match on one line

This commit is contained in:
yo 2024-02-24 11:42:31 +01:00
parent 0b4030905b
commit e9461bb39b
9 changed files with 93 additions and 71 deletions

View File

@ -20,7 +20,7 @@ const (
type Request struct {
Request int
Pattern string
Pattern []string
}
type Response struct {
@ -85,7 +85,7 @@ func usage(err string) {
}
func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
response := SendAndRetrieve(Request{Show, ""})
response := SendAndRetrieve(Request{Show, []string{""}})
if response.Err != nil {
logger.Fatalln("Received error from daemon:", response.Err)
}
@ -166,7 +166,7 @@ func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
os.Exit(0)
}
func ClientFlush(pattern, streamfilter, format string) {
func ClientFlush(pattern []string, streamfilter, format string) {
response := SendAndRetrieve(Request{Flush, pattern})
if response.Err != nil {
logger.Fatalln("Received error from daemon:", response.Err)

View File

@ -13,6 +13,16 @@ import (
"framagit.org/ppom/reaction/logger"
)
// Compare content and ordering. Case sensitive.
func IsStringArrayEqual(one, two []string) bool {
for i, a := range one {
if a != two[i] {
return false
}
}
return true
}
// Executes a command and channel-send its stdout
func cmdStdout(commandline []string) chan *string {
lines := make(chan *string)
@ -77,44 +87,53 @@ func (p *Pattern) notAnIgnore(match *string) bool {
}
// Whether one of the filter's regexes is matched on a line
func (f *Filter) match(line *string) string {
func (f *Filter) match(line *string) []string {
var result []string
for _, regex := range f.compiledRegex {
if matches := regex.FindStringSubmatch(*line); matches != nil {
var pnames []string
for _, p := range f.pattern {
pnames = append(pnames, p.name)
}
if f.pattern != nil {
match := matches[regex.SubexpIndex(f.pattern.name)]
if f.pattern.notAnIgnore(&match) {
for _, p := range f.pattern {
match := matches[regex.SubexpIndex(p.name)]
if p.notAnIgnore(&match) {
logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match)
return match
result = append(result, match)
}
} else {
logger.Printf(logger.INFO, "%s.%s: match [.]\n", f.stream.name, f.name)
}
if f.pattern == nil {
// No pattern, so this match will never actually be used
return "."
return []string{"."}
}
}
}
return ""
return result
}
func (f *Filter) sendActions(match string, at time.Time) {
func (f *Filter) sendActions(match []string, at time.Time) {
for _, a := range f.Actions {
actionsC <- PAT{match, a, at.Add(a.afterDuration)}
}
}
func (a *Action) exec(match string) {
func (a *Action) exec(match []string) {
defer wgActions.Done()
var computedCommand []string
var cmdItem string
if a.filter.pattern != nil {
computedCommand = make([]string, 0, len(a.Cmd))
for _, item := range a.Cmd {
computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match))
cmdItem = strings.Clone(item)
for i, p := range a.filter.pattern {
cmdItem = strings.ReplaceAll(cmdItem, p.nameWithBraces, match[i])
}
computedCommand = append(computedCommand, cmdItem)
}
} else {
computedCommand = a.Cmd
@ -153,7 +172,7 @@ func ActionsManager(concurrency int) {
}
}()
}
execAction := func(a *Action, p string) {
execAction := func(a *Action, p []string) {
wgActions.Add(1)
execActionsC <- PA{p, a}
}
@ -171,10 +190,10 @@ func ActionsManager(concurrency int) {
execAction(action, pattern)
} else {
actionsLock.Lock()
if actions[pa] == nil {
actions[pa] = make(map[time.Time]struct{})
if actions[&pa] == nil {
actions[&pa] = make(map[time.Time]struct{})
}
actions[pa][then] = struct{}{}
actions[&pa][then] = struct{}{}
actionsLock.Unlock()
go func(insidePat PAT, insideNow time.Time) {
time.Sleep(insidePat.t.Sub(insideNow))
@ -185,8 +204,8 @@ func ActionsManager(concurrency int) {
pa := PA{pat.p, pat.a}
pattern, action, then := pat.p, pat.a, pat.t
actionsLock.Lock()
if actions[pa] != nil {
delete(actions[pa], then)
if actions[&pa] != nil {
delete(actions[&pa], then)
}
actionsLock.Unlock()
execAction(action, pattern)
@ -194,7 +213,7 @@ func ActionsManager(concurrency int) {
ret := make(ActionsMap)
actionsLock.Lock()
for pa := range actions {
if pa.p == fo.p {
if IsStringArrayEqual(pa.p, fo.p) {
for range actions[pa] {
execAction(pa.a, pa.p)
}
@ -257,7 +276,7 @@ func matchesManagerHandleFlush(fo FlushMatchOrder) {
ret := make(MatchesMap)
matchesLock.Lock()
for pf := range matches {
if fo.p == pf.p {
if IsStringArrayEqual(fo.p, pf.p) {
if fo.ret != nil {
ret[pf] = matches[pf]
}
@ -279,26 +298,26 @@ func matchesManagerHandleMatch(pft PFT) bool {
if filter.Retry > 1 {
// make sure map exists
if matches[pf] == nil {
matches[pf] = make(map[time.Time]struct{})
if matches[&pf] == nil {
matches[&pf] = make(map[time.Time]struct{})
}
// add new match
matches[pf][then] = struct{}{}
matches[&pf][then] = struct{}{}
// remove match when expired
go func(pf PF, then time.Time) {
time.Sleep(then.Sub(time.Now()) + filter.retryDuration)
matchesLock.Lock()
if matches[pf] != nil {
if matches[&pf] != nil {
// FIXME replace this and all similar occurences
// by clear() when switching to go 1.21
delete(matches[pf], then)
delete(matches[&pf], then)
}
matchesLock.Unlock()
}(pf, then)
}
if filter.Retry <= 1 || len(matches[pf]) >= filter.Retry {
delete(matches, pf)
if filter.Retry <= 1 || len(matches[&pf]) >= filter.Retry {
delete(matches, &pf)
filter.sendActions(pattern, then)
return true
}
@ -318,7 +337,7 @@ func StreamManager(s *Stream, endedSignal chan *Stream) {
return
}
for _, filter := range s.Filters {
if match := filter.match(line); match != "" {
if match := filter.match(line); len(match) > 0 {
matchesC <- PFT{match, filter, time.Now()}
}
}

View File

@ -103,6 +103,8 @@ func basicUsage() {
# remove currently active matches and run currently pending actions for the specified TARGET
# (then show flushed matches and actions)
# e.g. reaction flush 192.168.1.1
# Concatenate patterns with " / " if several patterns in TARGET
# e.g. reaction flush "192.168.1.1 / root"
# options:
-s/--socket SOCKET # path to the client-daemon communication socket
@ -209,7 +211,7 @@ func Main(version, commit string) {
logger.Fatalln("for now, -l/--limit is not supported")
os.Exit(1)
}
ClientFlush(f.Arg(0), *limit, *queryFormat)
ClientFlush(strings.Split(f.Arg(0), " / "), *limit, *queryFormat)
case "test-regex":
// socket not needed, no interaction with the daemon

View File

@ -134,7 +134,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
}()
// pattern, stream, fitler → last flush
flushes := make(map[PSF]time.Time)
flushes := make(map[*PSF]time.Time)
for {
var entry LogEntry
var filter *Filter
@ -160,7 +160,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
}
// store
flushes[PSF{entry.Pattern, entry.Stream, entry.Filter}] = entry.T
flushes[&PSF{entry.Pattern, entry.Stream, entry.Filter}] = entry.T
}
lastTimeCpt := int64(0)
@ -201,8 +201,8 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
}
// check if it hasn't been flushed
lastGlobalFlush := flushes[PSF{entry.Pattern, "", ""}].Unix()
lastLocalFlush := flushes[PSF{entry.Pattern, entry.Stream, entry.Filter}].Unix()
lastGlobalFlush := flushes[&PSF{entry.Pattern, "", ""}].Unix()
lastLocalFlush := flushes[&PSF{entry.Pattern, entry.Stream, entry.Filter}].Unix()
entryTime := entry.T.Unix()
if lastLocalFlush > entryTime || lastGlobalFlush > entryTime {
continue

View File

@ -7,6 +7,7 @@ import (
"path"
"sync"
"time"
"strings"
"framagit.org/ppom/reaction/logger"
)
@ -24,7 +25,7 @@ func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_a
if cs[filter.stream.name][filter.name] == nil {
cs[filter.stream.name][filter.name] = make(MapPatternStatus)
}
cs[filter.stream.name][filter.name][pattern] = &PatternStatus{len(times), nil}
cs[filter.stream.name][filter.name][strings.Join(pattern, " / ")] = &PatternStatus{len(times), nil}
}
local_matchesLock.Unlock()
@ -39,10 +40,10 @@ func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_a
if cs[action.filter.stream.name][action.filter.name] == nil {
cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus)
}
if cs[action.filter.stream.name][action.filter.name][pattern] == nil {
cs[action.filter.stream.name][action.filter.name][pattern] = new(PatternStatus)
if cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] == nil {
cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] = new(PatternStatus)
}
ps := cs[action.filter.stream.name][action.filter.name][pattern]
ps := cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")]
if ps.Actions == nil {
ps.Actions = make(map[string][]string)
}

View File

@ -13,6 +13,7 @@ import (
"framagit.org/ppom/reaction/logger"
"github.com/google/go-jsonnet"
"golang.org/x/exp/slices"
)
func (c *Conf) setup() {
@ -74,17 +75,17 @@ func (c *Conf) setup() {
filter.name = filterName
if strings.Contains(filter.name, ".") {
logger.Fatalf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name)
logger.Fatalf(fmt.Sprintf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name))
}
// Parse Duration
if filter.RetryPeriod == "" {
if filter.Retry > 1 {
logger.Fatalf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name)
logger.Fatalf(fmt.Sprintf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name))
}
} else {
retryDuration, err := time.ParseDuration(filter.RetryPeriod)
if err != nil {
logger.Fatalf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err)
logger.Fatalf(fmt.Sprintf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err))
}
filter.retryDuration = retryDuration
}
@ -95,27 +96,17 @@ func (c *Conf) setup() {
// Compute Regexes
// Look for Patterns inside Regexes
for _, regex := range filter.Regex {
for patternName, pattern := range c.Patterns {
for _, pattern := range c.Patterns {
if strings.Contains(regex, pattern.nameWithBraces) {
if filter.pattern == nil {
filter.pattern = pattern
} else if filter.pattern == pattern {
// no op
} else {
logger.Fatalf(
"Bad configuration: Can't mix different patterns (%s, %s) in same filter (%s.%s)\n",
filter.pattern.name, patternName, streamName, filterName,
)
if !slices.Contains(filter.pattern, pattern) {
filter.pattern = append(filter.pattern, pattern)
}
// FIXME should go in the `if filter.pattern == nil`?
regex = strings.Replace(regex, pattern.nameWithBraces, pattern.Regex, 1)
}
}
compiledRegex, err := regexp.Compile(regex)
if err != nil {
log.Fatalf("%vBad configuration: regex of filter %s.%s: %v", logger.FATAL, stream.name, filter.name, err)
log.Fatal(fmt.Sprintf("Bad configuration: regex of filter %s.%s: %v", stream.name, filter.name, err))
}
filter.compiledRegex = append(filter.compiledRegex, *compiledRegex)
}
@ -125,7 +116,7 @@ func (c *Conf) setup() {
}
for actionName := range filter.Actions {
action := filter.Actions[actionName]
action := filter.Actions[actionName]
action.filter = filter
action.name = actionName

View File

@ -42,7 +42,7 @@ type Filter struct {
Regex []string `json:"regex"`
compiledRegex []regexp.Regexp `json:"-"`
pattern *Pattern `json:"-"`
pattern []*Pattern `json:"-"`
Retry int `json:"retry"`
RetryPeriod string `json:"retryperiod"`
@ -67,7 +67,7 @@ type Action struct {
type LogEntry struct {
T time.Time
S int64
Pattern string
Pattern []string
Stream, Filter string
SF int
Exec bool
@ -82,37 +82,43 @@ type WriteDB struct {
file *os.File
enc *gob.Encoder
}
type MatchesMap map[PF]map[time.Time]struct{}
type ActionsMap map[PA]map[time.Time]struct{}
// https://stackoverflow.com/a/69691894
type MatchesMap map[*PF]map[time.Time]struct{}
type ActionsMap map[*PA]map[time.Time]struct{}
// Helper structs made to carry information
// Stream, Filter
type SF struct{ s, f string }
type PSF struct{ p, s, f string }
// Pattern, Stream, Filter
type PSF struct{
p []string
s string
f string
}
type PF struct {
p string
p []string
f *Filter
}
type PFT struct {
p string
p []string
f *Filter
t time.Time
}
type PA struct {
p string
p []string
a *Action
}
type PAT struct {
p string
p []string
a *Action
t time.Time
}
type FlushMatchOrder struct {
p string
p []string
ret chan MatchesMap
}
type FlushActionOrder struct {
p string
p []string
ret chan ActionsMap
}

1
go.mod
View File

@ -4,6 +4,7 @@ go 1.20
require (
github.com/google/go-jsonnet v0.20.0
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a
sigs.k8s.io/yaml v1.1.0
)

2
go.sum
View File

@ -1,6 +1,8 @@
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-jsonnet v0.20.0 h1:WG4TTSARuV7bSm4PMB4ohjxe33IHT5WVTrJSU33uT4g=
github.com/google/go-jsonnet v0.20.0/go.mod h1:VbgWF9JX7ztlv770x/TolZNGGFfiHEVx9G6ca2eUmeA=
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a h1:HinSgX1tJRX3KsL//Gxynpw5CTOAIPhgL4W8PNiIpVE=
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=