Compare commits
	
		
			6 Commits
		
	
	
		
			persistent
			...
			multipatte
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 9ef2ae585e | |||
| 61bf6f92b9 | |||
| 463c5b709f | |||
| 4c18161c9c | |||
| 833ffde474 | |||
| 00f1647aa6 | 
@ -8,6 +8,7 @@ import (
 | 
			
		||||
	"net"
 | 
			
		||||
	"os"
 | 
			
		||||
	"regexp"
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
	"framagit.org/ppom/reaction/logger"
 | 
			
		||||
	"sigs.k8s.io/yaml"
 | 
			
		||||
@ -20,7 +21,7 @@ const (
 | 
			
		||||
 | 
			
		||||
type Request struct {
 | 
			
		||||
	Request int
 | 
			
		||||
	Pattern []string
 | 
			
		||||
	Pattern string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Response struct {
 | 
			
		||||
@ -85,7 +86,7 @@ func usage(err string) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
 | 
			
		||||
	response := SendAndRetrieve(Request{Show, []string{""}})
 | 
			
		||||
	response := SendAndRetrieve(Request{Show, ""})
 | 
			
		||||
	if response.Err != nil {
 | 
			
		||||
		logger.Fatalln("Received error from daemon:", response.Err)
 | 
			
		||||
	}
 | 
			
		||||
@ -137,9 +138,15 @@ func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
 | 
			
		||||
	if regex != nil {
 | 
			
		||||
		for streamName := range response.ClientStatus {
 | 
			
		||||
			for filterName := range response.ClientStatus[streamName] {
 | 
			
		||||
				for patternName := range response.ClientStatus[streamName][filterName] {
 | 
			
		||||
					if !regex.MatchString(patternName) {
 | 
			
		||||
						delete(response.ClientStatus[streamName][filterName], patternName)
 | 
			
		||||
				for patterns := range response.ClientStatus[streamName][filterName] {
 | 
			
		||||
					pmatch := false
 | 
			
		||||
					for _, p := range strings.Split(patterns, "\x00") {
 | 
			
		||||
						if regex.MatchString(p) {
 | 
			
		||||
							pmatch = true
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
					if !pmatch {
 | 
			
		||||
						delete(response.ClientStatus[streamName][filterName], patterns)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				if len(response.ClientStatus[streamName][filterName]) == 0 {
 | 
			
		||||
@ -162,12 +169,22 @@ func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.Fatalln("Failed to convert daemon binary response to text format:", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Replace \0 joined string with space joined string ("1.2.3.4\0root" -> "1.2.3.4 root")
 | 
			
		||||
	for streamName := range response.ClientStatus {
 | 
			
		||||
		for filterName := range response.ClientStatus[streamName] {
 | 
			
		||||
			for patterns := range response.ClientStatus[streamName][filterName] {
 | 
			
		||||
				text = []byte(strings.ReplaceAll(string(text), strings.Join(strings.Split(patterns, "\x00"), "\\0"), strings.Join(strings.Split(patterns, "\x00"), " ")))
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	fmt.Println(string(text))
 | 
			
		||||
 | 
			
		||||
	os.Exit(0)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ClientFlush(pattern []string, streamfilter, format string) {
 | 
			
		||||
	response := SendAndRetrieve(Request{Flush, pattern})
 | 
			
		||||
func ClientFlush(patterns []string, streamfilter, format string) {
 | 
			
		||||
	response := SendAndRetrieve(Request{Flush, strings.Join(patterns, "\x00")})
 | 
			
		||||
	if response.Err != nil {
 | 
			
		||||
		logger.Fatalln("Received error from daemon:", response.Err)
 | 
			
		||||
		os.Exit(1)
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										174
									
								
								app/daemon.go
									
									
									
									
									
								
							
							
						
						
									
										174
									
								
								app/daemon.go
									
									
									
									
									
								
							@ -2,8 +2,6 @@ package app
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"os"
 | 
			
		||||
	"os/exec"
 | 
			
		||||
	"os/signal"
 | 
			
		||||
@ -15,71 +13,6 @@ import (
 | 
			
		||||
	"framagit.org/ppom/reaction/logger"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Compare content and ordering. Case sensitive.
 | 
			
		||||
func IsStringArrayEqual(one, two []string) bool {
 | 
			
		||||
	for i, a := range one {
 | 
			
		||||
		if a != two[i] {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
// Executes a command and write to its stdin via input channel until command, or reaction, dies
 | 
			
		||||
func cmdStdin(commandline []string, input <-chan string) {
 | 
			
		||||
	cmd := exec.Command(commandline[0], commandline[1:]...)
 | 
			
		||||
	stdin, err := cmd.StdinPipe()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.Fatalln("couldn't open stdin on command:", err)
 | 
			
		||||
	}
 | 
			
		||||
	stdout, err := cmd.StdoutPipe()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.Fatalln("couldn't open stdout on command:", err)
 | 
			
		||||
	}
 | 
			
		||||
	if err := cmd.Start(); err != nil {
 | 
			
		||||
		logger.Fatalln("couldn't start command:", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer stdin.Close()
 | 
			
		||||
 | 
			
		||||
	logger.Printf(logger.INFO, fmt.Sprintf("Output started with %v\n", commandline))
 | 
			
		||||
 | 
			
		||||
	// stdout displaying thread
 | 
			
		||||
	go func() {
 | 
			
		||||
		// FIXME
 | 
			
		||||
		tmp := make([]byte, 1024)
 | 
			
		||||
		for {
 | 
			
		||||
			_, err := stdout.Read(tmp)
 | 
			
		||||
			if len(bytes.Trim(tmp, "\x00")) > 0 {
 | 
			
		||||
				for _, line := range strings.Split(strings.ReplaceAll(string(bytes.Trim(tmp, "\x00")), "\r\n", "\n"), "\n") {
 | 
			
		||||
					if len(line) > 0 {
 | 
			
		||||
						logger.Printf(logger.INFO, fmt.Sprintf("Output returned %s", line))
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				logger.Printf(logger.ERROR, fmt.Sprintf("Reading output error: %v\n", err))
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// Stdin writing thread
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			in := <-input
 | 
			
		||||
			_, err := stdin.Write([]byte(in))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				logger.Printf(logger.ERROR, fmt.Sprintf("Writing to output error: %v\n", err))
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	err = cmd.Wait()
 | 
			
		||||
	logger.Fatalln("command %v stopped: %v", cmd, err)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Executes a command and channel-send its stdout
 | 
			
		||||
func cmdStdout(commandline []string) chan *string {
 | 
			
		||||
	lines := make(chan *string)
 | 
			
		||||
@ -144,8 +77,8 @@ func (p *Pattern) notAnIgnore(match *string) bool {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Whether one of the filter's regexes is matched on a line
 | 
			
		||||
func (f *Filter) match(line *string) []string {
 | 
			
		||||
	var result []string
 | 
			
		||||
func (f *Filter) match(line *string) string {
 | 
			
		||||
	var result string
 | 
			
		||||
	for _, regex := range f.compiledRegex {
 | 
			
		||||
 | 
			
		||||
		if matches := regex.FindStringSubmatch(*line); matches != nil {
 | 
			
		||||
@ -158,54 +91,47 @@ func (f *Filter) match(line *string) []string {
 | 
			
		||||
				match := matches[regex.SubexpIndex(p.name)]
 | 
			
		||||
				if p.notAnIgnore(&match) {
 | 
			
		||||
					logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match)
 | 
			
		||||
					result = append(result, match)
 | 
			
		||||
					if len(result) == 0 {
 | 
			
		||||
						result = match
 | 
			
		||||
					} else {
 | 
			
		||||
						result = strings.Join([]string{result, match}, "\x00")
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			if f.pattern == nil {
 | 
			
		||||
				// No pattern, so this match will never actually be used
 | 
			
		||||
				return nil
 | 
			
		||||
				return ""
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if len(result) == len(f.pattern) {
 | 
			
		||||
	if len(strings.Split(result, "\x00")) == len(f.pattern) {
 | 
			
		||||
		return result
 | 
			
		||||
	} else {
 | 
			
		||||
		// Incomplete match = no match.
 | 
			
		||||
		return nil
 | 
			
		||||
		// Incomplete match = no match
 | 
			
		||||
		return ""
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *Filter) sendActions(match []string, at time.Time) {
 | 
			
		||||
func (f *Filter) sendActions(match string, at time.Time) {
 | 
			
		||||
	for _, a := range f.Actions {
 | 
			
		||||
		actionsC <- PAT{match, a, at.Add(a.afterDuration)}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *Action) exec(match []string) {
 | 
			
		||||
func (a *Action) exec(match string) {
 | 
			
		||||
	defer wgActions.Done()
 | 
			
		||||
 | 
			
		||||
	if len(a.Cmd) > 0 {
 | 
			
		||||
		a.execCmd(match)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if a.Write != nil {
 | 
			
		||||
		a.execWrite(match)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *Action) execCmd(match []string) {
 | 
			
		||||
	var computedCommand []string
 | 
			
		||||
	var cmdItem string
 | 
			
		||||
 | 
			
		||||
	if a.filter.pattern != nil {
 | 
			
		||||
		computedCommand = make([]string, 0, len(a.Cmd))
 | 
			
		||||
		matches := strings.Split(match, "\x00")
 | 
			
		||||
 | 
			
		||||
		for _, item := range a.Cmd {
 | 
			
		||||
			cmdItem = strings.Clone(item)
 | 
			
		||||
			for i, p := range a.filter.pattern {
 | 
			
		||||
				cmdItem = strings.ReplaceAll(cmdItem, p.nameWithBraces, match[i])
 | 
			
		||||
				item = strings.ReplaceAll(item, p.nameWithBraces, matches[i])
 | 
			
		||||
			}
 | 
			
		||||
			computedCommand = append(computedCommand, cmdItem)
 | 
			
		||||
			computedCommand = append(computedCommand, item)
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		computedCommand = a.Cmd
 | 
			
		||||
@ -220,29 +146,6 @@ func (a *Action) execCmd(match []string) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *Action) execWrite(match []string) {
 | 
			
		||||
	var computedWrite string
 | 
			
		||||
	var writeItem string
 | 
			
		||||
 | 
			
		||||
	if a.filter.pattern != nil {
 | 
			
		||||
		for _, item := range a.Write.Text {
 | 
			
		||||
			writeItem = strings.Clone(item)
 | 
			
		||||
			for i, p := range a.filter.pattern {
 | 
			
		||||
				writeItem = strings.ReplaceAll(writeItem, p.nameWithBraces, match[i])
 | 
			
		||||
			}
 | 
			
		||||
			if len(computedWrite) > 0 {
 | 
			
		||||
				computedWrite = computedWrite + " " + writeItem
 | 
			
		||||
			} else {
 | 
			
		||||
				computedWrite = writeItem
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		computedWrite = strings.Join(a.Write.Text, " ")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	a.Write.Output.Stdin <- fmt.Sprintf("%s\n", computedWrite)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ActionsManager(concurrency int) {
 | 
			
		||||
	// concurrency init
 | 
			
		||||
	execActionsC := make(chan PA)
 | 
			
		||||
@ -267,7 +170,7 @@ func ActionsManager(concurrency int) {
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
	execAction := func(a *Action, p []string) {
 | 
			
		||||
	execAction := func(a *Action, p string) {
 | 
			
		||||
		wgActions.Add(1)
 | 
			
		||||
		execActionsC <- PA{p, a}
 | 
			
		||||
	}
 | 
			
		||||
@ -285,10 +188,10 @@ func ActionsManager(concurrency int) {
 | 
			
		||||
				execAction(action, pattern)
 | 
			
		||||
			} else {
 | 
			
		||||
				actionsLock.Lock()
 | 
			
		||||
				if actions[&pa] == nil {
 | 
			
		||||
					actions[&pa] = make(map[time.Time]struct{})
 | 
			
		||||
				if actions[pa] == nil {
 | 
			
		||||
					actions[pa] = make(map[time.Time]struct{})
 | 
			
		||||
				}
 | 
			
		||||
				actions[&pa][then] = struct{}{}
 | 
			
		||||
				actions[pa][then] = struct{}{}
 | 
			
		||||
				actionsLock.Unlock()
 | 
			
		||||
				go func(insidePat PAT, insideNow time.Time) {
 | 
			
		||||
					time.Sleep(insidePat.t.Sub(insideNow))
 | 
			
		||||
@ -299,8 +202,8 @@ func ActionsManager(concurrency int) {
 | 
			
		||||
			pa := PA{pat.p, pat.a}
 | 
			
		||||
			pattern, action, then := pat.p, pat.a, pat.t
 | 
			
		||||
			actionsLock.Lock()
 | 
			
		||||
			if actions[&pa] != nil {
 | 
			
		||||
				delete(actions[&pa], then)
 | 
			
		||||
			if actions[pa] != nil {
 | 
			
		||||
				delete(actions[pa], then)
 | 
			
		||||
			}
 | 
			
		||||
			actionsLock.Unlock()
 | 
			
		||||
			execAction(action, pattern)
 | 
			
		||||
@ -308,7 +211,7 @@ func ActionsManager(concurrency int) {
 | 
			
		||||
			ret := make(ActionsMap)
 | 
			
		||||
			actionsLock.Lock()
 | 
			
		||||
			for pa := range actions {
 | 
			
		||||
				if IsStringArrayEqual(pa.p, fo.p) {
 | 
			
		||||
				if pa.p == fo.p {
 | 
			
		||||
					for range actions[pa] {
 | 
			
		||||
						execAction(pa.a, pa.p)
 | 
			
		||||
					}
 | 
			
		||||
@ -358,7 +261,7 @@ func MatchesManager() {
 | 
			
		||||
			matchesManagerHandleFlush(fo)
 | 
			
		||||
		case pft = <-matchesC:
 | 
			
		||||
 | 
			
		||||
			entry := LogEntry{pft.t, 0, pft.p, pft.f.stream.name, pft.f.name, 0, false}
 | 
			
		||||
			entry := LogEntry{pft.t, 0, strings.Join(strings.Split(pft.p, "\x00"), " / "), pft.f.stream.name, pft.f.name, 0, false}
 | 
			
		||||
 | 
			
		||||
			entry.Exec = matchesManagerHandleMatch(pft)
 | 
			
		||||
 | 
			
		||||
@ -371,7 +274,7 @@ func matchesManagerHandleFlush(fo FlushMatchOrder) {
 | 
			
		||||
	ret := make(MatchesMap)
 | 
			
		||||
	matchesLock.Lock()
 | 
			
		||||
	for pf := range matches {
 | 
			
		||||
		if IsStringArrayEqual(fo.p, pf.p) {
 | 
			
		||||
		if fo.p == pf.p {
 | 
			
		||||
			if fo.ret != nil {
 | 
			
		||||
				ret[pf] = matches[pf]
 | 
			
		||||
			}
 | 
			
		||||
@ -388,32 +291,32 @@ func matchesManagerHandleMatch(pft PFT) bool {
 | 
			
		||||
	matchesLock.Lock()
 | 
			
		||||
	defer matchesLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	filter, pattern, then := pft.f, pft.p, pft.t
 | 
			
		||||
	filter, patterns, then := pft.f, pft.p, pft.t
 | 
			
		||||
	pf := PF{pft.p, pft.f}
 | 
			
		||||
 | 
			
		||||
	if filter.Retry > 1 {
 | 
			
		||||
		// make sure map exists
 | 
			
		||||
		if matches[&pf] == nil {
 | 
			
		||||
			matches[&pf] = make(map[time.Time]struct{})
 | 
			
		||||
		if matches[pf] == nil {
 | 
			
		||||
			matches[pf] = make(map[time.Time]struct{})
 | 
			
		||||
		}
 | 
			
		||||
		// add new match
 | 
			
		||||
		matches[&pf][then] = struct{}{}
 | 
			
		||||
		matches[pf][then] = struct{}{}
 | 
			
		||||
		// remove match when expired
 | 
			
		||||
		go func(pf PF, then time.Time) {
 | 
			
		||||
			time.Sleep(then.Sub(time.Now()) + filter.retryDuration)
 | 
			
		||||
			matchesLock.Lock()
 | 
			
		||||
			if matches[&pf] != nil {
 | 
			
		||||
			if matches[pf] != nil {
 | 
			
		||||
				// FIXME replace this and all similar occurences
 | 
			
		||||
				// by clear() when switching to go 1.21
 | 
			
		||||
				delete(matches[&pf], then)
 | 
			
		||||
				delete(matches[pf], then)
 | 
			
		||||
			}
 | 
			
		||||
			matchesLock.Unlock()
 | 
			
		||||
		}(pf, then)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if filter.Retry <= 1 || len(matches[&pf]) >= filter.Retry {
 | 
			
		||||
		delete(matches, &pf)
 | 
			
		||||
		filter.sendActions(pattern, then)
 | 
			
		||||
	if filter.Retry <= 1 || len(matches[pf]) >= filter.Retry {
 | 
			
		||||
		delete(matches, pf)
 | 
			
		||||
		filter.sendActions(patterns, then)
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
@ -443,14 +346,6 @@ func StreamManager(s *Stream, endedSignal chan *Stream) {
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func OutputsManager(c *Conf) {
 | 
			
		||||
	for outputName := range c.Outputs {
 | 
			
		||||
		output := c.Outputs[outputName]
 | 
			
		||||
		output.Stdin = make(chan string)
 | 
			
		||||
		cmdStdin(output.Start, output.Stdin)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var actions ActionsMap
 | 
			
		||||
var matches MatchesMap
 | 
			
		||||
var actionsLock sync.Mutex
 | 
			
		||||
@ -514,7 +409,6 @@ func Daemon(confFilename string) {
 | 
			
		||||
	_ = runCommands(conf.Start, "start")
 | 
			
		||||
 | 
			
		||||
	go DatabaseManager(conf)
 | 
			
		||||
	go OutputsManager(conf)
 | 
			
		||||
	go MatchesManager()
 | 
			
		||||
	go ActionsManager(conf.Concurrency)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										11
									
								
								app/main.go
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								app/main.go
									
									
									
									
									
								
							@ -60,7 +60,8 @@ func subCommandParse(f *flag.FlagSet, maxRemainingArgs int) {
 | 
			
		||||
		basicUsage()
 | 
			
		||||
		os.Exit(0)
 | 
			
		||||
	}
 | 
			
		||||
	if len(f.Args()) > maxRemainingArgs {
 | 
			
		||||
	// -1 = no limit to remaining args
 | 
			
		||||
	if maxRemainingArgs > -1 && len(f.Args()) > maxRemainingArgs {
 | 
			
		||||
		fmt.Printf("ERROR unrecognized argument(s): %v\n", f.Args()[maxRemainingArgs:])
 | 
			
		||||
		basicUsage()
 | 
			
		||||
		os.Exit(1)
 | 
			
		||||
@ -102,9 +103,7 @@ func basicUsage() {
 | 
			
		||||
` + bold + `reaction flush` + reset + ` TARGET
 | 
			
		||||
  # remove currently active matches and run currently pending actions for the specified TARGET
 | 
			
		||||
  # (then show flushed matches and actions)
 | 
			
		||||
  # e.g. reaction flush 192.168.1.1
 | 
			
		||||
  # Concatenate patterns with " / " if several patterns in TARGET
 | 
			
		||||
  # e.g. reaction flush "192.168.1.1 / root"
 | 
			
		||||
  # e.g. reaction flush 192.168.1.1 root
 | 
			
		||||
 | 
			
		||||
  # options:
 | 
			
		||||
    -s/--socket SOCKET               # path to the client-daemon communication socket
 | 
			
		||||
@ -196,7 +195,7 @@ func Main(version, commit string) {
 | 
			
		||||
		SocketPath = addSocketFlag(f)
 | 
			
		||||
		queryFormat := addFormatFlag(f)
 | 
			
		||||
		limit := addLimitFlag(f)
 | 
			
		||||
		subCommandParse(f, 1)
 | 
			
		||||
		subCommandParse(f, -1)
 | 
			
		||||
		if *queryFormat != "yaml" && *queryFormat != "json" {
 | 
			
		||||
			logger.Fatalln("only yaml and json formats are supported")
 | 
			
		||||
			f.PrintDefaults()
 | 
			
		||||
@ -211,7 +210,7 @@ func Main(version, commit string) {
 | 
			
		||||
			logger.Fatalln("for now, -l/--limit is not supported")
 | 
			
		||||
			os.Exit(1)
 | 
			
		||||
		}
 | 
			
		||||
		ClientFlush(strings.Split(f.Arg(0), " / "), *limit, *queryFormat)
 | 
			
		||||
		ClientFlush(f.Args(), *limit, *queryFormat)
 | 
			
		||||
 | 
			
		||||
	case "test-regex":
 | 
			
		||||
		// socket not needed, no interaction with the daemon
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										13
									
								
								app/pipe.go
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								app/pipe.go
									
									
									
									
									
								
							@ -7,7 +7,6 @@ import (
 | 
			
		||||
	"path"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
	"framagit.org/ppom/reaction/logger"
 | 
			
		||||
)
 | 
			
		||||
@ -18,14 +17,14 @@ func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_a
 | 
			
		||||
 | 
			
		||||
	// Painful data manipulation
 | 
			
		||||
	for pf, times := range local_matches {
 | 
			
		||||
		pattern, filter := pf.p, pf.f
 | 
			
		||||
		patterns, filter := pf.p, pf.f
 | 
			
		||||
		if cs[filter.stream.name] == nil {
 | 
			
		||||
			cs[filter.stream.name] = make(map[string]MapPatternStatus)
 | 
			
		||||
		}
 | 
			
		||||
		if cs[filter.stream.name][filter.name] == nil {
 | 
			
		||||
			cs[filter.stream.name][filter.name] = make(MapPatternStatus)
 | 
			
		||||
		}
 | 
			
		||||
		cs[filter.stream.name][filter.name][strings.Join(pattern, " / ")] = &PatternStatus{len(times), nil}
 | 
			
		||||
		cs[filter.stream.name][filter.name][patterns] = &PatternStatus{len(times), nil}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	local_matchesLock.Unlock()
 | 
			
		||||
@ -33,17 +32,17 @@ func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_a
 | 
			
		||||
 | 
			
		||||
	// Painful data manipulation
 | 
			
		||||
	for pa, times := range local_actions {
 | 
			
		||||
		pattern, action := pa.p, pa.a
 | 
			
		||||
		patterns, action := pa.p, pa.a
 | 
			
		||||
		if cs[action.filter.stream.name] == nil {
 | 
			
		||||
			cs[action.filter.stream.name] = make(map[string]MapPatternStatus)
 | 
			
		||||
		}
 | 
			
		||||
		if cs[action.filter.stream.name][action.filter.name] == nil {
 | 
			
		||||
			cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus)
 | 
			
		||||
		}
 | 
			
		||||
		if cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] == nil {
 | 
			
		||||
			cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] = new(PatternStatus)
 | 
			
		||||
		if cs[action.filter.stream.name][action.filter.name][patterns] == nil {
 | 
			
		||||
			cs[action.filter.stream.name][action.filter.name][patterns] = new(PatternStatus)
 | 
			
		||||
		}
 | 
			
		||||
		ps := cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")]
 | 
			
		||||
		ps := cs[action.filter.stream.name][action.filter.name][patterns]
 | 
			
		||||
		if ps.Actions == nil {
 | 
			
		||||
			ps.Actions = make(map[string][]string)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
@ -21,15 +21,6 @@ func (c *Conf) setup() {
 | 
			
		||||
		c.Concurrency = runtime.NumCPU()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for outputName := range c.Outputs {
 | 
			
		||||
		output := c.Outputs[outputName]
 | 
			
		||||
		output.name = outputName
 | 
			
		||||
 | 
			
		||||
		if len(output.Start) == 0 {
 | 
			
		||||
			logger.Fatalf("Bad configuration: output's start %v is empty!", outputName)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for patternName := range c.Patterns {
 | 
			
		||||
		pattern := c.Patterns[patternName]
 | 
			
		||||
		pattern.name = patternName
 | 
			
		||||
@ -84,17 +75,17 @@ func (c *Conf) setup() {
 | 
			
		||||
			filter.name = filterName
 | 
			
		||||
 | 
			
		||||
			if strings.Contains(filter.name, ".") {
 | 
			
		||||
				logger.Fatalf(fmt.Sprintf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name))
 | 
			
		||||
				logger.Fatalf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name)
 | 
			
		||||
			}
 | 
			
		||||
			// Parse Duration
 | 
			
		||||
			if filter.RetryPeriod == "" {
 | 
			
		||||
				if filter.Retry > 1 {
 | 
			
		||||
					logger.Fatalf(fmt.Sprintf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name))
 | 
			
		||||
					logger.Fatalf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name)
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				retryDuration, err := time.ParseDuration(filter.RetryPeriod)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					logger.Fatalf(fmt.Sprintf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err))
 | 
			
		||||
					logger.Fatalf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err)
 | 
			
		||||
				}
 | 
			
		||||
				filter.retryDuration = retryDuration
 | 
			
		||||
			}
 | 
			
		||||
@ -115,7 +106,7 @@ func (c *Conf) setup() {
 | 
			
		||||
				}
 | 
			
		||||
				compiledRegex, err := regexp.Compile(regex)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					log.Fatal(fmt.Sprintf("Bad configuration: regex of filter %s.%s: %v", stream.name, filter.name, err))
 | 
			
		||||
					log.Fatal("Bad configuration: regex of filter %s.%s: %v", stream.name, filter.name, err)
 | 
			
		||||
				}
 | 
			
		||||
				filter.compiledRegex = append(filter.compiledRegex, *compiledRegex)
 | 
			
		||||
			}
 | 
			
		||||
@ -145,20 +136,6 @@ func (c *Conf) setup() {
 | 
			
		||||
				if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() {
 | 
			
		||||
					filter.longuestActionDuration = &action.afterDuration
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				if action.Write != nil {
 | 
			
		||||
					found := false
 | 
			
		||||
					for oname := range c.Outputs {
 | 
			
		||||
						if strings.EqualFold(oname, action.Write.OutputName) {
 | 
			
		||||
							action.Write.Output = c.Outputs[oname]
 | 
			
		||||
							found = true
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
					if !found {
 | 
			
		||||
						logger.Fatalln(fmt.Sprintf("Bad configuration: action %s.%s.%s refers to undeclared output %s",
 | 
			
		||||
									   stream.name, filter.name, action.name, action.Write.OutputName))
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										44
									
								
								app/types.go
									
									
									
									
									
								
							
							
						
						
									
										44
									
								
								app/types.go
									
									
									
									
									
								
							@ -9,24 +9,12 @@ import (
 | 
			
		||||
 | 
			
		||||
type Conf struct {
 | 
			
		||||
	Concurrency int                 `json:"concurrency"`
 | 
			
		||||
	Outputs     map[string]*Output  `json:"outputs"`
 | 
			
		||||
	Patterns    map[string]*Pattern `json:"patterns"`
 | 
			
		||||
	Streams     map[string]*Stream  `json:"streams"`
 | 
			
		||||
	Start       [][]string          `json:"start"`
 | 
			
		||||
	Stop        [][]string          `json:"stop"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Output struct {
 | 
			
		||||
	Start   []string `json:"start"`
 | 
			
		||||
	Stop    []string `json:"stop"`
 | 
			
		||||
	// TODO: Restart when lost communication with output
 | 
			
		||||
	//Restart string   `json:"restart"`
 | 
			
		||||
 | 
			
		||||
	name   string   `json:"-"`
 | 
			
		||||
 | 
			
		||||
	Stdin  chan string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Pattern struct {
 | 
			
		||||
	Regex  string   `json:"regex"`
 | 
			
		||||
	Ignore []string `json:"ignore"`
 | 
			
		||||
@ -64,19 +52,11 @@ type Filter struct {
 | 
			
		||||
	longuestActionDuration *time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type OutputWrite struct {
 | 
			
		||||
	OutputName string   `json:"output"`
 | 
			
		||||
	Text       []string `json:"text"`
 | 
			
		||||
 | 
			
		||||
	Output *Output
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Action struct {
 | 
			
		||||
	filter *Filter `json:"-"`
 | 
			
		||||
	name   string  `json:"-"`
 | 
			
		||||
 | 
			
		||||
	Cmd         []string     `json:"cmd"`
 | 
			
		||||
	Write       *OutputWrite `json:"write"`
 | 
			
		||||
	Cmd []string `json:"cmd"`
 | 
			
		||||
 | 
			
		||||
	After         string        `json:"after"`
 | 
			
		||||
	afterDuration time.Duration `json:"-"`
 | 
			
		||||
@ -87,7 +67,7 @@ type Action struct {
 | 
			
		||||
type LogEntry struct {
 | 
			
		||||
	T              time.Time
 | 
			
		||||
	S              int64
 | 
			
		||||
	Pattern        []string
 | 
			
		||||
	Pattern        string
 | 
			
		||||
	Stream, Filter string
 | 
			
		||||
	SF             int
 | 
			
		||||
	Exec           bool
 | 
			
		||||
@ -102,43 +82,43 @@ type WriteDB struct {
 | 
			
		||||
	file *os.File
 | 
			
		||||
	enc  *gob.Encoder
 | 
			
		||||
}
 | 
			
		||||
// https://stackoverflow.com/a/69691894
 | 
			
		||||
type MatchesMap map[*PF]map[time.Time]struct{}
 | 
			
		||||
type ActionsMap map[*PA]map[time.Time]struct{}
 | 
			
		||||
 | 
			
		||||
type MatchesMap map[PF]map[time.Time]struct{}
 | 
			
		||||
type ActionsMap map[PA]map[time.Time]struct{}
 | 
			
		||||
 | 
			
		||||
// Helper structs made to carry information
 | 
			
		||||
// Stream, Filter
 | 
			
		||||
type SF struct{ s, f string }
 | 
			
		||||
// Pattern, Stream, Filter
 | 
			
		||||
type PSF struct{
 | 
			
		||||
	p []string
 | 
			
		||||
	p string
 | 
			
		||||
	s string
 | 
			
		||||
	f string
 | 
			
		||||
}
 | 
			
		||||
type PF struct {
 | 
			
		||||
	p []string
 | 
			
		||||
	p string
 | 
			
		||||
	f *Filter
 | 
			
		||||
}
 | 
			
		||||
type PFT struct {
 | 
			
		||||
	p []string
 | 
			
		||||
	p string
 | 
			
		||||
	f *Filter
 | 
			
		||||
	t time.Time
 | 
			
		||||
}
 | 
			
		||||
type PA struct {
 | 
			
		||||
	p []string
 | 
			
		||||
	p string
 | 
			
		||||
	a *Action
 | 
			
		||||
}
 | 
			
		||||
type PAT struct {
 | 
			
		||||
	p []string
 | 
			
		||||
	p string
 | 
			
		||||
	a *Action
 | 
			
		||||
	t time.Time
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type FlushMatchOrder struct {
 | 
			
		||||
	p   []string
 | 
			
		||||
	p   string
 | 
			
		||||
	ret chan MatchesMap
 | 
			
		||||
}
 | 
			
		||||
type FlushActionOrder struct {
 | 
			
		||||
	p   []string
 | 
			
		||||
	p   string
 | 
			
		||||
	ret chan ActionsMap
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,59 +0,0 @@
 | 
			
		||||
---
 | 
			
		||||
concurrency: 0
 | 
			
		||||
 | 
			
		||||
# patterns are substitued in regexes.
 | 
			
		||||
# when a filter performs an action, it replaces the found pattern
 | 
			
		||||
patterns:
 | 
			
		||||
  ip:
 | 
			
		||||
    # reaction regex syntax is defined here: https://github.com/google/re2/wiki/Syntax
 | 
			
		||||
    # simple version: regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
 | 
			
		||||
    regex: '(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}|(?:(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9]))'
 | 
			
		||||
    ignore:
 | 
			
		||||
      - 127.0.0.1
 | 
			
		||||
      - ::1
 | 
			
		||||
    # Patterns can be ignored based on regexes, it will try to match the whole string detected by the pattern
 | 
			
		||||
    # ignoreregex:
 | 
			
		||||
    #   - '10\.0\.[0-9]{1,3}\.[0-9]{1,3}'
 | 
			
		||||
  login:
 | 
			
		||||
    regex: '[a-zA-Z0-9_\-\.]*'
 | 
			
		||||
    
 | 
			
		||||
  method:
 | 
			
		||||
    regex: '.*'
 | 
			
		||||
    
 | 
			
		||||
  port:
 | 
			
		||||
    regex: '[0-9]{1,5}'
 | 
			
		||||
 | 
			
		||||
# Outputs are commands returning stdin you can use in write actions.
 | 
			
		||||
# This can ben used to get a persistent connection to p.e. a KV database you will write into,
 | 
			
		||||
#  eliminating the overhead of executing a process each time action is trigged.
 | 
			
		||||
outputs:
 | 
			
		||||
  redis:
 | 
			
		||||
    start: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis']
 | 
			
		||||
#  tee:
 | 
			
		||||
#    start: ['tee', 'output.log']
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
# streams are commands
 | 
			
		||||
# they are run and their ouptut is captured
 | 
			
		||||
# *example:* `tail -f /var/log/nginx/access.log`
 | 
			
		||||
# their output will be used by one or more filters
 | 
			
		||||
streams:
 | 
			
		||||
  # streams have a user-defined name
 | 
			
		||||
  ssh:
 | 
			
		||||
    # note that if the command is not in environment's `PATH`
 | 
			
		||||
    # its full path must be given.
 | 
			
		||||
    cmd: ['tail', '-f', '/var/log/auth.log']
 | 
			
		||||
    # filters run actions when they match regexes on a stream
 | 
			
		||||
    filters:
 | 
			
		||||
      # filters have a user-defined name
 | 
			
		||||
      acceptedlogin:
 | 
			
		||||
        # reaction's regex syntax is defined here: https://github.com/google/re2/wiki/Syntax
 | 
			
		||||
        regex:
 | 
			
		||||
          - 'Accepted <method> for <login> from <ip> port <port>'
 | 
			
		||||
        # actions are run by the filter when regexes are matched
 | 
			
		||||
        actions:
 | 
			
		||||
          # actions have a user-defined name
 | 
			
		||||
          store2redis:
 | 
			
		||||
            write:
 | 
			
		||||
              output: redis
 | 
			
		||||
              text: ['XADD', 'logins', '*', 'username', '<login>', 'method', '<method>', 'ip', '<ip>', 'port', '<port>']
 | 
			
		||||
@ -1,50 +0,0 @@
 | 
			
		||||
---
 | 
			
		||||
patterns:
 | 
			
		||||
  num:
 | 
			
		||||
    regex: '[0-9]+'
 | 
			
		||||
  idx:
 | 
			
		||||
    regex: '[0-9]+'
 | 
			
		||||
  ip:
 | 
			
		||||
    regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
 | 
			
		||||
    ignore:
 | 
			
		||||
      - 1.0.0.1
 | 
			
		||||
 | 
			
		||||
concurrency: 0
 | 
			
		||||
 | 
			
		||||
streams:
 | 
			
		||||
  tailDown1:
 | 
			
		||||
    cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo found $(($i % 100)) for test 1; done' ]
 | 
			
		||||
    filters:
 | 
			
		||||
      findIP:
 | 
			
		||||
        regex:
 | 
			
		||||
          - '^found <num> for test <idx>$'
 | 
			
		||||
        actions:
 | 
			
		||||
          store2redis:
 | 
			
		||||
            cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
 | 
			
		||||
  tailDown2:
 | 
			
		||||
    cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo prout $(($i % 100)) for test 2; done' ]
 | 
			
		||||
    filters:
 | 
			
		||||
      findIP:
 | 
			
		||||
        regex:
 | 
			
		||||
          - '^prout <num> for test <idx>$'
 | 
			
		||||
        actions:
 | 
			
		||||
          store2redis:
 | 
			
		||||
            cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
 | 
			
		||||
  tailDown3:
 | 
			
		||||
    cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo nanana $(($i % 100)) for test 3; done' ]
 | 
			
		||||
    filters:
 | 
			
		||||
      findIP:
 | 
			
		||||
        regex:
 | 
			
		||||
          - '^nanana <num> for test <idx>$'
 | 
			
		||||
        actions:
 | 
			
		||||
          store2redis:
 | 
			
		||||
            cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
 | 
			
		||||
  tailDown4:
 | 
			
		||||
    cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo nanana $(($i % 100)) for test 4; done' ]
 | 
			
		||||
    filters:
 | 
			
		||||
      findIP:
 | 
			
		||||
        regex:
 | 
			
		||||
          - '^nomatch <num> for test <idx>$'
 | 
			
		||||
        actions:
 | 
			
		||||
          store2redis:
 | 
			
		||||
            cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
 | 
			
		||||
@ -1,62 +0,0 @@
 | 
			
		||||
---
 | 
			
		||||
patterns:
 | 
			
		||||
  num:
 | 
			
		||||
    regex: '[0-9]+'
 | 
			
		||||
  idx:
 | 
			
		||||
    regex: '[0-9]+'
 | 
			
		||||
  ip:
 | 
			
		||||
    regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
 | 
			
		||||
    ignore:
 | 
			
		||||
      - 1.0.0.1
 | 
			
		||||
 | 
			
		||||
concurrency: 0
 | 
			
		||||
 | 
			
		||||
outputs:
 | 
			
		||||
  redis:
 | 
			
		||||
    start: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis']
 | 
			
		||||
 | 
			
		||||
streams:
 | 
			
		||||
  tailDown1:
 | 
			
		||||
    cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo found $(($i % 100)) for test 1; done' ]
 | 
			
		||||
    filters:
 | 
			
		||||
      findIP:
 | 
			
		||||
        regex:
 | 
			
		||||
          - '^found <num> for test <idx>$'
 | 
			
		||||
        actions:
 | 
			
		||||
          store2redis:
 | 
			
		||||
            write:
 | 
			
		||||
              output: redis
 | 
			
		||||
              text: ['XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
 | 
			
		||||
  tailDown2:
 | 
			
		||||
    cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo prout $(($i % 100)) for test 2; done' ]
 | 
			
		||||
    filters:
 | 
			
		||||
      findIP:
 | 
			
		||||
        regex:
 | 
			
		||||
          - '^prout <num> for test <idx>$'
 | 
			
		||||
        actions:
 | 
			
		||||
          store2redis:
 | 
			
		||||
            write:
 | 
			
		||||
              output: redis
 | 
			
		||||
              text: ['XADD', 'teststream', '*', 'prout', '<num>', 'test', '<idx>']
 | 
			
		||||
  tailDown3:
 | 
			
		||||
    cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo nanana $(($i % 100)) for test 3; done' ]
 | 
			
		||||
    filters:
 | 
			
		||||
      findIP:
 | 
			
		||||
        regex:
 | 
			
		||||
          - '^nanana <num> for test <idx>$'
 | 
			
		||||
        actions:
 | 
			
		||||
          store2redis:
 | 
			
		||||
            write:
 | 
			
		||||
              output: redis
 | 
			
		||||
              text: ['XADD', 'teststream', '*', 'nanana', '<num>', 'test', '<idx>']
 | 
			
		||||
  tailDown4:
 | 
			
		||||
    cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo nanana $(($i % 100)) for test 4; done' ]
 | 
			
		||||
    filters:
 | 
			
		||||
      findIP:
 | 
			
		||||
        regex:
 | 
			
		||||
          - '^nomatch <num> for test <idx>$'
 | 
			
		||||
        actions:
 | 
			
		||||
          store2redis:
 | 
			
		||||
            write:
 | 
			
		||||
              output: redis
 | 
			
		||||
              text: ['XADD', 'teststream', '*', 'nomatch', '<num>', 'test', '<idx>']
 | 
			
		||||
		Reference in New Issue
	
	Block a user