diff --git a/app/daemon.go b/app/daemon.go index 31c9e6c..1bb1ab5 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -94,26 +94,52 @@ func (f *Filter) sendActions(match string, at time.Time) { } func (a *Action) exec(match string) { - wgActions.Add(1) - go func() { - defer wgActions.Done() + defer wgActions.Done() - computedCommand := make([]string, 0, len(a.Cmd)) - for _, item := range a.Cmd { - computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match)) - } + computedCommand := make([]string, 0, len(a.Cmd)) + for _, item := range a.Cmd { + computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match)) + } - logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand) + logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand) - cmd := exec.Command(computedCommand[0], computedCommand[1:]...) + cmd := exec.Command(computedCommand[0], computedCommand[1:]...) - if ret := cmd.Run(); ret != nil { - logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret) - } - }() + if ret := cmd.Run(); ret != nil { + logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret) + } } -func ActionsManager() { +func ActionsManager(concurrency int) { + // concurrency init + execActionsC := make(chan PA) + if concurrency > 0 { + for i := 0; i < concurrency; i++ { + go func() { + var pa PA + for { + pa = <-execActionsC + pa.a.exec(pa.p) + } + }() + } + } else { + go func() { + var pa PA + for { + pa = <-execActionsC + go func(pa PA) { + pa.a.exec(pa.p) + }(pa) + } + }() + } + execAction := func(a *Action, p string) { + wgActions.Add(1) + execActionsC <- PA{p, a} + } + + // main pendingActionsC := make(chan PAT) for { select { @@ -123,13 +149,13 @@ func ActionsManager() { now := time.Now() // check if must be executed now if then.Compare(now) <= 0 { - action.exec(pattern) + execAction(action, pattern) } else { actionsLock.Lock() if actions[pa] == nil { actions[pa] = make(map[time.Time]struct{}) } - actions[PA{pattern, action}][then] = struct{}{} + actions[pa][then] = struct{}{} actionsLock.Unlock() go func(insidePat PAT, insideNow time.Time) { time.Sleep(insidePat.t.Sub(insideNow)) @@ -141,20 +167,17 @@ func ActionsManager() { pattern, action, then := pat.p, pat.a, pat.t actionsLock.Lock() if actions[pa] != nil { - if _, ok := actions[pa][then]; ok { - delete(actions[pa], then) - action.exec(pattern) - } + delete(actions[pa], then) } actionsLock.Unlock() - action.exec(pattern) + execAction(action, pattern) case fo := <-flushToActionsC: ret := make(ActionsMap) actionsLock.Lock() for pa := range actions { if pa.p == fo.p { for range actions[pa] { - pa.a.exec(pa.p) + execAction(pa.a, pa.p) } ret[pa] = actions[pa] delete(actions, pa) @@ -164,9 +187,11 @@ func ActionsManager() { fo.ret <- ret case _, _ = <-stopActions: actionsLock.Lock() - for pat := range actions { - if pat.a.OnExit { - pat.a.exec(pat.p) + for pa := range actions { + if pa.a.OnExit { + for range actions[pa] { + execAction(pa.a, pa.p) + } } } actionsLock.Unlock() @@ -349,7 +374,7 @@ func Daemon(confFilename string) { go DatabaseManager(conf) go MatchesManager() - go ActionsManager() + go ActionsManager(conf.Concurrency) // Ready to start diff --git a/app/example.yml b/app/example.yml index 45264e1..e275e5a 100644 --- a/app/example.yml +++ b/app/example.yml @@ -8,6 +8,11 @@ definitions: # ip46tables is a minimal C program (only POSIX dependencies) present as a subdirectory. # it permits to handle both ipv4/iptables and ipv6/ip6tables commands +# if set to a positive number → max number of concurrent actions +# if set to a negative number → no limit +# if not specified or set to 0 → defaults to the number of CPUs on the system +concurrency: 0 + # patterns are substitued in regexes. # when a filter performs an action, it replaces the found pattern patterns: diff --git a/app/startup.go b/app/startup.go index 54d9097..1364d8b 100644 --- a/app/startup.go +++ b/app/startup.go @@ -6,6 +6,7 @@ import ( "log" "os" "regexp" + "runtime" "strings" "time" @@ -15,6 +16,9 @@ import ( ) func (c *Conf) setup() { + if c.Concurrency == 0 { + c.Concurrency = runtime.NumCPU() + } for patternName := range c.Patterns { pattern := c.Patterns[patternName] @@ -144,13 +148,11 @@ func parseConf(filename string) *Conf { var conf Conf if filename[len(filename)-4:] == ".yml" || filename[len(filename)-5:] == ".yaml" { - logger.Println(logger.DEBUG, "yaml") err = jsonnet.NewYAMLToJSONDecoder(data).Decode(&conf) if err != nil { logger.Fatalln("Failed to parse yaml configuration file:", err) } } else { - logger.Println(logger.DEBUG, "json") var jsondata string jsondata, err = jsonnet.MakeVM().EvaluateFile(filename) if err == nil { diff --git a/app/types.go b/app/types.go index 0825878..dd8b72a 100644 --- a/app/types.go +++ b/app/types.go @@ -8,10 +8,11 @@ import ( ) type Conf struct { - Patterns map[string]*Pattern `json:"patterns"` - Streams map[string]*Stream `json:"streams"` - Start [][]string `json:"start"` - Stop [][]string `json:"stop"` + Concurrency int `json:"concurrency"` + Patterns map[string]*Pattern `json:"patterns"` + Streams map[string]*Stream `json:"streams"` + Start [][]string `json:"start"` + Stop [][]string `json:"stop"` } type Pattern struct { diff --git a/config/example.jsonnet b/config/example.jsonnet index 8a693cc..add9588 100644 --- a/config/example.jsonnet +++ b/config/example.jsonnet @@ -21,6 +21,11 @@ local iptables(args) = ['ip46tables', '-w'] + args; }, }, + // if set to a positive number → max number of concurrent actions + // if set to a negative number → no limit + // if not specified or set to 0 → defaults to the number of CPUs on the system + concurrency: 0, + // Those commands will be executed in order at start, before everything else start: [ // Create an iptables chain for reaction diff --git a/config/heavy-load.yml b/config/heavy-load.yml index a63999a..1c149c3 100644 --- a/config/heavy-load.yml +++ b/config/heavy-load.yml @@ -7,6 +7,8 @@ patterns: ignore: - 1.0.0.1 +concurrency: 0 + streams: tailDown1: cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo found $(($i % 100)); done' ] @@ -18,9 +20,9 @@ streams: retryperiod: 1m actions: damn: - cmd: [ 'echo', '' ] + cmd: [ 'sleep', '0.' ] undamn: - cmd: [ 'echo', 'undamn', '' ] + cmd: [ 'sleep', '0.' ] after: 1m onexit: false tailDown2: @@ -33,9 +35,9 @@ streams: retryperiod: 1m actions: damn: - cmd: [ 'echo', '' ] + cmd: [ 'sleep', '0.' ] undamn: - cmd: [ 'echo', 'undamn', '' ] + cmd: [ 'sleep', '0.' ] after: 1m onexit: false tailDown3: @@ -48,9 +50,9 @@ streams: retryperiod: 2m actions: damn: - cmd: [ 'true' ] + cmd: [ 'sleep', '0.' ] undamn: - cmd: [ 'true' ] + cmd: [ 'sleep', '0.' ] after: 1m onexit: false tailDown4: @@ -63,8 +65,8 @@ streams: retryperiod: 2m actions: damn: - cmd: [ 'echo', '' ] + cmd: [ 'sleep', '0.' ] undamn: - cmd: [ 'echo', 'undamn', '' ] + cmd: [ 'sleep', '0.' ] after: 1m onexit: false diff --git a/config/test.jsonnet b/config/test.jsonnet index b5fc18e..63f6b96 100644 --- a/config/test.jsonnet +++ b/config/test.jsonnet @@ -18,12 +18,12 @@ streams: { tailDown1: { - cmd: ['sh', '-c', "echo 1 2 3 4 5 | tr ' ' '\n' | while read i; do sleep 2; echo found $(($i % 10)); done"], + cmd: ['sh', '-c', "echo 1 2 3 4 5 5 | tr ' ' '\n' | while read i; do sleep 1; echo found $(($i % 10)); done"], // cmd: ['sh', '-c', "echo 1 2 3 4 5 1 2 3 4 5 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 | tr ' ' '\n' | while read i; do sleep 2; echo found $(($i % 10)); done"], filters: { findIP: { regex: ['^found $'], - retry: 3, + retry: 1, retryperiod: '30s', actions: { damn: { @@ -31,7 +31,7 @@ }, undamn: { cmd: ['echo', 'undamn', ''], - after: '30s', + after: '4s', onexit: true, }, },