reaction/app/daemon.go

470 lines
10 KiB
Go
Raw Permalink Normal View History

2023-03-25 18:27:01 +01:00
package app
2023-03-19 23:10:18 +01:00
import (
2023-03-20 23:25:57 +01:00
"bufio"
2023-03-24 17:52:00 +01:00
"os"
2023-03-19 23:10:18 +01:00
"os/exec"
2023-04-27 10:42:19 +02:00
"os/signal"
2023-03-24 17:36:41 +01:00
"strings"
"sync"
2023-10-12 12:00:00 +02:00
"syscall"
2023-03-24 17:36:41 +01:00
"time"
2023-10-12 12:00:00 +02:00
"framagit.org/ppom/reaction/logger"
2023-03-19 23:10:18 +01:00
)
2023-03-24 17:36:41 +01:00
// Executes a command and channel-send its stdout
func cmdStdout(commandline []string) chan *string {
lines := make(chan *string)
2023-03-19 23:10:18 +01:00
2023-03-24 00:27:51 +01:00
go func() {
cmd := exec.Command(commandline[0], commandline[1:]...)
stdout, err := cmd.StdoutPipe()
if err != nil {
2023-10-12 12:00:00 +02:00
logger.Fatalln("couldn't open stdout on command:", err)
}
2023-03-24 00:27:51 +01:00
if err := cmd.Start(); err != nil {
2023-10-12 12:00:00 +02:00
logger.Fatalln("couldn't start command:", err)
2023-03-24 00:27:51 +01:00
}
defer stdout.Close()
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := scanner.Text()
lines <- &line
2023-10-12 12:00:00 +02:00
logger.Println(logger.DEBUG, "stdout:", line)
2023-03-24 00:27:51 +01:00
}
close(lines)
}()
return lines
2023-03-19 23:10:18 +01:00
}
2023-11-23 12:00:00 +01:00
func runCommands(commands [][]string, moment string) bool {
ok := true
for _, command := range commands {
cmd := exec.Command(command[0], command[1:]...)
cmd.WaitDelay = time.Minute
logger.Printf(logger.INFO, "%v command: run %v\n", moment, command)
if err := cmd.Start(); err != nil {
logger.Printf(logger.ERROR, "%v command: run %v: %v", moment, command, err)
2023-11-23 12:00:00 +01:00
ok = false
} else {
err := cmd.Wait()
if err != nil {
logger.Printf(logger.ERROR, "%v command: run %v: %v", moment, command, err)
2023-11-23 12:00:00 +01:00
ok = false
}
}
}
2023-11-23 12:00:00 +01:00
return ok
}
2023-08-21 23:33:56 +02:00
func (p *Pattern) notAnIgnore(match *string) bool {
for _, regex := range p.compiledIgnoreRegex {
if regex.MatchString(*match) {
return false
}
}
2023-08-21 23:33:56 +02:00
for _, ignore := range p.Ignore {
if ignore == *match {
return false
}
}
return true
}
2023-03-24 17:36:41 +01:00
// Whether one of the filter's regexes is matched on a line
func (f *Filter) match(line *string) string {
var result string
2023-03-24 00:27:51 +01:00
for _, regex := range f.compiledRegex {
2023-03-24 17:36:41 +01:00
if matches := regex.FindStringSubmatch(*line); matches != nil {
var pnames []string
for _, p := range f.pattern {
pnames = append(pnames, p.name)
}
2023-03-24 17:36:41 +01:00
for _, p := range f.pattern {
match := matches[regex.SubexpIndex(p.name)]
if p.notAnIgnore(&match) {
logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match)
if len(result) == 0 {
result = match
} else {
result = strings.Join([]string{result, match}, "\x00")
}
}
}
if f.pattern == nil {
// No pattern, so this match will never actually be used
return ""
2023-08-21 23:33:56 +02:00
}
2023-03-24 00:27:51 +01:00
}
2023-03-20 23:25:57 +01:00
}
if len(strings.Split(result, "\x00")) == len(f.pattern) {
2024-03-31 16:58:14 +02:00
return result
} else {
// Incomplete match = no match
return ""
2024-03-31 16:58:14 +02:00
}
2023-03-20 23:25:57 +01:00
}
func (f *Filter) sendActions(match string, at time.Time) {
2023-03-24 00:27:51 +01:00
for _, a := range f.Actions {
actionsC <- PAT{match, a, at.Add(a.afterDuration)}
2023-03-19 23:10:18 +01:00
}
2023-03-24 00:27:51 +01:00
}
2023-03-20 23:25:57 +01:00
func (a *Action) exec(match string) {
defer wgActions.Done()
var computedCommand []string
if a.filter.pattern != nil {
computedCommand = make([]string, 0, len(a.Cmd))
matches := strings.Split(match, "\x00")
for _, item := range a.Cmd {
for i, p := range a.filter.pattern {
item = strings.ReplaceAll(item, p.nameWithBraces, matches[i])
}
computedCommand = append(computedCommand, item)
}
} else {
computedCommand = a.Cmd
}
2023-03-24 17:36:41 +01:00
logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand)
2023-03-24 17:36:41 +01:00
cmd := exec.Command(computedCommand[0], computedCommand[1:]...)
2023-03-24 18:06:57 +01:00
if ret := cmd.Run(); ret != nil {
logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret)
}
2023-03-19 23:10:18 +01:00
}
func ActionsManager(concurrency int) {
// concurrency init
execActionsC := make(chan PA)
if concurrency > 0 {
for i := 0; i < concurrency; i++ {
go func() {
var pa PA
for {
pa = <-execActionsC
pa.a.exec(pa.p)
}
}()
}
} else {
go func() {
var pa PA
for {
pa = <-execActionsC
go func(pa PA) {
pa.a.exec(pa.p)
}(pa)
}
}()
}
execAction := func(a *Action, p string) {
wgActions.Add(1)
execActionsC <- PA{p, a}
}
// main
pendingActionsC := make(chan PAT)
for {
select {
2023-10-01 12:00:00 +02:00
case pat := <-actionsC:
pa := PA{pat.p, pat.a}
pattern, action, then := pat.p, pat.a, pat.t
now := time.Now()
// check if must be executed now
2023-09-23 00:14:20 +02:00
if then.Compare(now) <= 0 {
execAction(action, pattern)
} else {
actionsLock.Lock()
if actions[pa] == nil {
actions[pa] = make(map[time.Time]struct{})
2023-10-01 12:00:00 +02:00
}
actions[pa][then] = struct{}{}
actionsLock.Unlock()
2023-10-01 12:00:00 +02:00
go func(insidePat PAT, insideNow time.Time) {
time.Sleep(insidePat.t.Sub(insideNow))
pendingActionsC <- insidePat
2023-09-23 00:14:20 +02:00
}(pat, now)
}
2023-10-01 12:00:00 +02:00
case pat := <-pendingActionsC:
pa := PA{pat.p, pat.a}
pattern, action, then := pat.p, pat.a, pat.t
actionsLock.Lock()
if actions[pa] != nil {
delete(actions[pa], then)
2023-10-01 12:00:00 +02:00
}
actionsLock.Unlock()
execAction(action, pattern)
2023-10-01 12:00:00 +02:00
case fo := <-flushToActionsC:
2023-10-04 12:00:00 +02:00
ret := make(ActionsMap)
2023-10-01 12:00:00 +02:00
actionsLock.Lock()
for pa := range actions {
if pa.p == fo.p {
2023-10-01 12:00:00 +02:00
for range actions[pa] {
execAction(pa.a, pa.p)
2023-10-01 12:00:00 +02:00
}
2023-10-04 12:00:00 +02:00
ret[pa] = actions[pa]
2023-10-01 12:00:00 +02:00
delete(actions, pa)
}
}
actionsLock.Unlock()
2023-10-01 12:00:00 +02:00
fo.ret <- ret
case _, _ = <-stopActions:
actionsLock.Lock()
for pa := range actions {
if pa.a.OnExit {
for range actions[pa] {
execAction(pa.a, pa.p)
}
}
}
actionsLock.Unlock()
wgActions.Done()
return
}
}
}
func MatchesManager() {
2023-10-01 12:00:00 +02:00
var fo FlushMatchOrder
var pft PFT
end := false
for !end {
select {
2023-10-01 12:00:00 +02:00
case fo = <-flushToMatchesC:
matchesManagerHandleFlush(fo)
case fo, ok := <-startupMatchesC:
if !ok {
end = true
} else {
2023-10-01 12:00:00 +02:00
_ = matchesManagerHandleMatch(fo)
}
2023-03-25 19:12:11 +01:00
}
}
for {
select {
2023-10-01 12:00:00 +02:00
case fo = <-flushToMatchesC:
matchesManagerHandleFlush(fo)
case pft = <-matchesC:
2023-04-26 17:18:55 +02:00
entry := LogEntry{pft.t, 0, strings.Join(strings.Split(pft.p, "\x00"), " / "), pft.f.stream.name, pft.f.name, 0, false}
2023-03-25 19:12:11 +01:00
entry.Exec = matchesManagerHandleMatch(pft)
2023-03-25 19:12:11 +01:00
logsC <- entry
}
}
}
2023-04-26 17:18:55 +02:00
2023-10-01 12:00:00 +02:00
func matchesManagerHandleFlush(fo FlushMatchOrder) {
2023-10-04 12:00:00 +02:00
ret := make(MatchesMap)
2023-10-01 12:00:00 +02:00
matchesLock.Lock()
for pf := range matches {
if fo.p == pf.p {
2023-10-01 12:00:00 +02:00
if fo.ret != nil {
2023-10-04 12:00:00 +02:00
ret[pf] = matches[pf]
2023-10-01 12:00:00 +02:00
}
delete(matches, pf)
}
}
matchesLock.Unlock()
if fo.ret != nil {
fo.ret <- ret
}
}
func matchesManagerHandleMatch(pft PFT) bool {
2023-10-01 12:00:00 +02:00
matchesLock.Lock()
defer matchesLock.Unlock()
filter, patterns, then := pft.f, pft.p, pft.t
pf := PF{pft.p, pft.f}
if filter.Retry > 1 {
// make sure map exists
if matches[pf] == nil {
matches[pf] = make(map[time.Time]struct{})
}
// add new match
matches[pf][then] = struct{}{}
// remove match when expired
go func(pf PF, then time.Time) {
2023-10-01 12:00:00 +02:00
time.Sleep(then.Sub(time.Now()) + filter.retryDuration)
matchesLock.Lock()
if matches[pf] != nil {
// FIXME replace this and all similar occurences
// by clear() when switching to go 1.21
delete(matches[pf], then)
}
matchesLock.Unlock()
}(pf, then)
}
if filter.Retry <= 1 || len(matches[pf]) >= filter.Retry {
delete(matches, pf)
filter.sendActions(patterns, then)
return true
}
return false
}
func StreamManager(s *Stream, endedSignal chan *Stream) {
defer wgStreams.Done()
2023-10-12 12:00:00 +02:00
logger.Printf(logger.INFO, "%s: start %s\n", s.name, s.Cmd)
2023-03-24 00:27:51 +01:00
lines := cmdStdout(s.Cmd)
for {
select {
case line, ok := <-lines:
if !ok {
endedSignal <- s
return
}
for _, filter := range s.Filters {
if match := filter.match(line); len(match) > 0 {
matchesC <- PFT{match, filter, time.Now()}
}
}
case _, _ = <-stopStreams:
return
}
}
2023-03-19 23:10:18 +01:00
}
var actions ActionsMap
var matches MatchesMap
var actionsLock sync.Mutex
var matchesLock sync.Mutex
var stopStreams chan bool
var stopActions chan bool
var wgActions sync.WaitGroup
var wgStreams sync.WaitGroup
2023-09-25 20:42:42 +02:00
/*
2023-10-01 12:00:00 +02:00
<StreamCmds>
2023-09-25 20:42:42 +02:00
StreamManager onstartup:matches
matches MatchesManager logs DatabaseManager ·
actions ActionsManager
SocketManager flushes··
2023-10-01 12:00:00 +02:00
<Clients>
2023-09-25 20:42:42 +02:00
*/
// DatabaseManager → MatchesManager
var startupMatchesC chan PFT
// StreamManager → MatchesManager
var matchesC chan PFT
2023-09-25 20:42:42 +02:00
// MatchesManager → DatabaseManager
var logsC chan LogEntry
2023-09-25 20:42:42 +02:00
// MatchesManager → ActionsManager
var actionsC chan PAT
2023-04-26 17:18:55 +02:00
2023-09-25 20:42:42 +02:00
// SocketManager, DatabaseManager → MatchesManager
2023-10-01 12:00:00 +02:00
var flushToMatchesC chan FlushMatchOrder
2023-09-25 20:42:42 +02:00
// SocketManager → ActionsManager
2023-10-01 12:00:00 +02:00
var flushToActionsC chan FlushActionOrder
2023-09-25 20:42:42 +02:00
// SocketManager → DatabaseManager
var flushToDatabaseC chan LogEntry
func Daemon(confFilename string) {
conf := parseConf(confFilename)
2023-09-06 02:00:33 +02:00
startupMatchesC = make(chan PFT)
2023-09-25 20:42:42 +02:00
matchesC = make(chan PFT)
logsC = make(chan LogEntry)
actionsC = make(chan PAT)
2023-10-01 12:00:00 +02:00
flushToMatchesC = make(chan FlushMatchOrder)
flushToActionsC = make(chan FlushActionOrder)
2023-09-25 20:42:42 +02:00
flushToDatabaseC = make(chan LogEntry)
stopActions = make(chan bool)
stopStreams = make(chan bool)
actions = make(ActionsMap)
matches = make(MatchesMap)
2023-11-23 12:00:00 +01:00
_ = runCommands(conf.Start, "start")
2023-09-09 20:42:47 +02:00
go DatabaseManager(conf)
go MatchesManager()
go ActionsManager(conf.Concurrency)
2023-05-01 18:21:31 +02:00
// Ready to start
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
2023-04-26 17:18:55 +02:00
endSignals := make(chan *Stream)
nbStreamsInExecution := len(conf.Streams)
2023-03-24 00:27:51 +01:00
for _, stream := range conf.Streams {
wgStreams.Add(1)
go StreamManager(stream, endSignals)
}
2023-10-01 12:00:00 +02:00
go SocketManager(conf.Streams)
2023-05-03 20:03:22 +02:00
2023-04-27 10:42:19 +02:00
for {
select {
case finishedStream := <-endSignals:
2023-10-12 12:00:00 +02:00
logger.Printf(logger.ERROR, "%s stream finished", finishedStream.name)
nbStreamsInExecution--
if nbStreamsInExecution == 0 {
2023-11-23 12:00:00 +01:00
quit(conf, false)
2023-04-27 10:42:19 +02:00
}
case <-sigs:
2023-10-12 12:00:00 +02:00
logger.Printf(logger.INFO, "Received SIGINT/SIGTERM, exiting")
2023-11-23 12:00:00 +01:00
quit(conf, true)
2023-04-27 10:42:19 +02:00
}
2023-03-24 00:27:51 +01:00
}
2023-04-27 10:42:19 +02:00
}
2023-11-23 12:00:00 +01:00
func quit(conf *Conf, graceful bool) {
// send stop to StreamManager·s
close(stopStreams)
2023-10-12 12:00:00 +02:00
logger.Println(logger.INFO, "Waiting for Streams to finish...")
wgStreams.Wait()
// ActionsManager calls wgActions.Done() when it has launched all pending actions
wgActions.Add(1)
// send stop to ActionsManager
close(stopActions)
// stop all actions
2023-10-12 12:00:00 +02:00
logger.Println(logger.INFO, "Waiting for Actions to finish...")
wgActions.Wait()
// run stop commands
2023-11-23 12:00:00 +01:00
stopOk := runCommands(conf.Stop, "stop")
2023-05-03 20:03:22 +02:00
// delete pipe
err := os.Remove(*SocketPath)
2023-05-05 15:33:00 +02:00
if err != nil {
2023-10-12 12:00:00 +02:00
logger.Println(logger.ERROR, "Failed to remove socket:", err)
2023-05-05 15:33:00 +02:00
}
2023-11-23 12:00:00 +01:00
if !stopOk || !graceful {
os.Exit(1)
}
os.Exit(0)
2023-03-19 23:10:18 +01:00
}