persist flushes accross restarts. fixes #23

This commit is contained in:
ppom
2023-09-09 19:32:23 +02:00
parent 2bff4ef1b3
commit 1cb0562e15
5 changed files with 157 additions and 89 deletions

View File

@ -147,7 +147,7 @@ func (f *Filter) handle() chan *string {
f.execActions(match, time.Duration(0))
}
logs <- &entry
logs <- entry
}
}
}()
@ -193,14 +193,17 @@ var stopStreams chan bool
var actionStore ActionStore
var wgActions sync.WaitGroup
var logs chan *LogEntry
var logs chan LogEntry
var flushes chan LogEntry
func Daemon(confFilename string) {
actionStore.store = make(ActionMap)
conf := parseConf(confFilename)
logs = conf.DatabaseManager()
logs = make(chan LogEntry)
flushes = make(chan LogEntry)
go conf.DatabaseManager()
// Ready to start
@ -210,7 +213,7 @@ func Daemon(confFilename string) {
stopStreams = make(chan bool)
endSignals := make(chan *Stream)
noStreamsInExecution := len(conf.Streams)
nbStreamsInExecution := len(conf.Streams)
for _, stream := range conf.Streams {
go stream.handle(endSignals)
@ -222,8 +225,8 @@ func Daemon(confFilename string) {
select {
case finishedStream := <-endSignals:
log.Printf("ERROR %s stream finished", finishedStream.name)
noStreamsInExecution--
if noStreamsInExecution == 0 {
nbStreamsInExecution--
if nbStreamsInExecution == 0 {
quit()
}
case <-sigs: