Add global concurrency parameter. fix #56

Fix bug which caused pending actions to be run multiple times when pending time finished
Fix bug which caused
1. past pending actions to rerun on exit
2. maximum one pending action per pattern/action couple to run on exit
This commit is contained in:
ppom 2024-01-05 12:00:00 +01:00
parent 9ce589b07d
commit b441e91f84
7 changed files with 83 additions and 43 deletions

View File

@ -94,8 +94,6 @@ func (f *Filter) sendActions(match string, at time.Time) {
} }
func (a *Action) exec(match string) { func (a *Action) exec(match string) {
wgActions.Add(1)
go func() {
defer wgActions.Done() defer wgActions.Done()
computedCommand := make([]string, 0, len(a.Cmd)) computedCommand := make([]string, 0, len(a.Cmd))
@ -110,10 +108,38 @@ func (a *Action) exec(match string) {
if ret := cmd.Run(); ret != nil { if ret := cmd.Run(); ret != nil {
logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret) logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret)
} }
}()
} }
func ActionsManager() { func ActionsManager(concurrency int) {
// concurrency init
execActionsC := make(chan PA)
if concurrency > 0 {
for i := 0; i < concurrency; i++ {
go func() {
var pa PA
for {
pa = <-execActionsC
pa.a.exec(pa.p)
}
}()
}
} else {
go func() {
var pa PA
for {
pa = <-execActionsC
go func(pa PA) {
pa.a.exec(pa.p)
}(pa)
}
}()
}
execAction := func(a *Action, p string) {
wgActions.Add(1)
execActionsC <- PA{p, a}
}
// main
pendingActionsC := make(chan PAT) pendingActionsC := make(chan PAT)
for { for {
select { select {
@ -123,13 +149,13 @@ func ActionsManager() {
now := time.Now() now := time.Now()
// check if must be executed now // check if must be executed now
if then.Compare(now) <= 0 { if then.Compare(now) <= 0 {
action.exec(pattern) execAction(action, pattern)
} else { } else {
actionsLock.Lock() actionsLock.Lock()
if actions[pa] == nil { if actions[pa] == nil {
actions[pa] = make(map[time.Time]struct{}) actions[pa] = make(map[time.Time]struct{})
} }
actions[PA{pattern, action}][then] = struct{}{} actions[pa][then] = struct{}{}
actionsLock.Unlock() actionsLock.Unlock()
go func(insidePat PAT, insideNow time.Time) { go func(insidePat PAT, insideNow time.Time) {
time.Sleep(insidePat.t.Sub(insideNow)) time.Sleep(insidePat.t.Sub(insideNow))
@ -141,20 +167,17 @@ func ActionsManager() {
pattern, action, then := pat.p, pat.a, pat.t pattern, action, then := pat.p, pat.a, pat.t
actionsLock.Lock() actionsLock.Lock()
if actions[pa] != nil { if actions[pa] != nil {
if _, ok := actions[pa][then]; ok {
delete(actions[pa], then) delete(actions[pa], then)
action.exec(pattern)
}
} }
actionsLock.Unlock() actionsLock.Unlock()
action.exec(pattern) execAction(action, pattern)
case fo := <-flushToActionsC: case fo := <-flushToActionsC:
ret := make(ActionsMap) ret := make(ActionsMap)
actionsLock.Lock() actionsLock.Lock()
for pa := range actions { for pa := range actions {
if pa.p == fo.p { if pa.p == fo.p {
for range actions[pa] { for range actions[pa] {
pa.a.exec(pa.p) execAction(pa.a, pa.p)
} }
ret[pa] = actions[pa] ret[pa] = actions[pa]
delete(actions, pa) delete(actions, pa)
@ -164,9 +187,11 @@ func ActionsManager() {
fo.ret <- ret fo.ret <- ret
case _, _ = <-stopActions: case _, _ = <-stopActions:
actionsLock.Lock() actionsLock.Lock()
for pat := range actions { for pa := range actions {
if pat.a.OnExit { if pa.a.OnExit {
pat.a.exec(pat.p) for range actions[pa] {
execAction(pa.a, pa.p)
}
} }
} }
actionsLock.Unlock() actionsLock.Unlock()
@ -349,7 +374,7 @@ func Daemon(confFilename string) {
go DatabaseManager(conf) go DatabaseManager(conf)
go MatchesManager() go MatchesManager()
go ActionsManager() go ActionsManager(conf.Concurrency)
// Ready to start // Ready to start

View File

@ -8,6 +8,11 @@ definitions:
# ip46tables is a minimal C program (only POSIX dependencies) present as a subdirectory. # ip46tables is a minimal C program (only POSIX dependencies) present as a subdirectory.
# it permits to handle both ipv4/iptables and ipv6/ip6tables commands # it permits to handle both ipv4/iptables and ipv6/ip6tables commands
# if set to a positive number → max number of concurrent actions
# if set to a negative number → no limit
# if not specified or set to 0 → defaults to the number of CPUs on the system
concurrency: 0
# patterns are substitued in regexes. # patterns are substitued in regexes.
# when a filter performs an action, it replaces the found pattern # when a filter performs an action, it replaces the found pattern
patterns: patterns:

View File

@ -6,6 +6,7 @@ import (
"log" "log"
"os" "os"
"regexp" "regexp"
"runtime"
"strings" "strings"
"time" "time"
@ -15,6 +16,9 @@ import (
) )
func (c *Conf) setup() { func (c *Conf) setup() {
if c.Concurrency == 0 {
c.Concurrency = runtime.NumCPU()
}
for patternName := range c.Patterns { for patternName := range c.Patterns {
pattern := c.Patterns[patternName] pattern := c.Patterns[patternName]
@ -144,13 +148,11 @@ func parseConf(filename string) *Conf {
var conf Conf var conf Conf
if filename[len(filename)-4:] == ".yml" || filename[len(filename)-5:] == ".yaml" { if filename[len(filename)-4:] == ".yml" || filename[len(filename)-5:] == ".yaml" {
logger.Println(logger.DEBUG, "yaml")
err = jsonnet.NewYAMLToJSONDecoder(data).Decode(&conf) err = jsonnet.NewYAMLToJSONDecoder(data).Decode(&conf)
if err != nil { if err != nil {
logger.Fatalln("Failed to parse yaml configuration file:", err) logger.Fatalln("Failed to parse yaml configuration file:", err)
} }
} else { } else {
logger.Println(logger.DEBUG, "json")
var jsondata string var jsondata string
jsondata, err = jsonnet.MakeVM().EvaluateFile(filename) jsondata, err = jsonnet.MakeVM().EvaluateFile(filename)
if err == nil { if err == nil {

View File

@ -8,6 +8,7 @@ import (
) )
type Conf struct { type Conf struct {
Concurrency int `json:"concurrency"`
Patterns map[string]*Pattern `json:"patterns"` Patterns map[string]*Pattern `json:"patterns"`
Streams map[string]*Stream `json:"streams"` Streams map[string]*Stream `json:"streams"`
Start [][]string `json:"start"` Start [][]string `json:"start"`

View File

@ -21,6 +21,11 @@ local iptables(args) = ['ip46tables', '-w'] + args;
}, },
}, },
// if set to a positive number → max number of concurrent actions
// if set to a negative number → no limit
// if not specified or set to 0 → defaults to the number of CPUs on the system
concurrency: 0,
// Those commands will be executed in order at start, before everything else // Those commands will be executed in order at start, before everything else
start: [ start: [
// Create an iptables chain for reaction // Create an iptables chain for reaction

View File

@ -7,6 +7,8 @@ patterns:
ignore: ignore:
- 1.0.0.1 - 1.0.0.1
concurrency: 0
streams: streams:
tailDown1: tailDown1:
cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo found $(($i % 100)); done' ] cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo found $(($i % 100)); done' ]
@ -18,9 +20,9 @@ streams:
retryperiod: 1m retryperiod: 1m
actions: actions:
damn: damn:
cmd: [ 'echo', '<num>' ] cmd: [ 'sleep', '0.<num>' ]
undamn: undamn:
cmd: [ 'echo', 'undamn', '<num>' ] cmd: [ 'sleep', '0.<num>' ]
after: 1m after: 1m
onexit: false onexit: false
tailDown2: tailDown2:
@ -33,9 +35,9 @@ streams:
retryperiod: 1m retryperiod: 1m
actions: actions:
damn: damn:
cmd: [ 'echo', '<num>' ] cmd: [ 'sleep', '0.<num>' ]
undamn: undamn:
cmd: [ 'echo', 'undamn', '<num>' ] cmd: [ 'sleep', '0.<num>' ]
after: 1m after: 1m
onexit: false onexit: false
tailDown3: tailDown3:
@ -48,9 +50,9 @@ streams:
retryperiod: 2m retryperiod: 2m
actions: actions:
damn: damn:
cmd: [ 'true' ] cmd: [ 'sleep', '0.<num>' ]
undamn: undamn:
cmd: [ 'true' ] cmd: [ 'sleep', '0.<num>' ]
after: 1m after: 1m
onexit: false onexit: false
tailDown4: tailDown4:
@ -63,8 +65,8 @@ streams:
retryperiod: 2m retryperiod: 2m
actions: actions:
damn: damn:
cmd: [ 'echo', '<num>' ] cmd: [ 'sleep', '0.<num>' ]
undamn: undamn:
cmd: [ 'echo', 'undamn', '<num>' ] cmd: [ 'sleep', '0.<num>' ]
after: 1m after: 1m
onexit: false onexit: false

View File

@ -18,12 +18,12 @@
streams: { streams: {
tailDown1: { tailDown1: {
cmd: ['sh', '-c', "echo 1 2 3 4 5 | tr ' ' '\n' | while read i; do sleep 2; echo found $(($i % 10)); done"], cmd: ['sh', '-c', "echo 1 2 3 4 5 5 | tr ' ' '\n' | while read i; do sleep 1; echo found $(($i % 10)); done"],
// cmd: ['sh', '-c', "echo 1 2 3 4 5 1 2 3 4 5 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 | tr ' ' '\n' | while read i; do sleep 2; echo found $(($i % 10)); done"], // cmd: ['sh', '-c', "echo 1 2 3 4 5 1 2 3 4 5 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 | tr ' ' '\n' | while read i; do sleep 2; echo found $(($i % 10)); done"],
filters: { filters: {
findIP: { findIP: {
regex: ['^found <num>$'], regex: ['^found <num>$'],
retry: 3, retry: 1,
retryperiod: '30s', retryperiod: '30s',
actions: { actions: {
damn: { damn: {
@ -31,7 +31,7 @@
}, },
undamn: { undamn: {
cmd: ['echo', 'undamn', '<num>'], cmd: ['echo', 'undamn', '<num>'],
after: '30s', after: '4s',
onexit: true, onexit: true,
}, },
}, },