one goroutine handles all actions

pending actions are now data, they're not goroutines anymore ❤️

warn: cli currently doesn't work, but it's already a huge commit
This commit is contained in:
ppom 2023-09-22 18:09:31 +02:00
parent 52556f69b9
commit a119e0814b
4 changed files with 117 additions and 48 deletions

View File

@ -66,31 +66,15 @@ func (f *Filter) match(line *string) string {
return ""
}
func (f *Filter) execActions(match string, advance time.Duration) {
func (f *Filter) sendActions(match string, at time.Time) {
for _, a := range f.Actions {
wgActions.Add(1)
go a.exec(match, advance)
actionsC <- PAT{match, a, at.Add(a.afterDuration)}
}
}
func (a *Action) exec(match string, advance time.Duration) {
func (a *Action) exec(match string) {
defer wgActions.Done()
// Wait for either end of sleep time, or actionStore requesting stop
if a.afterDuration != 0 && a.afterDuration > advance {
stopAction := actionStore.Register(a, match)
select {
case <-time.After(a.afterDuration - advance):
// Let's not wait for the lock
go actionStore.Unregister(a, match, stopAction)
case doExec := <-stopAction:
// no need to unregister here
if !doExec {
return
}
}
}
computedCommand := make([]string, 0, len(a.Cmd))
for _, item := range a.Cmd {
computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match))
@ -105,6 +89,75 @@ func (a *Action) exec(match string, advance time.Duration) {
}
}
func quasiBefore(then, now time.Time) bool {
// We won't complain if it's executed less than 1sec earlier
return then.Unix() <= now.Add(1*time.Second).Unix()
}
func ActionsManager() {
actions := make(ActionsMap)
pendingActionsC := make(chan PAT)
var (
pat PAT
action *Action
match string
then time.Time
now time.Time
)
for {
select {
case pat = <-actionsC:
match = pat.p
action = pat.a
then = pat.t
now = time.Now()
// check
if quasiBefore(then, now) {
wgActions.Add(1)
go action.exec(match)
} else {
// make sure map exists
if actions[action] == nil {
actions[action] = make(PatternTimes)
}
// append() to nil is valid go
actions[action][match] = append(actions[action][match], then)
go func(pat PAT) {
log.Printf("DEBUG then: %v, now: %v, then.Sub(now): %v", then.String(), now.String(), then.Sub(now).String())
time.Sleep(then.Sub(now))
pendingActionsC <- pat
}(pat)
}
// FIXME convert to pendingActionsC to chan PA
// and forget about time checking
case pat = <-pendingActionsC:
match = pat.p
action = pat.a
then = pat.t
now = time.Now()
if quasiBefore(then, now) {
actions[action][match] = actions[action][match][1:]
wgActions.Add(1)
go action.exec(match)
} else {
// This should not happen
log.Fatalf("ERROR pendingActionsC then: %v << now %v\n", pat.t.String(), now)
}
case _, _ = <-stopActions:
for action := range actions {
if action.OnExit {
for match := range actions[action] {
wgActions.Add(1)
go action.exec(match)
}
}
}
wgActions.Done()
return
}
}
}
func MatchesManager() {
matches := make(MatchesMap)
var pf PF
@ -140,39 +193,35 @@ func MatchesManager() {
}
func matchesManagerHandleMatch(matches MatchesMap, pft PFT) bool {
var filter *Filter
var match string
var now time.Time
filter = pft.f
match = pft.p
now = pft.t
filter, match, then := pft.f, pft.p, pft.t
if filter.Retry > 1 {
// make sure map exists
if matches[filter] == nil {
matches[filter] = make(map[string][]time.Time)
matches[filter] = make(PatternTimes)
}
// clean old matches
newMatches := make([]time.Time, 0, len(matches[filter][match]))
for _, old := range matches[filter][match] {
if old.Add(filter.retryDuration).After(now) {
if old.Add(filter.retryDuration).After(then) {
newMatches = append(newMatches, old)
}
}
// add new match
newMatches = append(newMatches, now)
newMatches = append(newMatches, then)
matches[filter][match] = newMatches
}
if filter.Retry <= 1 || len(matches[filter][match]) >= filter.Retry {
delete(matches[filter], match)
filter.execActions(match, time.Duration(0))
filter.sendActions(match, then)
return true
}
return false
}
func StreamManager(s *Stream, endedSignal chan *Stream) {
defer wgStreams.Done()
log.Printf("INFO %s: start %s\n", s.name, s.Cmd)
lines := cmdStdout(s.Cmd)
@ -188,32 +237,36 @@ func StreamManager(s *Stream, endedSignal chan *Stream) {
matchesC <- PFT{match, filter, time.Now()}
}
}
case _, ok := <-stopStreams:
if !ok {
return
}
case _, _ = <-stopStreams:
return
}
}
}
var stopStreams chan bool
var stopActions chan bool
var actionStore ActionStore
var wgActions sync.WaitGroup
var wgStreams sync.WaitGroup
// MatchesManager → DatabaseManager
var logsC chan LogEntry
// SocketManager → DatabaseManager
var flushesC chan LogEntry
// DatabaseManager → MatchesManager
var startupMatchesC chan PFT
// StreamManager → MatchesManager
var matchesC chan PFT
// StreamManager, DatabaseManager → MatchesManager
var cleanMatchesC chan PF
// MatchesManager → ExecsManager
var execsC chan PA
var actionsC chan PAT
func Daemon(confFilename string) {
actionStore.store = make(ActionMap)
@ -225,22 +278,24 @@ func Daemon(confFilename string) {
matchesC = make(chan PFT)
startupMatchesC = make(chan PFT)
cleanMatchesC = make(chan PF)
execsC = make(chan PA)
actionsC = make(chan PAT)
stopActions = make(chan bool)
stopStreams = make(chan bool)
go DatabaseManager(conf)
go MatchesManager()
go ActionsManager()
// Ready to start
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
stopStreams = make(chan bool)
endSignals := make(chan *Stream)
nbStreamsInExecution := len(conf.Streams)
for _, stream := range conf.Streams {
wgStreams.Add(1)
go StreamManager(stream, endSignals)
}
@ -262,12 +317,17 @@ func Daemon(confFilename string) {
}
func quit() {
log.Println("INFO Quitting...")
// stop all streams
// send stop to StreamManager·s
close(stopStreams)
log.Println("INFO Waiting for Streams to finish...")
wgStreams.Wait()
// ActionsManager calls wgActions.Done() when it has launched all pending actions
wgActions.Add(1)
// send stop to ActionsManager
close(stopActions)
// stop all actions
actionStore.Quit()
// wait for them to complete
log.Println("INFO Waiting for Actions to finish...")
wgActions.Wait()
// delete pipe
err := os.Remove(*SocketPath)

View File

@ -63,6 +63,7 @@ func (c *Conf) manageLogs(logDB *WriteDB, flushDB *WriteDB) {
}
func (c *Conf) RotateDB(startup bool) (*WriteDB, *WriteDB) {
defer close(startupMatchesC)
var (
doesntExist bool
err error
@ -191,6 +192,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
// store matches
if !entry.Exec && entry.T.Add(filter.retryDuration).Unix() > now.Unix() {
if startup {
log.Println("DEBUG db send match")
startupMatchesC <- PFT{entry.Pattern, filter, entry.T}
}
@ -200,16 +202,14 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
// replay executions
if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() {
if startup {
log.Println("DEBUG db send match")
cleanMatchesC <- PF{entry.Pattern, filter}
filter.execActions(entry.Pattern, now.Sub(entry.T))
filter.sendActions(entry.Pattern, entry.T)
}
encodeOrFatal(logEnc, entry)
}
}
if startup {
close(startupMatchesC)
}
}
func encodeOrFatal(enc *gob.Encoder, entry LogEntry) {

View File

@ -75,7 +75,11 @@ type WriteDB struct {
enc *gob.Encoder
}
type MatchesMap map[*Filter]map[string][]time.Time
type PatternTimes map[string][]time.Time
type MatchesMap map[*Filter]PatternTimes
type ActionsMap map[*Action]PatternTimes
// Helper structs made to carry information across channels
type SF struct{ s, f string }
@ -93,3 +97,8 @@ type PA struct {
p string
a *Action
}
type PAT struct {
p string
a *Action
t time.Time
}

View File

@ -5,17 +5,17 @@ patterns:
streams:
tailDown:
cmd: [ "sh", "-c", "sleep 2; echo found 1; echo found 2; sleep 2" ]
cmd: [ "sh", "-c", "sleep 0.5; echo found 1; sleep 1; echo found 1; sleep 10" ]
filters:
findIP:
regex:
- '^found <num>$'
retry: 2
retry-period: 2m
retry-period: 1m
actions:
damn:
cmd: [ "echo", "<num>" ]
undamn:
cmd: [ "echo", "undamn", "<num>" ]
after: 1m
after: 5s
onexit: true