From ccc114bb132dccb1d18111ee915f417f72a503b0 Mon Sep 17 00:00:00 2001 From: yo Date: Sat, 30 Mar 2024 17:26:10 +0100 Subject: [PATCH] Add output you can stream matches into, so we don't spwan a process at each match --- app/daemon.go | 99 ++++++++++++++++++++++++++++++++++++++++++++++++++ app/startup.go | 23 ++++++++++++ app/types.go | 22 ++++++++++- 3 files changed, 143 insertions(+), 1 deletion(-) diff --git a/app/daemon.go b/app/daemon.go index 5784acd..d91090f 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -2,6 +2,8 @@ package app import ( "bufio" + "bytes" + "fmt" "os" "os/exec" "os/signal" @@ -23,6 +25,61 @@ func IsStringArrayEqual(one, two []string) bool { return true } + +// Executes a command and write to its stdin via input channel until command, or reaction, dies +func cmdStdin(commandline []string, input <-chan string) { + cmd := exec.Command(commandline[0], commandline[1:]...) + stdin, err := cmd.StdinPipe() + if err != nil { + logger.Fatalln("couldn't open stdin on command:", err) + } + stdout, err := cmd.StdoutPipe() + if err != nil { + logger.Fatalln("couldn't open stdout on command:", err) + } + if err := cmd.Start(); err != nil { + logger.Fatalln("couldn't start command:", err) + } + defer stdin.Close() + + logger.Printf(logger.INFO, fmt.Sprintf("Output started with %v\n", commandline)) + + // stdout displaying thread + go func() { + for { + // FIXME + tmp := make([]byte, 1024) + _, err := stdout.Read(tmp) + if len(bytes.Trim(tmp, "\x00")) > 0 { + for _, line := range strings.Split(strings.ReplaceAll(string(bytes.Trim(tmp, "\x00")), "\r\n", "\n"), "\n") { + if len(line) > 0 { + logger.Printf(logger.INFO, fmt.Sprintf("Output returned %s", line)) + } + } + } + if err != nil { + logger.Printf(logger.ERROR, fmt.Sprintf("Reading output error: %v\n", err)) + break + } + } + }() + + // Stdin writing thread + go func() { + for { + in := <-input + _, err := stdin.Write([]byte(in)) + if err != nil { + logger.Printf(logger.ERROR, fmt.Sprintf("Writing to output error: %v\n", err)) + break + } + } + }() + + err = cmd.Wait() + logger.Fatalln("command %v stopped: %v", cmd, err) +} + // Executes a command and channel-send its stdout func cmdStdout(commandline []string) chan *string { lines := make(chan *string) @@ -127,6 +184,16 @@ func (f *Filter) sendActions(match []string, at time.Time) { func (a *Action) exec(match []string) { defer wgActions.Done() + if len(a.Cmd) > 0 { + a.execCmd(match) + } + + if a.Write != nil { + a.execWrite(match) + } +} + +func (a *Action) execCmd(match []string) { var computedCommand []string var cmdItem string @@ -153,6 +220,29 @@ func (a *Action) exec(match []string) { } } +func (a *Action) execWrite(match []string) { + var computedWrite string + var writeItem string + + if a.filter.pattern != nil { + for _, item := range a.Write.Text { + writeItem = strings.Clone(item) + for i, p := range a.filter.pattern { + writeItem = strings.ReplaceAll(writeItem, p.nameWithBraces, match[i]) + } + if len(computedWrite) > 0 { + computedWrite = computedWrite + " " + writeItem + } else { + computedWrite = writeItem + } + } + } else { + computedWrite = strings.Join(a.Write.Text, " ") + } + + a.Write.Output.Stdin <- fmt.Sprintf("%s\n", computedWrite) +} + func ActionsManager(concurrency int) { // concurrency init execActionsC := make(chan PA) @@ -353,6 +443,14 @@ func StreamManager(s *Stream, endedSignal chan *Stream) { } +func OutputsManager(c *Conf) { + for outputName := range c.Outputs { + output := c.Outputs[outputName] + output.Stdin = make(chan string) + cmdStdin(output.Start, output.Stdin) + } +} + var actions ActionsMap var matches MatchesMap var actionsLock sync.Mutex @@ -416,6 +514,7 @@ func Daemon(confFilename string) { _ = runCommands(conf.Start, "start") go DatabaseManager(conf) + go OutputsManager(conf) go MatchesManager() go ActionsManager(conf.Concurrency) diff --git a/app/startup.go b/app/startup.go index 9e002c8..6c2bdb5 100644 --- a/app/startup.go +++ b/app/startup.go @@ -21,6 +21,15 @@ func (c *Conf) setup() { c.Concurrency = runtime.NumCPU() } + for outputName := range c.Outputs { + output := c.Outputs[outputName] + output.name = outputName + + if len(output.Start) == 0 { + logger.Fatalf("Bad configuration: output's start %v is empty!", outputName) + } + } + for patternName := range c.Patterns { pattern := c.Patterns[patternName] pattern.name = patternName @@ -136,6 +145,20 @@ func (c *Conf) setup() { if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() { filter.longuestActionDuration = &action.afterDuration } + + if action.Write != nil { + found := false + for oname := range c.Outputs { + if strings.EqualFold(oname, action.Write.OutputName) { + action.Write.Output = c.Outputs[oname] + found = true + } + } + if !found { + logger.Fatalln(fmt.Sprintf("Bad configuration: action %s.%s.%s refers to undeclared output %s", + stream.name, filter.name, action.name, action.Write.OutputName)) + } + } } } } diff --git a/app/types.go b/app/types.go index caa7a5e..81be6fd 100644 --- a/app/types.go +++ b/app/types.go @@ -9,12 +9,24 @@ import ( type Conf struct { Concurrency int `json:"concurrency"` + Outputs map[string]*Output `json:"outputs"` Patterns map[string]*Pattern `json:"patterns"` Streams map[string]*Stream `json:"streams"` Start [][]string `json:"start"` Stop [][]string `json:"stop"` } +type Output struct { + Start []string `json:"start"` + Stop []string `json:"stop"` + // TODO: Restart when lost communication with output + //Restart string `json:"restart"` + + name string `json:"-"` + + Stdin chan string +} + type Pattern struct { Regex string `json:"regex"` Ignore []string `json:"ignore"` @@ -52,11 +64,19 @@ type Filter struct { longuestActionDuration *time.Duration } +type OutputWrite struct { + OutputName string `json:"output"` + Text []string `json:"text"` + + Output *Output +} + type Action struct { filter *Filter `json:"-"` name string `json:"-"` - Cmd []string `json:"cmd"` + Cmd []string `json:"cmd"` + Write *OutputWrite `json:"write"` After string `json:"after"` afterDuration time.Duration `json:"-"`