diff --git a/app/cli.go b/app/cli.go new file mode 100644 index 0000000..b63802f --- /dev/null +++ b/app/cli.go @@ -0,0 +1,119 @@ +package app + +import ( + "encoding/gob" + "errors" + "fmt" + "log" + "math/rand" + "os" + "path" + "time" +) + +const ( + Query = 0 + Flush = 1 +) + +type Request struct { + Request int + Id int + Pattern string +} + +type Response struct { + Err error + Actions ReadableMap +} + +// Runtime files: +// /run/user//reaction/reaction.pipe +// /run/user//reaction/id.response + +func RuntimeDirectory() string { + return fmt.Sprintf("/run/user/%v/reaction/", os.Getuid()) +} + +func PipePath() string { + return path.Join(RuntimeDirectory(), "reaction.pipe") +} + +func (r Request) ResponsePath() string { + return path.Join(RuntimeDirectory(), string(r.Id)) +} + +func Send(data Request) { + pipePath := PipePath() + pipe, err := os.OpenFile(pipePath, os.O_APPEND, os.ModeNamedPipe) + if err != nil { + log.Println("Failed to open", pipePath, ":", err) + log.Fatalln("Is the reaction daemon running? Does the CLI run as the same user?") + } + log.Println("DEBUG opening ok, encoding...") + enc := gob.NewEncoder(pipe) + err = enc.Encode(data) + if err != nil { + log.Fatalf("Failed to write to %s: %s", pipePath, err) + } +} + +func SendAndRetrieve(data Request) Response { + if data.Id == 0 { + data.Id = rand.Int() + } + log.Println("DEBUG sending:", data) + Send(data) + responsePath := data.ResponsePath() + d, _ := time.ParseDuration("100ms") + for tries := 20; tries > 0; tries-- { + log.Println("DEBUG waiting for answer...") + file, err := os.Open(responsePath) + if errors.Is(err, os.ErrNotExist) { + time.Sleep(d) + continue + } + defer os.Remove(responsePath) + if err != nil { + log.Fatalf("Error opening daemon answer: %s", err) + } + var response Response + err = gob.NewDecoder(file).Decode(&response) + if err != nil { + log.Fatalf("Error parsing daemon answer: %s", err) + } + return response + } + log.Fatalln("Timeout while waiting answer from the daemon") + return Response{errors.New("unreachable code"), nil} +} + +func usage(err string) { + fmt.Println("Usage: reactionc") + fmt.Println("Usage: reactionc flush ") + log.Fatalln(err) +} + +func CLI() { + if len(os.Args) <= 1 { + response := SendAndRetrieve(Request{Query, 0, ""}) + if response.Err != nil { + log.Fatalln("Received error from daemon:", response.Err) + } + fmt.Println(response.Actions.ToString()) + os.Exit(0) + } + switch os.Args[1] { + case "flush": + if len(os.Args) != 3 { + usage("flush takes one argument") + } + response := SendAndRetrieve(Request{Flush, 0, os.Args[2]}) + if response.Err != nil { + log.Fatalln("Received error from daemon:", response.Err) + } + os.Exit(0) + default: + usage("first argument must be `flush`") + } +} diff --git a/app/pipe.go b/app/pipe.go new file mode 100644 index 0000000..731659f --- /dev/null +++ b/app/pipe.go @@ -0,0 +1,174 @@ +package app + +import ( + "encoding/gob" + "errors" + "io/fs" + "log" + "os" + "sync" + "syscall" + + "gopkg.in/yaml.v3" +) + +type ActionMap map[string]map[*Action]map[chan bool]bool +type ReadableMap map[string]map[string]map[string]int + +type ActionStore struct { + store ActionMap + mutex sync.Mutex +} + +// Called by an Action before entering sleep +func (a *ActionStore) Register(action *Action, pattern string) chan bool { + a.mutex.Lock() + defer a.mutex.Unlock() + if a.store[pattern] == nil { + a.store[pattern] = make(map[*Action]map[chan bool]bool) + } + if a.store[pattern][action] == nil { + a.store[pattern][action] = make(map[chan bool]bool) + } + sig := make(chan bool) + a.store[pattern][action][sig] = true + return sig +} + +// Called by an Action after sleep +func (a *ActionStore) Unregister(action *Action, pattern string, sig chan bool) { + a.mutex.Lock() + defer a.mutex.Unlock() + if a.store[pattern] == nil || a.store[pattern][action] == nil || len(a.store[pattern][action]) == 0 { + return + } + close(sig) + delete(a.store[pattern][action], sig) +} + +// Called by Main +func (a *ActionStore) Quit() { + a.mutex.Lock() + defer a.mutex.Unlock() + for _, actions := range a.store { + for _, sigs := range actions { + for sig := range sigs { + close(sig) + } + } + } + a.store = make(ActionMap) +} + +// Called by a CLI +func (a *ActionStore) Flush(pattern string) { + a.mutex.Lock() + defer a.mutex.Unlock() + if a.store[pattern] != nil { + for _, action := range a.store[pattern] { + for sig := range action { + close(sig) + } + } + } + delete(a.store, pattern) +} + +// Called by a CLI +func (a *ActionStore) pendingActions() ReadableMap { + a.mutex.Lock() + defer a.mutex.Unlock() + return a.store.ToReadable() +} + +func (a ActionMap) ToReadable() ReadableMap { + res := make(ReadableMap) + + for pattern, actions := range a { + for action := range actions { + filter := action.filter.name + stream := action.filter.stream.name + if res[stream] == nil { + res[stream] = make(map[string]map[string]int) + } + if res[stream][filter] == nil { + res[stream][filter] = make(map[string]int) + } + res[stream][filter][pattern] = res[stream][filter][pattern] + 1 + } + } + + return res +} + +func (r ReadableMap) ToString() string { + text, err := yaml.Marshal(r) + if err != nil { + log.Fatalln(err) + } + return string(text) +} + +// Pipe-related, server-related functions + +func createOpenPipe() fs.File { + err := os.Mkdir(RuntimeDirectory(), 0755) + if err != nil && !errors.Is(err, os.ErrExist) { + log.Fatalln("FATAL Failed to create runtime directory", err) + } + pipePath := PipePath() + _, err = os.Stat(pipePath) + if err == nil { + log.Println("WARN Runtime file", pipePath, "already exists: Is the daemon already running? Deleting.") + err = os.Remove(pipePath) + if err != nil { + log.Println("FATAL Failed to remove runtime file:", err) + } + } + err = syscall.Mkfifo(pipePath, 0600) + if err != nil { + log.Println("FATAL Failed to create runtime file:", err) + } + file, err := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe) + if err != nil { + log.Println("FATAL Failed to open runtime file:", err) + } + return file +} + +func Respond(request Request, response Response) { + file, err := os.Create(request.ResponsePath()) + if err != nil { + log.Println("WARN Can't respond to message:", err) + return + } + err = gob.NewEncoder(file).Encode(response) + if err != nil { + log.Println("WARN Can't respond to message:", err) + return + } +} + +// Handle connections +func Serve() { + pipe := createOpenPipe() + for { + var request Request + err := gob.NewDecoder(pipe).Decode(&request) + if err != nil { + log.Println("WARN Invalid Message received: ", err) + } + go func(request Request) { + var response Response + switch request.Request { + case Query: + response.Actions = actionStore.store.ToReadable() + case Flush: + actionStore.Flush(request.Pattern) + default: + log.Println("WARN Invalid Message: unrecognised Request type") + } + Respond(request, response) + }(request) + } +} diff --git a/app/reaction.go b/app/reaction.go index f662b72..4aa51a8 100644 --- a/app/reaction.go +++ b/app/reaction.go @@ -24,10 +24,10 @@ func cmdStdout(commandline []string) chan *string { cmd := exec.Command(commandline[0], commandline[1:]...) stdout, err := cmd.StdoutPipe() if err != nil { - log.Fatal("couldn't open stdout on command:", err) + log.Fatalln("couldn't open stdout on command:", err) } if err := cmd.Start(); err != nil { - log.Fatal("couldn't start command:", err) + log.Fatalln("couldn't start command:", err) } defer stdout.Close() @@ -77,14 +77,17 @@ func sleep(d time.Duration) chan bool { func (a *Action) exec(match string, advance time.Duration) { defer wgActions.Done() - // Wait for either end of sleep time, or stopActions channel being closed + // Wait for either end of sleep time, or actionStore requesting stop if a.afterDuration != 0 && a.afterDuration > advance { + stopAction := actionStore.Register(a, match) select { case <-sleep(a.afterDuration - advance): // no-op - case _, _ = <-stopActions: + case _, _ = <-stopAction: // no-op } + // Let's not wait for the lock + go actionStore.Unregister(a, match, stopAction) } computedCommand := make([]string, 0, len(a.Cmd)) @@ -174,7 +177,7 @@ func (s *Stream) handle(endedSignal chan *Stream) { } var stopStreams chan bool -var stopActions chan bool +var actionStore ActionStore var wgActions sync.WaitGroup var db *gob.Encoder @@ -188,6 +191,8 @@ func Main() { os.Exit(2) } + actionStore.store = make(ActionMap) + conf := parseConf(*confFilename) db = conf.updateFromDB() @@ -197,7 +202,6 @@ func Main() { signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) stopStreams = make(chan bool) - stopActions = make(chan bool) endSignals := make(chan *Stream) noStreamsInExecution := len(conf.Streams) @@ -206,6 +210,8 @@ func Main() { go stream.handle(endSignals) } + go Serve() + for { select { case finishedStream := <-endSignals: @@ -225,9 +231,11 @@ func quit() { // stop all streams close(stopStreams) // stop all actions - close(stopActions) + actionStore.Quit() // wait for them to complete wgActions.Wait() + // delete pipe + os.Remove(PipePath()) os.Exit(3) } diff --git a/app/startup.go b/app/startup.go index 5eddb0f..75867e3 100644 --- a/app/startup.go +++ b/app/startup.go @@ -248,7 +248,7 @@ func (c *Conf) updateFromDB() *gob.Encoder { } } -func parseConf(filename string) (*Conf, *gob.Encoder) { +func parseConf(filename string) *Conf { data, err := os.ReadFile(filename) if err != nil { diff --git a/cli.go b/cli.go new file mode 100644 index 0000000..2a73dbb --- /dev/null +++ b/cli.go @@ -0,0 +1,9 @@ +package main + +import ( + "reaction/app" +) + +func main() { + app.CLI() +} diff --git a/config/reaction.test.yml b/config/reaction.test.yml index 6109669..3d5e999 100644 --- a/config/reaction.test.yml +++ b/config/reaction.test.yml @@ -4,7 +4,7 @@ patterns: streams: tailDown: - cmd: [ "sh", "-c", "echo 'found 1.1.1.1' && sleep 2s && echo 'found 1.1.1.2' && sleep 2s && echo 'found 1.1.1.1' && sleep 1s" ] + cmd: [ "sh", "-c", "echo 'found 1.1.1.1' && sleep 2s && echo 'found 1.1.1.2' && sleep 2s && echo 'found 1.1.1.1' && sleep 10m" ] filters: findIP: regex: @@ -16,4 +16,4 @@ streams: cmd: [ "echo", "" ] sleepdamn: cmd: [ "echo", "sleep", "" ] - after: 10s + after: 8m diff --git a/main.go b/daemon.go similarity index 100% rename from main.go rename to daemon.go