Compare commits
6 Commits
persistent
...
multipatte
Author | SHA1 | Date | |
---|---|---|---|
9ef2ae585e | |||
61bf6f92b9 | |||
463c5b709f | |||
4c18161c9c | |||
833ffde474 | |||
00f1647aa6 |
@ -8,6 +8,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"framagit.org/ppom/reaction/logger"
|
"framagit.org/ppom/reaction/logger"
|
||||||
"sigs.k8s.io/yaml"
|
"sigs.k8s.io/yaml"
|
||||||
@ -20,7 +21,7 @@ const (
|
|||||||
|
|
||||||
type Request struct {
|
type Request struct {
|
||||||
Request int
|
Request int
|
||||||
Pattern []string
|
Pattern string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Response struct {
|
type Response struct {
|
||||||
@ -85,7 +86,7 @@ func usage(err string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
|
func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
|
||||||
response := SendAndRetrieve(Request{Show, []string{""}})
|
response := SendAndRetrieve(Request{Show, ""})
|
||||||
if response.Err != nil {
|
if response.Err != nil {
|
||||||
logger.Fatalln("Received error from daemon:", response.Err)
|
logger.Fatalln("Received error from daemon:", response.Err)
|
||||||
}
|
}
|
||||||
@ -137,9 +138,15 @@ func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
|
|||||||
if regex != nil {
|
if regex != nil {
|
||||||
for streamName := range response.ClientStatus {
|
for streamName := range response.ClientStatus {
|
||||||
for filterName := range response.ClientStatus[streamName] {
|
for filterName := range response.ClientStatus[streamName] {
|
||||||
for patternName := range response.ClientStatus[streamName][filterName] {
|
for patterns := range response.ClientStatus[streamName][filterName] {
|
||||||
if !regex.MatchString(patternName) {
|
pmatch := false
|
||||||
delete(response.ClientStatus[streamName][filterName], patternName)
|
for _, p := range strings.Split(patterns, "\x00") {
|
||||||
|
if regex.MatchString(p) {
|
||||||
|
pmatch = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !pmatch {
|
||||||
|
delete(response.ClientStatus[streamName][filterName], patterns)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(response.ClientStatus[streamName][filterName]) == 0 {
|
if len(response.ClientStatus[streamName][filterName]) == 0 {
|
||||||
@ -162,12 +169,22 @@ func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalln("Failed to convert daemon binary response to text format:", err)
|
logger.Fatalln("Failed to convert daemon binary response to text format:", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Replace \0 joined string with space joined string ("1.2.3.4\0root" -> "1.2.3.4 root")
|
||||||
|
for streamName := range response.ClientStatus {
|
||||||
|
for filterName := range response.ClientStatus[streamName] {
|
||||||
|
for patterns := range response.ClientStatus[streamName][filterName] {
|
||||||
|
text = []byte(strings.ReplaceAll(string(text), strings.Join(strings.Split(patterns, "\x00"), "\\0"), strings.Join(strings.Split(patterns, "\x00"), " ")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
fmt.Println(string(text))
|
fmt.Println(string(text))
|
||||||
|
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ClientFlush(pattern []string, streamfilter, format string) {
|
func ClientFlush(patterns []string, streamfilter, format string) {
|
||||||
response := SendAndRetrieve(Request{Flush, pattern})
|
response := SendAndRetrieve(Request{Flush, strings.Join(patterns, "\x00")})
|
||||||
if response.Err != nil {
|
if response.Err != nil {
|
||||||
logger.Fatalln("Received error from daemon:", response.Err)
|
logger.Fatalln("Received error from daemon:", response.Err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
174
app/daemon.go
174
app/daemon.go
@ -2,8 +2,6 @@ package app
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
@ -15,71 +13,6 @@ import (
|
|||||||
"framagit.org/ppom/reaction/logger"
|
"framagit.org/ppom/reaction/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Compare content and ordering. Case sensitive.
|
|
||||||
func IsStringArrayEqual(one, two []string) bool {
|
|
||||||
for i, a := range one {
|
|
||||||
if a != two[i] {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Executes a command and write to its stdin via input channel until command, or reaction, dies
|
|
||||||
func cmdStdin(commandline []string, input <-chan string) {
|
|
||||||
cmd := exec.Command(commandline[0], commandline[1:]...)
|
|
||||||
stdin, err := cmd.StdinPipe()
|
|
||||||
if err != nil {
|
|
||||||
logger.Fatalln("couldn't open stdin on command:", err)
|
|
||||||
}
|
|
||||||
stdout, err := cmd.StdoutPipe()
|
|
||||||
if err != nil {
|
|
||||||
logger.Fatalln("couldn't open stdout on command:", err)
|
|
||||||
}
|
|
||||||
if err := cmd.Start(); err != nil {
|
|
||||||
logger.Fatalln("couldn't start command:", err)
|
|
||||||
}
|
|
||||||
defer stdin.Close()
|
|
||||||
|
|
||||||
logger.Printf(logger.INFO, fmt.Sprintf("Output started with %v\n", commandline))
|
|
||||||
|
|
||||||
// stdout displaying thread
|
|
||||||
go func() {
|
|
||||||
// FIXME
|
|
||||||
tmp := make([]byte, 1024)
|
|
||||||
for {
|
|
||||||
_, err := stdout.Read(tmp)
|
|
||||||
if len(bytes.Trim(tmp, "\x00")) > 0 {
|
|
||||||
for _, line := range strings.Split(strings.ReplaceAll(string(bytes.Trim(tmp, "\x00")), "\r\n", "\n"), "\n") {
|
|
||||||
if len(line) > 0 {
|
|
||||||
logger.Printf(logger.INFO, fmt.Sprintf("Output returned %s", line))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
logger.Printf(logger.ERROR, fmt.Sprintf("Reading output error: %v\n", err))
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Stdin writing thread
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
in := <-input
|
|
||||||
_, err := stdin.Write([]byte(in))
|
|
||||||
if err != nil {
|
|
||||||
logger.Printf(logger.ERROR, fmt.Sprintf("Writing to output error: %v\n", err))
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
err = cmd.Wait()
|
|
||||||
logger.Fatalln("command %v stopped: %v", cmd, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Executes a command and channel-send its stdout
|
// Executes a command and channel-send its stdout
|
||||||
func cmdStdout(commandline []string) chan *string {
|
func cmdStdout(commandline []string) chan *string {
|
||||||
lines := make(chan *string)
|
lines := make(chan *string)
|
||||||
@ -144,8 +77,8 @@ func (p *Pattern) notAnIgnore(match *string) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Whether one of the filter's regexes is matched on a line
|
// Whether one of the filter's regexes is matched on a line
|
||||||
func (f *Filter) match(line *string) []string {
|
func (f *Filter) match(line *string) string {
|
||||||
var result []string
|
var result string
|
||||||
for _, regex := range f.compiledRegex {
|
for _, regex := range f.compiledRegex {
|
||||||
|
|
||||||
if matches := regex.FindStringSubmatch(*line); matches != nil {
|
if matches := regex.FindStringSubmatch(*line); matches != nil {
|
||||||
@ -158,54 +91,47 @@ func (f *Filter) match(line *string) []string {
|
|||||||
match := matches[regex.SubexpIndex(p.name)]
|
match := matches[regex.SubexpIndex(p.name)]
|
||||||
if p.notAnIgnore(&match) {
|
if p.notAnIgnore(&match) {
|
||||||
logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match)
|
logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match)
|
||||||
result = append(result, match)
|
if len(result) == 0 {
|
||||||
|
result = match
|
||||||
|
} else {
|
||||||
|
result = strings.Join([]string{result, match}, "\x00")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if f.pattern == nil {
|
if f.pattern == nil {
|
||||||
// No pattern, so this match will never actually be used
|
// No pattern, so this match will never actually be used
|
||||||
return nil
|
return ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(result) == len(f.pattern) {
|
if len(strings.Split(result, "\x00")) == len(f.pattern) {
|
||||||
return result
|
return result
|
||||||
} else {
|
} else {
|
||||||
// Incomplete match = no match.
|
// Incomplete match = no match
|
||||||
return nil
|
return ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Filter) sendActions(match []string, at time.Time) {
|
func (f *Filter) sendActions(match string, at time.Time) {
|
||||||
for _, a := range f.Actions {
|
for _, a := range f.Actions {
|
||||||
actionsC <- PAT{match, a, at.Add(a.afterDuration)}
|
actionsC <- PAT{match, a, at.Add(a.afterDuration)}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Action) exec(match []string) {
|
func (a *Action) exec(match string) {
|
||||||
defer wgActions.Done()
|
defer wgActions.Done()
|
||||||
|
|
||||||
if len(a.Cmd) > 0 {
|
|
||||||
a.execCmd(match)
|
|
||||||
}
|
|
||||||
|
|
||||||
if a.Write != nil {
|
|
||||||
a.execWrite(match)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *Action) execCmd(match []string) {
|
|
||||||
var computedCommand []string
|
var computedCommand []string
|
||||||
var cmdItem string
|
|
||||||
|
|
||||||
if a.filter.pattern != nil {
|
if a.filter.pattern != nil {
|
||||||
computedCommand = make([]string, 0, len(a.Cmd))
|
computedCommand = make([]string, 0, len(a.Cmd))
|
||||||
|
matches := strings.Split(match, "\x00")
|
||||||
|
|
||||||
for _, item := range a.Cmd {
|
for _, item := range a.Cmd {
|
||||||
cmdItem = strings.Clone(item)
|
|
||||||
for i, p := range a.filter.pattern {
|
for i, p := range a.filter.pattern {
|
||||||
cmdItem = strings.ReplaceAll(cmdItem, p.nameWithBraces, match[i])
|
item = strings.ReplaceAll(item, p.nameWithBraces, matches[i])
|
||||||
}
|
}
|
||||||
computedCommand = append(computedCommand, cmdItem)
|
computedCommand = append(computedCommand, item)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
computedCommand = a.Cmd
|
computedCommand = a.Cmd
|
||||||
@ -220,29 +146,6 @@ func (a *Action) execCmd(match []string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Action) execWrite(match []string) {
|
|
||||||
var computedWrite string
|
|
||||||
var writeItem string
|
|
||||||
|
|
||||||
if a.filter.pattern != nil {
|
|
||||||
for _, item := range a.Write.Text {
|
|
||||||
writeItem = strings.Clone(item)
|
|
||||||
for i, p := range a.filter.pattern {
|
|
||||||
writeItem = strings.ReplaceAll(writeItem, p.nameWithBraces, match[i])
|
|
||||||
}
|
|
||||||
if len(computedWrite) > 0 {
|
|
||||||
computedWrite = computedWrite + " " + writeItem
|
|
||||||
} else {
|
|
||||||
computedWrite = writeItem
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
computedWrite = strings.Join(a.Write.Text, " ")
|
|
||||||
}
|
|
||||||
|
|
||||||
a.Write.Output.Stdin <- fmt.Sprintf("%s\n", computedWrite)
|
|
||||||
}
|
|
||||||
|
|
||||||
func ActionsManager(concurrency int) {
|
func ActionsManager(concurrency int) {
|
||||||
// concurrency init
|
// concurrency init
|
||||||
execActionsC := make(chan PA)
|
execActionsC := make(chan PA)
|
||||||
@ -267,7 +170,7 @@ func ActionsManager(concurrency int) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
execAction := func(a *Action, p []string) {
|
execAction := func(a *Action, p string) {
|
||||||
wgActions.Add(1)
|
wgActions.Add(1)
|
||||||
execActionsC <- PA{p, a}
|
execActionsC <- PA{p, a}
|
||||||
}
|
}
|
||||||
@ -285,10 +188,10 @@ func ActionsManager(concurrency int) {
|
|||||||
execAction(action, pattern)
|
execAction(action, pattern)
|
||||||
} else {
|
} else {
|
||||||
actionsLock.Lock()
|
actionsLock.Lock()
|
||||||
if actions[&pa] == nil {
|
if actions[pa] == nil {
|
||||||
actions[&pa] = make(map[time.Time]struct{})
|
actions[pa] = make(map[time.Time]struct{})
|
||||||
}
|
}
|
||||||
actions[&pa][then] = struct{}{}
|
actions[pa][then] = struct{}{}
|
||||||
actionsLock.Unlock()
|
actionsLock.Unlock()
|
||||||
go func(insidePat PAT, insideNow time.Time) {
|
go func(insidePat PAT, insideNow time.Time) {
|
||||||
time.Sleep(insidePat.t.Sub(insideNow))
|
time.Sleep(insidePat.t.Sub(insideNow))
|
||||||
@ -299,8 +202,8 @@ func ActionsManager(concurrency int) {
|
|||||||
pa := PA{pat.p, pat.a}
|
pa := PA{pat.p, pat.a}
|
||||||
pattern, action, then := pat.p, pat.a, pat.t
|
pattern, action, then := pat.p, pat.a, pat.t
|
||||||
actionsLock.Lock()
|
actionsLock.Lock()
|
||||||
if actions[&pa] != nil {
|
if actions[pa] != nil {
|
||||||
delete(actions[&pa], then)
|
delete(actions[pa], then)
|
||||||
}
|
}
|
||||||
actionsLock.Unlock()
|
actionsLock.Unlock()
|
||||||
execAction(action, pattern)
|
execAction(action, pattern)
|
||||||
@ -308,7 +211,7 @@ func ActionsManager(concurrency int) {
|
|||||||
ret := make(ActionsMap)
|
ret := make(ActionsMap)
|
||||||
actionsLock.Lock()
|
actionsLock.Lock()
|
||||||
for pa := range actions {
|
for pa := range actions {
|
||||||
if IsStringArrayEqual(pa.p, fo.p) {
|
if pa.p == fo.p {
|
||||||
for range actions[pa] {
|
for range actions[pa] {
|
||||||
execAction(pa.a, pa.p)
|
execAction(pa.a, pa.p)
|
||||||
}
|
}
|
||||||
@ -358,7 +261,7 @@ func MatchesManager() {
|
|||||||
matchesManagerHandleFlush(fo)
|
matchesManagerHandleFlush(fo)
|
||||||
case pft = <-matchesC:
|
case pft = <-matchesC:
|
||||||
|
|
||||||
entry := LogEntry{pft.t, 0, pft.p, pft.f.stream.name, pft.f.name, 0, false}
|
entry := LogEntry{pft.t, 0, strings.Join(strings.Split(pft.p, "\x00"), " / "), pft.f.stream.name, pft.f.name, 0, false}
|
||||||
|
|
||||||
entry.Exec = matchesManagerHandleMatch(pft)
|
entry.Exec = matchesManagerHandleMatch(pft)
|
||||||
|
|
||||||
@ -371,7 +274,7 @@ func matchesManagerHandleFlush(fo FlushMatchOrder) {
|
|||||||
ret := make(MatchesMap)
|
ret := make(MatchesMap)
|
||||||
matchesLock.Lock()
|
matchesLock.Lock()
|
||||||
for pf := range matches {
|
for pf := range matches {
|
||||||
if IsStringArrayEqual(fo.p, pf.p) {
|
if fo.p == pf.p {
|
||||||
if fo.ret != nil {
|
if fo.ret != nil {
|
||||||
ret[pf] = matches[pf]
|
ret[pf] = matches[pf]
|
||||||
}
|
}
|
||||||
@ -388,32 +291,32 @@ func matchesManagerHandleMatch(pft PFT) bool {
|
|||||||
matchesLock.Lock()
|
matchesLock.Lock()
|
||||||
defer matchesLock.Unlock()
|
defer matchesLock.Unlock()
|
||||||
|
|
||||||
filter, pattern, then := pft.f, pft.p, pft.t
|
filter, patterns, then := pft.f, pft.p, pft.t
|
||||||
pf := PF{pft.p, pft.f}
|
pf := PF{pft.p, pft.f}
|
||||||
|
|
||||||
if filter.Retry > 1 {
|
if filter.Retry > 1 {
|
||||||
// make sure map exists
|
// make sure map exists
|
||||||
if matches[&pf] == nil {
|
if matches[pf] == nil {
|
||||||
matches[&pf] = make(map[time.Time]struct{})
|
matches[pf] = make(map[time.Time]struct{})
|
||||||
}
|
}
|
||||||
// add new match
|
// add new match
|
||||||
matches[&pf][then] = struct{}{}
|
matches[pf][then] = struct{}{}
|
||||||
// remove match when expired
|
// remove match when expired
|
||||||
go func(pf PF, then time.Time) {
|
go func(pf PF, then time.Time) {
|
||||||
time.Sleep(then.Sub(time.Now()) + filter.retryDuration)
|
time.Sleep(then.Sub(time.Now()) + filter.retryDuration)
|
||||||
matchesLock.Lock()
|
matchesLock.Lock()
|
||||||
if matches[&pf] != nil {
|
if matches[pf] != nil {
|
||||||
// FIXME replace this and all similar occurences
|
// FIXME replace this and all similar occurences
|
||||||
// by clear() when switching to go 1.21
|
// by clear() when switching to go 1.21
|
||||||
delete(matches[&pf], then)
|
delete(matches[pf], then)
|
||||||
}
|
}
|
||||||
matchesLock.Unlock()
|
matchesLock.Unlock()
|
||||||
}(pf, then)
|
}(pf, then)
|
||||||
}
|
}
|
||||||
|
|
||||||
if filter.Retry <= 1 || len(matches[&pf]) >= filter.Retry {
|
if filter.Retry <= 1 || len(matches[pf]) >= filter.Retry {
|
||||||
delete(matches, &pf)
|
delete(matches, pf)
|
||||||
filter.sendActions(pattern, then)
|
filter.sendActions(patterns, then)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
@ -443,14 +346,6 @@ func StreamManager(s *Stream, endedSignal chan *Stream) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func OutputsManager(c *Conf) {
|
|
||||||
for outputName := range c.Outputs {
|
|
||||||
output := c.Outputs[outputName]
|
|
||||||
output.Stdin = make(chan string)
|
|
||||||
cmdStdin(output.Start, output.Stdin)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var actions ActionsMap
|
var actions ActionsMap
|
||||||
var matches MatchesMap
|
var matches MatchesMap
|
||||||
var actionsLock sync.Mutex
|
var actionsLock sync.Mutex
|
||||||
@ -514,7 +409,6 @@ func Daemon(confFilename string) {
|
|||||||
_ = runCommands(conf.Start, "start")
|
_ = runCommands(conf.Start, "start")
|
||||||
|
|
||||||
go DatabaseManager(conf)
|
go DatabaseManager(conf)
|
||||||
go OutputsManager(conf)
|
|
||||||
go MatchesManager()
|
go MatchesManager()
|
||||||
go ActionsManager(conf.Concurrency)
|
go ActionsManager(conf.Concurrency)
|
||||||
|
|
||||||
|
11
app/main.go
11
app/main.go
@ -60,7 +60,8 @@ func subCommandParse(f *flag.FlagSet, maxRemainingArgs int) {
|
|||||||
basicUsage()
|
basicUsage()
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
if len(f.Args()) > maxRemainingArgs {
|
// -1 = no limit to remaining args
|
||||||
|
if maxRemainingArgs > -1 && len(f.Args()) > maxRemainingArgs {
|
||||||
fmt.Printf("ERROR unrecognized argument(s): %v\n", f.Args()[maxRemainingArgs:])
|
fmt.Printf("ERROR unrecognized argument(s): %v\n", f.Args()[maxRemainingArgs:])
|
||||||
basicUsage()
|
basicUsage()
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
@ -102,9 +103,7 @@ func basicUsage() {
|
|||||||
` + bold + `reaction flush` + reset + ` TARGET
|
` + bold + `reaction flush` + reset + ` TARGET
|
||||||
# remove currently active matches and run currently pending actions for the specified TARGET
|
# remove currently active matches and run currently pending actions for the specified TARGET
|
||||||
# (then show flushed matches and actions)
|
# (then show flushed matches and actions)
|
||||||
# e.g. reaction flush 192.168.1.1
|
# e.g. reaction flush 192.168.1.1 root
|
||||||
# Concatenate patterns with " / " if several patterns in TARGET
|
|
||||||
# e.g. reaction flush "192.168.1.1 / root"
|
|
||||||
|
|
||||||
# options:
|
# options:
|
||||||
-s/--socket SOCKET # path to the client-daemon communication socket
|
-s/--socket SOCKET # path to the client-daemon communication socket
|
||||||
@ -196,7 +195,7 @@ func Main(version, commit string) {
|
|||||||
SocketPath = addSocketFlag(f)
|
SocketPath = addSocketFlag(f)
|
||||||
queryFormat := addFormatFlag(f)
|
queryFormat := addFormatFlag(f)
|
||||||
limit := addLimitFlag(f)
|
limit := addLimitFlag(f)
|
||||||
subCommandParse(f, 1)
|
subCommandParse(f, -1)
|
||||||
if *queryFormat != "yaml" && *queryFormat != "json" {
|
if *queryFormat != "yaml" && *queryFormat != "json" {
|
||||||
logger.Fatalln("only yaml and json formats are supported")
|
logger.Fatalln("only yaml and json formats are supported")
|
||||||
f.PrintDefaults()
|
f.PrintDefaults()
|
||||||
@ -211,7 +210,7 @@ func Main(version, commit string) {
|
|||||||
logger.Fatalln("for now, -l/--limit is not supported")
|
logger.Fatalln("for now, -l/--limit is not supported")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
ClientFlush(strings.Split(f.Arg(0), " / "), *limit, *queryFormat)
|
ClientFlush(f.Args(), *limit, *queryFormat)
|
||||||
|
|
||||||
case "test-regex":
|
case "test-regex":
|
||||||
// socket not needed, no interaction with the daemon
|
// socket not needed, no interaction with the daemon
|
||||||
|
13
app/pipe.go
13
app/pipe.go
@ -7,7 +7,6 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"framagit.org/ppom/reaction/logger"
|
"framagit.org/ppom/reaction/logger"
|
||||||
)
|
)
|
||||||
@ -18,14 +17,14 @@ func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_a
|
|||||||
|
|
||||||
// Painful data manipulation
|
// Painful data manipulation
|
||||||
for pf, times := range local_matches {
|
for pf, times := range local_matches {
|
||||||
pattern, filter := pf.p, pf.f
|
patterns, filter := pf.p, pf.f
|
||||||
if cs[filter.stream.name] == nil {
|
if cs[filter.stream.name] == nil {
|
||||||
cs[filter.stream.name] = make(map[string]MapPatternStatus)
|
cs[filter.stream.name] = make(map[string]MapPatternStatus)
|
||||||
}
|
}
|
||||||
if cs[filter.stream.name][filter.name] == nil {
|
if cs[filter.stream.name][filter.name] == nil {
|
||||||
cs[filter.stream.name][filter.name] = make(MapPatternStatus)
|
cs[filter.stream.name][filter.name] = make(MapPatternStatus)
|
||||||
}
|
}
|
||||||
cs[filter.stream.name][filter.name][strings.Join(pattern, " / ")] = &PatternStatus{len(times), nil}
|
cs[filter.stream.name][filter.name][patterns] = &PatternStatus{len(times), nil}
|
||||||
}
|
}
|
||||||
|
|
||||||
local_matchesLock.Unlock()
|
local_matchesLock.Unlock()
|
||||||
@ -33,17 +32,17 @@ func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_a
|
|||||||
|
|
||||||
// Painful data manipulation
|
// Painful data manipulation
|
||||||
for pa, times := range local_actions {
|
for pa, times := range local_actions {
|
||||||
pattern, action := pa.p, pa.a
|
patterns, action := pa.p, pa.a
|
||||||
if cs[action.filter.stream.name] == nil {
|
if cs[action.filter.stream.name] == nil {
|
||||||
cs[action.filter.stream.name] = make(map[string]MapPatternStatus)
|
cs[action.filter.stream.name] = make(map[string]MapPatternStatus)
|
||||||
}
|
}
|
||||||
if cs[action.filter.stream.name][action.filter.name] == nil {
|
if cs[action.filter.stream.name][action.filter.name] == nil {
|
||||||
cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus)
|
cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus)
|
||||||
}
|
}
|
||||||
if cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] == nil {
|
if cs[action.filter.stream.name][action.filter.name][patterns] == nil {
|
||||||
cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] = new(PatternStatus)
|
cs[action.filter.stream.name][action.filter.name][patterns] = new(PatternStatus)
|
||||||
}
|
}
|
||||||
ps := cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")]
|
ps := cs[action.filter.stream.name][action.filter.name][patterns]
|
||||||
if ps.Actions == nil {
|
if ps.Actions == nil {
|
||||||
ps.Actions = make(map[string][]string)
|
ps.Actions = make(map[string][]string)
|
||||||
}
|
}
|
||||||
|
@ -21,15 +21,6 @@ func (c *Conf) setup() {
|
|||||||
c.Concurrency = runtime.NumCPU()
|
c.Concurrency = runtime.NumCPU()
|
||||||
}
|
}
|
||||||
|
|
||||||
for outputName := range c.Outputs {
|
|
||||||
output := c.Outputs[outputName]
|
|
||||||
output.name = outputName
|
|
||||||
|
|
||||||
if len(output.Start) == 0 {
|
|
||||||
logger.Fatalf("Bad configuration: output's start %v is empty!", outputName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for patternName := range c.Patterns {
|
for patternName := range c.Patterns {
|
||||||
pattern := c.Patterns[patternName]
|
pattern := c.Patterns[patternName]
|
||||||
pattern.name = patternName
|
pattern.name = patternName
|
||||||
@ -84,17 +75,17 @@ func (c *Conf) setup() {
|
|||||||
filter.name = filterName
|
filter.name = filterName
|
||||||
|
|
||||||
if strings.Contains(filter.name, ".") {
|
if strings.Contains(filter.name, ".") {
|
||||||
logger.Fatalf(fmt.Sprintf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name))
|
logger.Fatalf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name)
|
||||||
}
|
}
|
||||||
// Parse Duration
|
// Parse Duration
|
||||||
if filter.RetryPeriod == "" {
|
if filter.RetryPeriod == "" {
|
||||||
if filter.Retry > 1 {
|
if filter.Retry > 1 {
|
||||||
logger.Fatalf(fmt.Sprintf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name))
|
logger.Fatalf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
retryDuration, err := time.ParseDuration(filter.RetryPeriod)
|
retryDuration, err := time.ParseDuration(filter.RetryPeriod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf(fmt.Sprintf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err))
|
logger.Fatalf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err)
|
||||||
}
|
}
|
||||||
filter.retryDuration = retryDuration
|
filter.retryDuration = retryDuration
|
||||||
}
|
}
|
||||||
@ -115,7 +106,7 @@ func (c *Conf) setup() {
|
|||||||
}
|
}
|
||||||
compiledRegex, err := regexp.Compile(regex)
|
compiledRegex, err := regexp.Compile(regex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(fmt.Sprintf("Bad configuration: regex of filter %s.%s: %v", stream.name, filter.name, err))
|
log.Fatal("Bad configuration: regex of filter %s.%s: %v", stream.name, filter.name, err)
|
||||||
}
|
}
|
||||||
filter.compiledRegex = append(filter.compiledRegex, *compiledRegex)
|
filter.compiledRegex = append(filter.compiledRegex, *compiledRegex)
|
||||||
}
|
}
|
||||||
@ -145,20 +136,6 @@ func (c *Conf) setup() {
|
|||||||
if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() {
|
if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() {
|
||||||
filter.longuestActionDuration = &action.afterDuration
|
filter.longuestActionDuration = &action.afterDuration
|
||||||
}
|
}
|
||||||
|
|
||||||
if action.Write != nil {
|
|
||||||
found := false
|
|
||||||
for oname := range c.Outputs {
|
|
||||||
if strings.EqualFold(oname, action.Write.OutputName) {
|
|
||||||
action.Write.Output = c.Outputs[oname]
|
|
||||||
found = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
logger.Fatalln(fmt.Sprintf("Bad configuration: action %s.%s.%s refers to undeclared output %s",
|
|
||||||
stream.name, filter.name, action.name, action.Write.OutputName))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
44
app/types.go
44
app/types.go
@ -9,24 +9,12 @@ import (
|
|||||||
|
|
||||||
type Conf struct {
|
type Conf struct {
|
||||||
Concurrency int `json:"concurrency"`
|
Concurrency int `json:"concurrency"`
|
||||||
Outputs map[string]*Output `json:"outputs"`
|
|
||||||
Patterns map[string]*Pattern `json:"patterns"`
|
Patterns map[string]*Pattern `json:"patterns"`
|
||||||
Streams map[string]*Stream `json:"streams"`
|
Streams map[string]*Stream `json:"streams"`
|
||||||
Start [][]string `json:"start"`
|
Start [][]string `json:"start"`
|
||||||
Stop [][]string `json:"stop"`
|
Stop [][]string `json:"stop"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Output struct {
|
|
||||||
Start []string `json:"start"`
|
|
||||||
Stop []string `json:"stop"`
|
|
||||||
// TODO: Restart when lost communication with output
|
|
||||||
//Restart string `json:"restart"`
|
|
||||||
|
|
||||||
name string `json:"-"`
|
|
||||||
|
|
||||||
Stdin chan string
|
|
||||||
}
|
|
||||||
|
|
||||||
type Pattern struct {
|
type Pattern struct {
|
||||||
Regex string `json:"regex"`
|
Regex string `json:"regex"`
|
||||||
Ignore []string `json:"ignore"`
|
Ignore []string `json:"ignore"`
|
||||||
@ -64,19 +52,11 @@ type Filter struct {
|
|||||||
longuestActionDuration *time.Duration
|
longuestActionDuration *time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type OutputWrite struct {
|
|
||||||
OutputName string `json:"output"`
|
|
||||||
Text []string `json:"text"`
|
|
||||||
|
|
||||||
Output *Output
|
|
||||||
}
|
|
||||||
|
|
||||||
type Action struct {
|
type Action struct {
|
||||||
filter *Filter `json:"-"`
|
filter *Filter `json:"-"`
|
||||||
name string `json:"-"`
|
name string `json:"-"`
|
||||||
|
|
||||||
Cmd []string `json:"cmd"`
|
Cmd []string `json:"cmd"`
|
||||||
Write *OutputWrite `json:"write"`
|
|
||||||
|
|
||||||
After string `json:"after"`
|
After string `json:"after"`
|
||||||
afterDuration time.Duration `json:"-"`
|
afterDuration time.Duration `json:"-"`
|
||||||
@ -87,7 +67,7 @@ type Action struct {
|
|||||||
type LogEntry struct {
|
type LogEntry struct {
|
||||||
T time.Time
|
T time.Time
|
||||||
S int64
|
S int64
|
||||||
Pattern []string
|
Pattern string
|
||||||
Stream, Filter string
|
Stream, Filter string
|
||||||
SF int
|
SF int
|
||||||
Exec bool
|
Exec bool
|
||||||
@ -102,43 +82,43 @@ type WriteDB struct {
|
|||||||
file *os.File
|
file *os.File
|
||||||
enc *gob.Encoder
|
enc *gob.Encoder
|
||||||
}
|
}
|
||||||
// https://stackoverflow.com/a/69691894
|
|
||||||
type MatchesMap map[*PF]map[time.Time]struct{}
|
type MatchesMap map[PF]map[time.Time]struct{}
|
||||||
type ActionsMap map[*PA]map[time.Time]struct{}
|
type ActionsMap map[PA]map[time.Time]struct{}
|
||||||
|
|
||||||
// Helper structs made to carry information
|
// Helper structs made to carry information
|
||||||
// Stream, Filter
|
// Stream, Filter
|
||||||
type SF struct{ s, f string }
|
type SF struct{ s, f string }
|
||||||
// Pattern, Stream, Filter
|
// Pattern, Stream, Filter
|
||||||
type PSF struct{
|
type PSF struct{
|
||||||
p []string
|
p string
|
||||||
s string
|
s string
|
||||||
f string
|
f string
|
||||||
}
|
}
|
||||||
type PF struct {
|
type PF struct {
|
||||||
p []string
|
p string
|
||||||
f *Filter
|
f *Filter
|
||||||
}
|
}
|
||||||
type PFT struct {
|
type PFT struct {
|
||||||
p []string
|
p string
|
||||||
f *Filter
|
f *Filter
|
||||||
t time.Time
|
t time.Time
|
||||||
}
|
}
|
||||||
type PA struct {
|
type PA struct {
|
||||||
p []string
|
p string
|
||||||
a *Action
|
a *Action
|
||||||
}
|
}
|
||||||
type PAT struct {
|
type PAT struct {
|
||||||
p []string
|
p string
|
||||||
a *Action
|
a *Action
|
||||||
t time.Time
|
t time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type FlushMatchOrder struct {
|
type FlushMatchOrder struct {
|
||||||
p []string
|
p string
|
||||||
ret chan MatchesMap
|
ret chan MatchesMap
|
||||||
}
|
}
|
||||||
type FlushActionOrder struct {
|
type FlushActionOrder struct {
|
||||||
p []string
|
p string
|
||||||
ret chan ActionsMap
|
ret chan ActionsMap
|
||||||
}
|
}
|
||||||
|
@ -1,59 +0,0 @@
|
|||||||
---
|
|
||||||
concurrency: 0
|
|
||||||
|
|
||||||
# patterns are substitued in regexes.
|
|
||||||
# when a filter performs an action, it replaces the found pattern
|
|
||||||
patterns:
|
|
||||||
ip:
|
|
||||||
# reaction regex syntax is defined here: https://github.com/google/re2/wiki/Syntax
|
|
||||||
# simple version: regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
|
|
||||||
regex: '(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}|(?:(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9]))'
|
|
||||||
ignore:
|
|
||||||
- 127.0.0.1
|
|
||||||
- ::1
|
|
||||||
# Patterns can be ignored based on regexes, it will try to match the whole string detected by the pattern
|
|
||||||
# ignoreregex:
|
|
||||||
# - '10\.0\.[0-9]{1,3}\.[0-9]{1,3}'
|
|
||||||
login:
|
|
||||||
regex: '[a-zA-Z0-9_\-\.]*'
|
|
||||||
|
|
||||||
method:
|
|
||||||
regex: '.*'
|
|
||||||
|
|
||||||
port:
|
|
||||||
regex: '[0-9]{1,5}'
|
|
||||||
|
|
||||||
# Outputs are commands returning stdin you can use in write actions.
|
|
||||||
# This can ben used to get a persistent connection to p.e. a KV database you will write into,
|
|
||||||
# eliminating the overhead of executing a process each time action is trigged.
|
|
||||||
outputs:
|
|
||||||
redis:
|
|
||||||
start: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis']
|
|
||||||
# tee:
|
|
||||||
# start: ['tee', 'output.log']
|
|
||||||
|
|
||||||
|
|
||||||
# streams are commands
|
|
||||||
# they are run and their ouptut is captured
|
|
||||||
# *example:* `tail -f /var/log/nginx/access.log`
|
|
||||||
# their output will be used by one or more filters
|
|
||||||
streams:
|
|
||||||
# streams have a user-defined name
|
|
||||||
ssh:
|
|
||||||
# note that if the command is not in environment's `PATH`
|
|
||||||
# its full path must be given.
|
|
||||||
cmd: ['tail', '-f', '/var/log/auth.log']
|
|
||||||
# filters run actions when they match regexes on a stream
|
|
||||||
filters:
|
|
||||||
# filters have a user-defined name
|
|
||||||
acceptedlogin:
|
|
||||||
# reaction's regex syntax is defined here: https://github.com/google/re2/wiki/Syntax
|
|
||||||
regex:
|
|
||||||
- 'Accepted <method> for <login> from <ip> port <port>'
|
|
||||||
# actions are run by the filter when regexes are matched
|
|
||||||
actions:
|
|
||||||
# actions have a user-defined name
|
|
||||||
store2redis:
|
|
||||||
write:
|
|
||||||
output: redis
|
|
||||||
text: ['XADD', 'logins', '*', 'username', '<login>', 'method', '<method>', 'ip', '<ip>', 'port', '<port>']
|
|
@ -1,50 +0,0 @@
|
|||||||
---
|
|
||||||
patterns:
|
|
||||||
num:
|
|
||||||
regex: '[0-9]+'
|
|
||||||
idx:
|
|
||||||
regex: '[0-9]+'
|
|
||||||
ip:
|
|
||||||
regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
|
|
||||||
ignore:
|
|
||||||
- 1.0.0.1
|
|
||||||
|
|
||||||
concurrency: 0
|
|
||||||
|
|
||||||
streams:
|
|
||||||
tailDown1:
|
|
||||||
cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo found $(($i % 100)) for test 1; done' ]
|
|
||||||
filters:
|
|
||||||
findIP:
|
|
||||||
regex:
|
|
||||||
- '^found <num> for test <idx>$'
|
|
||||||
actions:
|
|
||||||
store2redis:
|
|
||||||
cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
|
|
||||||
tailDown2:
|
|
||||||
cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo prout $(($i % 100)) for test 2; done' ]
|
|
||||||
filters:
|
|
||||||
findIP:
|
|
||||||
regex:
|
|
||||||
- '^prout <num> for test <idx>$'
|
|
||||||
actions:
|
|
||||||
store2redis:
|
|
||||||
cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
|
|
||||||
tailDown3:
|
|
||||||
cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo nanana $(($i % 100)) for test 3; done' ]
|
|
||||||
filters:
|
|
||||||
findIP:
|
|
||||||
regex:
|
|
||||||
- '^nanana <num> for test <idx>$'
|
|
||||||
actions:
|
|
||||||
store2redis:
|
|
||||||
cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
|
|
||||||
tailDown4:
|
|
||||||
cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo nanana $(($i % 100)) for test 4; done' ]
|
|
||||||
filters:
|
|
||||||
findIP:
|
|
||||||
regex:
|
|
||||||
- '^nomatch <num> for test <idx>$'
|
|
||||||
actions:
|
|
||||||
store2redis:
|
|
||||||
cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
|
|
@ -1,62 +0,0 @@
|
|||||||
---
|
|
||||||
patterns:
|
|
||||||
num:
|
|
||||||
regex: '[0-9]+'
|
|
||||||
idx:
|
|
||||||
regex: '[0-9]+'
|
|
||||||
ip:
|
|
||||||
regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
|
|
||||||
ignore:
|
|
||||||
- 1.0.0.1
|
|
||||||
|
|
||||||
concurrency: 0
|
|
||||||
|
|
||||||
outputs:
|
|
||||||
redis:
|
|
||||||
start: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis']
|
|
||||||
|
|
||||||
streams:
|
|
||||||
tailDown1:
|
|
||||||
cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo found $(($i % 100)) for test 1; done' ]
|
|
||||||
filters:
|
|
||||||
findIP:
|
|
||||||
regex:
|
|
||||||
- '^found <num> for test <idx>$'
|
|
||||||
actions:
|
|
||||||
store2redis:
|
|
||||||
write:
|
|
||||||
output: redis
|
|
||||||
text: ['XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
|
|
||||||
tailDown2:
|
|
||||||
cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo prout $(($i % 100)) for test 2; done' ]
|
|
||||||
filters:
|
|
||||||
findIP:
|
|
||||||
regex:
|
|
||||||
- '^prout <num> for test <idx>$'
|
|
||||||
actions:
|
|
||||||
store2redis:
|
|
||||||
write:
|
|
||||||
output: redis
|
|
||||||
text: ['XADD', 'teststream', '*', 'prout', '<num>', 'test', '<idx>']
|
|
||||||
tailDown3:
|
|
||||||
cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo nanana $(($i % 100)) for test 3; done' ]
|
|
||||||
filters:
|
|
||||||
findIP:
|
|
||||||
regex:
|
|
||||||
- '^nanana <num> for test <idx>$'
|
|
||||||
actions:
|
|
||||||
store2redis:
|
|
||||||
write:
|
|
||||||
output: redis
|
|
||||||
text: ['XADD', 'teststream', '*', 'nanana', '<num>', 'test', '<idx>']
|
|
||||||
tailDown4:
|
|
||||||
cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo nanana $(($i % 100)) for test 4; done' ]
|
|
||||||
filters:
|
|
||||||
findIP:
|
|
||||||
regex:
|
|
||||||
- '^nomatch <num> for test <idx>$'
|
|
||||||
actions:
|
|
||||||
store2redis:
|
|
||||||
write:
|
|
||||||
output: redis
|
|
||||||
text: ['XADD', 'teststream', '*', 'nomatch', '<num>', 'test', '<idx>']
|
|
Loading…
Reference in New Issue
Block a user