database refactor

fixes #21
This commit is contained in:
ppom
2023-09-06 02:00:33 +02:00
parent 9555342741
commit 48f2517074
4 changed files with 249 additions and 130 deletions

View File

@ -2,7 +2,6 @@ package app
import (
"bufio"
"encoding/gob"
"syscall"
// "fmt"
@ -86,34 +85,33 @@ func sleep(d time.Duration) chan bool {
func (a *Action) exec(match string, advance time.Duration) {
defer wgActions.Done()
doExec := true
// Wait for either end of sleep time, or actionStore requesting stop
if a.afterDuration != 0 && a.afterDuration > advance {
stopAction := actionStore.Register(a, match)
select {
case <-sleep(a.afterDuration - advance):
// no-op
case doExec = <-stopAction:
// no-op
// Let's not wait for the lock
go actionStore.Unregister(a, match, stopAction)
case doExec := <-stopAction:
// no need to unregister here
if !doExec {
return
}
}
// Let's not wait for the lock
go actionStore.Unregister(a, match, stopAction)
}
if doExec {
computedCommand := make([]string, 0, len(a.Cmd))
for _, item := range a.Cmd {
computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match))
}
computedCommand := make([]string, 0, len(a.Cmd))
for _, item := range a.Cmd {
computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match))
}
log.Printf("INFO %s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand)
log.Printf("INFO %s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand)
cmd := exec.Command(computedCommand[0], computedCommand[1:]...)
cmd := exec.Command(computedCommand[0], computedCommand[1:]...)
if ret := cmd.Run(); ret != nil {
log.Printf("ERROR %s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret)
}
if ret := cmd.Run(); ret != nil {
log.Printf("ERROR %s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret)
}
}
@ -149,7 +147,7 @@ func (f *Filter) handle() chan *string {
f.execActions(match, time.Duration(0))
}
db.Encode(&entry)
logs <- &entry
}
}
}()
@ -195,13 +193,14 @@ var stopStreams chan bool
var actionStore ActionStore
var wgActions sync.WaitGroup
var db *gob.Encoder
var logs chan *LogEntry
func Daemon(confFilename string) {
actionStore.store = make(ActionMap)
conf := parseConf(confFilename)
db = conf.updateFromDB()
logs = conf.DatabaseManager()
// Ready to start
@ -240,6 +239,7 @@ func quit() {
// stop all actions
actionStore.Quit()
// wait for them to complete
log.Println("INFO Waiting for actions to complete")
wgActions.Wait()
// delete pipe
err := os.Remove(*SocketPath)