Clean mqueue task now show before/after

This commit is contained in:
yo 2022-11-13 10:58:00 +01:00
parent 145d061c52
commit aff1c5af75

View File

@ -125,7 +125,7 @@ var (
File os.File File os.File
Writer *bufio.Writer Writer *bufio.Writer
Version = "0.6.9" Version = "0.6.10"
BuildInfo = promauto.NewGaugeVec(prometheus.GaugeOpts{ BuildInfo = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "openldaplogparser_build_info", Name: "openldaplogparser_build_info",
@ -321,29 +321,37 @@ func writeOut(msg string, filename string) error {
return nil return nil
} }
// Every 24H, remove connections closed more than 24 hours ago func cleanMQueue(mqueue map[string]*OpenLdapConnection, mqMtx *sync.Mutex, age time.Duration) {
func periodicallyCleanMQueue(mqueue map[string]*OpenLdapConnection, mqMtx *sync.Mutex) { var ok bool
var ok bool
for range time.Tick(time.Hour * 24) { log.Printf("Start cleaning queue task: %d items in queue", len(mqueue))
// Do we need read lock?
for _, ldcon := range mqueue { // Do we need read lock?
ok = false for uid, ldcon := range mqueue {
// Check if a close operation exist ok = false
for _, op := range ldcon.Operations { // Check if a close operation exist
if op.OpType == "close" { for _, op := range ldcon.Operations {
if op.Time.Add(time.Hour * 1 * 24).Before(time.Now()) { if op.OpType == "close" {
ok = true if op.Time.Add(age).Before(time.Now()) {
} ok = true
} }
} }
if ok == true {
mqMtx.Lock()
//delete(mqueue, ldcon.ConnId)
delete(mqueue, fmt.Sprintf("%s:%d", ldcon.Hostname, ldcon.ConnId))
mqMtx.Unlock()
}
} }
if ok == true {
mqMtx.Lock()
delete(mqueue, uid)
mqMtx.Unlock()
}
}
log.Printf("Finished cleaning queue task: %d items in queue", len(mqueue))
}
// Every 24H, remove connections closed more than 24 hours ago
func periodicallyCleanMQueue(mqueue map[string]*OpenLdapConnection, mqMtx *sync.Mutex) {
for range time.Tick(time.Hour * 24) {
cleanMQueue(mqueue, mqMtx, 24 * time.Hour)
} }
} }
@ -973,6 +981,17 @@ func processLogs(cmd *cobra.Command, args []string) {
// Cleaner thread // Cleaner thread
go periodicallyCleanMQueue(mQueue, &mqMtx) go periodicallyCleanMQueue(mQueue, &mqMtx)
// On demand Mqueue cleaning... For debug, dont try this at home, kids!
/* sig2 := make(chan os.Signal)
signal.Notify(sig2, syscall.SIGUSR2)
go func() {
for {
<-sig2
cleanMQueue(mQueue, &mqMtx, 5 * time.Minute)
}
}()
*/
// Initialize Stdin input... // Initialize Stdin input...
if true == strings.EqualFold(gSyslogListenAddress, "do-not-listen") { if true == strings.EqualFold(gSyslogListenAddress, "do-not-listen") {
useStdin = true useStdin = true