From e87c09283daf14bf69f6c75bcbfd9a2dd83ec176 Mon Sep 17 00:00:00 2001 From: ppom <> Date: Mon, 20 Mar 2023 23:25:57 +0100 Subject: [PATCH] first working prototype --- main.go | 54 +++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/main.go b/main.go index d327425..5451066 100644 --- a/main.go +++ b/main.go @@ -1,22 +1,40 @@ package main import ( + "bufio" "log" "os/exec" + "regexp" ) type Action struct { regex, cmd []string } +type compiledAction struct { + regex []regexp.Regexp + cmd []string +} + type Stream struct { cmd []string actions []Action } -func streamHandle(stream Stream, execQueue chan string) { +func compileAction(action Action) compiledAction { + var ca compiledAction + ca.cmd = action.cmd + for _, regex := range action.regex { + ca.regex = append(ca.regex, *regexp.MustCompile(regex)) + } + return ca +} + +/// Handle a log command +/// Must be started in a goroutine +func streamHandle(stream Stream, execQueue chan []string) { log.Printf("streamHandle{%v}: start\n", stream.cmd) - cmd := exec.Command(stream.cmd...) + cmd := exec.Command(stream.cmd[0], stream.cmd[1:]...) stdout, err := cmd.StdoutPipe() if err != nil { log.Fatal("couldn't open stdout on command:", err) @@ -25,21 +43,34 @@ func streamHandle(stream Stream, execQueue chan string) { log.Fatal("couldn't start command:", err) } defer stdout.Close() + + compiledActions := make([]compiledAction, 0, len(stream.actions)) + for _, action := range stream.actions { + compiledActions = append(compiledActions, compileAction(action)) + } + scanner := bufio.NewScanner(stdout) for scanner.Scan() { line := scanner.Text() - // try to match and send to execQueue if matching + for _, action := range compiledActions { + for _, regex := range action.regex { + if match := regex.FindString(line); match != "" { + log.Printf("match `%v` in line: `%v`\n", regex.String(), line) + execQueue <- action.cmd + } + } + } } } -func execQueue() chan string { - queue := make(chan string) +func execQueue() chan []string { + queue := make(chan []string) go func() { for { command := <-queue - return_code := run(command) - if return_code != 0 { - log.Printf("Error launching `%v`\n", command) + cmd := exec.Command(command[0], command[1:]...) + if ret := cmd.Run(); ret != nil { + log.Printf("Error launching `%v`: code %v\n", cmd, ret) } } }() @@ -50,15 +81,16 @@ func main() { mockstreams := []Stream{Stream{ []string{"tail", "-f", "/home/ao/DOWN"}, []Action{Action{ - "prout.dev", - []string{"echo", "DAMN"}, + []string{"prout.dev"}, + []string{"touch", "/home/ao/DAMN"}, }}, }} streams := mockstreams log.Println(streams) queue := execQueue() - stop := make(chan bool) for _, stream := range streams { go streamHandle(stream, queue) } + // Infinite wait + <-make(chan bool) }