Reimplement flushes

fix #33
fix #39
This commit is contained in:
ppom
2023-10-01 12:00:00 +02:00
parent 83e760ed72
commit 480aeb7f15
5 changed files with 157 additions and 79 deletions

View File

@ -4,7 +4,6 @@ import (
"bufio"
"syscall"
// "fmt"
"log"
"os"
"os/exec"
@ -73,64 +72,79 @@ func (f *Filter) sendActions(match string, at time.Time) {
}
func (a *Action) exec(match string) {
defer wgActions.Done()
wgActions.Add(1)
go func() {
defer wgActions.Done()
computedCommand := make([]string, 0, len(a.Cmd))
for _, item := range a.Cmd {
computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match))
}
computedCommand := make([]string, 0, len(a.Cmd))
for _, item := range a.Cmd {
computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match))
}
log.Printf("INFO %s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand)
log.Printf("INFO %s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand)
cmd := exec.Command(computedCommand[0], computedCommand[1:]...)
cmd := exec.Command(computedCommand[0], computedCommand[1:]...)
if ret := cmd.Run(); ret != nil {
log.Printf("ERROR %s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret)
}
if ret := cmd.Run(); ret != nil {
log.Printf("ERROR %s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret)
}
}()
}
func ActionsManager() {
pendingActionsC := make(chan PAT)
var (
pat PAT
action *Action
match string
then time.Time
now time.Time
)
for {
select {
case pat = <-actionsC:
match = pat.p
action = pat.a
then = pat.t
now = time.Now()
// check
case pat := <-actionsC:
pa := PA{pat.p, pat.a}
pattern, action, then := pat.p, pat.a, pat.t
now := time.Now()
// check if must be executed now
if then.Compare(now) <= 0 {
wgActions.Add(1)
go action.exec(match)
action.exec(pattern)
} else {
actionsLock.Lock()
actions[PAT{match, action, then}] = struct{}{}
if actions[pa] == nil {
actions[pa] = make(map[time.Time]struct{})
}
actions[PA{pattern, action}][then] = struct{}{}
actionsLock.Unlock()
go func(pat PAT, now time.Time) {
time.Sleep(pat.t.Sub(now))
pendingActionsC <- pat
go func(insidePat PAT, insideNow time.Time) {
time.Sleep(insidePat.t.Sub(insideNow))
pendingActionsC <- insidePat
}(pat, now)
}
case pat = <-pendingActionsC:
match, action, then = pat.p, pat.a, pat.t
case pat := <-pendingActionsC:
pa := PA{pat.p, pat.a}
pattern, action, then := pat.p, pat.a, pat.t
actionsLock.Lock()
delete(actions, PAT{match, action, then})
if actions[pa] != nil {
if _, ok := actions[pa][then]; ok {
delete(actions[pa], then)
action.exec(pattern)
}
}
actionsLock.Unlock()
wgActions.Add(1)
go action.exec(match)
action.exec(pattern)
case fo := <-flushToActionsC:
ret := make(map[*Action]int)
actionsLock.Lock()
for pa := range actions {
if pa.p == fo.p {
for range actions[pa] {
pa.a.exec(pa.p)
}
ret[pa.a] = len(actions[pa])
delete(actions, pa)
}
}
actionsLock.Unlock()
fo.ret <- ret
case _, _ = <-stopActions:
actionsLock.Lock()
for pat := range actions {
if pat.a.OnExit {
wgActions.Add(1)
go action.exec(match)
pat.a.exec(pat.p)
}
}
actionsLock.Unlock()
@ -141,44 +155,60 @@ func ActionsManager() {
}
func MatchesManager() {
var pf PF
var fo FlushMatchOrder
var pft PFT
end := false
for !end {
select {
case pf = <-flushToMatchesC:
delete(matches, pf)
case pft, ok := <-startupMatchesC:
case fo = <-flushToMatchesC:
matchesManagerHandleFlush(fo)
case fo, ok := <-startupMatchesC:
if !ok {
end = true
} else {
_ = matchesManagerHandleMatch(pft)
_ = matchesManagerHandleMatch(fo)
}
}
}
for {
select {
case pf = <-flushToMatchesC:
matchesLock.Lock()
delete(matches, pf)
matchesLock.Unlock()
case fo = <-flushToMatchesC:
matchesManagerHandleFlush(fo)
case pft = <-matchesC:
entry := LogEntry{pft.t, pft.p, pft.f.stream.name, pft.f.name, false}
matchesLock.Lock()
entry.Exec = matchesManagerHandleMatch(pft)
matchesLock.Unlock()
logsC <- entry
}
}
}
func matchesManagerHandleFlush(fo FlushMatchOrder) {
ret := make(map[*Filter]int)
matchesLock.Lock()
for pf := range matches {
if fo.p == pf.p {
if fo.ret != nil {
ret[pf.f] = len(matches[pf])
}
delete(matches, pf)
}
}
matchesLock.Unlock()
if fo.ret != nil {
fo.ret <- ret
}
}
func matchesManagerHandleMatch(pft PFT) bool {
filter, match, then := pft.f, pft.p, pft.t
matchesLock.Lock()
defer matchesLock.Unlock()
filter, pattern, then := pft.f, pft.p, pft.t
pf := PF{pft.p, pft.f}
if filter.Retry > 1 {
@ -190,7 +220,7 @@ func matchesManagerHandleMatch(pft PFT) bool {
matches[pf][then] = struct{}{}
// remove match when expired
go func(pf PF, then time.Time) {
time.Sleep(filter.retryDuration)
time.Sleep(then.Sub(time.Now()) + filter.retryDuration)
matchesLock.Lock()
if matches[pf] != nil {
// FIXME replace this and all similar occurences
@ -203,7 +233,7 @@ func matchesManagerHandleMatch(pft PFT) bool {
if filter.Retry <= 1 || len(matches[pf]) >= filter.Retry {
delete(matches, pf)
filter.sendActions(match, then)
filter.sendActions(pattern, then)
return true
}
return false
@ -244,7 +274,7 @@ var wgActions sync.WaitGroup
var wgStreams sync.WaitGroup
/*
<streamcmds>
<StreamCmds>
StreamManager onstartup:matches
↓ ↓ ↑
@ -254,7 +284,7 @@ StreamManager onstartup:matches
↑ ↑ ↑
SocketManager →flushes→→→→→→→→→→·→→→→→→→→→→→→→→→→·
<clients>
<Clients>
*/
// DatabaseManager → MatchesManager
@ -270,10 +300,10 @@ var logsC chan LogEntry
var actionsC chan PAT
// SocketManager, DatabaseManager → MatchesManager
var flushToMatchesC chan PF
var flushToMatchesC chan FlushMatchOrder
// SocketManager → ActionsManager
var flushToActionsC chan PF
var flushToActionsC chan FlushActionOrder
// SocketManager → DatabaseManager
var flushToDatabaseC chan LogEntry
@ -285,8 +315,8 @@ func Daemon(confFilename string) {
matchesC = make(chan PFT)
logsC = make(chan LogEntry)
actionsC = make(chan PAT)
flushToMatchesC = make(chan PF)
flushToActionsC = make(chan PF)
flushToMatchesC = make(chan FlushMatchOrder)
flushToActionsC = make(chan FlushActionOrder)
flushToDatabaseC = make(chan LogEntry)
stopActions = make(chan bool)
stopStreams = make(chan bool)
@ -310,7 +340,7 @@ func Daemon(confFilename string) {
go StreamManager(stream, endSignals)
}
go SocketManager()
go SocketManager(conf.Streams)
for {
select {