package app import ( "bufio" "bytes" "fmt" "os" "os/exec" "os/signal" "strings" "sync" "syscall" "time" "framagit.org/ppom/reaction/logger" ) // Compare content and ordering. Case sensitive. func IsStringArrayEqual(one, two []string) bool { for i, a := range one { if a != two[i] { return false } } return true } // Executes a command and write to its stdin via input channel until command, or reaction, dies func cmdStdin(commandline []string, input <-chan string) { cmd := exec.Command(commandline[0], commandline[1:]...) stdin, err := cmd.StdinPipe() if err != nil { logger.Fatalln("couldn't open stdin on command:", err) } stdout, err := cmd.StdoutPipe() if err != nil { logger.Fatalln("couldn't open stdout on command:", err) } if err := cmd.Start(); err != nil { logger.Fatalln("couldn't start command:", err) } defer stdin.Close() logger.Printf(logger.INFO, fmt.Sprintf("Output started with %v\n", commandline)) // stdout displaying thread go func() { // FIXME tmp := make([]byte, 1024) for { _, err := stdout.Read(tmp) if len(bytes.Trim(tmp, "\x00")) > 0 { for _, line := range strings.Split(strings.ReplaceAll(string(bytes.Trim(tmp, "\x00")), "\r\n", "\n"), "\n") { if len(line) > 0 { logger.Printf(logger.INFO, fmt.Sprintf("Output returned %s", line)) } } } if err != nil { logger.Printf(logger.ERROR, fmt.Sprintf("Reading output error: %v\n", err)) break } } }() // Stdin writing thread go func() { for { in := <-input _, err := stdin.Write([]byte(in)) if err != nil { logger.Printf(logger.ERROR, fmt.Sprintf("Writing to output error: %v\n", err)) break } } }() err = cmd.Wait() logger.Fatalln("command %v stopped: %v", cmd, err) } // Executes a command and channel-send its stdout func cmdStdout(commandline []string) chan *string { lines := make(chan *string) go func() { cmd := exec.Command(commandline[0], commandline[1:]...) stdout, err := cmd.StdoutPipe() if err != nil { logger.Fatalln("couldn't open stdout on command:", err) } if err := cmd.Start(); err != nil { logger.Fatalln("couldn't start command:", err) } defer stdout.Close() scanner := bufio.NewScanner(stdout) for scanner.Scan() { line := scanner.Text() lines <- &line logger.Println(logger.DEBUG, "stdout:", line) } close(lines) }() return lines } func runCommands(commands [][]string, moment string) bool { ok := true for _, command := range commands { cmd := exec.Command(command[0], command[1:]...) cmd.WaitDelay = time.Minute logger.Printf(logger.INFO, "%v command: run %v\n", moment, command) if err := cmd.Start(); err != nil { logger.Printf(logger.ERROR, "%v command: run %v: %v", moment, command, err) ok = false } else { err := cmd.Wait() if err != nil { logger.Printf(logger.ERROR, "%v command: run %v: %v", moment, command, err) ok = false } } } return ok } func (p *Pattern) notAnIgnore(match *string) bool { for _, regex := range p.compiledIgnoreRegex { if regex.MatchString(*match) { return false } } for _, ignore := range p.Ignore { if ignore == *match { return false } } return true } // Whether one of the filter's regexes is matched on a line func (f *Filter) match(line *string) []string { var result []string for _, regex := range f.compiledRegex { if matches := regex.FindStringSubmatch(*line); matches != nil { var pnames []string for _, p := range f.pattern { pnames = append(pnames, p.name) } for _, p := range f.pattern { match := matches[regex.SubexpIndex(p.name)] if p.notAnIgnore(&match) { logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match) result = append(result, match) } } if f.pattern == nil { // No pattern, so this match will never actually be used return nil } } } if len(result) == len(f.pattern) { return result } else { // Incomplete match = no match. return nil } } func (f *Filter) sendActions(match []string, at time.Time) { for _, a := range f.Actions { actionsC <- PAT{match, a, at.Add(a.afterDuration)} } } func (a *Action) exec(match []string) { defer wgActions.Done() if len(a.Cmd) > 0 { a.execCmd(match) } if a.Write != nil { a.execWrite(match) } } func (a *Action) execCmd(match []string) { var computedCommand []string var cmdItem string if a.filter.pattern != nil { computedCommand = make([]string, 0, len(a.Cmd)) for _, item := range a.Cmd { cmdItem = strings.Clone(item) for i, p := range a.filter.pattern { cmdItem = strings.ReplaceAll(cmdItem, p.nameWithBraces, match[i]) } computedCommand = append(computedCommand, cmdItem) } } else { computedCommand = a.Cmd } logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand) cmd := exec.Command(computedCommand[0], computedCommand[1:]...) if ret := cmd.Run(); ret != nil { logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret) } } func (a *Action) execWrite(match []string) { var computedWrite string var writeItem string if a.filter.pattern != nil { for _, item := range a.Write.Text { writeItem = strings.Clone(item) for i, p := range a.filter.pattern { writeItem = strings.ReplaceAll(writeItem, p.nameWithBraces, match[i]) } if len(computedWrite) > 0 { computedWrite = computedWrite + " " + writeItem } else { computedWrite = writeItem } } } else { computedWrite = strings.Join(a.Write.Text, " ") } a.Write.Output.Stdin <- fmt.Sprintf("%s\n", computedWrite) } func ActionsManager(concurrency int) { // concurrency init execActionsC := make(chan PA) if concurrency > 0 { for i := 0; i < concurrency; i++ { go func() { var pa PA for { pa = <-execActionsC pa.a.exec(pa.p) } }() } } else { go func() { var pa PA for { pa = <-execActionsC go func(pa PA) { pa.a.exec(pa.p) }(pa) } }() } execAction := func(a *Action, p []string) { wgActions.Add(1) execActionsC <- PA{p, a} } // main pendingActionsC := make(chan PAT) for { select { case pat := <-actionsC: pa := PA{pat.p, pat.a} pattern, action, then := pat.p, pat.a, pat.t now := time.Now() // check if must be executed now if then.Compare(now) <= 0 { execAction(action, pattern) } else { actionsLock.Lock() if actions[&pa] == nil { actions[&pa] = make(map[time.Time]struct{}) } actions[&pa][then] = struct{}{} actionsLock.Unlock() go func(insidePat PAT, insideNow time.Time) { time.Sleep(insidePat.t.Sub(insideNow)) pendingActionsC <- insidePat }(pat, now) } case pat := <-pendingActionsC: pa := PA{pat.p, pat.a} pattern, action, then := pat.p, pat.a, pat.t actionsLock.Lock() if actions[&pa] != nil { delete(actions[&pa], then) } actionsLock.Unlock() execAction(action, pattern) case fo := <-flushToActionsC: ret := make(ActionsMap) actionsLock.Lock() for pa := range actions { if IsStringArrayEqual(pa.p, fo.p) { for range actions[pa] { execAction(pa.a, pa.p) } ret[pa] = actions[pa] delete(actions, pa) } } actionsLock.Unlock() fo.ret <- ret case _, _ = <-stopActions: actionsLock.Lock() for pa := range actions { if pa.a.OnExit { for range actions[pa] { execAction(pa.a, pa.p) } } } actionsLock.Unlock() wgActions.Done() return } } } func MatchesManager() { var fo FlushMatchOrder var pft PFT end := false for !end { select { case fo = <-flushToMatchesC: matchesManagerHandleFlush(fo) case fo, ok := <-startupMatchesC: if !ok { end = true } else { _ = matchesManagerHandleMatch(fo) } } } for { select { case fo = <-flushToMatchesC: matchesManagerHandleFlush(fo) case pft = <-matchesC: entry := LogEntry{pft.t, 0, pft.p, pft.f.stream.name, pft.f.name, 0, false} entry.Exec = matchesManagerHandleMatch(pft) logsC <- entry } } } func matchesManagerHandleFlush(fo FlushMatchOrder) { ret := make(MatchesMap) matchesLock.Lock() for pf := range matches { if IsStringArrayEqual(fo.p, pf.p) { if fo.ret != nil { ret[pf] = matches[pf] } delete(matches, pf) } } matchesLock.Unlock() if fo.ret != nil { fo.ret <- ret } } func matchesManagerHandleMatch(pft PFT) bool { matchesLock.Lock() defer matchesLock.Unlock() filter, pattern, then := pft.f, pft.p, pft.t pf := PF{pft.p, pft.f} if filter.Retry > 1 { // make sure map exists if matches[&pf] == nil { matches[&pf] = make(map[time.Time]struct{}) } // add new match matches[&pf][then] = struct{}{} // remove match when expired go func(pf PF, then time.Time) { time.Sleep(then.Sub(time.Now()) + filter.retryDuration) matchesLock.Lock() if matches[&pf] != nil { // FIXME replace this and all similar occurences // by clear() when switching to go 1.21 delete(matches[&pf], then) } matchesLock.Unlock() }(pf, then) } if filter.Retry <= 1 || len(matches[&pf]) >= filter.Retry { delete(matches, &pf) filter.sendActions(pattern, then) return true } return false } func StreamManager(s *Stream, endedSignal chan *Stream) { defer wgStreams.Done() logger.Printf(logger.INFO, "%s: start %s\n", s.name, s.Cmd) lines := cmdStdout(s.Cmd) for { select { case line, ok := <-lines: if !ok { endedSignal <- s return } for _, filter := range s.Filters { if match := filter.match(line); len(match) > 0 { matchesC <- PFT{match, filter, time.Now()} } } case _, _ = <-stopStreams: return } } } func OutputsManager(c *Conf) { for outputName := range c.Outputs { output := c.Outputs[outputName] output.Stdin = make(chan string) cmdStdin(output.Start, output.Stdin) } } var actions ActionsMap var matches MatchesMap var actionsLock sync.Mutex var matchesLock sync.Mutex var stopStreams chan bool var stopActions chan bool var wgActions sync.WaitGroup var wgStreams sync.WaitGroup /* ↓ StreamManager onstartup:matches ↓ ↓ ↑ matches→ MatchesManager →logs→ DatabaseManager ←· ↑ ↓ ↑ ↑ actions→ ActionsManager ↑ ↑ ↑ ↑ SocketManager →flushes→→→→→→→→→→·→→→→→→→→→→→→→→→→· ↑ */ // DatabaseManager → MatchesManager var startupMatchesC chan PFT // StreamManager → MatchesManager var matchesC chan PFT // MatchesManager → DatabaseManager var logsC chan LogEntry // MatchesManager → ActionsManager var actionsC chan PAT // SocketManager, DatabaseManager → MatchesManager var flushToMatchesC chan FlushMatchOrder // SocketManager → ActionsManager var flushToActionsC chan FlushActionOrder // SocketManager → DatabaseManager var flushToDatabaseC chan LogEntry func Daemon(confFilename string) { conf := parseConf(confFilename) startupMatchesC = make(chan PFT) matchesC = make(chan PFT) logsC = make(chan LogEntry) actionsC = make(chan PAT) flushToMatchesC = make(chan FlushMatchOrder) flushToActionsC = make(chan FlushActionOrder) flushToDatabaseC = make(chan LogEntry) stopActions = make(chan bool) stopStreams = make(chan bool) actions = make(ActionsMap) matches = make(MatchesMap) _ = runCommands(conf.Start, "start") go DatabaseManager(conf) go OutputsManager(conf) go MatchesManager() go ActionsManager(conf.Concurrency) // Ready to start sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) endSignals := make(chan *Stream) nbStreamsInExecution := len(conf.Streams) for _, stream := range conf.Streams { wgStreams.Add(1) go StreamManager(stream, endSignals) } go SocketManager(conf.Streams) for { select { case finishedStream := <-endSignals: logger.Printf(logger.ERROR, "%s stream finished", finishedStream.name) nbStreamsInExecution-- if nbStreamsInExecution == 0 { quit(conf, false) } case <-sigs: logger.Printf(logger.INFO, "Received SIGINT/SIGTERM, exiting") quit(conf, true) } } } func quit(conf *Conf, graceful bool) { // send stop to StreamManager·s close(stopStreams) logger.Println(logger.INFO, "Waiting for Streams to finish...") wgStreams.Wait() // ActionsManager calls wgActions.Done() when it has launched all pending actions wgActions.Add(1) // send stop to ActionsManager close(stopActions) // stop all actions logger.Println(logger.INFO, "Waiting for Actions to finish...") wgActions.Wait() // run stop commands stopOk := runCommands(conf.Stop, "stop") // delete pipe err := os.Remove(*SocketPath) if err != nil { logger.Println(logger.ERROR, "Failed to remove socket:", err) } if !stopOk || !graceful { os.Exit(1) } os.Exit(0) }