diff --git a/app/daemon.go b/app/daemon.go index 4ff70c8..c8ddcbc 100644 --- a/app/daemon.go +++ b/app/daemon.go @@ -187,7 +187,7 @@ func MatchesManager() { matchesManagerHandleFlush(fo) case pft = <-matchesC: - entry := LogEntry{pft.t, pft.p, pft.f.stream.name, pft.f.name, false} + entry := LogEntry{pft.t, pft.p, pft.f.stream.name, pft.f.name, 0, false} entry.Exec = matchesManagerHandleMatch(pft) diff --git a/app/persist.go b/app/persist.go index f1646c9..87127f4 100644 --- a/app/persist.go +++ b/app/persist.go @@ -43,16 +43,18 @@ func DatabaseManager(c *Conf) { } func (c *Conf) manageLogs(logDB *WriteDB, flushDB *WriteDB) { - var cpt int + cpt := 0 + writeSF2int := make(map[SF]int) + writeCpt := 1 for { select { case entry := <-flushToDatabaseC: flushDB.enc.Encode(entry) case entry := <-logsC: - logDB.enc.Encode(entry) + encodeOrFatal(logDB.enc, entry, writeSF2int, &writeCpt) cpt++ // let's say 100 000 entries ~ 10 MB - if cpt == 100_000 { + if cpt == 500_000 { cpt = 0 logger.Printf(logger.INFO, "Rotating database...") logDB.file.Close() @@ -111,6 +113,12 @@ func (c *Conf) RotateDB(startup bool) (*WriteDB, *WriteDB) { } func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.Encoder, startup bool) { + // This mapping is a space optimization feature + // It permits to compress stream+filter to a small number (which is a byte in gob) + // We do this only for matches, not for flushes + readSF2int := make(map[int]SF) + writeSF2int := make(map[SF]int) + writeCounter := 1 // This extra code is made to warn only one time for each non-existant filter discardedEntries := make(map[SF]int) malformedEntries := 0 @@ -170,13 +178,36 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E continue } - // retrieve related filter - if stream := c.Streams[entry.Stream]; stream != nil { - filter = stream.Filters[entry.Filter] - } - if filter == nil { - discardedEntries[SF{entry.Stream, entry.Filter}]++ - continue + // retrieve related stream & filter + if entry.Stream == "" && entry.Filter == "" { + if entry.SF != 0 { + sf, ok := readSF2int[entry.SF] + if ok { + entry.Stream = sf.s + entry.Filter = sf.f + } else { + discardedEntries[SF{"", ""}]++ + continue + } + } else { + discardedEntries[SF{"", ""}]++ + continue + } + // Only one of Stream, Filter is non-empty + // } else if entry.Stream == "" || entry.Filter == "" { + // discardedEntries[SF{"", ""}]++ + // continue + } else { + if stream := c.Streams[entry.Stream]; stream != nil { + filter = stream.Filters[entry.Filter] + } + if filter == nil { + discardedEntries[SF{entry.Stream, entry.Filter}]++ + continue + } + if entry.SF != 0 { + readSF2int[entry.SF] = SF{entry.Stream, entry.Filter} + } } // check if it hasn't been flushed @@ -193,7 +224,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E startupMatchesC <- PFT{entry.Pattern, filter, entry.T} } - encodeOrFatal(logEnc, entry) + encodeOrFatal(logEnc, entry, writeSF2int, &writeCounter) } // replay executions @@ -203,12 +234,22 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E filter.sendActions(entry.Pattern, entry.T) } - encodeOrFatal(logEnc, entry) + encodeOrFatal(logEnc, entry, writeSF2int, &writeCounter) } } } -func encodeOrFatal(enc *gob.Encoder, entry LogEntry) { +func encodeOrFatal(enc *gob.Encoder, entry LogEntry, writeSF2int map[SF]int, writeCounter *int) { + sf, ok := writeSF2int[SF{entry.Stream, entry.Filter}] + if ok { + entry.SF = sf + entry.Stream = "" + entry.Filter = "" + } else { + entry.SF = *writeCounter + writeSF2int[SF{entry.Stream, entry.Filter}] = *writeCounter + *writeCounter++ + } err := enc.Encode(entry) if err != nil { logger.Fatalln("Failed to write to new DB:", err) diff --git a/app/pipe.go b/app/pipe.go index 602fdec..637ca86 100644 --- a/app/pipe.go +++ b/app/pipe.go @@ -99,7 +99,7 @@ func SocketManager(streams map[string]*Stream) { case Show: response.ClientStatus = genClientStatus(actions, matches, &actionsLock, &matchesLock) case Flush: - le := LogEntry{time.Now(), request.Pattern, "", "", false} + le := LogEntry{time.Now(), request.Pattern, "", "", 0, false} matchesC := FlushMatchOrder{request.Pattern, make(chan MatchesMap)} actionsC := FlushActionOrder{request.Pattern, make(chan ActionsMap)} flushToMatchesC <- matchesC diff --git a/app/types.go b/app/types.go index cd196f3..b8e67b7 100644 --- a/app/types.go +++ b/app/types.go @@ -64,6 +64,7 @@ type LogEntry struct { T time.Time Pattern string Stream, Filter string + SF int Exec bool }