reaction/app/daemon.go

402 lines
9.0 KiB
Go
Raw Normal View History

2023-03-25 18:27:01 +01:00
package app
2023-03-19 23:10:18 +01:00
import (
2023-03-20 23:25:57 +01:00
"bufio"
2023-03-24 17:52:00 +01:00
"os"
2023-03-19 23:10:18 +01:00
"os/exec"
2023-04-27 10:42:19 +02:00
"os/signal"
2023-03-24 17:36:41 +01:00
"strings"
"sync"
2023-10-12 12:00:00 +02:00
"syscall"
2023-03-24 17:36:41 +01:00
"time"
2023-10-12 12:00:00 +02:00
"framagit.org/ppom/reaction/logger"
2023-03-19 23:10:18 +01:00
)
2023-03-24 17:36:41 +01:00
// Executes a command and channel-send its stdout
func cmdStdout(commandline []string) chan *string {
lines := make(chan *string)
2023-03-19 23:10:18 +01:00
2023-03-24 00:27:51 +01:00
go func() {
cmd := exec.Command(commandline[0], commandline[1:]...)
stdout, err := cmd.StdoutPipe()
if err != nil {
2023-10-12 12:00:00 +02:00
logger.Fatalln("couldn't open stdout on command:", err)
}
2023-03-24 00:27:51 +01:00
if err := cmd.Start(); err != nil {
2023-10-12 12:00:00 +02:00
logger.Fatalln("couldn't start command:", err)
2023-03-24 00:27:51 +01:00
}
defer stdout.Close()
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := scanner.Text()
lines <- &line
2023-10-12 12:00:00 +02:00
logger.Println(logger.DEBUG, "stdout:", line)
2023-03-24 00:27:51 +01:00
}
close(lines)
}()
return lines
2023-03-19 23:10:18 +01:00
}
func runCommands(commands [][]string, moment string) {
for _, command := range commands {
cmd := exec.Command(command[0], command[1:]...)
cmd.WaitDelay = time.Minute
logger.Printf(logger.INFO, "%v command: run %v\n", moment, command)
if err := cmd.Start(); err != nil {
logger.Printf(logger.ERROR, "%v command: run %v: %v", moment, command, err)
} else {
err := cmd.Wait()
if err != nil {
logger.Printf(logger.ERROR, "%v command: run %v: %v", moment, command, err)
}
}
}
}
2023-08-21 23:33:56 +02:00
func (p *Pattern) notAnIgnore(match *string) bool {
for _, ignore := range p.Ignore {
if ignore == *match {
return false
}
}
return true
}
2023-03-24 17:36:41 +01:00
// Whether one of the filter's regexes is matched on a line
func (f *Filter) match(line *string) string {
2023-03-24 00:27:51 +01:00
for _, regex := range f.compiledRegex {
2023-03-24 17:36:41 +01:00
if matches := regex.FindStringSubmatch(*line); matches != nil {
2023-03-24 17:36:41 +01:00
2023-08-21 23:33:56 +02:00
match := matches[regex.SubexpIndex(f.pattern.name)]
2023-03-24 17:36:41 +01:00
2023-08-21 23:33:56 +02:00
if f.pattern.notAnIgnore(&match) {
2023-10-12 12:00:00 +02:00
logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match)
2023-08-21 23:33:56 +02:00
return match
}
2023-03-24 00:27:51 +01:00
}
2023-03-20 23:25:57 +01:00
}
2023-03-24 17:36:41 +01:00
return ""
2023-03-20 23:25:57 +01:00
}
func (f *Filter) sendActions(match string, at time.Time) {
2023-03-24 00:27:51 +01:00
for _, a := range f.Actions {
actionsC <- PAT{match, a, at.Add(a.afterDuration)}
2023-03-19 23:10:18 +01:00
}
2023-03-24 00:27:51 +01:00
}
2023-03-20 23:25:57 +01:00
func (a *Action) exec(match string) {
2023-10-01 12:00:00 +02:00
wgActions.Add(1)
go func() {
defer wgActions.Done()
2023-10-01 12:00:00 +02:00
computedCommand := make([]string, 0, len(a.Cmd))
for _, item := range a.Cmd {
computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match))
}
2023-03-24 17:36:41 +01:00
2023-10-12 12:00:00 +02:00
logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand)
2023-03-24 17:36:41 +01:00
2023-10-01 12:00:00 +02:00
cmd := exec.Command(computedCommand[0], computedCommand[1:]...)
2023-03-24 18:06:57 +01:00
2023-10-01 12:00:00 +02:00
if ret := cmd.Run(); ret != nil {
2023-10-12 12:00:00 +02:00
logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret)
2023-10-01 12:00:00 +02:00
}
}()
2023-03-19 23:10:18 +01:00
}
func ActionsManager() {
pendingActionsC := make(chan PAT)
for {
select {
2023-10-01 12:00:00 +02:00
case pat := <-actionsC:
pa := PA{pat.p, pat.a}
pattern, action, then := pat.p, pat.a, pat.t
now := time.Now()
// check if must be executed now
2023-09-23 00:14:20 +02:00
if then.Compare(now) <= 0 {
2023-10-01 12:00:00 +02:00
action.exec(pattern)
} else {
actionsLock.Lock()
2023-10-01 12:00:00 +02:00
if actions[pa] == nil {
actions[pa] = make(map[time.Time]struct{})
}
actions[PA{pattern, action}][then] = struct{}{}
actionsLock.Unlock()
2023-10-01 12:00:00 +02:00
go func(insidePat PAT, insideNow time.Time) {
time.Sleep(insidePat.t.Sub(insideNow))
pendingActionsC <- insidePat
2023-09-23 00:14:20 +02:00
}(pat, now)
}
2023-10-01 12:00:00 +02:00
case pat := <-pendingActionsC:
pa := PA{pat.p, pat.a}
pattern, action, then := pat.p, pat.a, pat.t
actionsLock.Lock()
2023-10-01 12:00:00 +02:00
if actions[pa] != nil {
if _, ok := actions[pa][then]; ok {
delete(actions[pa], then)
action.exec(pattern)
}
}
actionsLock.Unlock()
action.exec(pattern)
case fo := <-flushToActionsC:
2023-10-04 12:00:00 +02:00
ret := make(ActionsMap)
2023-10-01 12:00:00 +02:00
actionsLock.Lock()
for pa := range actions {
if pa.p == fo.p {
for range actions[pa] {
pa.a.exec(pa.p)
}
2023-10-04 12:00:00 +02:00
ret[pa] = actions[pa]
2023-10-01 12:00:00 +02:00
delete(actions, pa)
}
}
actionsLock.Unlock()
2023-10-01 12:00:00 +02:00
fo.ret <- ret
case _, _ = <-stopActions:
actionsLock.Lock()
for pat := range actions {
if pat.a.OnExit {
2023-10-01 12:00:00 +02:00
pat.a.exec(pat.p)
}
}
actionsLock.Unlock()
wgActions.Done()
return
}
}
}
func MatchesManager() {
2023-10-01 12:00:00 +02:00
var fo FlushMatchOrder
var pft PFT
end := false
for !end {
select {
2023-10-01 12:00:00 +02:00
case fo = <-flushToMatchesC:
matchesManagerHandleFlush(fo)
case fo, ok := <-startupMatchesC:
if !ok {
end = true
} else {
2023-10-01 12:00:00 +02:00
_ = matchesManagerHandleMatch(fo)
}
2023-03-25 19:12:11 +01:00
}
}
for {
select {
2023-10-01 12:00:00 +02:00
case fo = <-flushToMatchesC:
matchesManagerHandleFlush(fo)
case pft = <-matchesC:
2023-04-26 17:18:55 +02:00
entry := LogEntry{pft.t, 0, pft.p, pft.f.stream.name, pft.f.name, 0, false}
2023-03-25 19:12:11 +01:00
entry.Exec = matchesManagerHandleMatch(pft)
2023-03-25 19:12:11 +01:00
logsC <- entry
}
}
}
2023-04-26 17:18:55 +02:00
2023-10-01 12:00:00 +02:00
func matchesManagerHandleFlush(fo FlushMatchOrder) {
2023-10-04 12:00:00 +02:00
ret := make(MatchesMap)
2023-10-01 12:00:00 +02:00
matchesLock.Lock()
for pf := range matches {
if fo.p == pf.p {
if fo.ret != nil {
2023-10-04 12:00:00 +02:00
ret[pf] = matches[pf]
2023-10-01 12:00:00 +02:00
}
delete(matches, pf)
}
}
matchesLock.Unlock()
if fo.ret != nil {
fo.ret <- ret
}
}
func matchesManagerHandleMatch(pft PFT) bool {
2023-10-01 12:00:00 +02:00
matchesLock.Lock()
defer matchesLock.Unlock()
filter, pattern, then := pft.f, pft.p, pft.t
pf := PF{pft.p, pft.f}
if filter.Retry > 1 {
// make sure map exists
if matches[pf] == nil {
matches[pf] = make(map[time.Time]struct{})
}
// add new match
matches[pf][then] = struct{}{}
// remove match when expired
go func(pf PF, then time.Time) {
2023-10-01 12:00:00 +02:00
time.Sleep(then.Sub(time.Now()) + filter.retryDuration)
matchesLock.Lock()
if matches[pf] != nil {
// FIXME replace this and all similar occurences
// by clear() when switching to go 1.21
delete(matches[pf], then)
}
matchesLock.Unlock()
}(pf, then)
}
if filter.Retry <= 1 || len(matches[pf]) >= filter.Retry {
delete(matches, pf)
2023-10-01 12:00:00 +02:00
filter.sendActions(pattern, then)
return true
}
return false
}
func StreamManager(s *Stream, endedSignal chan *Stream) {
defer wgStreams.Done()
2023-10-12 12:00:00 +02:00
logger.Printf(logger.INFO, "%s: start %s\n", s.name, s.Cmd)
2023-03-24 00:27:51 +01:00
lines := cmdStdout(s.Cmd)
for {
select {
case line, ok := <-lines:
if !ok {
endedSignal <- s
return
}
for _, filter := range s.Filters {
if match := filter.match(line); match != "" {
matchesC <- PFT{match, filter, time.Now()}
}
}
case _, _ = <-stopStreams:
return
}
}
2023-03-19 23:10:18 +01:00
}
var actions ActionsMap
var matches MatchesMap
var actionsLock sync.Mutex
var matchesLock sync.Mutex
var stopStreams chan bool
var stopActions chan bool
var wgActions sync.WaitGroup
var wgStreams sync.WaitGroup
2023-09-25 20:42:42 +02:00
/*
2023-10-01 12:00:00 +02:00
<StreamCmds>
2023-09-25 20:42:42 +02:00
StreamManager onstartup:matches
matches MatchesManager logs DatabaseManager ·
actions ActionsManager
SocketManager flushes··
2023-10-01 12:00:00 +02:00
<Clients>
2023-09-25 20:42:42 +02:00
*/
// DatabaseManager → MatchesManager
var startupMatchesC chan PFT
// StreamManager → MatchesManager
var matchesC chan PFT
2023-09-25 20:42:42 +02:00
// MatchesManager → DatabaseManager
var logsC chan LogEntry
2023-09-25 20:42:42 +02:00
// MatchesManager → ActionsManager
var actionsC chan PAT
2023-04-26 17:18:55 +02:00
2023-09-25 20:42:42 +02:00
// SocketManager, DatabaseManager → MatchesManager
2023-10-01 12:00:00 +02:00
var flushToMatchesC chan FlushMatchOrder
2023-09-25 20:42:42 +02:00
// SocketManager → ActionsManager
2023-10-01 12:00:00 +02:00
var flushToActionsC chan FlushActionOrder
2023-09-25 20:42:42 +02:00
// SocketManager → DatabaseManager
var flushToDatabaseC chan LogEntry
func Daemon(confFilename string) {
conf := parseConf(confFilename)
2023-09-06 02:00:33 +02:00
startupMatchesC = make(chan PFT)
2023-09-25 20:42:42 +02:00
matchesC = make(chan PFT)
logsC = make(chan LogEntry)
actionsC = make(chan PAT)
2023-10-01 12:00:00 +02:00
flushToMatchesC = make(chan FlushMatchOrder)
flushToActionsC = make(chan FlushActionOrder)
2023-09-25 20:42:42 +02:00
flushToDatabaseC = make(chan LogEntry)
stopActions = make(chan bool)
stopStreams = make(chan bool)
actions = make(ActionsMap)
matches = make(MatchesMap)
runCommands(conf.Start, "start")
2023-09-09 20:42:47 +02:00
go DatabaseManager(conf)
go MatchesManager()
go ActionsManager()
2023-05-01 18:21:31 +02:00
// Ready to start
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
2023-04-26 17:18:55 +02:00
endSignals := make(chan *Stream)
nbStreamsInExecution := len(conf.Streams)
2023-03-24 00:27:51 +01:00
for _, stream := range conf.Streams {
wgStreams.Add(1)
go StreamManager(stream, endSignals)
}
2023-10-01 12:00:00 +02:00
go SocketManager(conf.Streams)
2023-05-03 20:03:22 +02:00
2023-04-27 10:42:19 +02:00
for {
select {
case finishedStream := <-endSignals:
2023-10-12 12:00:00 +02:00
logger.Printf(logger.ERROR, "%s stream finished", finishedStream.name)
nbStreamsInExecution--
if nbStreamsInExecution == 0 {
quit(conf)
2023-04-27 10:42:19 +02:00
}
case <-sigs:
2023-10-12 12:00:00 +02:00
logger.Printf(logger.INFO, "Received SIGINT/SIGTERM, exiting")
quit(conf)
2023-04-27 10:42:19 +02:00
}
2023-03-24 00:27:51 +01:00
}
2023-04-27 10:42:19 +02:00
}
func quit(conf *Conf) {
// send stop to StreamManager·s
close(stopStreams)
2023-10-12 12:00:00 +02:00
logger.Println(logger.INFO, "Waiting for Streams to finish...")
wgStreams.Wait()
// ActionsManager calls wgActions.Done() when it has launched all pending actions
wgActions.Add(1)
// send stop to ActionsManager
close(stopActions)
// stop all actions
2023-10-12 12:00:00 +02:00
logger.Println(logger.INFO, "Waiting for Actions to finish...")
wgActions.Wait()
// run stop commands
runCommands(conf.Stop, "stop")
2023-05-03 20:03:22 +02:00
// delete pipe
err := os.Remove(*SocketPath)
2023-05-05 15:33:00 +02:00
if err != nil {
2023-10-12 12:00:00 +02:00
logger.Println(logger.ERROR, "Failed to remove socket:", err)
2023-05-05 15:33:00 +02:00
}
os.Exit(3)
2023-03-19 23:10:18 +01:00
}