Add output you can stream matches into, so we don't spwan a process at each match

This commit is contained in:
yo 2024-03-30 17:26:10 +01:00
parent 525d9cd270
commit ccc114bb13
3 changed files with 143 additions and 1 deletions

View File

@ -2,6 +2,8 @@ package app
import ( import (
"bufio" "bufio"
"bytes"
"fmt"
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
@ -23,6 +25,61 @@ func IsStringArrayEqual(one, two []string) bool {
return true return true
} }
// Executes a command and write to its stdin via input channel until command, or reaction, dies
func cmdStdin(commandline []string, input <-chan string) {
cmd := exec.Command(commandline[0], commandline[1:]...)
stdin, err := cmd.StdinPipe()
if err != nil {
logger.Fatalln("couldn't open stdin on command:", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
logger.Fatalln("couldn't open stdout on command:", err)
}
if err := cmd.Start(); err != nil {
logger.Fatalln("couldn't start command:", err)
}
defer stdin.Close()
logger.Printf(logger.INFO, fmt.Sprintf("Output started with %v\n", commandline))
// stdout displaying thread
go func() {
for {
// FIXME
tmp := make([]byte, 1024)
_, err := stdout.Read(tmp)
if len(bytes.Trim(tmp, "\x00")) > 0 {
for _, line := range strings.Split(strings.ReplaceAll(string(bytes.Trim(tmp, "\x00")), "\r\n", "\n"), "\n") {
if len(line) > 0 {
logger.Printf(logger.INFO, fmt.Sprintf("Output returned %s", line))
}
}
}
if err != nil {
logger.Printf(logger.ERROR, fmt.Sprintf("Reading output error: %v\n", err))
break
}
}
}()
// Stdin writing thread
go func() {
for {
in := <-input
_, err := stdin.Write([]byte(in))
if err != nil {
logger.Printf(logger.ERROR, fmt.Sprintf("Writing to output error: %v\n", err))
break
}
}
}()
err = cmd.Wait()
logger.Fatalln("command %v stopped: %v", cmd, err)
}
// Executes a command and channel-send its stdout // Executes a command and channel-send its stdout
func cmdStdout(commandline []string) chan *string { func cmdStdout(commandline []string) chan *string {
lines := make(chan *string) lines := make(chan *string)
@ -127,6 +184,16 @@ func (f *Filter) sendActions(match []string, at time.Time) {
func (a *Action) exec(match []string) { func (a *Action) exec(match []string) {
defer wgActions.Done() defer wgActions.Done()
if len(a.Cmd) > 0 {
a.execCmd(match)
}
if a.Write != nil {
a.execWrite(match)
}
}
func (a *Action) execCmd(match []string) {
var computedCommand []string var computedCommand []string
var cmdItem string var cmdItem string
@ -153,6 +220,29 @@ func (a *Action) exec(match []string) {
} }
} }
func (a *Action) execWrite(match []string) {
var computedWrite string
var writeItem string
if a.filter.pattern != nil {
for _, item := range a.Write.Text {
writeItem = strings.Clone(item)
for i, p := range a.filter.pattern {
writeItem = strings.ReplaceAll(writeItem, p.nameWithBraces, match[i])
}
if len(computedWrite) > 0 {
computedWrite = computedWrite + " " + writeItem
} else {
computedWrite = writeItem
}
}
} else {
computedWrite = strings.Join(a.Write.Text, " ")
}
a.Write.Output.Stdin <- fmt.Sprintf("%s\n", computedWrite)
}
func ActionsManager(concurrency int) { func ActionsManager(concurrency int) {
// concurrency init // concurrency init
execActionsC := make(chan PA) execActionsC := make(chan PA)
@ -353,6 +443,14 @@ func StreamManager(s *Stream, endedSignal chan *Stream) {
} }
func OutputsManager(c *Conf) {
for outputName := range c.Outputs {
output := c.Outputs[outputName]
output.Stdin = make(chan string)
cmdStdin(output.Start, output.Stdin)
}
}
var actions ActionsMap var actions ActionsMap
var matches MatchesMap var matches MatchesMap
var actionsLock sync.Mutex var actionsLock sync.Mutex
@ -416,6 +514,7 @@ func Daemon(confFilename string) {
_ = runCommands(conf.Start, "start") _ = runCommands(conf.Start, "start")
go DatabaseManager(conf) go DatabaseManager(conf)
go OutputsManager(conf)
go MatchesManager() go MatchesManager()
go ActionsManager(conf.Concurrency) go ActionsManager(conf.Concurrency)

View File

@ -21,6 +21,15 @@ func (c *Conf) setup() {
c.Concurrency = runtime.NumCPU() c.Concurrency = runtime.NumCPU()
} }
for outputName := range c.Outputs {
output := c.Outputs[outputName]
output.name = outputName
if len(output.Start) == 0 {
logger.Fatalf("Bad configuration: output's start %v is empty!", outputName)
}
}
for patternName := range c.Patterns { for patternName := range c.Patterns {
pattern := c.Patterns[patternName] pattern := c.Patterns[patternName]
pattern.name = patternName pattern.name = patternName
@ -136,6 +145,20 @@ func (c *Conf) setup() {
if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() { if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() {
filter.longuestActionDuration = &action.afterDuration filter.longuestActionDuration = &action.afterDuration
} }
if action.Write != nil {
found := false
for oname := range c.Outputs {
if strings.EqualFold(oname, action.Write.OutputName) {
action.Write.Output = c.Outputs[oname]
found = true
}
}
if !found {
logger.Fatalln(fmt.Sprintf("Bad configuration: action %s.%s.%s refers to undeclared output %s",
stream.name, filter.name, action.name, action.Write.OutputName))
}
}
} }
} }
} }

View File

@ -9,12 +9,24 @@ import (
type Conf struct { type Conf struct {
Concurrency int `json:"concurrency"` Concurrency int `json:"concurrency"`
Outputs map[string]*Output `json:"outputs"`
Patterns map[string]*Pattern `json:"patterns"` Patterns map[string]*Pattern `json:"patterns"`
Streams map[string]*Stream `json:"streams"` Streams map[string]*Stream `json:"streams"`
Start [][]string `json:"start"` Start [][]string `json:"start"`
Stop [][]string `json:"stop"` Stop [][]string `json:"stop"`
} }
type Output struct {
Start []string `json:"start"`
Stop []string `json:"stop"`
// TODO: Restart when lost communication with output
//Restart string `json:"restart"`
name string `json:"-"`
Stdin chan string
}
type Pattern struct { type Pattern struct {
Regex string `json:"regex"` Regex string `json:"regex"`
Ignore []string `json:"ignore"` Ignore []string `json:"ignore"`
@ -52,11 +64,19 @@ type Filter struct {
longuestActionDuration *time.Duration longuestActionDuration *time.Duration
} }
type OutputWrite struct {
OutputName string `json:"output"`
Text []string `json:"text"`
Output *Output
}
type Action struct { type Action struct {
filter *Filter `json:"-"` filter *Filter `json:"-"`
name string `json:"-"` name string `json:"-"`
Cmd []string `json:"cmd"` Cmd []string `json:"cmd"`
Write *OutputWrite `json:"write"`
After string `json:"after"` After string `json:"after"`
afterDuration time.Duration `json:"-"` afterDuration time.Duration `json:"-"`