Filters now run in own goroutine. Not run by Streams anymore.

This commit is contained in:
ppom 2023-03-25 18:04:44 +01:00
parent 487bcb5942
commit 95f90e8fce
3 changed files with 70 additions and 21 deletions

View File

@ -1,7 +1,6 @@
package main package reaction
import ( import (
// "flag"
"fmt" "fmt"
"log" "log"
"os" "os"
@ -37,6 +36,8 @@ type Filter struct {
retryDuration time.Duration retryDuration time.Duration
Actions map[string]*Action Actions map[string]*Action
matches map[string][]time.Time
} }
type Action struct { type Action struct {

View File

@ -1,18 +1,20 @@
package main package reaction
import ( import (
"bufio" "bufio"
"flag" "flag"
// "fmt"
"log" "log"
"os" "os"
"os/exec" "os/exec"
"strings" "strings"
"sync"
"time" "time"
) )
// Executes a command and channel-send its stdout // Executes a command and channel-send its stdout
func cmdStdout(commandline []string) chan string { func cmdStdout(commandline []string) chan *string {
lines := make(chan string) lines := make(chan *string)
go func() { go func() {
cmd := exec.Command(commandline[0], commandline[1:]...) cmd := exec.Command(commandline[0], commandline[1:]...)
@ -27,7 +29,8 @@ func cmdStdout(commandline []string) chan string {
scanner := bufio.NewScanner(stdout) scanner := bufio.NewScanner(stdout)
for scanner.Scan() { for scanner.Scan() {
lines <- scanner.Text() line := scanner.Text()
lines <- &line
} }
close(lines) close(lines)
}() }()
@ -36,14 +39,14 @@ func cmdStdout(commandline []string) chan string {
} }
// Whether one of the filter's regexes is matched on a line // Whether one of the filter's regexes is matched on a line
func (f *Filter) match(line string) string { func (f *Filter) match(line *string) string {
for _, regex := range f.compiledRegex { for _, regex := range f.compiledRegex {
if matches := regex.FindStringSubmatch(line); matches != nil { if matches := regex.FindStringSubmatch(*line); matches != nil {
match := matches[regex.SubexpIndex(f.patternName)] match := matches[regex.SubexpIndex(f.patternName)]
log.Printf("INFO %s.%s: match `%v`\n", f.stream.name, f.name, match) log.Printf("INFO %s.%s: match [%v]\n", f.stream.name, f.name, match)
return match return match
} }
} }
@ -52,11 +55,13 @@ func (f *Filter) match(line string) string {
func (f *Filter) execActions(match string) { func (f *Filter) execActions(match string) {
for _, a := range f.Actions { for _, a := range f.Actions {
wgActions.Add(1)
go a.exec(match) go a.exec(match)
} }
} }
func (a *Action) exec(match string) { func (a *Action) exec(match string) {
defer wgActions.Done()
if a.afterDuration != 0 { if a.afterDuration != 0 {
time.Sleep(a.afterDuration) time.Sleep(a.afterDuration)
} }
@ -75,21 +80,55 @@ func (a *Action) exec(match string) {
} }
} }
func (s *Stream) handle() { func (f *Filter) handle() chan *string {
lines := make(chan *string)
go func() {
for line := range lines {
if match := f.match(line); match != "" {
f.execActions(match)
}
}
}()
return lines
}
func multiplex(input chan *string, outputs []chan *string) {
var wg sync.WaitGroup
for item := range input {
for _, output := range outputs {
wg.Add(1)
go func(s *string) {
output <- s
wg.Done()
}(item)
}
}
for _, output := range outputs {
wg.Wait()
close(output)
}
}
func (s *Stream) handle(signal chan *Stream) {
log.Printf("INFO %s: start %s\n", s.name, s.Cmd) log.Printf("INFO %s: start %s\n", s.name, s.Cmd)
lines := cmdStdout(s.Cmd) lines := cmdStdout(s.Cmd)
for line := range lines { var filterInputs = make([]chan *string, 0, len(s.Filters))
for _, filter := range s.Filters { for _, filter := range s.Filters {
if match := filter.match(line); match != "" { filterInputs = append(filterInputs, filter.handle())
filter.execActions(match)
}
}
} }
multiplex(lines, filterInputs)
signal <- s
} }
func main() { var wgActions sync.WaitGroup
func Main() {
confFilename := flag.String("c", "", "configuration file. see an example at https://framagit.org/ppom/reaction/-/blob/main/reaction.yml") confFilename := flag.String("c", "", "configuration file. see an example at https://framagit.org/ppom/reaction/-/blob/main/reaction.yml")
flag.Parse() flag.Parse()
@ -100,9 +139,18 @@ func main() {
conf := parseConf(*confFilename) conf := parseConf(*confFilename)
endSignals := make(chan *Stream)
for _, stream := range conf.Streams { for _, stream := range conf.Streams {
go stream.handle() go stream.handle(endSignals)
} }
// Infinite wait
<-make(chan bool) for i := 0; i < len(conf.Streams); i++ {
finishedStream := <-endSignals
log.Printf("ERR %s stream finished", finishedStream.name)
}
wgActions.Wait()
os.Exit(3)
} }

View File

@ -8,7 +8,7 @@ patterns:
streams: streams:
tailDown: tailDown:
cmd: [ "tail", "-f", "/home/ao/DOWN" ] cmd: [ "tail", "/home/ao/DOWN" ]
filters: filters:
findIP: findIP:
regex: regex: