Reduce database size

...and it's not a breaking change!
fix #36

Benchmark: using config/heavy-load.yml
From 14MB to 8MB → 40% of size reduction!
This commit is contained in:
ppom 2023-10-20 12:00:00 +02:00
parent 345dd94b17
commit ed3cd4384c
4 changed files with 57 additions and 15 deletions

View File

@ -187,7 +187,7 @@ func MatchesManager() {
matchesManagerHandleFlush(fo) matchesManagerHandleFlush(fo)
case pft = <-matchesC: case pft = <-matchesC:
entry := LogEntry{pft.t, pft.p, pft.f.stream.name, pft.f.name, false} entry := LogEntry{pft.t, pft.p, pft.f.stream.name, pft.f.name, 0, false}
entry.Exec = matchesManagerHandleMatch(pft) entry.Exec = matchesManagerHandleMatch(pft)

View File

@ -43,16 +43,18 @@ func DatabaseManager(c *Conf) {
} }
func (c *Conf) manageLogs(logDB *WriteDB, flushDB *WriteDB) { func (c *Conf) manageLogs(logDB *WriteDB, flushDB *WriteDB) {
var cpt int cpt := 0
writeSF2int := make(map[SF]int)
writeCpt := 1
for { for {
select { select {
case entry := <-flushToDatabaseC: case entry := <-flushToDatabaseC:
flushDB.enc.Encode(entry) flushDB.enc.Encode(entry)
case entry := <-logsC: case entry := <-logsC:
logDB.enc.Encode(entry) encodeOrFatal(logDB.enc, entry, writeSF2int, &writeCpt)
cpt++ cpt++
// let's say 100 000 entries ~ 10 MB // let's say 100 000 entries ~ 10 MB
if cpt == 100_000 { if cpt == 500_000 {
cpt = 0 cpt = 0
logger.Printf(logger.INFO, "Rotating database...") logger.Printf(logger.INFO, "Rotating database...")
logDB.file.Close() logDB.file.Close()
@ -111,6 +113,12 @@ func (c *Conf) RotateDB(startup bool) (*WriteDB, *WriteDB) {
} }
func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.Encoder, startup bool) { func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.Encoder, startup bool) {
// This mapping is a space optimization feature
// It permits to compress stream+filter to a small number (which is a byte in gob)
// We do this only for matches, not for flushes
readSF2int := make(map[int]SF)
writeSF2int := make(map[SF]int)
writeCounter := 1
// This extra code is made to warn only one time for each non-existant filter // This extra code is made to warn only one time for each non-existant filter
discardedEntries := make(map[SF]int) discardedEntries := make(map[SF]int)
malformedEntries := 0 malformedEntries := 0
@ -170,13 +178,36 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
continue continue
} }
// retrieve related filter // retrieve related stream & filter
if stream := c.Streams[entry.Stream]; stream != nil { if entry.Stream == "" && entry.Filter == "" {
filter = stream.Filters[entry.Filter] if entry.SF != 0 {
} sf, ok := readSF2int[entry.SF]
if filter == nil { if ok {
discardedEntries[SF{entry.Stream, entry.Filter}]++ entry.Stream = sf.s
continue entry.Filter = sf.f
} else {
discardedEntries[SF{"", ""}]++
continue
}
} else {
discardedEntries[SF{"", ""}]++
continue
}
// Only one of Stream, Filter is non-empty
// } else if entry.Stream == "" || entry.Filter == "" {
// discardedEntries[SF{"", ""}]++
// continue
} else {
if stream := c.Streams[entry.Stream]; stream != nil {
filter = stream.Filters[entry.Filter]
}
if filter == nil {
discardedEntries[SF{entry.Stream, entry.Filter}]++
continue
}
if entry.SF != 0 {
readSF2int[entry.SF] = SF{entry.Stream, entry.Filter}
}
} }
// check if it hasn't been flushed // check if it hasn't been flushed
@ -193,7 +224,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
startupMatchesC <- PFT{entry.Pattern, filter, entry.T} startupMatchesC <- PFT{entry.Pattern, filter, entry.T}
} }
encodeOrFatal(logEnc, entry) encodeOrFatal(logEnc, entry, writeSF2int, &writeCounter)
} }
// replay executions // replay executions
@ -203,12 +234,22 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
filter.sendActions(entry.Pattern, entry.T) filter.sendActions(entry.Pattern, entry.T)
} }
encodeOrFatal(logEnc, entry) encodeOrFatal(logEnc, entry, writeSF2int, &writeCounter)
} }
} }
} }
func encodeOrFatal(enc *gob.Encoder, entry LogEntry) { func encodeOrFatal(enc *gob.Encoder, entry LogEntry, writeSF2int map[SF]int, writeCounter *int) {
sf, ok := writeSF2int[SF{entry.Stream, entry.Filter}]
if ok {
entry.SF = sf
entry.Stream = ""
entry.Filter = ""
} else {
entry.SF = *writeCounter
writeSF2int[SF{entry.Stream, entry.Filter}] = *writeCounter
*writeCounter++
}
err := enc.Encode(entry) err := enc.Encode(entry)
if err != nil { if err != nil {
logger.Fatalln("Failed to write to new DB:", err) logger.Fatalln("Failed to write to new DB:", err)

View File

@ -99,7 +99,7 @@ func SocketManager(streams map[string]*Stream) {
case Show: case Show:
response.ClientStatus = genClientStatus(actions, matches, &actionsLock, &matchesLock) response.ClientStatus = genClientStatus(actions, matches, &actionsLock, &matchesLock)
case Flush: case Flush:
le := LogEntry{time.Now(), request.Pattern, "", "", false} le := LogEntry{time.Now(), request.Pattern, "", "", 0, false}
matchesC := FlushMatchOrder{request.Pattern, make(chan MatchesMap)} matchesC := FlushMatchOrder{request.Pattern, make(chan MatchesMap)}
actionsC := FlushActionOrder{request.Pattern, make(chan ActionsMap)} actionsC := FlushActionOrder{request.Pattern, make(chan ActionsMap)}
flushToMatchesC <- matchesC flushToMatchesC <- matchesC

View File

@ -64,6 +64,7 @@ type LogEntry struct {
T time.Time T time.Time
Pattern string Pattern string
Stream, Filter string Stream, Filter string
SF int
Exec bool Exec bool
} }