one goroutine handles all matches

it's not the filters that handles their matches anymore
This commit is contained in:
ppom 2023-09-09 23:38:53 +02:00
parent da9ed42363
commit 52556f69b9
6 changed files with 190 additions and 112 deletions

View File

@ -105,39 +105,74 @@ func (a *Action) exec(match string, advance time.Duration) {
} }
} }
func (f *Filter) cleanOldMatches(match string) { func MatchesManager() {
now := time.Now() matches := make(MatchesMap)
newMatches := make([]time.Time, 0, len(f.matches[match])) var pf PF
for _, old := range f.matches[match] { var pft PFT
if old.Add(f.retryDuration).After(now) { end := false
newMatches = append(newMatches, old)
for !end {
select {
case pf = <-cleanMatchesC:
delete(matches[pf.f], pf.p)
case pft, ok := <-startupMatchesC:
if !ok {
end = true
} else {
_ = matchesManagerHandleMatch(matches, pft)
}
} }
} }
f.matches[match] = newMatches
}
func (f *Filter) handle(line *string) { for {
if match := f.match(line); match != "" { select {
case pf = <-cleanMatchesC:
delete(matches[pf.f], pf.p)
case pft = <-matchesC:
entry := LogEntry{time.Now(), match, f.stream.name, f.name, false} entry := LogEntry{pft.t, pft.p, pft.f.stream.name, pft.f.name, false}
if f.Retry > 1 { entry.Exec = matchesManagerHandleMatch(matches, pft)
f.cleanOldMatches(match)
f.matches[match] = append(f.matches[match], time.Now()) logsC <- entry
} }
if f.Retry <= 1 || len(f.matches[match]) >= f.Retry {
entry.Exec = true
delete(f.matches, match)
f.execActions(match, time.Duration(0))
}
logs <- entry
} }
} }
func (s *Stream) handle(endedSignal chan *Stream) { func matchesManagerHandleMatch(matches MatchesMap, pft PFT) bool {
var filter *Filter
var match string
var now time.Time
filter = pft.f
match = pft.p
now = pft.t
if filter.Retry > 1 {
// make sure map exists
if matches[filter] == nil {
matches[filter] = make(map[string][]time.Time)
}
// clean old matches
newMatches := make([]time.Time, 0, len(matches[filter][match]))
for _, old := range matches[filter][match] {
if old.Add(filter.retryDuration).After(now) {
newMatches = append(newMatches, old)
}
}
// add new match
newMatches = append(newMatches, now)
matches[filter][match] = newMatches
}
if filter.Retry <= 1 || len(matches[filter][match]) >= filter.Retry {
delete(matches[filter], match)
filter.execActions(match, time.Duration(0))
return true
}
return false
}
func StreamManager(s *Stream, endedSignal chan *Stream) {
log.Printf("INFO %s: start %s\n", s.name, s.Cmd) log.Printf("INFO %s: start %s\n", s.name, s.Cmd)
lines := cmdStdout(s.Cmd) lines := cmdStdout(s.Cmd)
@ -149,7 +184,9 @@ func (s *Stream) handle(endedSignal chan *Stream) {
return return
} }
for _, filter := range s.Filters { for _, filter := range s.Filters {
filter.handle(line) if match := filter.match(line); match != "" {
matchesC <- PFT{match, filter, time.Now()}
}
} }
case _, ok := <-stopStreams: case _, ok := <-stopStreams:
if !ok { if !ok {
@ -164,17 +201,34 @@ var stopStreams chan bool
var actionStore ActionStore var actionStore ActionStore
var wgActions sync.WaitGroup var wgActions sync.WaitGroup
var logs chan LogEntry // MatchesManager → DatabaseManager
var flushes chan LogEntry var logsC chan LogEntry
// SocketManager → DatabaseManager
var flushesC chan LogEntry
// DatabaseManager → MatchesManager
var startupMatchesC chan PFT
// StreamManager → MatchesManager
var matchesC chan PFT
// StreamManager, DatabaseManager → MatchesManager
var cleanMatchesC chan PF
// MatchesManager → ExecsManager
var execsC chan PA
func Daemon(confFilename string) { func Daemon(confFilename string) {
actionStore.store = make(ActionMap) actionStore.store = make(ActionMap)
conf := parseConf(confFilename) conf := parseConf(confFilename)
logs = make(chan LogEntry) logsC = make(chan LogEntry)
flushes = make(chan LogEntry) flushesC = make(chan LogEntry)
matchesC = make(chan PFT)
startupMatchesC = make(chan PFT)
cleanMatchesC = make(chan PF)
execsC = make(chan PA)
go DatabaseManager(conf) go DatabaseManager(conf)
go MatchesManager()
// Ready to start // Ready to start
@ -187,10 +241,10 @@ func Daemon(confFilename string) {
nbStreamsInExecution := len(conf.Streams) nbStreamsInExecution := len(conf.Streams)
for _, stream := range conf.Streams { for _, stream := range conf.Streams {
go stream.handle(endSignals) go StreamManager(stream, endSignals)
} }
go ServeSocket() go SocketManager()
for { for {
select { select {
@ -208,12 +262,12 @@ func Daemon(confFilename string) {
} }
func quit() { func quit() {
log.Println("INFO Quitting...")
// stop all streams // stop all streams
close(stopStreams) close(stopStreams)
// stop all actions // stop all actions
actionStore.Quit() actionStore.Quit()
// wait for them to complete // wait for them to complete
log.Println("INFO Waiting for actions to complete")
wgActions.Wait() wgActions.Wait()
// delete pipe // delete pipe
err := os.Remove(*SocketPath) err := os.Remove(*SocketPath)

View File

@ -15,16 +15,6 @@ const (
flushDBName = "./reaction-flushes.db" flushDBName = "./reaction-flushes.db"
) )
type ReadDB struct {
file *os.File
dec *gob.Decoder
}
type WriteDB struct {
file *os.File
enc *gob.Encoder
}
func openDB(path string) (bool, *ReadDB) { func openDB(path string) (bool, *ReadDB) {
file, err := os.Open(path) file, err := os.Open(path)
if err != nil { if err != nil {
@ -54,9 +44,9 @@ func (c *Conf) manageLogs(logDB *WriteDB, flushDB *WriteDB) {
var cpt int var cpt int
for { for {
select { select {
case entry := <-flushes: case entry := <-flushesC:
flushDB.enc.Encode(entry) flushDB.enc.Encode(entry)
case entry := <-logs: case entry := <-logsC:
logDB.enc.Encode(entry) logDB.enc.Encode(entry)
cpt++ cpt++
// let's say 100 000 entries ~ 10 MB // let's say 100 000 entries ~ 10 MB
@ -120,7 +110,6 @@ func (c *Conf) RotateDB(startup bool) (*WriteDB, *WriteDB) {
func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.Encoder, startup bool) { func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.Encoder, startup bool) {
// This extra code is made to warn only one time for each non-existant filter // This extra code is made to warn only one time for each non-existant filter
type SF struct{ s, f string }
discardedEntries := make(map[SF]int) discardedEntries := make(map[SF]int)
malformedEntries := 0 malformedEntries := 0
defer func() { defer func() {
@ -138,8 +127,6 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
var entry LogEntry var entry LogEntry
var filter *Filter var filter *Filter
type PSF struct{ p, s, f string }
// pattern, stream, fitler → last flush // pattern, stream, fitler → last flush
flushes := make(map[PSF]time.Time) flushes := make(map[PSF]time.Time)
for { for {
@ -204,7 +191,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
// store matches // store matches
if !entry.Exec && entry.T.Add(filter.retryDuration).Unix() > now.Unix() { if !entry.Exec && entry.T.Add(filter.retryDuration).Unix() > now.Unix() {
if startup { if startup {
filter.matches[entry.Pattern] = append(filter.matches[entry.Pattern], entry.T) startupMatchesC <- PFT{entry.Pattern, filter, entry.T}
} }
encodeOrFatal(logEnc, entry) encodeOrFatal(logEnc, entry)
@ -213,13 +200,16 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
// replay executions // replay executions
if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() { if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() {
if startup { if startup {
delete(filter.matches, entry.Pattern) cleanMatchesC <- PF{entry.Pattern, filter}
filter.execActions(entry.Pattern, now.Sub(entry.T)) filter.execActions(entry.Pattern, now.Sub(entry.T))
} }
encodeOrFatal(logEnc, entry) encodeOrFatal(logEnc, entry)
} }
} }
if startup {
close(startupMatchesC)
}
} }
func encodeOrFatal(enc *gob.Encoder, entry LogEntry) { func encodeOrFatal(enc *gob.Encoder, entry LogEntry) {

View File

@ -74,7 +74,7 @@ func (a *ActionStore) Flush(pattern string) int {
} }
} }
delete(a.store, pattern) delete(a.store, pattern)
flushes<-LogEntry{time.Now(), pattern, "", "", false} flushesC <- LogEntry{time.Now(), pattern, "", "", false}
return cpt return cpt
} }
@ -136,7 +136,7 @@ func createOpenSocket() net.Listener {
} }
// Handle connections // Handle connections
func ServeSocket() { func SocketManager() {
ln := createOpenSocket() ln := createOpenSocket()
defer ln.Close() defer ln.Close()
for { for {

View File

@ -11,66 +11,6 @@ import (
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
) )
type Conf struct {
Patterns map[string]*Pattern `yaml:"patterns"`
Streams map[string]*Stream `yaml:"streams"`
}
type Pattern struct {
Regex string `yaml:"regex"`
Ignore []string `yaml:"ignore"`
name string `yaml:"-"`
nameWithBraces string `yaml:"-"`
}
// Stream, Filter & Action structures must never be copied.
// They're always referenced through pointers
type Stream struct {
name string `yaml:"-"`
Cmd []string `yaml:"cmd"`
Filters map[string]*Filter `yaml:"filters"`
}
type Filter struct {
stream *Stream `yaml:"-"`
name string `yaml:"-"`
Regex []string `yaml:"regex"`
compiledRegex []regexp.Regexp `yaml:"-"`
pattern *Pattern `yaml:"-"`
Retry int `yaml:"retry"`
RetryPeriod string `yaml:"retry-period"`
retryDuration time.Duration `yaml:"-"`
Actions map[string]*Action `yaml:"actions"`
longuestActionDuration *time.Duration
matches map[string][]time.Time `yaml:"-"`
}
type Action struct {
filter *Filter `yaml:"-"`
name string `yaml:"-"`
Cmd []string `yaml:"cmd"`
After string `yaml:"after"`
afterDuration time.Duration `yaml:"-"`
OnExit bool `yaml:"onexit"`
}
type LogEntry struct {
T time.Time
Pattern string
Stream, Filter string
Exec bool
}
func (c *Conf) setup() { func (c *Conf) setup() {
for patternName := range c.Patterns { for patternName := range c.Patterns {
@ -114,7 +54,6 @@ func (c *Conf) setup() {
filter := stream.Filters[filterName] filter := stream.Filters[filterName]
filter.stream = stream filter.stream = stream
filter.name = filterName filter.name = filterName
filter.matches = make(map[string][]time.Time)
if strings.Contains(filter.name, ".") { if strings.Contains(filter.name, ".") {
log.Fatalln("FATAL Bad configuration: character '.' is not allowed in filter names", filter.name) log.Fatalln("FATAL Bad configuration: character '.' is not allowed in filter names", filter.name)

95
app/types.go Normal file
View File

@ -0,0 +1,95 @@
package app
import (
"encoding/gob"
"os"
"regexp"
"time"
)
type Conf struct {
Patterns map[string]*Pattern `yaml:"patterns"`
Streams map[string]*Stream `yaml:"streams"`
}
type Pattern struct {
Regex string `yaml:"regex"`
Ignore []string `yaml:"ignore"`
name string `yaml:"-"`
nameWithBraces string `yaml:"-"`
}
// Stream, Filter & Action structures must never be copied.
// They're always referenced through pointers
type Stream struct {
name string `yaml:"-"`
Cmd []string `yaml:"cmd"`
Filters map[string]*Filter `yaml:"filters"`
}
type Filter struct {
stream *Stream `yaml:"-"`
name string `yaml:"-"`
Regex []string `yaml:"regex"`
compiledRegex []regexp.Regexp `yaml:"-"`
pattern *Pattern `yaml:"-"`
Retry int `yaml:"retry"`
RetryPeriod string `yaml:"retry-period"`
retryDuration time.Duration `yaml:"-"`
Actions map[string]*Action `yaml:"actions"`
longuestActionDuration *time.Duration
}
type Action struct {
filter *Filter `yaml:"-"`
name string `yaml:"-"`
Cmd []string `yaml:"cmd"`
After string `yaml:"after"`
afterDuration time.Duration `yaml:"-"`
OnExit bool `yaml:"onexit"`
}
type LogEntry struct {
T time.Time
Pattern string
Stream, Filter string
Exec bool
}
type ReadDB struct {
file *os.File
dec *gob.Decoder
}
type WriteDB struct {
file *os.File
enc *gob.Encoder
}
type MatchesMap map[*Filter]map[string][]time.Time
// Helper structs made to carry information across channels
type SF struct{ s, f string }
type PSF struct{ p, s, f string }
type PF struct {
p string
f *Filter
}
type PFT struct {
p string
f *Filter
t time.Time
}
type PA struct {
p string
a *Action
}

View File

@ -5,7 +5,7 @@ patterns:
streams: streams:
tailDown: tailDown:
cmd: [ "sh", "-c", "sleep 6; echo found 1; echo found 2; sleep 10" ] cmd: [ "sh", "-c", "sleep 2; echo found 1; echo found 2; sleep 2" ]
filters: filters:
findIP: findIP:
regex: regex: