Compare commits

...

4 Commits

12 changed files with 412 additions and 72 deletions

View File

@ -20,7 +20,7 @@ const (
type Request struct { type Request struct {
Request int Request int
Pattern string Pattern []string
} }
type Response struct { type Response struct {
@ -85,7 +85,7 @@ func usage(err string) {
} }
func ClientShow(format, stream, filter string, regex *regexp.Regexp) { func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
response := SendAndRetrieve(Request{Show, ""}) response := SendAndRetrieve(Request{Show, []string{""}})
if response.Err != nil { if response.Err != nil {
logger.Fatalln("Received error from daemon:", response.Err) logger.Fatalln("Received error from daemon:", response.Err)
} }
@ -166,7 +166,7 @@ func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
os.Exit(0) os.Exit(0)
} }
func ClientFlush(pattern, streamfilter, format string) { func ClientFlush(pattern []string, streamfilter, format string) {
response := SendAndRetrieve(Request{Flush, pattern}) response := SendAndRetrieve(Request{Flush, pattern})
if response.Err != nil { if response.Err != nil {
logger.Fatalln("Received error from daemon:", response.Err) logger.Fatalln("Received error from daemon:", response.Err)

View File

@ -2,6 +2,8 @@ package app
import ( import (
"bufio" "bufio"
"bytes"
"fmt"
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
@ -13,6 +15,71 @@ import (
"framagit.org/ppom/reaction/logger" "framagit.org/ppom/reaction/logger"
) )
// Compare content and ordering. Case sensitive.
func IsStringArrayEqual(one, two []string) bool {
for i, a := range one {
if a != two[i] {
return false
}
}
return true
}
// Executes a command and write to its stdin via input channel until command, or reaction, dies
func cmdStdin(commandline []string, input <-chan string) {
cmd := exec.Command(commandline[0], commandline[1:]...)
stdin, err := cmd.StdinPipe()
if err != nil {
logger.Fatalln("couldn't open stdin on command:", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
logger.Fatalln("couldn't open stdout on command:", err)
}
if err := cmd.Start(); err != nil {
logger.Fatalln("couldn't start command:", err)
}
defer stdin.Close()
logger.Printf(logger.INFO, fmt.Sprintf("Output started with %v\n", commandline))
// stdout displaying thread
go func() {
for {
// FIXME
tmp := make([]byte, 1024)
_, err := stdout.Read(tmp)
if len(bytes.Trim(tmp, "\x00")) > 0 {
for _, line := range strings.Split(strings.ReplaceAll(string(bytes.Trim(tmp, "\x00")), "\r\n", "\n"), "\n") {
if len(line) > 0 {
logger.Printf(logger.INFO, fmt.Sprintf("Output returned %s", line))
}
}
}
if err != nil {
logger.Printf(logger.ERROR, fmt.Sprintf("Reading output error: %v\n", err))
break
}
}
}()
// Stdin writing thread
go func() {
for {
in := <-input
_, err := stdin.Write([]byte(in))
if err != nil {
logger.Printf(logger.ERROR, fmt.Sprintf("Writing to output error: %v\n", err))
break
}
}
}()
err = cmd.Wait()
logger.Fatalln("command %v stopped: %v", cmd, err)
}
// Executes a command and channel-send its stdout // Executes a command and channel-send its stdout
func cmdStdout(commandline []string) chan *string { func cmdStdout(commandline []string) chan *string {
lines := make(chan *string) lines := make(chan *string)
@ -77,44 +144,68 @@ func (p *Pattern) notAnIgnore(match *string) bool {
} }
// Whether one of the filter's regexes is matched on a line // Whether one of the filter's regexes is matched on a line
func (f *Filter) match(line *string) string { func (f *Filter) match(line *string) []string {
var result []string
for _, regex := range f.compiledRegex { for _, regex := range f.compiledRegex {
if matches := regex.FindStringSubmatch(*line); matches != nil { if matches := regex.FindStringSubmatch(*line); matches != nil {
var pnames []string
for _, p := range f.pattern {
pnames = append(pnames, p.name)
}
if f.pattern != nil { for _, p := range f.pattern {
match := matches[regex.SubexpIndex(f.pattern.name)] match := matches[regex.SubexpIndex(p.name)]
if p.notAnIgnore(&match) {
if f.pattern.notAnIgnore(&match) {
logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match) logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match)
return match result = append(result, match)
} }
} else { }
logger.Printf(logger.INFO, "%s.%s: match [.]\n", f.stream.name, f.name) if f.pattern == nil {
// No pattern, so this match will never actually be used // No pattern, so this match will never actually be used
return "." return nil
} }
} }
} }
return "" if len(result) == len(f.pattern) {
return result
} else {
// Incomplete match = no match.
return nil
}
} }
func (f *Filter) sendActions(match string, at time.Time) { func (f *Filter) sendActions(match []string, at time.Time) {
for _, a := range f.Actions { for _, a := range f.Actions {
actionsC <- PAT{match, a, at.Add(a.afterDuration)} actionsC <- PAT{match, a, at.Add(a.afterDuration)}
} }
} }
func (a *Action) exec(match string) { func (a *Action) exec(match []string) {
defer wgActions.Done() defer wgActions.Done()
if len(a.Cmd) > 0 {
a.execCmd(match)
}
if a.Write != nil {
a.execWrite(match)
}
}
func (a *Action) execCmd(match []string) {
var computedCommand []string var computedCommand []string
var cmdItem string
if a.filter.pattern != nil { if a.filter.pattern != nil {
computedCommand = make([]string, 0, len(a.Cmd)) computedCommand = make([]string, 0, len(a.Cmd))
for _, item := range a.Cmd { for _, item := range a.Cmd {
computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match)) cmdItem = strings.Clone(item)
for i, p := range a.filter.pattern {
cmdItem = strings.ReplaceAll(cmdItem, p.nameWithBraces, match[i])
}
computedCommand = append(computedCommand, cmdItem)
} }
} else { } else {
computedCommand = a.Cmd computedCommand = a.Cmd
@ -129,6 +220,29 @@ func (a *Action) exec(match string) {
} }
} }
func (a *Action) execWrite(match []string) {
var computedWrite string
var writeItem string
if a.filter.pattern != nil {
for _, item := range a.Write.Text {
writeItem = strings.Clone(item)
for i, p := range a.filter.pattern {
writeItem = strings.ReplaceAll(writeItem, p.nameWithBraces, match[i])
}
if len(computedWrite) > 0 {
computedWrite = computedWrite + " " + writeItem
} else {
computedWrite = writeItem
}
}
} else {
computedWrite = strings.Join(a.Write.Text, " ")
}
a.Write.Output.Stdin <- fmt.Sprintf("%s\n", computedWrite)
}
func ActionsManager(concurrency int) { func ActionsManager(concurrency int) {
// concurrency init // concurrency init
execActionsC := make(chan PA) execActionsC := make(chan PA)
@ -153,7 +267,7 @@ func ActionsManager(concurrency int) {
} }
}() }()
} }
execAction := func(a *Action, p string) { execAction := func(a *Action, p []string) {
wgActions.Add(1) wgActions.Add(1)
execActionsC <- PA{p, a} execActionsC <- PA{p, a}
} }
@ -171,10 +285,10 @@ func ActionsManager(concurrency int) {
execAction(action, pattern) execAction(action, pattern)
} else { } else {
actionsLock.Lock() actionsLock.Lock()
if actions[pa] == nil { if actions[&pa] == nil {
actions[pa] = make(map[time.Time]struct{}) actions[&pa] = make(map[time.Time]struct{})
} }
actions[pa][then] = struct{}{} actions[&pa][then] = struct{}{}
actionsLock.Unlock() actionsLock.Unlock()
go func(insidePat PAT, insideNow time.Time) { go func(insidePat PAT, insideNow time.Time) {
time.Sleep(insidePat.t.Sub(insideNow)) time.Sleep(insidePat.t.Sub(insideNow))
@ -185,8 +299,8 @@ func ActionsManager(concurrency int) {
pa := PA{pat.p, pat.a} pa := PA{pat.p, pat.a}
pattern, action, then := pat.p, pat.a, pat.t pattern, action, then := pat.p, pat.a, pat.t
actionsLock.Lock() actionsLock.Lock()
if actions[pa] != nil { if actions[&pa] != nil {
delete(actions[pa], then) delete(actions[&pa], then)
} }
actionsLock.Unlock() actionsLock.Unlock()
execAction(action, pattern) execAction(action, pattern)
@ -194,7 +308,7 @@ func ActionsManager(concurrency int) {
ret := make(ActionsMap) ret := make(ActionsMap)
actionsLock.Lock() actionsLock.Lock()
for pa := range actions { for pa := range actions {
if pa.p == fo.p { if IsStringArrayEqual(pa.p, fo.p) {
for range actions[pa] { for range actions[pa] {
execAction(pa.a, pa.p) execAction(pa.a, pa.p)
} }
@ -257,7 +371,7 @@ func matchesManagerHandleFlush(fo FlushMatchOrder) {
ret := make(MatchesMap) ret := make(MatchesMap)
matchesLock.Lock() matchesLock.Lock()
for pf := range matches { for pf := range matches {
if fo.p == pf.p { if IsStringArrayEqual(fo.p, pf.p) {
if fo.ret != nil { if fo.ret != nil {
ret[pf] = matches[pf] ret[pf] = matches[pf]
} }
@ -279,26 +393,26 @@ func matchesManagerHandleMatch(pft PFT) bool {
if filter.Retry > 1 { if filter.Retry > 1 {
// make sure map exists // make sure map exists
if matches[pf] == nil { if matches[&pf] == nil {
matches[pf] = make(map[time.Time]struct{}) matches[&pf] = make(map[time.Time]struct{})
} }
// add new match // add new match
matches[pf][then] = struct{}{} matches[&pf][then] = struct{}{}
// remove match when expired // remove match when expired
go func(pf PF, then time.Time) { go func(pf PF, then time.Time) {
time.Sleep(then.Sub(time.Now()) + filter.retryDuration) time.Sleep(then.Sub(time.Now()) + filter.retryDuration)
matchesLock.Lock() matchesLock.Lock()
if matches[pf] != nil { if matches[&pf] != nil {
// FIXME replace this and all similar occurences // FIXME replace this and all similar occurences
// by clear() when switching to go 1.21 // by clear() when switching to go 1.21
delete(matches[pf], then) delete(matches[&pf], then)
} }
matchesLock.Unlock() matchesLock.Unlock()
}(pf, then) }(pf, then)
} }
if filter.Retry <= 1 || len(matches[pf]) >= filter.Retry { if filter.Retry <= 1 || len(matches[&pf]) >= filter.Retry {
delete(matches, pf) delete(matches, &pf)
filter.sendActions(pattern, then) filter.sendActions(pattern, then)
return true return true
} }
@ -318,7 +432,7 @@ func StreamManager(s *Stream, endedSignal chan *Stream) {
return return
} }
for _, filter := range s.Filters { for _, filter := range s.Filters {
if match := filter.match(line); match != "" { if match := filter.match(line); len(match) > 0 {
matchesC <- PFT{match, filter, time.Now()} matchesC <- PFT{match, filter, time.Now()}
} }
} }
@ -329,6 +443,14 @@ func StreamManager(s *Stream, endedSignal chan *Stream) {
} }
func OutputsManager(c *Conf) {
for outputName := range c.Outputs {
output := c.Outputs[outputName]
output.Stdin = make(chan string)
cmdStdin(output.Start, output.Stdin)
}
}
var actions ActionsMap var actions ActionsMap
var matches MatchesMap var matches MatchesMap
var actionsLock sync.Mutex var actionsLock sync.Mutex
@ -392,6 +514,7 @@ func Daemon(confFilename string) {
_ = runCommands(conf.Start, "start") _ = runCommands(conf.Start, "start")
go DatabaseManager(conf) go DatabaseManager(conf)
go OutputsManager(conf)
go MatchesManager() go MatchesManager()
go ActionsManager(conf.Concurrency) go ActionsManager(conf.Concurrency)

View File

@ -103,6 +103,8 @@ func basicUsage() {
# remove currently active matches and run currently pending actions for the specified TARGET # remove currently active matches and run currently pending actions for the specified TARGET
# (then show flushed matches and actions) # (then show flushed matches and actions)
# e.g. reaction flush 192.168.1.1 # e.g. reaction flush 192.168.1.1
# Concatenate patterns with " / " if several patterns in TARGET
# e.g. reaction flush "192.168.1.1 / root"
# options: # options:
-s/--socket SOCKET # path to the client-daemon communication socket -s/--socket SOCKET # path to the client-daemon communication socket
@ -209,7 +211,7 @@ func Main(version, commit string) {
logger.Fatalln("for now, -l/--limit is not supported") logger.Fatalln("for now, -l/--limit is not supported")
os.Exit(1) os.Exit(1)
} }
ClientFlush(f.Arg(0), *limit, *queryFormat) ClientFlush(strings.Split(f.Arg(0), " / "), *limit, *queryFormat)
case "test-regex": case "test-regex":
// socket not needed, no interaction with the daemon // socket not needed, no interaction with the daemon

View File

@ -134,7 +134,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
}() }()
// pattern, stream, fitler → last flush // pattern, stream, fitler → last flush
flushes := make(map[PSF]time.Time) flushes := make(map[*PSF]time.Time)
for { for {
var entry LogEntry var entry LogEntry
var filter *Filter var filter *Filter
@ -160,7 +160,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
} }
// store // store
flushes[PSF{entry.Pattern, entry.Stream, entry.Filter}] = entry.T flushes[&PSF{entry.Pattern, entry.Stream, entry.Filter}] = entry.T
} }
lastTimeCpt := int64(0) lastTimeCpt := int64(0)
@ -201,8 +201,8 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
} }
// check if it hasn't been flushed // check if it hasn't been flushed
lastGlobalFlush := flushes[PSF{entry.Pattern, "", ""}].Unix() lastGlobalFlush := flushes[&PSF{entry.Pattern, "", ""}].Unix()
lastLocalFlush := flushes[PSF{entry.Pattern, entry.Stream, entry.Filter}].Unix() lastLocalFlush := flushes[&PSF{entry.Pattern, entry.Stream, entry.Filter}].Unix()
entryTime := entry.T.Unix() entryTime := entry.T.Unix()
if lastLocalFlush > entryTime || lastGlobalFlush > entryTime { if lastLocalFlush > entryTime || lastGlobalFlush > entryTime {
continue continue

View File

@ -7,6 +7,7 @@ import (
"path" "path"
"sync" "sync"
"time" "time"
"strings"
"framagit.org/ppom/reaction/logger" "framagit.org/ppom/reaction/logger"
) )
@ -24,7 +25,7 @@ func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_a
if cs[filter.stream.name][filter.name] == nil { if cs[filter.stream.name][filter.name] == nil {
cs[filter.stream.name][filter.name] = make(MapPatternStatus) cs[filter.stream.name][filter.name] = make(MapPatternStatus)
} }
cs[filter.stream.name][filter.name][pattern] = &PatternStatus{len(times), nil} cs[filter.stream.name][filter.name][strings.Join(pattern, " / ")] = &PatternStatus{len(times), nil}
} }
local_matchesLock.Unlock() local_matchesLock.Unlock()
@ -39,10 +40,10 @@ func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_a
if cs[action.filter.stream.name][action.filter.name] == nil { if cs[action.filter.stream.name][action.filter.name] == nil {
cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus) cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus)
} }
if cs[action.filter.stream.name][action.filter.name][pattern] == nil { if cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] == nil {
cs[action.filter.stream.name][action.filter.name][pattern] = new(PatternStatus) cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] = new(PatternStatus)
} }
ps := cs[action.filter.stream.name][action.filter.name][pattern] ps := cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")]
if ps.Actions == nil { if ps.Actions == nil {
ps.Actions = make(map[string][]string) ps.Actions = make(map[string][]string)
} }

View File

@ -13,6 +13,7 @@ import (
"framagit.org/ppom/reaction/logger" "framagit.org/ppom/reaction/logger"
"github.com/google/go-jsonnet" "github.com/google/go-jsonnet"
"golang.org/x/exp/slices"
) )
func (c *Conf) setup() { func (c *Conf) setup() {
@ -20,6 +21,15 @@ func (c *Conf) setup() {
c.Concurrency = runtime.NumCPU() c.Concurrency = runtime.NumCPU()
} }
for outputName := range c.Outputs {
output := c.Outputs[outputName]
output.name = outputName
if len(output.Start) == 0 {
logger.Fatalf("Bad configuration: output's start %v is empty!", outputName)
}
}
for patternName := range c.Patterns { for patternName := range c.Patterns {
pattern := c.Patterns[patternName] pattern := c.Patterns[patternName]
pattern.name = patternName pattern.name = patternName
@ -74,17 +84,17 @@ func (c *Conf) setup() {
filter.name = filterName filter.name = filterName
if strings.Contains(filter.name, ".") { if strings.Contains(filter.name, ".") {
logger.Fatalf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name) logger.Fatalf(fmt.Sprintf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name))
} }
// Parse Duration // Parse Duration
if filter.RetryPeriod == "" { if filter.RetryPeriod == "" {
if filter.Retry > 1 { if filter.Retry > 1 {
logger.Fatalf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name) logger.Fatalf(fmt.Sprintf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name))
} }
} else { } else {
retryDuration, err := time.ParseDuration(filter.RetryPeriod) retryDuration, err := time.ParseDuration(filter.RetryPeriod)
if err != nil { if err != nil {
logger.Fatalf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err) logger.Fatalf(fmt.Sprintf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err))
} }
filter.retryDuration = retryDuration filter.retryDuration = retryDuration
} }
@ -95,27 +105,17 @@ func (c *Conf) setup() {
// Compute Regexes // Compute Regexes
// Look for Patterns inside Regexes // Look for Patterns inside Regexes
for _, regex := range filter.Regex { for _, regex := range filter.Regex {
for patternName, pattern := range c.Patterns { for _, pattern := range c.Patterns {
if strings.Contains(regex, pattern.nameWithBraces) { if strings.Contains(regex, pattern.nameWithBraces) {
if !slices.Contains(filter.pattern, pattern) {
if filter.pattern == nil { filter.pattern = append(filter.pattern, pattern)
filter.pattern = pattern
} else if filter.pattern == pattern {
// no op
} else {
logger.Fatalf(
"Bad configuration: Can't mix different patterns (%s, %s) in same filter (%s.%s)\n",
filter.pattern.name, patternName, streamName, filterName,
)
} }
// FIXME should go in the `if filter.pattern == nil`?
regex = strings.Replace(regex, pattern.nameWithBraces, pattern.Regex, 1) regex = strings.Replace(regex, pattern.nameWithBraces, pattern.Regex, 1)
} }
} }
compiledRegex, err := regexp.Compile(regex) compiledRegex, err := regexp.Compile(regex)
if err != nil { if err != nil {
log.Fatalf("%vBad configuration: regex of filter %s.%s: %v", logger.FATAL, stream.name, filter.name, err) log.Fatal(fmt.Sprintf("Bad configuration: regex of filter %s.%s: %v", stream.name, filter.name, err))
} }
filter.compiledRegex = append(filter.compiledRegex, *compiledRegex) filter.compiledRegex = append(filter.compiledRegex, *compiledRegex)
} }
@ -125,7 +125,7 @@ func (c *Conf) setup() {
} }
for actionName := range filter.Actions { for actionName := range filter.Actions {
action := filter.Actions[actionName] action := filter.Actions[actionName]
action.filter = filter action.filter = filter
action.name = actionName action.name = actionName
@ -145,6 +145,20 @@ func (c *Conf) setup() {
if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() { if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() {
filter.longuestActionDuration = &action.afterDuration filter.longuestActionDuration = &action.afterDuration
} }
if action.Write != nil {
found := false
for oname := range c.Outputs {
if strings.EqualFold(oname, action.Write.OutputName) {
action.Write.Output = c.Outputs[oname]
found = true
}
}
if !found {
logger.Fatalln(fmt.Sprintf("Bad configuration: action %s.%s.%s refers to undeclared output %s",
stream.name, filter.name, action.name, action.Write.OutputName))
}
}
} }
} }
} }

