From 95f90e8fce93635a037ddcec08a542a486525755 Mon Sep 17 00:00:00 2001 From: ppom <> Date: Sat, 25 Mar 2023 18:04:44 +0100 Subject: [PATCH] Filters now run in own goroutine. Not run by Streams anymore. --- conf.go | 5 ++- main.go => reaction.go | 84 +++++++++++++++++++++++++++++++++--------- reaction.yml | 2 +- 3 files changed, 70 insertions(+), 21 deletions(-) rename main.go => reaction.go (56%) diff --git a/conf.go b/conf.go index d9224d1..59372b1 100644 --- a/conf.go +++ b/conf.go @@ -1,7 +1,6 @@ -package main +package reaction import ( - // "flag" "fmt" "log" "os" @@ -37,6 +36,8 @@ type Filter struct { retryDuration time.Duration Actions map[string]*Action + + matches map[string][]time.Time } type Action struct { diff --git a/main.go b/reaction.go similarity index 56% rename from main.go rename to reaction.go index 5190a30..2836ede 100644 --- a/main.go +++ b/reaction.go @@ -1,18 +1,20 @@ -package main +package reaction import ( "bufio" "flag" + // "fmt" "log" "os" "os/exec" "strings" + "sync" "time" ) // Executes a command and channel-send its stdout -func cmdStdout(commandline []string) chan string { - lines := make(chan string) +func cmdStdout(commandline []string) chan *string { + lines := make(chan *string) go func() { cmd := exec.Command(commandline[0], commandline[1:]...) @@ -27,7 +29,8 @@ func cmdStdout(commandline []string) chan string { scanner := bufio.NewScanner(stdout) for scanner.Scan() { - lines <- scanner.Text() + line := scanner.Text() + lines <- &line } close(lines) }() @@ -36,14 +39,14 @@ func cmdStdout(commandline []string) chan string { } // Whether one of the filter's regexes is matched on a line -func (f *Filter) match(line string) string { +func (f *Filter) match(line *string) string { for _, regex := range f.compiledRegex { - if matches := regex.FindStringSubmatch(line); matches != nil { + if matches := regex.FindStringSubmatch(*line); matches != nil { match := matches[regex.SubexpIndex(f.patternName)] - log.Printf("INFO %s.%s: match `%v`\n", f.stream.name, f.name, match) + log.Printf("INFO %s.%s: match [%v]\n", f.stream.name, f.name, match) return match } } @@ -52,11 +55,13 @@ func (f *Filter) match(line string) string { func (f *Filter) execActions(match string) { for _, a := range f.Actions { + wgActions.Add(1) go a.exec(match) } } func (a *Action) exec(match string) { + defer wgActions.Done() if a.afterDuration != 0 { time.Sleep(a.afterDuration) } @@ -75,21 +80,55 @@ func (a *Action) exec(match string) { } } -func (s *Stream) handle() { +func (f *Filter) handle() chan *string { + lines := make(chan *string) + + go func() { + for line := range lines { + if match := f.match(line); match != "" { + f.execActions(match) + } + } + }() + + return lines +} + +func multiplex(input chan *string, outputs []chan *string) { + var wg sync.WaitGroup + for item := range input { + for _, output := range outputs { + wg.Add(1) + go func(s *string) { + output <- s + wg.Done() + }(item) + } + } + for _, output := range outputs { + wg.Wait() + close(output) + } +} + +func (s *Stream) handle(signal chan *Stream) { log.Printf("INFO %s: start %s\n", s.name, s.Cmd) lines := cmdStdout(s.Cmd) - for line := range lines { - for _, filter := range s.Filters { - if match := filter.match(line); match != "" { - filter.execActions(match) - } - } + var filterInputs = make([]chan *string, 0, len(s.Filters)) + for _, filter := range s.Filters { + filterInputs = append(filterInputs, filter.handle()) } + + multiplex(lines, filterInputs) + + signal <- s } -func main() { +var wgActions sync.WaitGroup + +func Main() { confFilename := flag.String("c", "", "configuration file. see an example at https://framagit.org/ppom/reaction/-/blob/main/reaction.yml") flag.Parse() @@ -100,9 +139,18 @@ func main() { conf := parseConf(*confFilename) + endSignals := make(chan *Stream) + for _, stream := range conf.Streams { - go stream.handle() + go stream.handle(endSignals) } - // Infinite wait - <-make(chan bool) + + for i := 0; i < len(conf.Streams); i++ { + finishedStream := <-endSignals + log.Printf("ERR %s stream finished", finishedStream.name) + } + + wgActions.Wait() + + os.Exit(3) } diff --git a/reaction.yml b/reaction.yml index 742d3b1..b076840 100644 --- a/reaction.yml +++ b/reaction.yml @@ -8,7 +8,7 @@ patterns: streams: tailDown: - cmd: [ "tail", "-f", "/home/ao/DOWN" ] + cmd: [ "tail", "/home/ao/DOWN" ] filters: findIP: regex: