Compare commits

..

5 Commits

6 changed files with 315 additions and 2 deletions

View File

@ -2,6 +2,8 @@ package app
import (
"bufio"
"bytes"
"fmt"
"os"
"os/exec"
"os/signal"
@ -23,6 +25,61 @@ func IsStringArrayEqual(one, two []string) bool {
return true
}
// Executes a command and write to its stdin via input channel until command, or reaction, dies
func cmdStdin(commandline []string, input <-chan string) {
cmd := exec.Command(commandline[0], commandline[1:]...)
stdin, err := cmd.StdinPipe()
if err != nil {
logger.Fatalln("couldn't open stdin on command:", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
logger.Fatalln("couldn't open stdout on command:", err)
}
if err := cmd.Start(); err != nil {
logger.Fatalln("couldn't start command:", err)
}
defer stdin.Close()
logger.Printf(logger.INFO, fmt.Sprintf("Output started with %v\n", commandline))
// stdout displaying thread
go func() {
// FIXME
tmp := make([]byte, 1024)
for {
_, err := stdout.Read(tmp)
if len(bytes.Trim(tmp, "\x00")) > 0 {
for _, line := range strings.Split(strings.ReplaceAll(string(bytes.Trim(tmp, "\x00")), "\r\n", "\n"), "\n") {
if len(line) > 0 {
logger.Printf(logger.INFO, fmt.Sprintf("Output returned %s", line))
}
}
}
if err != nil {
logger.Printf(logger.ERROR, fmt.Sprintf("Reading output error: %v\n", err))
break
}
}
}()
// Stdin writing thread
go func() {
for {
in := <-input
_, err := stdin.Write([]byte(in))
if err != nil {
logger.Printf(logger.ERROR, fmt.Sprintf("Writing to output error: %v\n", err))
break
}
}
}()
err = cmd.Wait()
logger.Fatalln("command %v stopped: %v", cmd, err)
}
// Executes a command and channel-send its stdout
func cmdStdout(commandline []string) chan *string {
lines := make(chan *string)
@ -113,7 +170,7 @@ func (f *Filter) match(line *string) []string {
if len(result) == len(f.pattern) {
return result
} else {
// Incomplete match = no match
// Incomplete match = no match.
return nil
}
}
@ -127,6 +184,16 @@ func (f *Filter) sendActions(match []string, at time.Time) {
func (a *Action) exec(match []string) {
defer wgActions.Done()
if len(a.Cmd) > 0 {
a.execCmd(match)
}
if a.Write != nil {
a.execWrite(match)
}
}
func (a *Action) execCmd(match []string) {
var computedCommand []string
var cmdItem string
@ -153,6 +220,29 @@ func (a *Action) exec(match []string) {
}
}
func (a *Action) execWrite(match []string) {
var computedWrite string
var writeItem string
if a.filter.pattern != nil {
for _, item := range a.Write.Text {
writeItem = strings.Clone(item)
for i, p := range a.filter.pattern {
writeItem = strings.ReplaceAll(writeItem, p.nameWithBraces, match[i])
}
if len(computedWrite) > 0 {
computedWrite = computedWrite + " " + writeItem
} else {
computedWrite = writeItem
}
}
} else {
computedWrite = strings.Join(a.Write.Text, " ")
}
a.Write.Output.Stdin <- fmt.Sprintf("%s\n", computedWrite)
}
func ActionsManager(concurrency int) {
// concurrency init
execActionsC := make(chan PA)
@ -353,6 +443,14 @@ func StreamManager(s *Stream, endedSignal chan *Stream) {
}
func OutputsManager(c *Conf) {
for outputName := range c.Outputs {
output := c.Outputs[outputName]
output.Stdin = make(chan string)
cmdStdin(output.Start, output.Stdin)
}
}
var actions ActionsMap
var matches MatchesMap
var actionsLock sync.Mutex
@ -416,6 +514,7 @@ func Daemon(confFilename string) {
_ = runCommands(conf.Start, "start")
go DatabaseManager(conf)
go OutputsManager(conf)
go MatchesManager()
go ActionsManager(conf.Concurrency)

View File

@ -21,6 +21,15 @@ func (c *Conf) setup() {
c.Concurrency = runtime.NumCPU()
}
for outputName := range c.Outputs {
output := c.Outputs[outputName]
output.name = outputName
if len(output.Start) == 0 {
logger.Fatalf("Bad configuration: output's start %v is empty!", outputName)
}
}
for patternName := range c.Patterns {
pattern := c.Patterns[patternName]
pattern.name = patternName
@ -136,6 +145,20 @@ func (c *Conf) setup() {
if filter.longuestActionDuration == nil || filter.longuestActionDuration.Milliseconds() < action.afterDuration.Milliseconds() {
filter.longuestActionDuration = &action.afterDuration
}
if action.Write != nil {
found := false
for oname := range c.Outputs {
if strings.EqualFold(oname, action.Write.OutputName) {
action.Write.Output = c.Outputs[oname]
found = true
}
}
if !found {
logger.Fatalln(fmt.Sprintf("Bad configuration: action %s.%s.%s refers to undeclared output %s",
stream.name, filter.name, action.name, action.Write.OutputName))
}
}
}
}
}

View File

@ -9,12 +9,24 @@ import (
type Conf struct {
Concurrency int `json:"concurrency"`
Outputs map[string]*Output `json:"outputs"`
Patterns map[string]*Pattern `json:"patterns"`
Streams map[string]*Stream `json:"streams"`
Start [][]string `json:"start"`
Stop [][]string `json:"stop"`
}
type Output struct {
Start []string `json:"start"`
Stop []string `json:"stop"`
// TODO: Restart when lost communication with output
//Restart string `json:"restart"`
name string `json:"-"`
Stdin chan string
}
type Pattern struct {
Regex string `json:"regex"`
Ignore []string `json:"ignore"`
@ -52,11 +64,19 @@ type Filter struct {
longuestActionDuration *time.Duration
}
type OutputWrite struct {
OutputName string `json:"output"`
Text []string `json:"text"`
Output *Output
}
type Action struct {
filter *Filter `json:"-"`
name string `json:"-"`
Cmd []string `json:"cmd"`
Cmd []string `json:"cmd"`
Write *OutputWrite `json:"write"`
After string `json:"after"`
afterDuration time.Duration `json:"-"`

View File

@ -0,0 +1,59 @@
---
concurrency: 0
# patterns are substitued in regexes.
# when a filter performs an action, it replaces the found pattern
patterns:
ip:
# reaction regex syntax is defined here: https://github.com/google/re2/wiki/Syntax
# simple version: regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
regex: '(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}|(?:(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9]))'
ignore:
- 127.0.0.1
- ::1
# Patterns can be ignored based on regexes, it will try to match the whole string detected by the pattern
# ignoreregex:
# - '10\.0\.[0-9]{1,3}\.[0-9]{1,3}'
login:
regex: '[a-zA-Z0-9_\-\.]*'
method:
regex: '.*'
port:
regex: '[0-9]{1,5}'
# Outputs are commands returning stdin you can use in write actions.
# This can ben used to get a persistent connection to p.e. a KV database you will write into,
# eliminating the overhead of executing a process each time action is trigged.
outputs:
redis:
start: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis']
# tee:
# start: ['tee', 'output.log']
# streams are commands
# they are run and their ouptut is captured
# *example:* `tail -f /var/log/nginx/access.log`
# their output will be used by one or more filters
streams:
# streams have a user-defined name
ssh:
# note that if the command is not in environment's `PATH`
# its full path must be given.
cmd: ['tail', '-f', '/var/log/auth.log']
# filters run actions when they match regexes on a stream
filters:
# filters have a user-defined name
acceptedlogin:
# reaction's regex syntax is defined here: https://github.com/google/re2/wiki/Syntax
regex:
- 'Accepted <method> for <login> from <ip> port <port>'
# actions are run by the filter when regexes are matched
actions:
# actions have a user-defined name
store2redis:
write:
output: redis
text: ['XADD', 'logins', '*', 'username', '<login>', 'method', '<method>', 'ip', '<ip>', 'port', '<port>']

View File

@ -0,0 +1,50 @@
---
patterns:
num:
regex: '[0-9]+'
idx:
regex: '[0-9]+'
ip:
regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
ignore:
- 1.0.0.1
concurrency: 0
streams:
tailDown1:
cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo found $(($i % 100)) for test 1; done' ]
filters:
findIP:
regex:
- '^found <num> for test <idx>$'
actions:
store2redis:
cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
tailDown2:
cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo prout $(($i % 100)) for test 2; done' ]
filters:
findIP:
regex:
- '^prout <num> for test <idx>$'
actions:
store2redis:
cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
tailDown3:
cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo nanana $(($i % 100)) for test 3; done' ]
filters:
findIP:
regex:
- '^nanana <num> for test <idx>$'
actions:
store2redis:
cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
tailDown4:
cmd: [ 'sh', '-c', 'sleep 2; seq 100010 | while read i; do echo nanana $(($i % 100)) for test 4; done' ]
filters:
findIP:
regex:
- '^nomatch <num> for test <idx>$'
actions:
store2redis:
cmd: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis', 'XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']

View File

@ -0,0 +1,62 @@
---
patterns:
num:
regex: '[0-9]+'
idx:
regex: '[0-9]+'
ip:
regex: '(?:(?:[0-9]{1,3}\.){3}[0-9]{1,3})|(?:[0-9a-fA-F:]{2,90})'
ignore:
- 1.0.0.1
concurrency: 0
outputs:
redis:
start: ['redis-cli', '-h', 'redis.example.org', '-a', 'mypasswordoncmdlinedontdothis']
streams:
tailDown1:
cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo found $(($i % 100)) for test 1; done' ]
filters:
findIP:
regex:
- '^found <num> for test <idx>$'
actions:
store2redis:
write:
output: redis
text: ['XADD', 'teststream', '*', 'found', '<num>', 'test', '<idx>']
tailDown2:
cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo prout $(($i % 100)) for test 2; done' ]
filters:
findIP:
regex:
- '^prout <num> for test <idx>$'
actions:
store2redis:
write:
output: redis
text: ['XADD', 'teststream', '*', 'prout', '<num>', 'test', '<idx>']
tailDown3:
cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo nanana $(($i % 100)) for test 3; done' ]
filters:
findIP:
regex:
- '^nanana <num> for test <idx>$'
actions:
store2redis:
write:
output: redis
text: ['XADD', 'teststream', '*', 'nanana', '<num>', 'test', '<idx>']
tailDown4:
cmd: [ 'sh', '-c', 'seq 100010 | while read i; do echo nanana $(($i % 100)) for test 4; done' ]
filters:
findIP:
regex:
- '^nomatch <num> for test <idx>$'
actions:
store2redis:
write:
output: redis
text: ['XADD', 'teststream', '*', 'nomatch', '<num>', 'test', '<idx>']