Compare commits

...

3 Commits

6 changed files with 82 additions and 74 deletions

View File

@ -8,6 +8,7 @@ import (
"net" "net"
"os" "os"
"regexp" "regexp"
"strings"
"framagit.org/ppom/reaction/logger" "framagit.org/ppom/reaction/logger"
"sigs.k8s.io/yaml" "sigs.k8s.io/yaml"
@ -20,7 +21,7 @@ const (
type Request struct { type Request struct {
Request int Request int
Pattern []string Pattern string
} }
type Response struct { type Response struct {
@ -85,7 +86,7 @@ func usage(err string) {
} }
func ClientShow(format, stream, filter string, regex *regexp.Regexp) { func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
response := SendAndRetrieve(Request{Show, []string{""}}) response := SendAndRetrieve(Request{Show, ""})
if response.Err != nil { if response.Err != nil {
logger.Fatalln("Received error from daemon:", response.Err) logger.Fatalln("Received error from daemon:", response.Err)
} }
@ -137,9 +138,15 @@ func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
if regex != nil { if regex != nil {
for streamName := range response.ClientStatus { for streamName := range response.ClientStatus {
for filterName := range response.ClientStatus[streamName] { for filterName := range response.ClientStatus[streamName] {
for patternName := range response.ClientStatus[streamName][filterName] { for patterns := range response.ClientStatus[streamName][filterName] {
if !regex.MatchString(patternName) { pmatch := false
delete(response.ClientStatus[streamName][filterName], patternName) for _, p := range strings.Split(patterns, "\x00") {
if regex.MatchString(p) {
pmatch = true
}
}
if !pmatch {
delete(response.ClientStatus[streamName][filterName], patterns)
} }
} }
if len(response.ClientStatus[streamName][filterName]) == 0 { if len(response.ClientStatus[streamName][filterName]) == 0 {
@ -162,12 +169,22 @@ func ClientShow(format, stream, filter string, regex *regexp.Regexp) {
if err != nil { if err != nil {
logger.Fatalln("Failed to convert daemon binary response to text format:", err) logger.Fatalln("Failed to convert daemon binary response to text format:", err)
} }
// Replace \0 joined string with space joined string ("1.2.3.4\0root" -> "1.2.3.4 root")
for streamName := range response.ClientStatus {
for filterName := range response.ClientStatus[streamName] {
for patterns := range response.ClientStatus[streamName][filterName] {
text = []byte(strings.ReplaceAll(string(text), strings.Join(strings.Split(patterns, "\x00"), "\\0"), strings.Join(strings.Split(patterns, "\x00"), " ")))
}
}
}
fmt.Println(string(text)) fmt.Println(string(text))
os.Exit(0) os.Exit(0)
} }
func ClientFlush(pattern []string, streamfilter, format string) { func ClientFlush(patterns []string, streamfilter, format string) {
response := SendAndRetrieve(Request{Flush, pattern}) response := SendAndRetrieve(Request{Flush, strings.Join(patterns, "\x00")})
if response.Err != nil { if response.Err != nil {
logger.Fatalln("Received error from daemon:", response.Err) logger.Fatalln("Received error from daemon:", response.Err)
os.Exit(1) os.Exit(1)

View File

@ -13,16 +13,6 @@ import (
"framagit.org/ppom/reaction/logger" "framagit.org/ppom/reaction/logger"
) )
// Compare content and ordering. Case sensitive.
func IsStringArrayEqual(one, two []string) bool {
for i, a := range one {
if a != two[i] {
return false
}
}
return true
}
// Executes a command and channel-send its stdout // Executes a command and channel-send its stdout
func cmdStdout(commandline []string) chan *string { func cmdStdout(commandline []string) chan *string {
lines := make(chan *string) lines := make(chan *string)
@ -87,8 +77,8 @@ func (p *Pattern) notAnIgnore(match *string) bool {
} }
// Whether one of the filter's regexes is matched on a line // Whether one of the filter's regexes is matched on a line
func (f *Filter) match(line *string) []string { func (f *Filter) match(line *string) string {
var result []string var result string
for _, regex := range f.compiledRegex { for _, regex := range f.compiledRegex {
if matches := regex.FindStringSubmatch(*line); matches != nil { if matches := regex.FindStringSubmatch(*line); matches != nil {
@ -101,44 +91,47 @@ func (f *Filter) match(line *string) []string {
match := matches[regex.SubexpIndex(p.name)] match := matches[regex.SubexpIndex(p.name)]
if p.notAnIgnore(&match) { if p.notAnIgnore(&match) {
logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match) logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match)
result = append(result, match) if len(result) == 0 {
result = match
} else {
result = strings.Join([]string{result, match}, "\x00")
}
} }
} }
if f.pattern == nil { if f.pattern == nil {
// No pattern, so this match will never actually be used // No pattern, so this match will never actually be used
return nil return ""
} }
} }
} }
if len(result) == len(f.pattern) { if len(strings.Split(result, "\x00")) == len(f.pattern) {
return result return result
} else { } else {
// Incomplete match = no match // Incomplete match = no match
return nil return ""
} }
} }
func (f *Filter) sendActions(match []string, at time.Time) { func (f *Filter) sendActions(match string, at time.Time) {
for _, a := range f.Actions { for _, a := range f.Actions {
actionsC <- PAT{match, a, at.Add(a.afterDuration)} actionsC <- PAT{match, a, at.Add(a.afterDuration)}
} }
} }
func (a *Action) exec(match []string) { func (a *Action) exec(match string) {
defer wgActions.Done() defer wgActions.Done()
var computedCommand []string var computedCommand []string
var cmdItem string
if a.filter.pattern != nil { if a.filter.pattern != nil {
computedCommand = make([]string, 0, len(a.Cmd)) computedCommand = make([]string, 0, len(a.Cmd))
matches := strings.Split(match, "\x00")
for _, item := range a.Cmd { for _, item := range a.Cmd {
cmdItem = strings.Clone(item)
for i, p := range a.filter.pattern { for i, p := range a.filter.pattern {
cmdItem = strings.ReplaceAll(cmdItem, p.nameWithBraces, match[i]) item = strings.ReplaceAll(item, p.nameWithBraces, matches[i])
} }
computedCommand = append(computedCommand, cmdItem) computedCommand = append(computedCommand, item)
} }
} else { } else {
computedCommand = a.Cmd computedCommand = a.Cmd
@ -177,7 +170,7 @@ func ActionsManager(concurrency int) {
} }
}() }()
} }
execAction := func(a *Action, p []string) { execAction := func(a *Action, p string) {
wgActions.Add(1) wgActions.Add(1)
execActionsC <- PA{p, a} execActionsC <- PA{p, a}
} }
@ -195,10 +188,10 @@ func ActionsManager(concurrency int) {
execAction(action, pattern) execAction(action, pattern)
} else { } else {
actionsLock.Lock() actionsLock.Lock()
if actions[&pa] == nil { if actions[pa] == nil {
actions[&pa] = make(map[time.Time]struct{}) actions[pa] = make(map[time.Time]struct{})
} }
actions[&pa][then] = struct{}{} actions[pa][then] = struct{}{}
actionsLock.Unlock() actionsLock.Unlock()
go func(insidePat PAT, insideNow time.Time) { go func(insidePat PAT, insideNow time.Time) {
time.Sleep(insidePat.t.Sub(insideNow)) time.Sleep(insidePat.t.Sub(insideNow))
@ -209,8 +202,8 @@ func ActionsManager(concurrency int) {
pa := PA{pat.p, pat.a} pa := PA{pat.p, pat.a}
pattern, action, then := pat.p, pat.a, pat.t pattern, action, then := pat.p, pat.a, pat.t
actionsLock.Lock() actionsLock.Lock()
if actions[&pa] != nil { if actions[pa] != nil {
delete(actions[&pa], then) delete(actions[pa], then)
} }
actionsLock.Unlock() actionsLock.Unlock()
execAction(action, pattern) execAction(action, pattern)
@ -218,7 +211,7 @@ func ActionsManager(concurrency int) {
ret := make(ActionsMap) ret := make(ActionsMap)
actionsLock.Lock() actionsLock.Lock()
for pa := range actions { for pa := range actions {
if IsStringArrayEqual(pa.p, fo.p) { if pa.p == fo.p {
for range actions[pa] { for range actions[pa] {
execAction(pa.a, pa.p) execAction(pa.a, pa.p)
} }
@ -268,7 +261,7 @@ func MatchesManager() {
matchesManagerHandleFlush(fo) matchesManagerHandleFlush(fo)
case pft = <-matchesC: case pft = <-matchesC:
entry := LogEntry{pft.t, 0, pft.p, pft.f.stream.name, pft.f.name, 0, false} entry := LogEntry{pft.t, 0, strings.Join(strings.Split(pft.p, "\x00"), " / "), pft.f.stream.name, pft.f.name, 0, false}
entry.Exec = matchesManagerHandleMatch(pft) entry.Exec = matchesManagerHandleMatch(pft)
@ -281,7 +274,7 @@ func matchesManagerHandleFlush(fo FlushMatchOrder) {
ret := make(MatchesMap) ret := make(MatchesMap)
matchesLock.Lock() matchesLock.Lock()
for pf := range matches { for pf := range matches {
if IsStringArrayEqual(fo.p, pf.p) { if fo.p == pf.p {
if fo.ret != nil { if fo.ret != nil {
ret[pf] = matches[pf] ret[pf] = matches[pf]
} }
@ -298,32 +291,32 @@ func matchesManagerHandleMatch(pft PFT) bool {
matchesLock.Lock() matchesLock.Lock()
defer matchesLock.Unlock() defer matchesLock.Unlock()
filter, pattern, then := pft.f, pft.p, pft.t filter, patterns, then := pft.f, pft.p, pft.t
pf := PF{pft.p, pft.f} pf := PF{pft.p, pft.f}
if filter.Retry > 1 { if filter.Retry > 1 {
// make sure map exists // make sure map exists
if matches[&pf] == nil { if matches[pf] == nil {
matches[&pf] = make(map[time.Time]struct{}) matches[pf] = make(map[time.Time]struct{})
} }
// add new match // add new match
matches[&pf][then] = struct{}{} matches[pf][then] = struct{}{}
// remove match when expired // remove match when expired
go func(pf PF, then time.Time) { go func(pf PF, then time.Time) {
time.Sleep(then.Sub(time.Now()) + filter.retryDuration) time.Sleep(then.Sub(time.Now()) + filter.retryDuration)
matchesLock.Lock() matchesLock.Lock()
if matches[&pf] != nil { if matches[pf] != nil {
// FIXME replace this and all similar occurences // FIXME replace this and all similar occurences
// by clear() when switching to go 1.21 // by clear() when switching to go 1.21
delete(matches[&pf], then) delete(matches[pf], then)
} }
matchesLock.Unlock() matchesLock.Unlock()
}(pf, then) }(pf, then)
} }
if filter.Retry <= 1 || len(matches[&pf]) >= filter.Retry { if filter.Retry <= 1 || len(matches[pf]) >= filter.Retry {
delete(matches, &pf) delete(matches, pf)
filter.sendActions(pattern, then) filter.sendActions(patterns, then)
return true return true
} }
return false return false

View File

@ -60,7 +60,8 @@ func subCommandParse(f *flag.FlagSet, maxRemainingArgs int) {
basicUsage() basicUsage()
os.Exit(0) os.Exit(0)
} }
if len(f.Args()) > maxRemainingArgs { // -1 = no limit to remaining args
if maxRemainingArgs > -1 && len(f.Args()) > maxRemainingArgs {
fmt.Printf("ERROR unrecognized argument(s): %v\n", f.Args()[maxRemainingArgs:]) fmt.Printf("ERROR unrecognized argument(s): %v\n", f.Args()[maxRemainingArgs:])
basicUsage() basicUsage()
os.Exit(1) os.Exit(1)
@ -102,9 +103,7 @@ func basicUsage() {
` + bold + `reaction flush` + reset + ` TARGET ` + bold + `reaction flush` + reset + ` TARGET
# remove currently active matches and run currently pending actions for the specified TARGET # remove currently active matches and run currently pending actions for the specified TARGET
# (then show flushed matches and actions) # (then show flushed matches and actions)
# e.g. reaction flush 192.168.1.1 # e.g. reaction flush 192.168.1.1 root
# Concatenate patterns with " / " if several patterns in TARGET
# e.g. reaction flush "192.168.1.1 / root"
# options: # options:
-s/--socket SOCKET # path to the client-daemon communication socket -s/--socket SOCKET # path to the client-daemon communication socket
@ -196,7 +195,7 @@ func Main(version, commit string) {
SocketPath = addSocketFlag(f) SocketPath = addSocketFlag(f)
queryFormat := addFormatFlag(f) queryFormat := addFormatFlag(f)
limit := addLimitFlag(f) limit := addLimitFlag(f)
subCommandParse(f, 1) subCommandParse(f, -1)
if *queryFormat != "yaml" && *queryFormat != "json" { if *queryFormat != "yaml" && *queryFormat != "json" {
logger.Fatalln("only yaml and json formats are supported") logger.Fatalln("only yaml and json formats are supported")
f.PrintDefaults() f.PrintDefaults()
@ -211,7 +210,7 @@ func Main(version, commit string) {
logger.Fatalln("for now, -l/--limit is not supported") logger.Fatalln("for now, -l/--limit is not supported")
os.Exit(1) os.Exit(1)
} }
ClientFlush(strings.Split(f.Arg(0), " / "), *limit, *queryFormat) ClientFlush(f.Args(), *limit, *queryFormat)
case "test-regex": case "test-regex":
// socket not needed, no interaction with the daemon // socket not needed, no interaction with the daemon

View File

@ -7,7 +7,6 @@ import (
"path" "path"
"sync" "sync"
"time" "time"
"strings"
"framagit.org/ppom/reaction/logger" "framagit.org/ppom/reaction/logger"
) )
@ -18,14 +17,14 @@ func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_a
// Painful data manipulation // Painful data manipulation
for pf, times := range local_matches { for pf, times := range local_matches {
pattern, filter := pf.p, pf.f patterns, filter := pf.p, pf.f
if cs[filter.stream.name] == nil { if cs[filter.stream.name] == nil {
cs[filter.stream.name] = make(map[string]MapPatternStatus) cs[filter.stream.name] = make(map[string]MapPatternStatus)
} }
if cs[filter.stream.name][filter.name] == nil { if cs[filter.stream.name][filter.name] == nil {
cs[filter.stream.name][filter.name] = make(MapPatternStatus) cs[filter.stream.name][filter.name] = make(MapPatternStatus)
} }
cs[filter.stream.name][filter.name][strings.Join(pattern, " / ")] = &PatternStatus{len(times), nil} cs[filter.stream.name][filter.name][patterns] = &PatternStatus{len(times), nil}
} }
local_matchesLock.Unlock() local_matchesLock.Unlock()
@ -33,17 +32,17 @@ func genClientStatus(local_actions ActionsMap, local_matches MatchesMap, local_a
// Painful data manipulation // Painful data manipulation
for pa, times := range local_actions { for pa, times := range local_actions {
pattern, action := pa.p, pa.a patterns, action := pa.p, pa.a
if cs[action.filter.stream.name] == nil { if cs[action.filter.stream.name] == nil {
cs[action.filter.stream.name] = make(map[string]MapPatternStatus) cs[action.filter.stream.name] = make(map[string]MapPatternStatus)
} }
if cs[action.filter.stream.name][action.filter.name] == nil { if cs[action.filter.stream.name][action.filter.name] == nil {
cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus) cs[action.filter.stream.name][action.filter.name] = make(MapPatternStatus)
} }
if cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] == nil { if cs[action.filter.stream.name][action.filter.name][patterns] == nil {
cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] = new(PatternStatus) cs[action.filter.stream.name][action.filter.name][patterns] = new(PatternStatus)
} }
ps := cs[action.filter.stream.name][action.filter.name][strings.Join(pattern, " / ")] ps := cs[action.filter.stream.name][action.filter.name][patterns]
if ps.Actions == nil { if ps.Actions == nil {
ps.Actions = make(map[string][]string) ps.Actions = make(map[string][]string)
} }

View File

@ -75,17 +75,17 @@ func (c *Conf) setup() {
filter.name = filterName filter.name = filterName
if strings.Contains(filter.name, ".") { if strings.Contains(filter.name, ".") {
logger.Fatalf(fmt.Sprintf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name)) logger.Fatalf("Bad configuration: character '.' is not allowed in filter names: '%v'", filter.name)
} }
// Parse Duration // Parse Duration
if filter.RetryPeriod == "" { if filter.RetryPeriod == "" {
if filter.Retry > 1 { if filter.Retry > 1 {
logger.Fatalf(fmt.Sprintf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name)) logger.Fatalf("Bad configuration: retry but no retryperiod in %v.%v", stream.name, filter.name)
} }
} else { } else {
retryDuration, err := time.ParseDuration(filter.RetryPeriod) retryDuration, err := time.ParseDuration(filter.RetryPeriod)
if err != nil { if err != nil {
logger.Fatalf(fmt.Sprintf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err)) logger.Fatalf("Bad configuration: Failed to parse retry time in %v.%v: %v", stream.name, filter.name, err)
} }
filter.retryDuration = retryDuration filter.retryDuration = retryDuration
} }
@ -106,7 +106,7 @@ func (c *Conf) setup() {
} }
compiledRegex, err := regexp.Compile(regex) compiledRegex, err := regexp.Compile(regex)
if err != nil { if err != nil {
log.Fatal(fmt.Sprintf("Bad configuration: regex of filter %s.%s: %v", stream.name, filter.name, err)) log.Fatal("Bad configuration: regex of filter %s.%s: %v", stream.name, filter.name, err)
} }
filter.compiledRegex = append(filter.compiledRegex, *compiledRegex) filter.compiledRegex = append(filter.compiledRegex, *compiledRegex)
} }

View File

@ -67,7 +67,7 @@ type Action struct {
type LogEntry struct { type LogEntry struct {
T time.Time T time.Time
S int64 S int64
Pattern []string Pattern string
Stream, Filter string Stream, Filter string
SF int SF int
Exec bool Exec bool
@ -83,42 +83,42 @@ type WriteDB struct {
enc *gob.Encoder enc *gob.Encoder
} }
// https://stackoverflow.com/a/69691894 // https://stackoverflow.com/a/69691894
type MatchesMap map[*PF]map[time.Time]struct{} type MatchesMap map[PF]map[time.Time]struct{}
type ActionsMap map[*PA]map[time.Time]struct{} type ActionsMap map[PA]map[time.Time]struct{}
// Helper structs made to carry information // Helper structs made to carry information
// Stream, Filter // Stream, Filter
type SF struct{ s, f string } type SF struct{ s, f string }
// Pattern, Stream, Filter // Pattern, Stream, Filter
type PSF struct{ type PSF struct{
p []string p string
s string s string
f string f string
} }
type PF struct { type PF struct {
p []string p string
f *Filter f *Filter
} }
type PFT struct { type PFT struct {
p []string p string
f *Filter f *Filter
t time.Time t time.Time
} }
type PA struct { type PA struct {
p []string p string
a *Action a *Action
} }
type PAT struct { type PAT struct {
p []string p string
a *Action a *Action
t time.Time t time.Time
} }
type FlushMatchOrder struct { type FlushMatchOrder struct {
p []string p string
ret chan MatchesMap ret chan MatchesMap
} }
type FlushActionOrder struct { type FlushActionOrder struct {
p []string p string
ret chan ActionsMap ret chan ActionsMap
} }