From b56ccffd3bf2b678c78c997f31f44fd646e3a995 Mon Sep 17 00:00:00 2001 From: ppom <> Date: Fri, 20 Oct 2023 12:00:00 +0200 Subject: [PATCH] Fix persistence bug --- app/persist.go | 51 +++++++++++++++----------------------- config/persistence.jsonnet | 50 +++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 31 deletions(-) create mode 100644 config/persistence.jsonnet diff --git a/app/persist.go b/app/persist.go index 87127f4..237d4a3 100644 --- a/app/persist.go +++ b/app/persist.go @@ -133,15 +133,13 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E } }() - var err error - var entry LogEntry - var filter *Filter - // pattern, stream, fitler → last flush flushes := make(map[PSF]time.Time) for { + var entry LogEntry + var filter *Filter // decode entry - err = flushDec.Decode(&entry) + err := flushDec.Decode(&entry) if err != nil { if err == io.EOF { break @@ -167,9 +165,11 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E now := time.Now() for { + var entry LogEntry + var filter *Filter // decode entry - err = logDec.Decode(&entry) + err := logDec.Decode(&entry) if err != nil { if err == io.EOF { break @@ -180,34 +180,23 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E // retrieve related stream & filter if entry.Stream == "" && entry.Filter == "" { - if entry.SF != 0 { - sf, ok := readSF2int[entry.SF] - if ok { - entry.Stream = sf.s - entry.Filter = sf.f - } else { - discardedEntries[SF{"", ""}]++ - continue - } - } else { + sf, ok := readSF2int[entry.SF] + if !ok { discardedEntries[SF{"", ""}]++ continue } - // Only one of Stream, Filter is non-empty - // } else if entry.Stream == "" || entry.Filter == "" { - // discardedEntries[SF{"", ""}]++ - // continue - } else { - if stream := c.Streams[entry.Stream]; stream != nil { - filter = stream.Filters[entry.Filter] - } - if filter == nil { - discardedEntries[SF{entry.Stream, entry.Filter}]++ - continue - } - if entry.SF != 0 { - readSF2int[entry.SF] = SF{entry.Stream, entry.Filter} - } + entry.Stream = sf.s + entry.Filter = sf.f + } + if stream := c.Streams[entry.Stream]; stream != nil { + filter = stream.Filters[entry.Filter] + } + if filter == nil { + discardedEntries[SF{entry.Stream, entry.Filter}]++ + continue + } + if entry.SF != 0 { + readSF2int[entry.SF] = SF{entry.Stream, entry.Filter} } // check if it hasn't been flushed diff --git a/config/persistence.jsonnet b/config/persistence.jsonnet new file mode 100644 index 0000000..f3f58c1 --- /dev/null +++ b/config/persistence.jsonnet @@ -0,0 +1,50 @@ +{ + patterns: { + num: { + regex: '[0-9]+', + }, + }, + + streams: { + tailDown1: { + cmd: ['sh', '-c', "echo 01 02 03 04 05 | tr ' ' '\n' | while read i; do sleep 0.5; echo found $i; done"], + filters: { + findIP1: { + regex: ['^found $'], + retry: 1, + retryperiod: '2m', + actions: { + damn: { + cmd: ['echo', ''], + }, + undamn: { + cmd: ['echo', 'undamn', ''], + after: '1m', + onexit: true, + }, + }, + }, + }, + }, + tailDown2: { + cmd: ['sh', '-c', "echo 11 12 13 14 15 11 13 15 | tr ' ' '\n' | while read i; do sleep 0.3; echo found $i; done"], + filters: { + findIP2: { + regex: ['^found $'], + retry: 2, + retryperiod: '2m', + actions: { + damn: { + cmd: ['echo', ''], + }, + undamn: { + cmd: ['echo', 'undamn', ''], + after: '1m', + onexit: true, + }, + }, + }, + }, + }, + }, +}