diff --git a/app/reaction.go b/app/reaction.go index a44a6c9..ec74005 100644 --- a/app/reaction.go +++ b/app/reaction.go @@ -64,10 +64,27 @@ func (f *Filter) execActions(match string, advance time.Duration) { } } +func sleep(d time.Duration) chan bool { + c := make(chan bool) + go func() { + time.Sleep(d) + c <- true + close(c) + }() + return c +} + func (a *Action) exec(match string, advance time.Duration) { defer wgActions.Done() + + // Wait for either end of sleep time, or stopActions channel being closed if a.afterDuration != 0 && a.afterDuration > advance { - time.Sleep(a.afterDuration - advance) + select { + case <-sleep(a.afterDuration - advance): + // no-op + case _, _ = <-stopActions: + // no-op + } } computedCommand := make([]string, 0, len(a.Cmd)) @@ -122,24 +139,7 @@ func (f *Filter) handle() chan *string { return lines } -func multiplex(input chan *string, outputs []chan *string) { - var wg sync.WaitGroup - for item := range input { - for _, output := range outputs { - wg.Add(1) - go func(s *string) { - output <- s - wg.Done() - }(item) - } - } - for _, output := range outputs { - wg.Wait() - close(output) - } -} - -func (s *Stream) handle(signal chan *Stream) { +func (s *Stream) handle(endedSignal chan *Stream) { log.Printf("INFO %s: start %s\n", s.name, s.Cmd) lines := cmdStdout(s.Cmd) @@ -148,12 +148,33 @@ func (s *Stream) handle(signal chan *Stream) { for _, filter := range s.Filters { filterInputs = append(filterInputs, filter.handle()) } + defer func() { + for _, filterInput := range filterInputs { + close(filterInput) + } + }() - multiplex(lines, filterInputs) + for { + select { + case line, ok := <-lines: + if !ok { + endedSignal <- s + return + } + for _, filterInput := range filterInputs { + filterInput <- line + } + case _, ok := <-stopStreams: + if !ok { + return + } + } + } - signal <- s } +var stopStreams chan bool +var stopActions chan bool var wgActions sync.WaitGroup var db *gob.Encoder @@ -173,6 +194,9 @@ func Main() { conf, localdb := parseConf(*confFilename) db = localdb + stopStreams = make(chan bool) + stopActions = make(chan bool) + endSignals := make(chan *Stream) noStreamsInExecution := len(conf.Streams) @@ -196,7 +220,12 @@ func Main() { } func quit() { - // TODO replace with advanced execution of all WIP actions + // stop all streams + close(stopStreams) + // stop all actions + close(stopActions) + // wait for them to complete wgActions.Wait() + os.Exit(3) }