From b441e91f84da00d7151e01da34093ce4b7652e35 Mon Sep 17 00:00:00 2001 From: ppom <> Date: Fri, 5 Jan 2024 12:00:00 +0100 Subject: [PATCH] Add global concurrency parameter. fix #56 Fix bug which caused pending actions to be run multiple times when pending time finished Fix bug which caused 1. past pending actions to rerun on exit 2. maximum one pending action per pattern/action couple to run on exit --- app/daemon.go | 77 ++++++++++++++++++++++++++++-------------- app/example.yml | 5 +++ app/startup.go | 6 ++-- app/types.go | 9 ++--- config/example.jsonnet | 5 +++ config/heavy-load.yml | 18 +++++----- config/test.jsonnet | 6 ++-- 7 files changed, 83 insertions(+), 43 deletions(-) diff --git a/app/daemon.go b/app/daemon.go index 31c9e6c..1bb1ab5 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -94,26 +94,52 @@ func (f *Filter) sendActions(match string, at time.Time) { } func (a *Action) exec(match string) { - wgActions.Add(1) - go func() { - defer wgActions.Done() + defer wgActions.Done() - computedCommand := make([]string, 0, len(a.Cmd)) - for _, item := range a.Cmd { - computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match)) - } + computedCommand := make([]string, 0, len(a.Cmd)) + for _, item := range a.Cmd { + computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match)) + } - logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand) + logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand) - cmd := exec.Command(computedCommand[0], computedCommand[1:]...) + cmd := exec.Command(computedCommand[0], computedCommand[1:]...) - if ret := cmd.Run(); ret != nil { - logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret) - } - }() + if ret := cmd.Run(); ret != nil { + logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret) + } } -func ActionsManager() { +func ActionsManager(concurrency int) { + // concurrency init + execActionsC := make(chan PA) + if concurrency > 0 { + for i := 0; i < concurrency; i++ { + go func() { + var pa PA + for { + pa = <-execActionsC + pa.a.exec(pa.p) + } + }() + } + } else { + go func() { + var pa PA + for { + pa = <-execActionsC + go func(pa PA) { + pa.a.exec(pa.p) + }(pa) + } + }() + } + execAction := func(a *Action, p string) { + wgActions.Add(1) + execActionsC <- PA{p, a} + } + + // main pendingActionsC := make(chan PAT) for { select { @@ -123,13 +149,13 @@ func ActionsManager() { now := time.Now() // check if must be executed now if then.Compare(now) <= 0 { - action.exec(pattern) + execAction(action, pattern) } else { actionsLock.Lock() if actions[pa] == nil { actions[pa] = make(map[time.Time]struct{}) } - actions[PA{pattern, action}][then] = struct{}{} + actions[pa][then] = struct{}{} actionsLock.Unlock() go func(insidePat PAT, insideNow time.Time) { time.Sleep(insidePat.t.Sub(insideNow)) @@ -141,20 +167,17 @@ func ActionsManager() { pattern, action, then := pat.p, pat.a, pat.t actionsLock.Lock() if actions[pa] != nil { - if _, ok := actions[pa][then]; ok { - delete(actions[pa], then) - action.exec(pattern) - } + delete(actions[pa], then) } actionsLock.Unlock() - action.exec(pattern) + execAction(action, pattern) case fo := <-flushToActionsC: ret := make(ActionsMap) actionsLock.Lock() for pa := range actions { if pa.p == fo.p { for range actions[pa] { - pa.a.exec(pa.p) + execAction(pa.a, pa.p) } ret[pa] = actions[pa] delete(actions, pa) @@ -164,9 +187,11 @@ func ActionsManager() { fo.ret <- ret case _, _ = <-stopActions: actionsLock.Lock() - for pat := range actions { - if pat.a.OnExit { - pat.a.exec(pat.p) + for pa := range actions { + if pa.a.OnExit { + for range actions[pa] { + execAction(pa.a, pa.p) + } } } actionsLock.Unlock() @@ -349,7 +374,7 @@ func Daemon(confFilename string) { go DatabaseManager(conf) go MatchesManager() - go ActionsManager() + go ActionsManager(conf.Concurrency) // Ready to start diff --git a/app/example.yml b/app/example.yml index 45264e1..e275e5a 100644 --- a/app/example.yml +++ b/app/example.yml @@ -8,6 +8,11 @@ definitions: # ip46tables is a minimal C program (only POSIX dependencies) present as a subdirectory. # it permits to handle both ipv4/iptables and ipv6/ip6tables commands +# if set to a positive number → max number of concurrent actions +# if set to a negative number → no limit +# if not specified or set to 0 → defaults to the number of CPUs on the system +concurrency: 0 + # patterns are substitued in regexes. # when a filter performs an action, it replaces the found pattern patterns: diff --git a/app/startup.go b/app/startup.go index 54d9097..1364d8b 100644 --- a/app/startup.go +++ b/app/startup.go @@ -6,6 +6,7 @@ import ( "log" "os" "regexp" + "runtime" "strings" "time" @@ -15,6 +16,9 @@ import ( ) func (c *Conf) setup() { + if c.Concurrency == 0 { + c.Concurrency = runtime.NumCPU() + } for patternName := range c.Patterns { pattern := c.Patterns[patternName] @@ -144,13 +148,11 @@ func parseConf(filename string) *Conf { var conf Conf if filename[len(filename)-4:] == ".yml" || filename[len(filename)-5:] == ".yaml" { - logger.Println(logger.DEBUG, "yaml") err = jsonnet.NewYAMLToJSONDecoder(data).Decode(&conf) if err != nil { logger.Fatalln("Failed to parse yaml configuration file:", err) } } else { - logger.Println(logger.DEBUG, "json") var jsondata string jsondata, err = jsonnet.MakeVM().EvaluateFile(filename) if err == nil { diff --git a/app/types.go b/app/types.go index 0825878..dd8b72a 100644 --- a/app/types.go +++ b/app/types.go @@ -8,10 +8,11 @@ import ( ) type Conf struct { - Patterns map[string]*Pattern `json:"patterns"` - Streams map[string]*Stream `json:"streams"` - Start [][]string `json:"start"` - Stop [][]string `json:"stop"` + Concurrency int `json:"concurrency"` + Patterns map[string]*Pattern `json:"patterns"` + Streams map[string]*Stream `json:"streams"` + Start [][]string `json:"start"` + Stop [][]string `json:"stop"` } type Pattern struct { diff --git a/config/example.jsonnet b/config/example.jsonnet index 8a693cc..add9588 100644 --- a/config/example.jsonnet +++ b/config/example.jsonnet @@ -21,6 +21,11 @@ local iptables(args) = ['ip46tables', '-w'] + args; }, }, + // if set to a positive number → max number of concurrent actions + // if set to a negative number → no limit + // if not specified or set to 0 → defaults to the number of CPUs on the system + concurrency: 0, + // Those commands will be executed in order at start, before everything else start: [ // Create an iptables chain for reaction diff --git a/config/heavy-load.yml b/config/heavy-load.yml index a63999a..1c149c3 100644 --- a/config/heavy-load.yml +++ b/config/heavy-load.yml @@ -7,6 +7,8 @@ patterns: ignore: - 1.0.0.1 +concurrency: 0 + streams: tailDown1: cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo found $(($i % 100)); done' ] @@ -18,9 +20,9 @@ streams: retryperiod: 1m actions: damn: - cmd: [ 'echo', '' ] + cmd: [ 'sleep', '0.' ] undamn: - cmd: [ 'echo', 'undamn', '' ] + cmd: [ 'sleep', '0.' ] after: 1m onexit: false tailDown2: @@ -33,9 +35,9 @@ streams: retryperiod: 1m actions: damn: - cmd: [ 'echo', '' ] + cmd: [ 'sleep', '0.' ] undamn: - cmd: [ 'echo', 'undamn', '' ] + cmd: [ 'sleep', '0.' ] after: 1m onexit: false tailDown3: @@ -48,9 +50,9 @@ streams: retryperiod: 2m actions: damn: - cmd: [ 'true' ] + cmd: [ 'sleep', '0.' ] undamn: - cmd: [ 'true' ] + cmd: [ 'sleep', '0.' ] after: 1m onexit: false tailDown4: @@ -63,8 +65,8 @@ streams: retryperiod: 2m actions: damn: - cmd: [ 'echo', '' ] + cmd: [ 'sleep', '0.' ] undamn: - cmd: [ 'echo', 'undamn', '' ] + cmd: [ 'sleep', '0.' ] after: 1m onexit: false diff --git a/config/test.jsonnet b/config/test.jsonnet index b5fc18e..63f6b96 100644 --- a/config/test.jsonnet +++ b/config/test.jsonnet @@ -18,12 +18,12 @@ streams: { tailDown1: { - cmd: ['sh', '-c', "echo 1 2 3 4 5 | tr ' ' '\n' | while read i; do sleep 2; echo found $(($i % 10)); done"], + cmd: ['sh', '-c', "echo 1 2 3 4 5 5 | tr ' ' '\n' | while read i; do sleep 1; echo found $(($i % 10)); done"], // cmd: ['sh', '-c', "echo 1 2 3 4 5 1 2 3 4 5 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 | tr ' ' '\n' | while read i; do sleep 2; echo found $(($i % 10)); done"], filters: { findIP: { regex: ['^found $'], - retry: 3, + retry: 1, retryperiod: '30s', actions: { damn: { @@ -31,7 +31,7 @@ }, undamn: { cmd: ['echo', 'undamn', ''], - after: '30s', + after: '4s', onexit: true, }, },