View File

@ -9,12 +9,24 @@ import (
type Conf struct { type Conf struct {
Concurrency int `json:"concurrency"` Concurrency int `json:"concurrency"`
Outputs map[string]*Output `json:"outputs"`
Patterns map[string]*Pattern `json:"patterns"` Patterns map[string]*Pattern `json:"patterns"`
Streams map[string]*Stream `json:"streams"` Streams map[string]*Stream `json:"streams"`
Start [][]string `json:"start"` Start [][]string `json:"start"`
Stop [][]string `json:"stop"` Stop [][]string `json:"stop"`
} }
type Output struct {
Start []string `json:"start"`
Stop []string `json:"stop"`
// TODO: Restart when lost communication with output
//Restart string `json:"restart"`
name string `json:"-"`
Stdin chan string
}
type Pattern struct { type Pattern struct {
Regex string `json:"regex"` Regex string `json:"regex"`
Ignore []string `json:"ignore"` Ignore []string `json:"ignore"`
@ -42,7 +54,7 @@ type Filter struct {
Regex []string `json:"regex"` Regex []string `json:"regex"`
compiledRegex []regexp.Regexp `json:"-"` compiledRegex []regexp.Regexp `json:"-"`
pattern *Pattern `json:"-"` pattern []*Pattern `json:"-"`
Retry int `json:"retry"` Retry int `json:"retry"`
RetryPeriod string `json:"retryperiod"` RetryPeriod string `json:"retryperiod"`
@ -52,11 +64,19 @@ type Filter struct {
longuestActionDuration *time.Duration longuestActionDuration *time.Duration
} }
type OutputWrite struct {
OutputName string `json:"output"`
Text []string `json:"text"`
Output *Output
}
type Action struct { type Action struct {
filter *Filter `json:"-"` filter *Filter `json:"-"`
name string `json:"-"` name string `json:"-"`
Cmd []string `json:"cmd"` Cmd []string `json:"cmd"`
Write *OutputWrite `json:"write"`
After string `json:"after"` After string `json:"after"`
afterDuration time.Duration `json:"-"` afterDuration time.Duration `json:"-"`
@ -67,7 +87,7 @@ type Action struct {
type LogEntry struct { type LogEntry struct {
T time.Time T time.Time
S int64 S int64
Pattern string Pattern []string
Stream, Filter string Stream, Filter string
SF int SF int
Exec bool Exec bool
@ -82,37 +102,43 @@ type WriteDB struct {
file *os.File file *os.File
enc *gob.Encoder enc *gob.Encoder
} }
// https://stackoverflow.com/a/69691894
type MatchesMap map[PF]map[time.Time]struct{} type MatchesMap map[*PF]map[time.Time]struct{}
type ActionsMap map[PA]map[time.Time]struct{} type ActionsMap map[*PA]map[time.Time]struct{}
// Helper structs made to carry information // Helper structs made to carry information
// Stream, Filter
type SF struct{ s, f string } type SF struct{ s, f string }
type PSF struct{ p, s, f string } // Pattern, Stream, Filter
type PSF struct{
p []string
s string
f string
}
type PF struct { type PF struct {
p string p []string
f *Filter f *Filter
} }
type PFT struct { type PFT struct {
p string p []string
f *Filter f *Filter
t time.Time t time.Time
} }
type PA struct { type PA struct {
p string p []string
a *Action a *Action
} }
type PAT struct { type PAT struct {
p string p []string
a *Action a *Action
t time.Time t time.Time
} }
type FlushMatchOrder struct { type FlushMatchOrder struct {
p string p []string
ret chan MatchesMap ret chan MatchesMap
} }
type FlushActionOrder struct { type FlushActionOrder struct {
p string p []string
ret chan ActionsMap ret chan ActionsMap
} }

View File

@ -0,0 +1,59 @@
---
concurrency: 0
# patterns are substitued in regexes.
# when a filter performs an action, it replaces the found pattern
patterns:
ip:
# reaction regex syntax is defined here: https://github.com/google/re2/wiki/Syntax
# simple version: regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
regex: '(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}|(?:(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9]))'
ignore:
- 127.0.0.1
- ::1
# Patterns can be ignored based on regexes, it will try to match the whole string detected by the pattern
# ignoreregex:
# - '10\.0\.[0-9]{1,3}\.[0-9]{1,3}'
login:
regex: '[a-zA-Z0-9_\-\.]*'
method:
regex: '.*'
port:
regex: '[0-9]{1,5}'
# Outputs are commands returning stdin you can use in write actions.
# This can ben used to get a persistent connection to p.e. a KV database you will write into,
# eliminating the overhead of executing a process each time action is trigged.
outputs:
redis:
start: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis']
# tee:
# start: ['tee', 'output.log']
# streams are commands
# they are run and their ouptut is captured
# *example:* `tail -f /var/log/nginx/access.log`
# their output will be used by one or more filters
streams:
# streams have a user-defined name
ssh:
# note that if the command is not in environment's `PATH`
# its full path must be given.
cmd: ['tail', '-f', '/var/log/auth.log']
# filters run actions when they match regexes on a stream
filters:
# filters have a user-defined name
acceptedlogin:
# reaction's regex syntax is defined here: https://github.com/google/re2/wiki/Syntax
regex:
- 'Accepted <method> for <login> from <ip> port <port>'
# actions are run by the filter when regexes are matched
actions:
# actions have a user-defined name
store2redis:
write:
output: redis
text: ['XADD', 'logins', '*', 'username', '<login>', 'method', '<method>', 'ip', '<ip>', 'port', '<port>']

View File

@ -0,0 +1,50 @@
---
patterns:
num:
regex: '[0-9]+'
idx:
regex: '[0-9]+'
ip:
regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
ignore:
- 1.0.0.1
concurrency: 0
streams:
tailDown1:
cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo found $(($i % 100)) for test 1; done' ]
filters:
findIP:
regex:
- '^found <num> for test <idx>$'
actions:
store2redis:
cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
tailDown2:
cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo prout $(($i % 100)) for test 2; done' ]
filters:
findIP:
regex:
- '^prout <num> for test <idx>$'
actions:
store2redis:
cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
tailDown3:
cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo nanana $(($i % 100)) for test 3; done' ]
filters:
findIP:
regex:
- '^nanana <num> for test <idx>$'
actions:
store2redis:
cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
tailDown4:
cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo nanana $(($i % 100)) for test 4; done' ]
filters:
findIP:
regex:
- '^nomatch <num> for test <idx>$'
actions:
store2redis:
cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']

View File

@ -0,0 +1,62 @@
---
patterns:
num:
regex: '[0-9]+'
idx:
regex: '[0-9]+'
ip:
regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
ignore:
- 1.0.0.1
concurrency: 0
outputs:
redis:
start: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis']
streams:
tailDown1:
cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo found $(($i % 100)) for test 1; done' ]
filters:
findIP:
regex:
- '^found <num> for test <idx>$'
actions:
store2redis:
write:
output: redis
text: ['XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
tailDown2:
cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo prout $(($i % 100)) for test 2; done' ]
filters:
findIP:
regex:
- '^prout <num> for test <idx>$'
actions:
store2redis:
write:
output: redis
text: ['XADD', 'teststream', '*', 'prout', '<num>', 'test', '<idx>']
tailDown3:
cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo nanana $(($i % 100)) for test 3; done' ]
filters:
findIP:
regex:
- '^nanana <num> for test <idx>$'
actions:
store2redis:
write:
output: redis
text: ['XADD', 'teststream', '*', 'nanana', '<num>', 'test', '<idx>']
tailDown4:
cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo nanana $(($i % 100)) for test 4; done' ]
filters:
findIP:
regex:
- '^nomatch <num> for test <idx>$'
actions:
store2redis:
write:
output: redis
text: ['XADD', 'teststream', '*', 'nomatch', '<num>', 'test', '<idx>']

1
go.mod
View File

@ -4,6 +4,7 @@ go 1.20
require ( require (
github.com/google/go-jsonnet v0.20.0 github.com/google/go-jsonnet v0.20.0
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a
sigs.k8s.io/yaml v1.1.0 sigs.k8s.io/yaml v1.1.0
) )

2
go.sum
View File

@ -1,6 +1,8 @@
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-jsonnet v0.20.0 h1:WG4TTSARuV7bSm4PMB4ohjxe33IHT5WVTrJSU33uT4g= github.com/google/go-jsonnet v0.20.0 h1:WG4TTSARuV7bSm4PMB4ohjxe33IHT5WVTrJSU33uT4g=
github.com/google/go-jsonnet v0.20.0/go.mod h1:VbgWF9JX7ztlv770x/TolZNGGFfiHEVx9G6ca2eUmeA= github.com/google/go-jsonnet v0.20.0/go.mod h1:VbgWF9JX7ztlv770x/TolZNGGFfiHEVx9G6ca2eUmeA=
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a h1:HinSgX1tJRX3KsL//Gxynpw5CTOAIPhgL4W8PNiIpVE=
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=