server-client "show" reimplemented
Bug → expired matches are still present
This commit is contained in:
		@ -8,10 +8,12 @@ import (
 | 
			
		||||
	"net"
 | 
			
		||||
	"os"
 | 
			
		||||
	"regexp"
 | 
			
		||||
 | 
			
		||||
	"gopkg.in/yaml.v3"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	Query = 0
 | 
			
		||||
	Show  = 0
 | 
			
		||||
	Flush = 1
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@ -21,9 +23,9 @@ type Request struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Response struct {
 | 
			
		||||
	Err     error
 | 
			
		||||
	Actions ReadableMap
 | 
			
		||||
	Number  int
 | 
			
		||||
	Err          error
 | 
			
		||||
	ClientStatus ClientStatus
 | 
			
		||||
	Number       int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func SendAndRetrieve(data Request) Response {
 | 
			
		||||
@ -31,6 +33,7 @@ func SendAndRetrieve(data Request) Response {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatalln("Error opening connection top daemon:", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer conn.Close()
 | 
			
		||||
 | 
			
		||||
	err = gob.NewEncoder(conn).Encode(data)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@ -45,19 +48,60 @@ func SendAndRetrieve(data Request) Response {
 | 
			
		||||
	return response
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type PatternStatus struct {
 | 
			
		||||
	Matches int                 `yaml:"matches_since_last_trigger"`
 | 
			
		||||
	Actions map[string][]string `yaml:"pending_actions"`
 | 
			
		||||
}
 | 
			
		||||
type MapPatternStatus map[string]*PatternStatus
 | 
			
		||||
type ClientStatus map[string]map[string]MapPatternStatus
 | 
			
		||||
 | 
			
		||||
// This block is made to hide pending_actions when empty
 | 
			
		||||
// and matches_since_last_trigger when zero
 | 
			
		||||
type FullPatternStatus PatternStatus
 | 
			
		||||
type MatchesStatus struct {
 | 
			
		||||
	Matches int `yaml:"matches_since_last_trigger"`
 | 
			
		||||
}
 | 
			
		||||
type ActionsStatus struct {
 | 
			
		||||
	Actions map[string][]string `yaml:"pending_actions"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mps MapPatternStatus) MarshalYAML() (interface{}, error) {
 | 
			
		||||
	ret := make(map[string]interface{})
 | 
			
		||||
	for k, v := range mps {
 | 
			
		||||
		if v.Matches == 0 {
 | 
			
		||||
			if len(v.Actions) != 0 {
 | 
			
		||||
				ret[k] = ActionsStatus{v.Actions}
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			if len(v.Actions) != 0 {
 | 
			
		||||
				ret[k] = v
 | 
			
		||||
			} else {
 | 
			
		||||
				ret[k] = MatchesStatus{v.Matches}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return ret, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// end block
 | 
			
		||||
 | 
			
		||||
func usage(err string) {
 | 
			
		||||
	fmt.Println("Usage: reactionc")
 | 
			
		||||
	fmt.Println("Usage: reactionc flush <PATTERN>")
 | 
			
		||||
	log.Fatalln(err)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ClientQuery(streamfilter string) {
 | 
			
		||||
	response := SendAndRetrieve(Request{Query, streamfilter})
 | 
			
		||||
func ClientShow(streamfilter string) {
 | 
			
		||||
	response := SendAndRetrieve(Request{Show, streamfilter})
 | 
			
		||||
	if response.Err != nil {
 | 
			
		||||
		log.Fatalln("Received error from daemon:", response.Err)
 | 
			
		||||
		os.Exit(1)
 | 
			
		||||
	}
 | 
			
		||||
	fmt.Println(response.Actions.ToString())
 | 
			
		||||
	text, err := yaml.Marshal(response.ClientStatus)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatalln("Failed to convert daemon binary response to text format:", err)
 | 
			
		||||
	}
 | 
			
		||||
	fmt.Println(string(text))
 | 
			
		||||
	os.Exit(0)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -90,7 +90,6 @@ func (a *Action) exec(match string) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ActionsManager() {
 | 
			
		||||
	actions := make(ActionsMap)
 | 
			
		||||
	pendingActionsC := make(chan PAT)
 | 
			
		||||
	var (
 | 
			
		||||
		pat    PAT
 | 
			
		||||
@ -111,12 +110,14 @@ func ActionsManager() {
 | 
			
		||||
				wgActions.Add(1)
 | 
			
		||||
				go action.exec(match)
 | 
			
		||||
			} else {
 | 
			
		||||
				actionsLock.Lock()
 | 
			
		||||
				// make sure map exists
 | 
			
		||||
				if actions[action] == nil {
 | 
			
		||||
					actions[action] = make(PatternTimes)
 | 
			
		||||
				}
 | 
			
		||||
				// append() to nil is valid go
 | 
			
		||||
				actions[action][match] = append(actions[action][match], then)
 | 
			
		||||
				actionsLock.Unlock()
 | 
			
		||||
				go func(pat PAT, now time.Time) {
 | 
			
		||||
					time.Sleep(pat.t.Sub(now))
 | 
			
		||||
					pendingActionsC <- pat
 | 
			
		||||
@ -125,10 +126,13 @@ func ActionsManager() {
 | 
			
		||||
		case pat = <-pendingActionsC:
 | 
			
		||||
			match = pat.p
 | 
			
		||||
			action = pat.a
 | 
			
		||||
			actionsLock.Lock()
 | 
			
		||||
			actions[action][match] = actions[action][match][1:]
 | 
			
		||||
			actionsLock.Unlock()
 | 
			
		||||
			wgActions.Add(1)
 | 
			
		||||
			go action.exec(match)
 | 
			
		||||
		case _, _ = <-stopActions:
 | 
			
		||||
			actionsLock.Lock()
 | 
			
		||||
			for action := range actions {
 | 
			
		||||
				if action.OnExit {
 | 
			
		||||
					for match := range actions[action] {
 | 
			
		||||
@ -137,6 +141,7 @@ func ActionsManager() {
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			actionsLock.Unlock()
 | 
			
		||||
			wgActions.Done()
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
@ -144,7 +149,6 @@ func ActionsManager() {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func MatchesManager() {
 | 
			
		||||
	matches := make(MatchesMap)
 | 
			
		||||
	var pf PF
 | 
			
		||||
	var pft PFT
 | 
			
		||||
	end := false
 | 
			
		||||
@ -157,7 +161,7 @@ func MatchesManager() {
 | 
			
		||||
			if !ok {
 | 
			
		||||
				end = true
 | 
			
		||||
			} else {
 | 
			
		||||
				_ = matchesManagerHandleMatch(matches, pft)
 | 
			
		||||
				_ = matchesManagerHandleMatch(pft)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -165,19 +169,23 @@ func MatchesManager() {
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case pf = <-cleanMatchesC:
 | 
			
		||||
			matchesLock.Lock()
 | 
			
		||||
			delete(matches[pf.f], pf.p)
 | 
			
		||||
			matchesLock.Unlock()
 | 
			
		||||
		case pft = <-matchesC:
 | 
			
		||||
 | 
			
		||||
			entry := LogEntry{pft.t, pft.p, pft.f.stream.name, pft.f.name, false}
 | 
			
		||||
 | 
			
		||||
			entry.Exec = matchesManagerHandleMatch(matches, pft)
 | 
			
		||||
			matchesLock.Lock()
 | 
			
		||||
			entry.Exec = matchesManagerHandleMatch(pft)
 | 
			
		||||
			matchesLock.Unlock()
 | 
			
		||||
 | 
			
		||||
			logsC <- entry
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func matchesManagerHandleMatch(matches MatchesMap, pft PFT) bool {
 | 
			
		||||
func matchesManagerHandleMatch(pft PFT) bool {
 | 
			
		||||
	filter, match, then := pft.f, pft.p, pft.t
 | 
			
		||||
 | 
			
		||||
	if filter.Retry > 1 {
 | 
			
		||||
@ -229,9 +237,13 @@ func StreamManager(s *Stream, endedSignal chan *Stream) {
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var actions ActionsMap
 | 
			
		||||
var matches MatchesMap
 | 
			
		||||
var actionsLock sync.Mutex
 | 
			
		||||
var matchesLock sync.Mutex
 | 
			
		||||
 | 
			
		||||
var stopStreams chan bool
 | 
			
		||||
var stopActions chan bool
 | 
			
		||||
var actionStore ActionStore
 | 
			
		||||
var wgActions sync.WaitGroup
 | 
			
		||||
var wgStreams sync.WaitGroup
 | 
			
		||||
 | 
			
		||||
@ -254,8 +266,6 @@ var cleanMatchesC chan PF
 | 
			
		||||
var actionsC chan PAT
 | 
			
		||||
 | 
			
		||||
func Daemon(confFilename string) {
 | 
			
		||||
	actionStore.store = make(ActionMap)
 | 
			
		||||
 | 
			
		||||
	conf := parseConf(confFilename)
 | 
			
		||||
 | 
			
		||||
	logsC = make(chan LogEntry)
 | 
			
		||||
@ -266,6 +276,8 @@ func Daemon(confFilename string) {
 | 
			
		||||
	actionsC = make(chan PAT)
 | 
			
		||||
	stopActions = make(chan bool)
 | 
			
		||||
	stopStreams = make(chan bool)
 | 
			
		||||
	actions = make(ActionsMap)
 | 
			
		||||
	matches = make(MatchesMap)
 | 
			
		||||
 | 
			
		||||
	go DatabaseManager(conf)
 | 
			
		||||
	go MatchesManager()
 | 
			
		||||
@ -311,7 +323,6 @@ func quit() {
 | 
			
		||||
	// send stop to ActionsManager
 | 
			
		||||
	close(stopActions)
 | 
			
		||||
	// stop all actions
 | 
			
		||||
	actionStore.Quit()
 | 
			
		||||
	log.Println("INFO  Waiting for Actions to finish...")
 | 
			
		||||
	wgActions.Wait()
 | 
			
		||||
	// delete pipe
 | 
			
		||||
 | 
			
		||||
@ -146,7 +146,7 @@ func Main() {
 | 
			
		||||
			os.Exit(1)
 | 
			
		||||
		}
 | 
			
		||||
		// f.Arg(0) is "" if there is no remaining argument
 | 
			
		||||
		ClientQuery(*limit)
 | 
			
		||||
		ClientShow(*limit)
 | 
			
		||||
 | 
			
		||||
	case "flush":
 | 
			
		||||
		SocketPath = addSocketFlag(f)
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										143
									
								
								app/pipe.go
									
									
									
									
									
								
							
							
						
						
									
										143
									
								
								app/pipe.go
									
									
									
									
									
								
							@ -6,115 +6,58 @@ import (
 | 
			
		||||
	"net"
 | 
			
		||||
	"os"
 | 
			
		||||
	"path"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"gopkg.in/yaml.v3"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type ActionMap map[string]map[*Action]map[chan bool]bool
 | 
			
		||||
type ReadableMap map[string]map[string]map[string]int
 | 
			
		||||
func genClientStatus() ClientStatus {
 | 
			
		||||
	cs := make(ClientStatus)
 | 
			
		||||
	matchesLock.Lock()
 | 
			
		||||
 | 
			
		||||
type ActionStore struct {
 | 
			
		||||
	store ActionMap
 | 
			
		||||
	mutex sync.Mutex
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Called by an Action before entering sleep
 | 
			
		||||
func (a *ActionStore) Register(action *Action, pattern string) chan bool {
 | 
			
		||||
	a.mutex.Lock()
 | 
			
		||||
	defer a.mutex.Unlock()
 | 
			
		||||
	if a.store[pattern] == nil {
 | 
			
		||||
		a.store[pattern] = make(map[*Action]map[chan bool]bool)
 | 
			
		||||
	}
 | 
			
		||||
	if a.store[pattern][action] == nil {
 | 
			
		||||
		a.store[pattern][action] = make(map[chan bool]bool)
 | 
			
		||||
	}
 | 
			
		||||
	sig := make(chan bool)
 | 
			
		||||
	a.store[pattern][action][sig] = true
 | 
			
		||||
	return sig
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Called by an Action after sleep
 | 
			
		||||
func (a *ActionStore) Unregister(action *Action, pattern string, sig chan bool) {
 | 
			
		||||
	a.mutex.Lock()
 | 
			
		||||
	defer a.mutex.Unlock()
 | 
			
		||||
	if a.store[pattern] == nil || a.store[pattern][action] == nil || len(a.store[pattern][action]) == 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	close(sig)
 | 
			
		||||
	delete(a.store[pattern][action], sig)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Called by Main
 | 
			
		||||
func (a *ActionStore) Quit() {
 | 
			
		||||
	a.mutex.Lock()
 | 
			
		||||
	defer a.mutex.Unlock()
 | 
			
		||||
	for _, actions := range a.store {
 | 
			
		||||
		for action, sigs := range actions {
 | 
			
		||||
			for sig := range sigs {
 | 
			
		||||
				sig <- action.OnExit
 | 
			
		||||
			}
 | 
			
		||||
	// Painful data manipulation
 | 
			
		||||
	for filter, filterMatches := range matches {
 | 
			
		||||
		if cs[filter.stream.name] == nil {
 | 
			
		||||
			cs[filter.stream.name] = make(map[string]MapPatternStatus)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	a.store = make(ActionMap)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Called by a CLI
 | 
			
		||||
func (a *ActionStore) Flush(pattern string) int {
 | 
			
		||||
	var cpt int
 | 
			
		||||
	a.mutex.Lock()
 | 
			
		||||
	defer a.mutex.Unlock()
 | 
			
		||||
	if a.store[pattern] != nil {
 | 
			
		||||
		for _, action := range a.store[pattern] {
 | 
			
		||||
			for sig := range action {
 | 
			
		||||
				sig <- true
 | 
			
		||||
			}
 | 
			
		||||
			cpt++
 | 
			
		||||
		if cs[filter.stream.name][filter.name] == nil {
 | 
			
		||||
			cs[filter.stream.name][filter.name] = make(MapPatternStatus)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	delete(a.store, pattern)
 | 
			
		||||
	flushesC <- LogEntry{time.Now(), pattern, "", "", false}
 | 
			
		||||
	return cpt
 | 
			
		||||
}
 | 
			
		||||
		for pattern, patternMatches := range filterMatches {
 | 
			
		||||
			var ps PatternStatus
 | 
			
		||||
			cs[filter.stream.name][filter.name][pattern] = &ps
 | 
			
		||||
 | 
			
		||||
// Called by a CLI
 | 
			
		||||
func (a *ActionStore) pendingActions() ReadableMap {
 | 
			
		||||
	a.mutex.Lock()
 | 
			
		||||
	defer a.mutex.Unlock()
 | 
			
		||||
	return a.store.ToReadable()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a ActionMap) ToReadable() ReadableMap {
 | 
			
		||||
	res := make(ReadableMap)
 | 
			
		||||
 | 
			
		||||
	for pattern, actions := range a {
 | 
			
		||||
		for action := range actions {
 | 
			
		||||
			filter := action.filter.name
 | 
			
		||||
			stream := action.filter.stream.name
 | 
			
		||||
			if res[stream] == nil {
 | 
			
		||||
				res[stream] = make(map[string]map[string]int)
 | 
			
		||||
			}
 | 
			
		||||
			if res[stream][filter] == nil {
 | 
			
		||||
				res[stream][filter] = make(map[string]int)
 | 
			
		||||
			}
 | 
			
		||||
			res[stream][filter][pattern] = res[stream][filter][pattern] + 1
 | 
			
		||||
			ps.Matches = len(patternMatches)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return res
 | 
			
		||||
}
 | 
			
		||||
	matchesLock.Unlock()
 | 
			
		||||
	actionsLock.Lock()
 | 
			
		||||
 | 
			
		||||
func (r ReadableMap) ToString() string {
 | 
			
		||||
	text, err := yaml.Marshal(r)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatalln(err)
 | 
			
		||||
	// Painful data manipulation
 | 
			
		||||
	for action, pendingActions := range actions {
 | 
			
		||||
		if cs[action.filter.stream.name] == nil {
 | 
			
		||||
			cs[action.filter.stream.name] = make(map[string]MapPatternStatus)
 | 
			
		||||
		}
 | 
			
		||||
		if cs[action.filter.stream.name][action.filter.name] == nil {
 | 
			
		||||
			cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus)
 | 
			
		||||
		}
 | 
			
		||||
		for pattern, patternPendingActions := range pendingActions {
 | 
			
		||||
			if cs[action.filter.stream.name][action.filter.name][pattern] == nil {
 | 
			
		||||
				var ps PatternStatus
 | 
			
		||||
				cs[action.filter.stream.name][action.filter.name][pattern] = &ps
 | 
			
		||||
			}
 | 
			
		||||
			var ps *PatternStatus
 | 
			
		||||
			ps = cs[action.filter.stream.name][action.filter.name][pattern]
 | 
			
		||||
			ps.Actions = make(map[string][]string)
 | 
			
		||||
 | 
			
		||||
			for _, t := range patternPendingActions {
 | 
			
		||||
				ps.Actions[action.name] = append(ps.Actions[action.name], t.Format(time.DateTime))
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return string(text)
 | 
			
		||||
	actionsLock.Unlock()
 | 
			
		||||
	return cs
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Socket-related, server-related functions
 | 
			
		||||
 | 
			
		||||
func createOpenSocket() net.Listener {
 | 
			
		||||
	err := os.MkdirAll(path.Dir(*SocketPath), 0755)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@ -146,6 +89,7 @@ func SocketManager() {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		go func(conn net.Conn) {
 | 
			
		||||
			defer conn.Close()
 | 
			
		||||
			var request Request
 | 
			
		||||
			var response Response
 | 
			
		||||
 | 
			
		||||
@ -156,16 +100,17 @@ func SocketManager() {
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			switch request.Request {
 | 
			
		||||
			case Query:
 | 
			
		||||
				response.Actions = actionStore.store.ToReadable()
 | 
			
		||||
			case Show:
 | 
			
		||||
				response.ClientStatus = genClientStatus()
 | 
			
		||||
			case Flush:
 | 
			
		||||
				response.Number = actionStore.Flush(request.Pattern)
 | 
			
		||||
				// FIXME reimplement flush
 | 
			
		||||
				response.Number = 0
 | 
			
		||||
			default:
 | 
			
		||||
				log.Println("ERROR Invalid Message from cli: unrecognised Request type")
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			gob.NewEncoder(conn).Encode(response)
 | 
			
		||||
			err = gob.NewEncoder(conn).Encode(response)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.Println("ERROR Can't respond to cli:", err)
 | 
			
		||||
				return
 | 
			
		||||
 | 
			
		||||
@ -2,20 +2,24 @@
 | 
			
		||||
patterns:
 | 
			
		||||
  num:
 | 
			
		||||
    regex: '[0-9]+'
 | 
			
		||||
  ip:
 | 
			
		||||
    regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
 | 
			
		||||
    ignore:
 | 
			
		||||
      - 1.0.0.1
 | 
			
		||||
 | 
			
		||||
streams:
 | 
			
		||||
  tailDown:
 | 
			
		||||
    cmd: [ "sh", "-c", "sleep 0.5; echo found 1; sleep 1; echo found 1; sleep 10" ]
 | 
			
		||||
  tailDown1:
 | 
			
		||||
    cmd: [ "sh", "-c", "echo 1 2 3 4 5 1 2 3 4 5 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 1 2 3 4 | tr ' ' '\n' | while read i; do sleep 2; echo found $(($i % 10)); done" ]
 | 
			
		||||
    filters:
 | 
			
		||||
      findIP:
 | 
			
		||||
        regex:
 | 
			
		||||
          - '^found <num>$'
 | 
			
		||||
        retry: 2
 | 
			
		||||
        retry-period: 1m
 | 
			
		||||
        retry: 3
 | 
			
		||||
        retry-period: 30s
 | 
			
		||||
        actions:
 | 
			
		||||
          damn:
 | 
			
		||||
            cmd: [ "echo", "<num>" ]
 | 
			
		||||
          undamn:
 | 
			
		||||
            cmd: [ "echo", "undamn", "<num>" ]
 | 
			
		||||
            after: 5s
 | 
			
		||||
            after: 30s
 | 
			
		||||
            onexit: true
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user