Rename global channels
This commit is contained in:
parent
f29b5ec87f
commit
83e760ed72
@ -147,7 +147,7 @@ func MatchesManager() {
|
||||
|
||||
for !end {
|
||||
select {
|
||||
case pf = <-cleanMatchesC:
|
||||
case pf = <-flushToMatchesC:
|
||||
delete(matches, pf)
|
||||
case pft, ok := <-startupMatchesC:
|
||||
if !ok {
|
||||
@ -160,7 +160,7 @@ func MatchesManager() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case pf = <-cleanMatchesC:
|
||||
case pf = <-flushToMatchesC:
|
||||
matchesLock.Lock()
|
||||
delete(matches, pf)
|
||||
matchesLock.Unlock()
|
||||
@ -243,11 +243,19 @@ var stopActions chan bool
|
||||
var wgActions sync.WaitGroup
|
||||
var wgStreams sync.WaitGroup
|
||||
|
||||
// MatchesManager → DatabaseManager
|
||||
var logsC chan LogEntry
|
||||
|
||||
// SocketManager → DatabaseManager
|
||||
var flushesC chan LogEntry
|
||||
/*
|
||||
<streamcmds>
|
||||
↓
|
||||
StreamManager onstartup:matches
|
||||
↓ ↓ ↑
|
||||
matches→ MatchesManager →logs→ DatabaseManager ←·
|
||||
↑ ↓ ↑
|
||||
↑ actions→ ActionsManager ↑
|
||||
↑ ↑ ↑
|
||||
SocketManager →flushes→→→→→→→→→→·→→→→→→→→→→→→→→→→·
|
||||
↑
|
||||
<clients>
|
||||
*/
|
||||
|
||||
// DatabaseManager → MatchesManager
|
||||
var startupMatchesC chan PFT
|
||||
@ -255,21 +263,31 @@ var startupMatchesC chan PFT
|
||||
// StreamManager → MatchesManager
|
||||
var matchesC chan PFT
|
||||
|
||||
// StreamManager, DatabaseManager → MatchesManager
|
||||
var cleanMatchesC chan PF
|
||||
// MatchesManager → DatabaseManager
|
||||
var logsC chan LogEntry
|
||||
|
||||
// MatchesManager → ExecsManager
|
||||
// MatchesManager → ActionsManager
|
||||
var actionsC chan PAT
|
||||
|
||||
// SocketManager, DatabaseManager → MatchesManager
|
||||
var flushToMatchesC chan PF
|
||||
|
||||
// SocketManager → ActionsManager
|
||||
var flushToActionsC chan PF
|
||||
|
||||
// SocketManager → DatabaseManager
|
||||
var flushToDatabaseC chan LogEntry
|
||||
|
||||
func Daemon(confFilename string) {
|
||||
conf := parseConf(confFilename)
|
||||
|
||||
logsC = make(chan LogEntry)
|
||||
flushesC = make(chan LogEntry)
|
||||
matchesC = make(chan PFT)
|
||||
startupMatchesC = make(chan PFT)
|
||||
cleanMatchesC = make(chan PF)
|
||||
matchesC = make(chan PFT)
|
||||
logsC = make(chan LogEntry)
|
||||
actionsC = make(chan PAT)
|
||||
flushToMatchesC = make(chan PF)
|
||||
flushToActionsC = make(chan PF)
|
||||
flushToDatabaseC = make(chan LogEntry)
|
||||
stopActions = make(chan bool)
|
||||
stopStreams = make(chan bool)
|
||||
actions = make(ActionsMap)
|
||||
|
@ -45,7 +45,7 @@ func (c *Conf) manageLogs(logDB *WriteDB, flushDB *WriteDB) {
|
||||
var cpt int
|
||||
for {
|
||||
select {
|
||||
case entry := <-flushesC:
|
||||
case entry := <-flushToDatabaseC:
|
||||
flushDB.enc.Encode(entry)
|
||||
case entry := <-logsC:
|
||||
logDB.enc.Encode(entry)
|
||||
@ -201,7 +201,7 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E
|
||||
// replay executions
|
||||
if entry.Exec && entry.T.Add(*filter.longuestActionDuration).Unix() > now.Unix() {
|
||||
if startup {
|
||||
cleanMatchesC <- PF{entry.Pattern, filter}
|
||||
flushToMatchesC <- PF{entry.Pattern, filter}
|
||||
filter.sendActions(entry.Pattern, entry.T)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user