reaction/app/daemon.go

576 lines
13 KiB
Go
Raw Permalink Normal View History

2023-03-25 18:27:01 +01:00
package app
2023-03-19 23:10:18 +01:00
import (
2023-03-20 23:25:57 +01:00
"bufio"
"bytes"
"fmt"
2023-03-24 17:52:00 +01:00
"os"
2023-03-19 23:10:18 +01:00
"os/exec"
2023-04-27 10:42:19 +02:00
"os/signal"
2023-03-24 17:36:41 +01:00
"strings"
"sync"
2023-10-12 12:00:00 +02:00
"syscall"
2023-03-24 17:36:41 +01:00
"time"
2023-10-12 12:00:00 +02:00
"framagit.org/ppom/reaction/logger"
2023-03-19 23:10:18 +01:00
)
2024-02-24 11:42:31 +01:00
// Compare content and ordering. Case sensitive.
func IsStringArrayEqual(one, two []string) bool {
for i, a := range one {
if a != two[i] {
return false
}
}
return true
}
// Executes a command and write to its stdin via input channel until command, or reaction, dies
func cmdStdin(commandline []string, input <-chan string) {
cmd := exec.Command(commandline[0], commandline[1:]...)
stdin, err := cmd.StdinPipe()
if err != nil {
logger.Fatalln("couldn't open stdin on command:", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
logger.Fatalln("couldn't open stdout on command:", err)
}
if err := cmd.Start(); err != nil {
logger.Fatalln("couldn't start command:", err)
}
defer stdin.Close()
logger.Printf(logger.INFO, fmt.Sprintf("Output started with %v\n", commandline))
// stdout displaying thread
go func() {
2024-03-31 16:52:10 +02:00
// FIXME
tmp := make([]byte, 1024)
for {
_, err := stdout.Read(tmp)
if len(bytes.Trim(tmp, "\x00")) > 0 {
for _, line := range strings.Split(strings.ReplaceAll(string(bytes.Trim(tmp, "\x00")), "\r\n", "\n"), "\n") {
if len(line) > 0 {
logger.Printf(logger.INFO, fmt.Sprintf("Output returned %s", line))
}
}
}
if err != nil {
logger.Printf(logger.ERROR, fmt.Sprintf("Reading output error: %v\n", err))
break
}
}
}()
// Stdin writing thread
go func() {
for {
in := <-input
_, err := stdin.Write([]byte(in))
if err != nil {
logger.Printf(logger.ERROR, fmt.Sprintf("Writing to output error: %v\n", err))
break
}
}
}()
err = cmd.Wait()
logger.Fatalln("command %v stopped: %v", cmd, err)
}
2023-03-24 17:36:41 +01:00
// Executes a command and channel-send its stdout
func cmdStdout(commandline []string) chan *string {
lines := make(chan *string)
2023-03-19 23:10:18 +01:00
2023-03-24 00:27:51 +01:00
go func() {
cmd := exec.Command(commandline[0], commandline[1:]...)
stdout, err := cmd.StdoutPipe()
if err != nil {
2023-10-12 12:00:00 +02:00
logger.Fatalln("couldn't open stdout on command:", err)
}
2023-03-24 00:27:51 +01:00
if err := cmd.Start(); err != nil {
2023-10-12 12:00:00 +02:00
logger.Fatalln("couldn't start command:", err)
2023-03-24 00:27:51 +01:00
}
defer stdout.Close()
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := scanner.Text()
lines <- &line
2023-10-12 12:00:00 +02:00
logger.Println(logger.DEBUG, "stdout:", line)
2023-03-24 00:27:51 +01:00
}
close(lines)
}()
return lines
2023-03-19 23:10:18 +01:00
}
2023-11-23 12:00:00 +01:00
func runCommands(commands [][]string, moment string) bool {
ok := true
for _, command := range commands {
cmd := exec.Command(command[0], command[1:]...)
cmd.WaitDelay = time.Minute
logger.Printf(logger.INFO, "%v command: run %v\n", moment, command)
if err := cmd.Start(); err != nil {
logger.Printf(logger.ERROR, "%v command: run %v: %v", moment, command, err)
2023-11-23 12:00:00 +01:00
ok = false
} else {
err := cmd.Wait()
if err != nil {
logger.Printf(logger.ERROR, "%v command: run %v: %v", moment, command, err)
2023-11-23 12:00:00 +01:00
ok = false
}
}
}
2023-11-23 12:00:00 +01:00
return ok
}
2023-08-21 23:33:56 +02:00
func (p *Pattern) notAnIgnore(match *string) bool {
for _, regex := range p.compiledIgnoreRegex {
if regex.MatchString(*match) {
return false
}
}
2023-08-21 23:33:56 +02:00
for _, ignore := range p.Ignore {
if ignore == *match {
return false
}
}
return true
}
2023-03-24 17:36:41 +01:00
// Whether one of the filter's regexes is matched on a line
2024-02-24 11:42:31 +01:00
func (f *Filter) match(line *string) []string {
var result []string
2023-03-24 00:27:51 +01:00
for _, regex := range f.compiledRegex {
2023-03-24 17:36:41 +01:00
if matches := regex.FindStringSubmatch(*line); matches != nil {
2024-02-24 11:42:31 +01:00
var pnames []string
for _, p := range f.pattern {
pnames = append(pnames, p.name)
}
2023-03-24 17:36:41 +01:00
2024-02-24 11:42:31 +01:00
for _, p := range f.pattern {
match := matches[regex.SubexpIndex(p.name)]
if p.notAnIgnore(&match) {
logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match)
2024-02-24 11:42:31 +01:00
result = append(result, match)
}
2024-02-24 11:42:31 +01:00
}
if f.pattern == nil {
// No pattern, so this match will never actually be used
2024-03-30 08:50:44 +01:00
return nil
2023-08-21 23:33:56 +02:00
}
2023-03-24 00:27:51 +01:00
}
2023-03-20 23:25:57 +01:00
}
2024-03-30 08:50:44 +01:00
if len(result) == len(f.pattern) {
return result
} else {
// Incomplete match = no match.
return nil
}
2023-03-20 23:25:57 +01:00
}
2024-02-24 11:42:31 +01:00
func (f *Filter) sendActions(match []string, at time.Time) {
2023-03-24 00:27:51 +01:00
for _, a := range f.Actions {
actionsC <- PAT{match, a, at.Add(a.afterDuration)}
2023-03-19 23:10:18 +01:00
}
2023-03-24 00:27:51 +01:00
}
2023-03-20 23:25:57 +01:00
2024-02-24 11:42:31 +01:00
func (a *Action) exec(match []string) {
defer wgActions.Done()
if len(a.Cmd) > 0 {
a.execCmd(match)
}
if a.Write != nil {
a.execWrite(match)
}
}
func (a *Action) execCmd(match []string) {
var computedCommand []string
2024-02-24 11:42:31 +01:00
var cmdItem string
if a.filter.pattern != nil {
computedCommand = make([]string, 0, len(a.Cmd))
for _, item := range a.Cmd {
2024-02-24 11:42:31 +01:00
cmdItem = strings.Clone(item)
for i, p := range a.filter.pattern {
cmdItem = strings.ReplaceAll(cmdItem, p.nameWithBraces, match[i])
}
computedCommand = append(computedCommand, cmdItem)
}
} else {
computedCommand = a.Cmd
}
2023-03-24 17:36:41 +01:00
logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand)
2023-03-24 17:36:41 +01:00
cmd := exec.Command(computedCommand[0], computedCommand[1:]...)
2023-03-24 18:06:57 +01:00
if ret := cmd.Run(); ret != nil {
logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret)
}
2023-03-19 23:10:18 +01:00
}
func (a *Action) execWrite(match []string) {
var computedWrite string
var writeItem string
if a.filter.pattern != nil {
for _, item := range a.Write.Text {
writeItem = strings.Clone(item)
for i, p := range a.filter.pattern {
writeItem = strings.ReplaceAll(writeItem, p.nameWithBraces, match[i])
}
if len(computedWrite) > 0 {
computedWrite = computedWrite + " " + writeItem
} else {
computedWrite = writeItem
}
}
} else {
computedWrite = strings.Join(a.Write.Text, " ")
}
a.Write.Output.Stdin <- fmt.Sprintf("%s\n", computedWrite)
}
func ActionsManager(concurrency int) {
// concurrency init
execActionsC := make(chan PA)
if concurrency > 0 {
for i := 0; i < concurrency; i++ {
go func() {
var pa PA
for {
pa = <-execActionsC
pa.a.exec(pa.p)
}
}()
}
} else {
go func() {
var pa PA
for {
pa = <-execActionsC
go func(pa PA) {
pa.a.exec(pa.p)
}(pa)
}
}()
}
2024-02-24 11:42:31 +01:00
execAction := func(a *Action, p []string) {
wgActions.Add(1)
execActionsC <- PA{p, a}
}
// main
pendingActionsC := make(chan PAT)
for {
select {
2023-10-01 12:00:00 +02:00
case pat := <-actionsC:
pa := PA{pat.p, pat.a}
pattern, action, then := pat.p, pat.a, pat.t
now := time.Now()
// check if must be executed now
2023-09-23 00:14:20 +02:00
if then.Compare(now) <= 0 {
execAction(action, pattern)
} else {
actionsLock.Lock()
2024-02-24 11:42:31 +01:00
if actions[&pa] == nil {
actions[&pa] = make(map[time.Time]struct{})
2023-10-01 12:00:00 +02:00
}
2024-02-24 11:42:31 +01:00
actions[&pa][then] = struct{}{}
actionsLock.Unlock()
2023-10-01 12:00:00 +02:00
go func(insidePat PAT, insideNow time.Time) {
time.Sleep(insidePat.t.Sub(insideNow))
pendingActionsC <- insidePat
2023-09-23 00:14:20 +02:00
}(pat, now)
}
2023-10-01 12:00:00 +02:00
case pat := <-pendingActionsC:
pa := PA{pat.p, pat.a}
pattern, action, then := pat.p, pat.a, pat.t
actionsLock.Lock()
2024-02-24 11:42:31 +01:00
if actions[&pa] != nil {
delete(actions[&pa], then)
2023-10-01 12:00:00 +02:00
}
actionsLock.Unlock()
execAction(action, pattern)
2023-10-01 12:00:00 +02:00
case fo := <-flushToActionsC:
2023-10-04 12:00:00 +02:00
ret := make(ActionsMap)
2023-10-01 12:00:00 +02:00
actionsLock.Lock()
for pa := range actions {
2024-02-24 11:42:31 +01:00
if IsStringArrayEqual(pa.p, fo.p) {
2023-10-01 12:00:00 +02:00
for range actions[pa] {
execAction(pa.a, pa.p)
2023-10-01 12:00:00 +02:00
}
2023-10-04 12:00:00 +02:00
ret[pa] = actions[pa]
2023-10-01 12:00:00 +02:00
delete(actions, pa)
}
}
actionsLock.Unlock()
2023-10-01 12:00:00 +02:00
fo.ret <- ret
case _, _ = <-stopActions:
actionsLock.Lock()
for pa := range actions {
if pa.a.OnExit {
for range actions[pa] {
execAction(pa.a, pa.p)
}
}
}
actionsLock.Unlock()
wgActions.Done()
return
}
}
}
func MatchesManager() {
2023-10-01 12:00:00 +02:00
var fo FlushMatchOrder
var pft PFT
end := false
for !end {
select {
2023-10-01 12:00:00 +02:00
case fo = <-flushToMatchesC:
matchesManagerHandleFlush(fo)
case fo, ok := <-startupMatchesC:
if !ok {
end = true
} else {
2023-10-01 12:00:00 +02:00
_ = matchesManagerHandleMatch(fo)
}
2023-03-25 19:12:11 +01:00
}
}
for {
select {
2023-10-01 12:00:00 +02:00
case fo = <-flushToMatchesC:
matchesManagerHandleFlush(fo)
case pft = <-matchesC:
2023-04-26 17:18:55 +02:00
entry := LogEntry{pft.t, 0, pft.p, pft.f.stream.name, pft.f.name, 0, false}
2023-03-25 19:12:11 +01:00
entry.Exec = matchesManagerHandleMatch(pft)
2023-03-25 19:12:11 +01:00
logsC <- entry
}
}
}
2023-04-26 17:18:55 +02:00
2023-10-01 12:00:00 +02:00
func matchesManagerHandleFlush(fo FlushMatchOrder) {
2023-10-04 12:00:00 +02:00
ret := make(MatchesMap)
2023-10-01 12:00:00 +02:00
matchesLock.Lock()
for pf := range matches {
2024-02-24 11:42:31 +01:00
if IsStringArrayEqual(fo.p, pf.p) {
2023-10-01 12:00:00 +02:00
if fo.ret != nil {
2023-10-04 12:00:00 +02:00
ret[pf] = matches[pf]
2023-10-01 12:00:00 +02:00
}
delete(matches, pf)
}
}
matchesLock.Unlock()
if fo.ret != nil {
fo.ret <- ret
}
}
func matchesManagerHandleMatch(pft PFT) bool {
2023-10-01 12:00:00 +02:00
matchesLock.Lock()
defer matchesLock.Unlock()
filter, pattern, then := pft.f, pft.p, pft.t
pf := PF{pft.p, pft.f}
if filter.Retry > 1 {
// make sure map exists
2024-02-24 11:42:31 +01:00
if matches[&pf] == nil {
matches[&pf] = make(map[time.Time]struct{})
}
// add new match
2024-02-24 11:42:31 +01:00
matches[&pf][then] = struct{}{}
// remove match when expired
go func(pf PF, then time.Time) {
2023-10-01 12:00:00 +02:00
time.Sleep(then.Sub(time.Now()) + filter.retryDuration)
matchesLock.Lock()
2024-02-24 11:42:31 +01:00
if matches[&pf] != nil {
// FIXME replace this and all similar occurences
// by clear() when switching to go 1.21
2024-02-24 11:42:31 +01:00
delete(matches[&pf], then)
}
matchesLock.Unlock()
}(pf, then)
}
2024-02-24 11:42:31 +01:00
if filter.Retry <= 1 || len(matches[&pf]) >= filter.Retry {
delete(matches, &pf)
2023-10-01 12:00:00 +02:00
filter.sendActions(pattern, then)
return true
}
return false
}
func StreamManager(s *Stream, endedSignal chan *Stream) {
defer wgStreams.Done()
2023-10-12 12:00:00 +02:00
logger.Printf(logger.INFO, "%s: start %s\n", s.name, s.Cmd)
2023-03-24 00:27:51 +01:00
lines := cmdStdout(s.Cmd)
for {
select {
case line, ok := <-lines:
if !ok {
endedSignal <- s
return
}
for _, filter := range s.Filters {
2024-02-24 11:42:31 +01:00
if match := filter.match(line); len(match) > 0 {
matchesC <- PFT{match, filter, time.Now()}
}
}
case _, _ = <-stopStreams:
return
}
}
2023-03-19 23:10:18 +01:00
}
func OutputsManager(c *Conf) {
for outputName := range c.Outputs {
output := c.Outputs[outputName]
output.Stdin = make(chan string)
cmdStdin(output.Start, output.Stdin)
}
}
var actions ActionsMap
var matches MatchesMap
var actionsLock sync.Mutex
var matchesLock sync.Mutex
var stopStreams chan bool
var stopActions chan bool
var wgActions sync.WaitGroup
var wgStreams sync.WaitGroup
2023-09-25 20:42:42 +02:00
/*
2023-10-01 12:00:00 +02:00
<StreamCmds>
2023-09-25 20:42:42 +02:00
StreamManager onstartup:matches
matches MatchesManager logs DatabaseManager ·
actions ActionsManager
SocketManager flushes··
2023-10-01 12:00:00 +02:00
<Clients>
2023-09-25 20:42:42 +02:00
*/
// DatabaseManager → MatchesManager
var startupMatchesC chan PFT
// StreamManager → MatchesManager
var matchesC chan PFT
2023-09-25 20:42:42 +02:00
// MatchesManager → DatabaseManager
var logsC chan LogEntry
2023-09-25 20:42:42 +02:00
// MatchesManager → ActionsManager
var actionsC chan PAT
2023-04-26 17:18:55 +02:00
2023-09-25 20:42:42 +02:00
// SocketManager, DatabaseManager → MatchesManager
2023-10-01 12:00:00 +02:00
var flushToMatchesC chan FlushMatchOrder
2023-09-25 20:42:42 +02:00
// SocketManager → ActionsManager
2023-10-01 12:00:00 +02:00
var flushToActionsC chan FlushActionOrder
2023-09-25 20:42:42 +02:00
// SocketManager → DatabaseManager
var flushToDatabaseC chan LogEntry
func Daemon(confFilename string) {
conf := parseConf(confFilename)
2023-09-06 02:00:33 +02:00
startupMatchesC = make(chan PFT)
2023-09-25 20:42:42 +02:00
matchesC = make(chan PFT)
logsC = make(chan LogEntry)
actionsC = make(chan PAT)
2023-10-01 12:00:00 +02:00
flushToMatchesC = make(chan FlushMatchOrder)
flushToActionsC = make(chan FlushActionOrder)
2023-09-25 20:42:42 +02:00
flushToDatabaseC = make(chan LogEntry)
stopActions = make(chan bool)
stopStreams = make(chan bool)
actions = make(ActionsMap)
matches = make(MatchesMap)
2023-11-23 12:00:00 +01:00
_ = runCommands(conf.Start, "start")
2023-09-09 20:42:47 +02:00
go DatabaseManager(conf)
go OutputsManager(conf)
go MatchesManager()
go ActionsManager(conf.Concurrency)
2023-05-01 18:21:31 +02:00
// Ready to start
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
2023-04-26 17:18:55 +02:00
endSignals := make(chan *Stream)
nbStreamsInExecution := len(conf.Streams)
2023-03-24 00:27:51 +01:00
for _, stream := range conf.Streams {
wgStreams.Add(1)
go StreamManager(stream, endSignals)
}
2023-10-01 12:00:00 +02:00
go SocketManager(conf.Streams)
2023-05-03 20:03:22 +02:00
2023-04-27 10:42:19 +02:00
for {
select {
case finishedStream := <-endSignals:
2023-10-12 12:00:00 +02:00
logger.Printf(logger.ERROR, "%s stream finished", finishedStream.name)
nbStreamsInExecution--
if nbStreamsInExecution == 0 {
2023-11-23 12:00:00 +01:00
quit(conf, false)
2023-04-27 10:42:19 +02:00
}
case <-sigs:
2023-10-12 12:00:00 +02:00
logger.Printf(logger.INFO, "Received SIGINT/SIGTERM, exiting")
2023-11-23 12:00:00 +01:00
quit(conf, true)
2023-04-27 10:42:19 +02:00
}
2023-03-24 00:27:51 +01:00
}
2023-04-27 10:42:19 +02:00
}
2023-11-23 12:00:00 +01:00
func quit(conf *Conf, graceful bool) {
// send stop to StreamManager·s
close(stopStreams)
2023-10-12 12:00:00 +02:00
logger.Println(logger.INFO, "Waiting for Streams to finish...")
wgStreams.Wait()
// ActionsManager calls wgActions.Done() when it has launched all pending actions
wgActions.Add(1)
// send stop to ActionsManager
close(stopActions)
// stop all actions
2023-10-12 12:00:00 +02:00
logger.Println(logger.INFO, "Waiting for Actions to finish...")
wgActions.Wait()
// run stop commands
2023-11-23 12:00:00 +01:00
stopOk := runCommands(conf.Stop, "stop")
2023-05-03 20:03:22 +02:00
// delete pipe
err := os.Remove(*SocketPath)
2023-05-05 15:33:00 +02:00
if err != nil {
2023-10-12 12:00:00 +02:00
logger.Println(logger.ERROR, "Failed to remove socket:", err)
2023-05-05 15:33:00 +02:00
}
2023-11-23 12:00:00 +01:00
if !stopOk || !graceful {
os.Exit(1)
}
os.Exit(0)
2023-03-19 23:10:18 +01:00
}