reaction/app/reaction.go

261 lines
5.3 KiB
Go
Raw Normal View History

2023-03-25 18:27:01 +01:00
package app
2023-03-19 23:10:18 +01:00
import (
2023-03-20 23:25:57 +01:00
"bufio"
2023-04-26 17:18:55 +02:00
"encoding/gob"
2023-03-24 17:52:00 +01:00
"flag"
2023-04-27 10:42:19 +02:00
"syscall"
2023-03-25 19:12:11 +01:00
// "fmt"
2023-03-19 23:10:18 +01:00
"log"
2023-03-24 17:52:00 +01:00
"os"
2023-03-19 23:10:18 +01:00
"os/exec"
2023-04-27 10:42:19 +02:00
"os/signal"
2023-03-24 17:36:41 +01:00
"strings"
"sync"
2023-03-24 17:36:41 +01:00
"time"
2023-03-19 23:10:18 +01:00
)
2023-03-24 17:36:41 +01:00
// Executes a command and channel-send its stdout
func cmdStdout(commandline []string) chan *string {
lines := make(chan *string)
2023-03-19 23:10:18 +01:00
2023-03-24 00:27:51 +01:00
go func() {
cmd := exec.Command(commandline[0], commandline[1:]...)
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Fatalln("FATAL couldn't open stdout on command:", err)
2023-03-24 00:27:51 +01:00
}
if err := cmd.Start(); err != nil {
log.Fatalln("FATAL couldn't start command:", err)
2023-03-24 00:27:51 +01:00
}
defer stdout.Close()
2023-03-20 23:25:57 +01:00
2023-03-24 00:27:51 +01:00
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := scanner.Text()
lines <- &line
2023-03-24 00:27:51 +01:00
}
close(lines)
}()
return lines
2023-03-19 23:10:18 +01:00
}
2023-08-21 23:33:56 +02:00
func (p *Pattern) notAnIgnore(match *string) bool {
for _, ignore := range p.Ignore {
if ignore == *match {
return false
}
}
return true
}
2023-03-24 17:36:41 +01:00
// Whether one of the filter's regexes is matched on a line
func (f *Filter) match(line *string) string {
2023-03-24 00:27:51 +01:00
for _, regex := range f.compiledRegex {
2023-03-24 17:36:41 +01:00
if matches := regex.FindStringSubmatch(*line); matches != nil {
2023-03-24 17:36:41 +01:00
2023-08-21 23:33:56 +02:00
match := matches[regex.SubexpIndex(f.pattern.name)]
2023-03-24 17:36:41 +01:00
2023-08-21 23:33:56 +02:00
if f.pattern.notAnIgnore(&match) {
log.Printf("INFO %s.%s: match [%v]\n", f.stream.name, f.name, match)
return match
}
2023-03-24 00:27:51 +01:00
}
2023-03-20 23:25:57 +01:00
}
2023-03-24 17:36:41 +01:00
return ""
2023-03-20 23:25:57 +01:00
}
2023-04-26 17:18:55 +02:00
func (f *Filter) execActions(match string, advance time.Duration) {
2023-03-24 00:27:51 +01:00
for _, a := range f.Actions {
wgActions.Add(1)
2023-04-26 17:18:55 +02:00
go a.exec(match, advance)
2023-03-19 23:10:18 +01:00
}
2023-03-24 00:27:51 +01:00
}
2023-03-20 23:25:57 +01:00
func sleep(d time.Duration) chan bool {
c := make(chan bool)
go func() {
time.Sleep(d)
c <- true
close(c)
}()
return c
}
2023-04-26 17:18:55 +02:00
func (a *Action) exec(match string, advance time.Duration) {
defer wgActions.Done()
2023-07-12 17:45:16 +02:00
doExec := true
2023-05-03 20:03:22 +02:00
// Wait for either end of sleep time, or actionStore requesting stop
2023-04-26 17:18:55 +02:00
if a.afterDuration != 0 && a.afterDuration > advance {
2023-05-03 20:03:22 +02:00
stopAction := actionStore.Register(a, match)
select {
case <-sleep(a.afterDuration - advance):
// no-op
2023-07-12 17:45:16 +02:00
case doExec = <-stopAction:
// no-op
}
2023-05-03 20:03:22 +02:00
// Let's not wait for the lock
go actionStore.Unregister(a, match, stopAction)
2023-03-24 00:49:59 +01:00
}
2023-03-20 23:25:57 +01:00
2023-07-12 17:45:16 +02:00
if doExec {
computedCommand := make([]string, 0, len(a.Cmd))
for _, item := range a.Cmd {
2023-08-21 23:33:56 +02:00
computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match))
2023-07-12 17:45:16 +02:00
}
2023-03-24 17:36:41 +01:00
2023-07-12 17:45:16 +02:00
log.Printf("INFO %s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand)
2023-03-24 17:36:41 +01:00
2023-07-12 17:45:16 +02:00
cmd := exec.Command(computedCommand[0], computedCommand[1:]...)
2023-03-24 18:06:57 +01:00
2023-07-12 17:45:16 +02:00
if ret := cmd.Run(); ret != nil {
log.Printf("ERROR %s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret)
}
2023-03-19 23:10:18 +01:00
}
}
2023-03-25 19:12:11 +01:00
func (f *Filter) cleanOldMatches(match string) {
now := time.Now()
newMatches := make([]time.Time, 0, len(f.matches[match]))
for _, old := range f.matches[match] {
if old.Add(f.retryDuration).After(now) {
newMatches = append(newMatches, old)
}
}
f.matches[match] = newMatches
}
func (f *Filter) handle() chan *string {
lines := make(chan *string)
go func() {
for line := range lines {
if match := f.match(line); match != "" {
2023-03-25 19:12:11 +01:00
2023-04-26 17:18:55 +02:00
entry := LogEntry{time.Now(), match, f.stream.name, f.name, false}
if f.Retry > 1 {
f.cleanOldMatches(match)
2023-03-25 19:12:11 +01:00
f.matches[match] = append(f.matches[match], time.Now())
}
2023-03-25 19:12:11 +01:00
if f.Retry <= 1 || len(f.matches[match]) >= f.Retry {
2023-04-27 10:42:19 +02:00
entry.Exec = true
2023-04-26 17:18:55 +02:00
delete(f.matches, match)
2023-04-27 10:42:19 +02:00
f.execActions(match, time.Duration(0))
2023-03-25 19:12:11 +01:00
}
2023-04-26 17:18:55 +02:00
db.Encode(&entry)
}
}
}()
return lines
}
func (s *Stream) handle(endedSignal chan *Stream) {
2023-04-27 12:33:56 +02:00
log.Printf("INFO %s: start %s\n", s.name, s.Cmd)
2023-03-24 00:27:51 +01:00
lines := cmdStdout(s.Cmd)
var filterInputs = make([]chan *string, 0, len(s.Filters))
for _, filter := range s.Filters {
filterInputs = append(filterInputs, filter.handle())
2023-03-24 00:27:51 +01:00
}
defer func() {
for _, filterInput := range filterInputs {
close(filterInput)
}
}()
for {
select {
case line, ok := <-lines:
if !ok {
endedSignal <- s
return
}
for _, filterInput := range filterInputs {
filterInput <- line
}
case _, ok := <-stopStreams:
if !ok {
return
}
}
}
2023-03-19 23:10:18 +01:00
}
var stopStreams chan bool
2023-05-03 20:03:22 +02:00
var actionStore ActionStore
var wgActions sync.WaitGroup
2023-04-27 10:42:19 +02:00
var db *gob.Encoder
2023-04-26 17:18:55 +02:00
func Main() {
2023-03-24 17:52:00 +01:00
confFilename := flag.String("c", "", "configuration file. see an example at https://framagit.org/ppom/reaction/-/blob/main/reaction.yml")
flag.Parse()
if *confFilename == "" {
flag.PrintDefaults()
os.Exit(2)
}
2023-05-03 20:03:22 +02:00
actionStore.store = make(ActionMap)
2023-05-01 18:21:31 +02:00
conf := parseConf(*confFilename)
db = conf.updateFromDB()
// Ready to start
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
2023-04-26 17:18:55 +02:00
stopStreams = make(chan bool)
endSignals := make(chan *Stream)
2023-04-27 10:42:19 +02:00
noStreamsInExecution := len(conf.Streams)
2023-03-24 00:27:51 +01:00
for _, stream := range conf.Streams {
go stream.handle(endSignals)
}
2023-05-03 20:03:22 +02:00
go Serve()
2023-04-27 10:42:19 +02:00
for {
select {
case finishedStream := <-endSignals:
2023-04-27 12:33:56 +02:00
log.Printf("ERROR %s stream finished", finishedStream.name)
2023-04-27 10:42:19 +02:00
noStreamsInExecution--
if noStreamsInExecution == 0 {
quit()
}
case <-sigs:
log.Printf("INFO Received SIGINT/SIGTERM, exiting")
2023-04-27 10:42:19 +02:00
quit()
}
2023-03-24 00:27:51 +01:00
}
2023-04-27 10:42:19 +02:00
}
2023-04-27 10:42:19 +02:00
func quit() {
// stop all streams
close(stopStreams)
// stop all actions
2023-05-03 20:03:22 +02:00
actionStore.Quit()
// wait for them to complete
wgActions.Wait()
2023-05-03 20:03:22 +02:00
// delete pipe
2023-05-05 15:33:00 +02:00
err := os.Remove(SocketPath)
if err != nil {
log.Println("Failed to remove socket:", err)
}
os.Exit(3)
2023-03-19 23:10:18 +01:00
}