diff --git a/app/client.go b/app/client.go index 94d1220..5889a0e 100644 --- a/app/client.go +++ b/app/client.go @@ -8,10 +8,12 @@ import ( "net" "os" "regexp" + + "gopkg.in/yaml.v3" ) const ( - Query = 0 + Show = 0 Flush = 1 ) @@ -21,9 +23,9 @@ type Request struct { } type Response struct { - Err error - Actions ReadableMap - Number int + Err error + ClientStatus ClientStatus + Number int } func SendAndRetrieve(data Request) Response { @@ -31,6 +33,7 @@ func SendAndRetrieve(data Request) Response { if err != nil { log.Fatalln("Error opening connection top daemon:", err) } + defer conn.Close() err = gob.NewEncoder(conn).Encode(data) if err != nil { @@ -45,19 +48,60 @@ func SendAndRetrieve(data Request) Response { return response } +type PatternStatus struct { + Matches int `yaml:"matches_since_last_trigger"` + Actions map[string][]string `yaml:"pending_actions"` +} +type MapPatternStatus map[string]*PatternStatus +type ClientStatus map[string]map[string]MapPatternStatus + +// This block is made to hide pending_actions when empty +// and matches_since_last_trigger when zero +type FullPatternStatus PatternStatus +type MatchesStatus struct { + Matches int `yaml:"matches_since_last_trigger"` +} +type ActionsStatus struct { + Actions map[string][]string `yaml:"pending_actions"` +} + +func (mps MapPatternStatus) MarshalYAML() (interface{}, error) { + ret := make(map[string]interface{}) + for k, v := range mps { + if v.Matches == 0 { + if len(v.Actions) != 0 { + ret[k] = ActionsStatus{v.Actions} + } + } else { + if len(v.Actions) != 0 { + ret[k] = v + } else { + ret[k] = MatchesStatus{v.Matches} + } + } + } + return ret, nil +} + +// end block + func usage(err string) { fmt.Println("Usage: reactionc") fmt.Println("Usage: reactionc flush ") log.Fatalln(err) } -func ClientQuery(streamfilter string) { - response := SendAndRetrieve(Request{Query, streamfilter}) +func ClientShow(streamfilter string) { + response := SendAndRetrieve(Request{Show, streamfilter}) if response.Err != nil { log.Fatalln("Received error from daemon:", response.Err) os.Exit(1) } - fmt.Println(response.Actions.ToString()) + text, err := yaml.Marshal(response.ClientStatus) + if err != nil { + log.Fatalln("Failed to convert daemon binary response to text format:", err) + } + fmt.Println(string(text)) os.Exit(0) } diff --git a/app/daemon.go b/app/daemon.go index 209381e..1c18913 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -90,7 +90,6 @@ func (a *Action) exec(match string) { } func ActionsManager() { - actions := make(ActionsMap) pendingActionsC := make(chan PAT) var ( pat PAT @@ -111,12 +110,14 @@ func ActionsManager() { wgActions.Add(1) go action.exec(match) } else { + actionsLock.Lock() // make sure map exists if actions[action] == nil { actions[action] = make(PatternTimes) } // append() to nil is valid go actions[action][match] = append(actions[action][match], then) + actionsLock.Unlock() go func(pat PAT, now time.Time) { time.Sleep(pat.t.Sub(now)) pendingActionsC <- pat @@ -125,10 +126,13 @@ func ActionsManager() { case pat = <-pendingActionsC: match = pat.p action = pat.a + actionsLock.Lock() actions[action][match] = actions[action][match][1:] + actionsLock.Unlock() wgActions.Add(1) go action.exec(match) case _, _ = <-stopActions: + actionsLock.Lock() for action := range actions { if action.OnExit { for match := range actions[action] { @@ -137,6 +141,7 @@ func ActionsManager() { } } } + actionsLock.Unlock() wgActions.Done() return } @@ -144,7 +149,6 @@ func ActionsManager() { } func MatchesManager() { - matches := make(MatchesMap) var pf PF var pft PFT end := false @@ -157,7 +161,7 @@ func MatchesManager() { if !ok { end = true } else { - _ = matchesManagerHandleMatch(matches, pft) + _ = matchesManagerHandleMatch(pft) } } } @@ -165,19 +169,23 @@ func MatchesManager() { for { select { case pf = <-cleanMatchesC: + matchesLock.Lock() delete(matches[pf.f], pf.p) + matchesLock.Unlock() case pft = <-matchesC: entry := LogEntry{pft.t, pft.p, pft.f.stream.name, pft.f.name, false} - entry.Exec = matchesManagerHandleMatch(matches, pft) + matchesLock.Lock() + entry.Exec = matchesManagerHandleMatch(pft) + matchesLock.Unlock() logsC <- entry } } } -func matchesManagerHandleMatch(matches MatchesMap, pft PFT) bool { +func matchesManagerHandleMatch(pft PFT) bool { filter, match, then := pft.f, pft.p, pft.t if filter.Retry > 1 { @@ -229,9 +237,13 @@ func StreamManager(s *Stream, endedSignal chan *Stream) { } +var actions ActionsMap +var matches MatchesMap +var actionsLock sync.Mutex +var matchesLock sync.Mutex + var stopStreams chan bool var stopActions chan bool -var actionStore ActionStore var wgActions sync.WaitGroup var wgStreams sync.WaitGroup @@ -254,8 +266,6 @@ var cleanMatchesC chan PF var actionsC chan PAT func Daemon(confFilename string) { - actionStore.store = make(ActionMap) - conf := parseConf(confFilename) logsC = make(chan LogEntry) @@ -266,6 +276,8 @@ func Daemon(confFilename string) { actionsC = make(chan PAT) stopActions = make(chan bool) stopStreams = make(chan bool) + actions = make(ActionsMap) + matches = make(MatchesMap) go DatabaseManager(conf) go MatchesManager() @@ -311,7 +323,6 @@ func quit() { // send stop to ActionsManager close(stopActions) // stop all actions - actionStore.Quit() log.Println("INFO Waiting for Actions to finish...") wgActions.Wait() // delete pipe diff --git a/app/main.go b/app/main.go index c9dddfb..9af1ccd 100644 --- a/app/main.go +++ b/app/main.go @@ -146,7 +146,7 @@ func Main() { os.Exit(1) } // f.Arg(0) is "" if there is no remaining argument - ClientQuery(*limit) + ClientShow(*limit) case "flush": SocketPath = addSocketFlag(f) diff --git a/app/pipe.go b/app/pipe.go index 3434719..92c4f5b 100644 --- a/app/pipe.go +++ b/app/pipe.go @@ -6,115 +6,58 @@ import ( "net" "os" "path" - "sync" "time" - - "gopkg.in/yaml.v3" ) -type ActionMap map[string]map[*Action]map[chan bool]bool -type ReadableMap map[string]map[string]map[string]int +func genClientStatus() ClientStatus { + cs := make(ClientStatus) + matchesLock.Lock() -type ActionStore struct { - store ActionMap - mutex sync.Mutex -} - -// Called by an Action before entering sleep -func (a *ActionStore) Register(action *Action, pattern string) chan bool { - a.mutex.Lock() - defer a.mutex.Unlock() - if a.store[pattern] == nil { - a.store[pattern] = make(map[*Action]map[chan bool]bool) - } - if a.store[pattern][action] == nil { - a.store[pattern][action] = make(map[chan bool]bool) - } - sig := make(chan bool) - a.store[pattern][action][sig] = true - return sig -} - -// Called by an Action after sleep -func (a *ActionStore) Unregister(action *Action, pattern string, sig chan bool) { - a.mutex.Lock() - defer a.mutex.Unlock() - if a.store[pattern] == nil || a.store[pattern][action] == nil || len(a.store[pattern][action]) == 0 { - return - } - close(sig) - delete(a.store[pattern][action], sig) -} - -// Called by Main -func (a *ActionStore) Quit() { - a.mutex.Lock() - defer a.mutex.Unlock() - for _, actions := range a.store { - for action, sigs := range actions { - for sig := range sigs { - sig <- action.OnExit - } + // Painful data manipulation + for filter, filterMatches := range matches { + if cs[filter.stream.name] == nil { + cs[filter.stream.name] = make(map[string]MapPatternStatus) } - } - a.store = make(ActionMap) -} - -// Called by a CLI -func (a *ActionStore) Flush(pattern string) int { - var cpt int - a.mutex.Lock() - defer a.mutex.Unlock() - if a.store[pattern] != nil { - for _, action := range a.store[pattern] { - for sig := range action { - sig <- true - } - cpt++ + if cs[filter.stream.name][filter.name] == nil { + cs[filter.stream.name][filter.name] = make(MapPatternStatus) } - } - delete(a.store, pattern) - flushesC <- LogEntry{time.Now(), pattern, "", "", false} - return cpt -} + for pattern, patternMatches := range filterMatches { + var ps PatternStatus + cs[filter.stream.name][filter.name][pattern] = &ps -// Called by a CLI -func (a *ActionStore) pendingActions() ReadableMap { - a.mutex.Lock() - defer a.mutex.Unlock() - return a.store.ToReadable() -} - -func (a ActionMap) ToReadable() ReadableMap { - res := make(ReadableMap) - - for pattern, actions := range a { - for action := range actions { - filter := action.filter.name - stream := action.filter.stream.name - if res[stream] == nil { - res[stream] = make(map[string]map[string]int) - } - if res[stream][filter] == nil { - res[stream][filter] = make(map[string]int) - } - res[stream][filter][pattern] = res[stream][filter][pattern] + 1 + ps.Matches = len(patternMatches) } } - return res -} + matchesLock.Unlock() + actionsLock.Lock() -func (r ReadableMap) ToString() string { - text, err := yaml.Marshal(r) - if err != nil { - log.Fatalln(err) + // Painful data manipulation + for action, pendingActions := range actions { + if cs[action.filter.stream.name] == nil { + cs[action.filter.stream.name] = make(map[string]MapPatternStatus) + } + if cs[action.filter.stream.name][action.filter.name] == nil { + cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus) + } + for pattern, patternPendingActions := range pendingActions { + if cs[action.filter.stream.name][action.filter.name][pattern] == nil { + var ps PatternStatus + cs[action.filter.stream.name][action.filter.name][pattern] = &ps + } + var ps *PatternStatus + ps = cs[action.filter.stream.name][action.filter.name][pattern] + ps.Actions = make(map[string][]string) + + for _, t := range patternPendingActions { + ps.Actions[action.name] = append(ps.Actions[action.name], t.Format(time.DateTime)) + } + } } - return string(text) + actionsLock.Unlock() + return cs } -// Socket-related, server-related functions - func createOpenSocket() net.Listener { err := os.MkdirAll(path.Dir(*SocketPath), 0755) if err != nil { @@ -146,6 +89,7 @@ func SocketManager() { continue } go func(conn net.Conn) { + defer conn.Close() var request Request var response Response @@ -156,16 +100,17 @@ func SocketManager() { } switch request.Request { - case Query: - response.Actions = actionStore.store.ToReadable() + case Show: + response.ClientStatus = genClientStatus() case Flush: - response.Number = actionStore.Flush(request.Pattern) + // FIXME reimplement flush + response.Number = 0 default: log.Println("ERROR Invalid Message from cli: unrecognised Request type") return } - gob.NewEncoder(conn).Encode(response) + err = gob.NewEncoder(conn).Encode(response) if err != nil { log.Println("ERROR Can't respond to cli:", err) return diff --git a/config/reaction.test.yml b/config/reaction.test.yml index fbfe1f2..89d8fbc 100644 --- a/config/reaction.test.yml +++ b/config/reaction.test.yml @@ -2,20 +2,24 @@ patterns: num: regex: '[0-9]+' + ip: + regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})' + ignore: + - 1.0.0.1 streams: - tailDown: - cmd: [ "sh", "-c", "sleep 0.5; echo found 1; sleep 1; echo found 1; sleep 10" ] + tailDown1: + cmd: [ "sh", "-c", "echo 1 2 3 4 5 1 2 3 4 5 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 | tr ' ' '\n' | while read i; do sleep 2; echo found $(($i % 10)); done" ] filters: findIP: regex: - '^found $' - retry: 2 - retry-period: 1m + retry: 3 + retry-period: 30s actions: damn: cmd: [ "echo", "" ] undamn: cmd: [ "echo", "undamn", "" ] - after: 5s + after: 30s onexit: true