Execute now all pending actions when quiting

This commit is contained in:
ppom 2023-04-27 11:58:57 +02:00
parent a2be5a566c
commit 067fb13e79

View File

@ -64,10 +64,27 @@ func (f *Filter) execActions(match string, advance time.Duration) {
} }
} }
func sleep(d time.Duration) chan bool {
c := make(chan bool)
go func() {
time.Sleep(d)
c <- true
close(c)
}()
return c
}
func (a *Action) exec(match string, advance time.Duration) { func (a *Action) exec(match string, advance time.Duration) {
defer wgActions.Done() defer wgActions.Done()
// Wait for either end of sleep time, or stopActions channel being closed
if a.afterDuration != 0 && a.afterDuration > advance { if a.afterDuration != 0 && a.afterDuration > advance {
time.Sleep(a.afterDuration - advance) select {
case <-sleep(a.afterDuration - advance):
// no-op
case _, _ = <-stopActions:
// no-op
}
} }
computedCommand := make([]string, 0, len(a.Cmd)) computedCommand := make([]string, 0, len(a.Cmd))
@ -122,24 +139,7 @@ func (f *Filter) handle() chan *string {
return lines return lines
} }
func multiplex(input chan *string, outputs []chan *string) { func (s *Stream) handle(endedSignal chan *Stream) {
var wg sync.WaitGroup
for item := range input {
for _, output := range outputs {
wg.Add(1)
go func(s *string) {
output <- s
wg.Done()
}(item)
}
}
for _, output := range outputs {
wg.Wait()
close(output)
}
}
func (s *Stream) handle(signal chan *Stream) {
log.Printf("INFO %s: start %s\n", s.name, s.Cmd) log.Printf("INFO %s: start %s\n", s.name, s.Cmd)
lines := cmdStdout(s.Cmd) lines := cmdStdout(s.Cmd)
@ -148,12 +148,33 @@ func (s *Stream) handle(signal chan *Stream) {
for _, filter := range s.Filters { for _, filter := range s.Filters {
filterInputs = append(filterInputs, filter.handle()) filterInputs = append(filterInputs, filter.handle())
} }
defer func() {
for _, filterInput := range filterInputs {
close(filterInput)
}
}()
multiplex(lines, filterInputs) for {
select {
signal <- s case line, ok := <-lines:
if !ok {
endedSignal <- s
return
}
for _, filterInput := range filterInputs {
filterInput <- line
}
case _, ok := <-stopStreams:
if !ok {
return
}
}
} }
}
var stopStreams chan bool
var stopActions chan bool
var wgActions sync.WaitGroup var wgActions sync.WaitGroup
var db *gob.Encoder var db *gob.Encoder
@ -173,6 +194,9 @@ func Main() {
conf, localdb := parseConf(*confFilename) conf, localdb := parseConf(*confFilename)
db = localdb db = localdb
stopStreams = make(chan bool)
stopActions = make(chan bool)
endSignals := make(chan *Stream) endSignals := make(chan *Stream)
noStreamsInExecution := len(conf.Streams) noStreamsInExecution := len(conf.Streams)
@ -196,7 +220,12 @@ func Main() {
} }
func quit() { func quit() {
// TODO replace with advanced execution of all WIP actions // stop all streams
close(stopStreams)
// stop all actions
close(stopActions)
// wait for them to complete
wgActions.Wait() wgActions.Wait()
os.Exit(3) os.Exit(3)
} }