1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							@ -0,0 +1 @@
 | 
			
		||||
/reaction.db
 | 
			
		||||
							
								
								
									
										80
									
								
								app/conf.go
									
									
									
									
									
								
							
							
						
						
									
										80
									
								
								app/conf.go
									
									
									
									
									
								
							@ -2,6 +2,7 @@ package app
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/gob"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
	"log"
 | 
			
		||||
@ -55,10 +56,10 @@ type Action struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type LogEntry struct {
 | 
			
		||||
	t              time.Time
 | 
			
		||||
	pattern        string
 | 
			
		||||
	stream, filter string
 | 
			
		||||
	exec           bool
 | 
			
		||||
	T              time.Time
 | 
			
		||||
	Pattern        string
 | 
			
		||||
	Stream, Filter string
 | 
			
		||||
	Exec           bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Conf) setup() {
 | 
			
		||||
@ -135,6 +136,9 @@ func (c *Conf) setup() {
 | 
			
		||||
					}
 | 
			
		||||
					action.afterDuration = afterDuration
 | 
			
		||||
				}
 | 
			
		||||
				if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() {
 | 
			
		||||
					filter.longuestActionDuration = &action.afterDuration
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -143,22 +147,40 @@ func (c *Conf) setup() {
 | 
			
		||||
var DBname = "./reaction.db"
 | 
			
		||||
var DBnewName = "./reaction.new.db"
 | 
			
		||||
 | 
			
		||||
func (c *Conf) updateFromDB() {
 | 
			
		||||
func (c *Conf) updateFromDB() *gob.Encoder {
 | 
			
		||||
	file, err := os.Open(DBname)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if err == os.ErrNotExist {
 | 
			
		||||
		if errors.Is(err, os.ErrNotExist) {
 | 
			
		||||
			log.Printf("WARN: No DB found at %s\n", DBname)
 | 
			
		||||
			return
 | 
			
		||||
 | 
			
		||||
			file, err := os.Create(DBname)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.Fatalln("Failed to create DB:", err)
 | 
			
		||||
			}
 | 
			
		||||
			return gob.NewEncoder(file)
 | 
			
		||||
		}
 | 
			
		||||
		log.Fatalln("Failed to open DB:", err)
 | 
			
		||||
	}
 | 
			
		||||
	dec := gob.NewDecoder(&file)
 | 
			
		||||
	dec := gob.NewDecoder(file)
 | 
			
		||||
 | 
			
		||||
	newfile, err := os.Create(DBnewName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatalln("Failed to open DB:", err)
 | 
			
		||||
		log.Fatalln("Failed to create new DB:", err)
 | 
			
		||||
	}
 | 
			
		||||
	enc := gob.NewEncoder(&newfile)
 | 
			
		||||
	enc := gob.NewEncoder(newfile)
 | 
			
		||||
 | 
			
		||||
	defer func() {
 | 
			
		||||
		err := file.Close()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Fatalln("ERRO: Failed to close old DB:", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// It should be ok to rename an open file
 | 
			
		||||
		err = os.Rename(DBnewName, DBname)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Fatalln("ERRO: Failed to replace old DB with new one:", err)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// This extra code is made to warn only one time for each non-existant filter
 | 
			
		||||
	type SF struct{ s, f string }
 | 
			
		||||
@ -171,7 +193,7 @@ func (c *Conf) updateFromDB() {
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if malformedEntries > 0 {
 | 
			
		||||
			log.Printf("WARN: %v malformed entry discarded from the DB\n", malformedEntries)
 | 
			
		||||
			log.Printf("WARN: %v malformed entries discarded from the DB\n", malformedEntries)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
@ -191,52 +213,39 @@ func (c *Conf) updateFromDB() {
 | 
			
		||||
		err = dec.Decode(&entry)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if err == io.EOF {
 | 
			
		||||
				return
 | 
			
		||||
				return enc
 | 
			
		||||
			}
 | 
			
		||||
			malformedEntries++
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// retrieve related filter
 | 
			
		||||
		if s := c.Streams[entry.stream]; stream != nil {
 | 
			
		||||
			filter = stream.Filters[entry.filter]
 | 
			
		||||
		if stream := c.Streams[entry.Stream]; stream != nil {
 | 
			
		||||
			filter = stream.Filters[entry.Filter]
 | 
			
		||||
		}
 | 
			
		||||
		if filter == nil {
 | 
			
		||||
			discardedEntries[SF{entry.stream, entry.filter}] = true
 | 
			
		||||
			discardedEntries[SF{entry.Stream, entry.Filter}] = true
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// store matches
 | 
			
		||||
		if !entry.exec && entry.t+filter.retryDuration > now {
 | 
			
		||||
			filter.matches[entry.pattern] = append(f.matches[entry.pattern], entry.t)
 | 
			
		||||
		if !entry.Exec && entry.T.Add(filter.retryDuration).Unix() > now.Unix() {
 | 
			
		||||
			filter.matches[entry.Pattern] = append(filter.matches[entry.Pattern], entry.T)
 | 
			
		||||
 | 
			
		||||
			encodeOrFatal(entry)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// replay executions
 | 
			
		||||
		if entry.exec && entry.t+filter.longuestActionDuration > now {
 | 
			
		||||
			delete(filter.matches, match)
 | 
			
		||||
			filter.execActions(match, now-entry.t)
 | 
			
		||||
		if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() {
 | 
			
		||||
			delete(filter.matches, entry.Pattern)
 | 
			
		||||
			filter.execActions(entry.Pattern, now.Sub(entry.T))
 | 
			
		||||
 | 
			
		||||
			encodeOrFatal(entry)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = os.Rename(DBnewName, DBname)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatalln("ERRO: Failed to replace old DB with new one:", err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func openDB() {
 | 
			
		||||
	f, err := os.OpenFile(DBname, os.O_APPEND|os.O_WRONLY, 0600)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatalln("Failed to open DB:", err)
 | 
			
		||||
	}
 | 
			
		||||
	return gob.NewEncoder(&f)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func parseConf(filename string) *Conf {
 | 
			
		||||
func parseConf(filename string) (*Conf, *gob.Encoder) {
 | 
			
		||||
 | 
			
		||||
	data, err := os.ReadFile(filename)
 | 
			
		||||
 | 
			
		||||
@ -251,6 +260,7 @@ func parseConf(filename string) *Conf {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	conf.setup()
 | 
			
		||||
	enc := conf.updateFromDB()
 | 
			
		||||
 | 
			
		||||
	return &conf
 | 
			
		||||
	return &conf, enc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -4,11 +4,13 @@ import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"encoding/gob"
 | 
			
		||||
	"flag"
 | 
			
		||||
	"syscall"
 | 
			
		||||
 | 
			
		||||
	// "fmt"
 | 
			
		||||
	"log"
 | 
			
		||||
	"os"
 | 
			
		||||
	"os/exec"
 | 
			
		||||
	"os/signal"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
@ -107,9 +109,9 @@ func (f *Filter) handle() chan *string {
 | 
			
		||||
				f.matches[match] = append(f.matches[match], time.Now())
 | 
			
		||||
 | 
			
		||||
				if len(f.matches[match]) >= f.Retry {
 | 
			
		||||
					entry.exec = true
 | 
			
		||||
					entry.Exec = true
 | 
			
		||||
					delete(f.matches, match)
 | 
			
		||||
					f.execActions(match, nil)
 | 
			
		||||
					f.execActions(match, time.Duration(0))
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				db.Encode(&entry)
 | 
			
		||||
@ -154,9 +156,12 @@ func (s *Stream) handle(signal chan *Stream) {
 | 
			
		||||
 | 
			
		||||
var wgActions sync.WaitGroup
 | 
			
		||||
 | 
			
		||||
var db gob.Encoder
 | 
			
		||||
var db *gob.Encoder
 | 
			
		||||
 | 
			
		||||
func Main() {
 | 
			
		||||
	sigs := make(chan os.Signal, 1)
 | 
			
		||||
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
 | 
			
		||||
 | 
			
		||||
	confFilename := flag.String("c", "", "configuration file. see an example at https://framagit.org/ppom/reaction/-/blob/main/reaction.yml")
 | 
			
		||||
	flag.Parse()
 | 
			
		||||
 | 
			
		||||
@ -165,22 +170,33 @@ func Main() {
 | 
			
		||||
		os.Exit(2)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	conf := parseConf(*confFilename)
 | 
			
		||||
 | 
			
		||||
	db = openDB()
 | 
			
		||||
	conf, localdb := parseConf(*confFilename)
 | 
			
		||||
	db = localdb
 | 
			
		||||
 | 
			
		||||
	endSignals := make(chan *Stream)
 | 
			
		||||
	noStreamsInExecution := len(conf.Streams)
 | 
			
		||||
 | 
			
		||||
	for _, stream := range conf.Streams {
 | 
			
		||||
		go stream.handle(endSignals)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < len(conf.Streams); i++ {
 | 
			
		||||
		finishedStream := <-endSignals
 | 
			
		||||
		log.Printf("ERR  %s stream finished", finishedStream.name)
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case finishedStream := <-endSignals:
 | 
			
		||||
			log.Printf("ERR  %s stream finished", finishedStream.name)
 | 
			
		||||
			noStreamsInExecution--
 | 
			
		||||
			if noStreamsInExecution == 0 {
 | 
			
		||||
				quit()
 | 
			
		||||
			}
 | 
			
		||||
		case <-sigs:
 | 
			
		||||
			log.Printf("Received SIGINT or SIGTERM, exiting")
 | 
			
		||||
			quit()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func quit() {
 | 
			
		||||
	// TODO replace with advanced execution of all WIP actions
 | 
			
		||||
	wgActions.Wait()
 | 
			
		||||
 | 
			
		||||
	os.Exit(3)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -10,10 +10,10 @@ streams:
 | 
			
		||||
        regex:
 | 
			
		||||
          - found <ip>
 | 
			
		||||
        retry: 2
 | 
			
		||||
        retry-period: 5s
 | 
			
		||||
        retry-period: 1m
 | 
			
		||||
        actions:
 | 
			
		||||
          damn:
 | 
			
		||||
            cmd: [ "echo", "<ip>" ]
 | 
			
		||||
          sleepdamn:
 | 
			
		||||
            cmd: [ "echo", "sleep", "<ip>" ]
 | 
			
		||||
            after: 1s
 | 
			
		||||
            after: 10s
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user