This commit is contained in:
ppom 2023-03-24 00:27:51 +01:00
parent 9e702c4cdd
commit 94d023e78c
3 changed files with 127 additions and 105 deletions

47
conf.go
View File

@ -2,24 +2,50 @@ package main
import (
// "flag"
"fmt"
"log"
"os"
"regexp"
"gopkg.in/yaml.v3"
)
type Conf struct {
// Definitions []string
Streams []struct {
Cmd string
Filters []struct {
Streams map[string]Stream
}
type Stream struct {
Cmd []string
Filters map[string]*Filter
}
type Filter struct {
Regex []string
compiledRegex []regexp.Regexp
Retry uint
RetryPeriod string `yaml:"retry-period"`
Actions []struct {
Cmd string
Actions map[string]*Action
}
type Action struct {
name, filterName, streamName string
Cmd []string
After string `yaml:",omitempty"`
}
func (c *Conf) setup() {
for streamName, stream := range c.Streams {
for filterName, filter := range stream.Filters {
// Compute Regexes
for _, regex := range filter.Regex {
filter.compiledRegex = append(filter.compiledRegex, *regexp.MustCompile(regex))
}
// Give all relevant infos to Actions
for actionName, action := range filter.Actions {
action.name = actionName
action.filterName = filterName
action.streamName = streamName
}
}
}
}
@ -37,13 +63,10 @@ func parseConf(filename string) *Conf {
if err != nil {
log.Fatalln("Failed to parse configuration file:", err)
}
log.Println(conf)
yaml, err := yaml.Marshal(conf)
if err != nil {
log.Fatalln("Failed to rewrite configuration file:", err)
}
log.Println(string(yaml))
conf.setup()
fmt.Printf("conf.Streams[0].Filters[0].Actions: %s\n", conf.Streams["tailDown"].Filters["lookForProuts"].Actions)
return &conf
}

108
main.go
View File

@ -4,37 +4,13 @@ import (
"bufio"
"log"
"os/exec"
"regexp"
)
type Action struct {
regex, cmd []string
}
func cmdStdout(commandline []string) chan string {
lines := make(chan string)
type compiledAction struct {
regex []regexp.Regexp
cmd []string
}
type Stream struct {
cmd []string
actions []Action
}
func compileAction(action Action) compiledAction {
var ca compiledAction
ca.cmd = action.cmd
for _, regex := range action.regex {
ca.regex = append(ca.regex, *regexp.MustCompile(regex))
}
return ca
}
// Handle a log command
// Must be started in a goroutine
func streamHandle(stream Stream, execQueue chan []string) {
log.Printf("streamHandle{%v}: start\n", stream.cmd)
cmd := exec.Command(stream.cmd[0], stream.cmd[1:]...)
go func() {
cmd := exec.Command(commandline[0], commandline[1:]...)
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Fatal("couldn't open stdout on command:", err)
@ -44,55 +20,63 @@ func streamHandle(stream Stream, execQueue chan []string) {
}
defer stdout.Close()
compiledActions := make([]compiledAction, 0, len(stream.actions))
for _, action := range stream.actions {
compiledActions = append(compiledActions, compileAction(action))
}
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := scanner.Text()
for _, action := range compiledActions {
for _, regex := range action.regex {
lines <- scanner.Text()
}
close(lines)
}()
return lines
}
func (f *Filter) match(line string) bool {
log.Printf("trying to match line {%s}...\n", line)
for _, regex := range f.compiledRegex {
log.Printf("...on %s\n", regex.String())
if match := regex.FindString(line); match != "" {
log.Printf("match `%v` in line: `%v`\n", regex.String(), line)
execQueue <- action.cmd
return true
}
}
return false
}
func (f *Filter) launch(line *string) {
for _, a := range f.Actions {
go a.launch(line)
}
}
func execQueue() chan []string {
queue := make(chan []string)
go func() {
for {
command := <-queue
cmd := exec.Command(command[0], command[1:]...)
func (a *Action) launch(line *string) {
log.Printf("INFO %s.%s.%s: line {%s} → run {%s}\n", a.streamName, a.filterName, a.name, *line, a.Cmd)
cmd := exec.Command(a.Cmd[0], a.Cmd[1:]...)
if ret := cmd.Run(); ret != nil {
log.Printf("Error launching `%v`: code %v\n", cmd, ret)
log.Printf("ERR %s.%s.%s: line {%s} → run %s, code {%s}\n", a.streamName, a.filterName, a.name, *line, a.Cmd, ret)
}
}
func (s *Stream) handle() {
log.Printf("streamHandle{%v}: start\n", s.Cmd)
lines := cmdStdout(s.Cmd)
for line := range lines {
for _, filter := range s.Filters {
if filter.match(line) {
filter.launch(&line)
}
}
}
}()
return queue
}
func main() {
conf := parseConf("./reaction.yml")
conf = conf
// mockstreams := []Stream{Stream{
// []string{"tail", "-f", "/home/ao/DOWN"},
// []Action{Action{
// []string{"prout.dev"},
// []string{"touch", "/home/ao/DAMN"},
// }},
// }}
// streams := mockstreams
// log.Println(streams)
// queue := execQueue()
// for _, stream := range streams {
// go streamHandle(stream, queue)
// }
// // Infinite wait
// <-make(chan bool)
for _, stream := range conf.Streams {
go stream.handle()
}
// Infinite wait
<-make(chan bool)
}

View File

@ -7,13 +7,28 @@ definitions:
# ip: '(([0-9]{1,3}\.){3}[0-9]{1,3})|([0-9a-fA-F:]{2,90})'
streams:
- cmd: journalctl -fu phpfpm-nextcloud.service
tailDown:
cmd: [ "tail", "-f", "/home/ao/DOWN" ]
filters:
- regex:
- '"message":"Login failed: .\+ (Remote IP: <ip>)"'
retry: 3
retry-period: 1h
lookForProuts:
regex:
- prout
retry: 1
retry-period: 1s
actions:
- cmd: *iptablesban
- cmd: *iptablesunban
after: 1h
damn:
cmd: [ "echo", "DAMN" ]
sleepdamn:
cmd: [ "echo", "sleepDAMN" ]
after: 2s
# - cmd: journalctl -fu phpfpm-nextcloud.service
# filters:
# - regex:
# - '"message":"Login failed: .\+ (Remote IP: <ip>)"'
# retry: 3
# retry-period: 1h
# actions:
# - cmd: *iptablesban
# - cmd: *iptablesunban
# after: 1h