From 94d023e78c8dcc52c8d090eccd0d8e8cc1917b05 Mon Sep 17 00:00:00 2001 From: ppom <> Date: Fri, 24 Mar 2023 00:27:51 +0100 Subject: [PATCH] Close #1 #4 --- conf.go | 55 +++++++++++++------ main.go | 146 +++++++++++++++++++++++---------------------------- reaction.yml | 31 ++++++++--- 3 files changed, 127 insertions(+), 105 deletions(-) diff --git a/conf.go b/conf.go index 4293a31..78e97eb 100644 --- a/conf.go +++ b/conf.go @@ -2,23 +2,49 @@ package main import ( // "flag" + "fmt" "log" "os" + "regexp" "gopkg.in/yaml.v3" ) type Conf struct { - // Definitions []string - Streams []struct { - Cmd string - Filters []struct { - Regex []string - Retry uint - RetryPeriod string `yaml:"retry-period"` - Actions []struct { - Cmd string - After string `yaml:",omitempty"` + Streams map[string]Stream +} + +type Stream struct { + Cmd []string + Filters map[string]*Filter +} + +type Filter struct { + Regex []string + compiledRegex []regexp.Regexp + Retry uint + RetryPeriod string `yaml:"retry-period"` + Actions map[string]*Action +} + +type Action struct { + name, filterName, streamName string + Cmd []string + After string `yaml:",omitempty"` +} + +func (c *Conf) setup() { + for streamName, stream := range c.Streams { + for filterName, filter := range stream.Filters { + // Compute Regexes + for _, regex := range filter.Regex { + filter.compiledRegex = append(filter.compiledRegex, *regexp.MustCompile(regex)) + } + // Give all relevant infos to Actions + for actionName, action := range filter.Actions { + action.name = actionName + action.filterName = filterName + action.streamName = streamName } } } @@ -37,13 +63,10 @@ func parseConf(filename string) *Conf { if err != nil { log.Fatalln("Failed to parse configuration file:", err) } - log.Println(conf) - yaml, err := yaml.Marshal(conf) - if err != nil { - log.Fatalln("Failed to rewrite configuration file:", err) - } - log.Println(string(yaml)) + conf.setup() + fmt.Printf("conf.Streams[0].Filters[0].Actions: %s\n", conf.Streams["tailDown"].Filters["lookForProuts"].Actions) + return &conf } diff --git a/main.go b/main.go index db19ee9..bd881b0 100644 --- a/main.go +++ b/main.go @@ -4,95 +4,79 @@ import ( "bufio" "log" "os/exec" - "regexp" ) -type Action struct { - regex, cmd []string -} +func cmdStdout(commandline []string) chan string { + lines := make(chan string) -type compiledAction struct { - regex []regexp.Regexp - cmd []string -} - -type Stream struct { - cmd []string - actions []Action -} - -func compileAction(action Action) compiledAction { - var ca compiledAction - ca.cmd = action.cmd - for _, regex := range action.regex { - ca.regex = append(ca.regex, *regexp.MustCompile(regex)) - } - return ca -} - -// Handle a log command -// Must be started in a goroutine -func streamHandle(stream Stream, execQueue chan []string) { - log.Printf("streamHandle{%v}: start\n", stream.cmd) - cmd := exec.Command(stream.cmd[0], stream.cmd[1:]...) - stdout, err := cmd.StdoutPipe() - if err != nil { - log.Fatal("couldn't open stdout on command:", err) - } - if err := cmd.Start(); err != nil { - log.Fatal("couldn't start command:", err) - } - defer stdout.Close() - - compiledActions := make([]compiledAction, 0, len(stream.actions)) - for _, action := range stream.actions { - compiledActions = append(compiledActions, compileAction(action)) - } - - scanner := bufio.NewScanner(stdout) - for scanner.Scan() { - line := scanner.Text() - for _, action := range compiledActions { - for _, regex := range action.regex { - if match := regex.FindString(line); match != "" { - log.Printf("match `%v` in line: `%v`\n", regex.String(), line) - execQueue <- action.cmd - } - } - } - } -} - -func execQueue() chan []string { - queue := make(chan []string) go func() { - for { - command := <-queue - cmd := exec.Command(command[0], command[1:]...) - if ret := cmd.Run(); ret != nil { - log.Printf("Error launching `%v`: code %v\n", cmd, ret) + cmd := exec.Command(commandline[0], commandline[1:]...) + stdout, err := cmd.StdoutPipe() + if err != nil { + log.Fatal("couldn't open stdout on command:", err) + } + if err := cmd.Start(); err != nil { + log.Fatal("couldn't start command:", err) + } + defer stdout.Close() + + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + lines <- scanner.Text() + } + close(lines) + }() + + return lines +} + +func (f *Filter) match(line string) bool { + log.Printf("trying to match line {%s}...\n", line) + for _, regex := range f.compiledRegex { + log.Printf("...on %s\n", regex.String()) + if match := regex.FindString(line); match != "" { + log.Printf("match `%v` in line: `%v`\n", regex.String(), line) + return true + } + } + return false +} + +func (f *Filter) launch(line *string) { + for _, a := range f.Actions { + go a.launch(line) + } +} + +func (a *Action) launch(line *string) { + log.Printf("INFO %s.%s.%s: line {%s} → run {%s}\n", a.streamName, a.filterName, a.name, *line, a.Cmd) + + cmd := exec.Command(a.Cmd[0], a.Cmd[1:]...) + if ret := cmd.Run(); ret != nil { + log.Printf("ERR %s.%s.%s: line {%s} → run %s, code {%s}\n", a.streamName, a.filterName, a.name, *line, a.Cmd, ret) + } +} + +func (s *Stream) handle() { + log.Printf("streamHandle{%v}: start\n", s.Cmd) + + lines := cmdStdout(s.Cmd) + + for line := range lines { + for _, filter := range s.Filters { + if filter.match(line) { + filter.launch(&line) } } - }() - return queue + } } func main() { conf := parseConf("./reaction.yml") - conf = conf - // mockstreams := []Stream{Stream{ - // []string{"tail", "-f", "/home/ao/DOWN"}, - // []Action{Action{ - // []string{"prout.dev"}, - // []string{"touch", "/home/ao/DAMN"}, - // }}, - // }} - // streams := mockstreams - // log.Println(streams) - // queue := execQueue() - // for _, stream := range streams { - // go streamHandle(stream, queue) - // } - // // Infinite wait - // <-make(chan bool) + + for _, stream := range conf.Streams { + go stream.handle() + } + // Infinite wait + <-make(chan bool) } diff --git a/reaction.yml b/reaction.yml index 5cad19a..7459e09 100644 --- a/reaction.yml +++ b/reaction.yml @@ -7,13 +7,28 @@ definitions: # ip: '(([0-9]{1,3}\.){3}[0-9]{1,3})|([0-9a-fA-F:]{2,90})' streams: - - cmd: journalctl -fu phpfpm-nextcloud.service + tailDown: + cmd: [ "tail", "-f", "/home/ao/DOWN" ] filters: - - regex: - - '"message":"Login failed: .\+ (Remote IP: )"' - retry: 3 - retry-period: 1h + lookForProuts: + regex: + - prout + retry: 1 + retry-period: 1s actions: - - cmd: *iptablesban - - cmd: *iptablesunban - after: 1h + damn: + cmd: [ "echo", "DAMN" ] + sleepdamn: + cmd: [ "echo", "sleepDAMN" ] + after: 2s + + # - cmd: journalctl -fu phpfpm-nextcloud.service + # filters: + # - regex: + # - '"message":"Login failed: .\+ (Remote IP: )"' + # retry: 3 + # retry-period: 1h + # actions: + # - cmd: *iptablesban + # - cmd: *iptablesunban + # after: 1h