Compare commits
4 Commits
8a4ba4486d
...
v0.6.10
Author | SHA1 | Date | |
---|---|---|---|
aff1c5af75 | |||
145d061c52 | |||
ea673b809c | |||
b82615a974 |
@ -125,7 +125,7 @@ var (
|
||||
File os.File
|
||||
Writer *bufio.Writer
|
||||
|
||||
Version = "0.6.6"
|
||||
Version = "0.6.10"
|
||||
|
||||
BuildInfo = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "openldaplogparser_build_info",
|
||||
@ -217,7 +217,7 @@ func OlcToFlat(olc *OpenLdapConnection) []OpenLdapConnectionFlat {
|
||||
|
||||
for i := range olc.Operations {
|
||||
olcf[i] = OpenLdapConnectionFlat{
|
||||
Time: olc.Time,
|
||||
Time: olc.Operations[i].Time,
|
||||
Hostname: olc.Hostname,
|
||||
Process: olc.Process,
|
||||
ClientIp: olc.ClientIp,
|
||||
@ -321,35 +321,39 @@ func writeOut(msg string, filename string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Every 24H, remove sent, milter-rejected and deferred that entered queue more than 5 days ago
|
||||
/*
|
||||
func periodicallyCleanMQueue(mqueue map[int]*PostfixLogParser, mqMtx *sync.Mutex) {
|
||||
var ok int
|
||||
func cleanMQueue(mqueue map[string]*OpenLdapConnection, mqMtx *sync.Mutex, age time.Duration) {
|
||||
var ok bool
|
||||
|
||||
for range time.Tick(time.Hour * 24) {
|
||||
// Do we need read lock?
|
||||
for _, inmail := range mqueue {
|
||||
ok = 0
|
||||
// Check all mails were sent (multiple destinations mails)
|
||||
// or rejected
|
||||
for _, outmail := range inmail.Messages {
|
||||
if outmail.Status == "sent" || outmail.Status == "milter-reject" {
|
||||
ok += 1
|
||||
} else if outmail.Status == "deferred" {
|
||||
if inmail.Time.Add(time.Hour * 5 * 24).Before(time.Now()) {
|
||||
ok += 1
|
||||
}
|
||||
log.Printf("Start cleaning queue task: %d items in queue", len(mqueue))
|
||||
|
||||
// Do we need read lock?
|
||||
for uid, ldcon := range mqueue {
|
||||
ok = false
|
||||
// Check if a close operation exist
|
||||
for _, op := range ldcon.Operations {
|
||||
if op.OpType == "close" {
|
||||
if op.Time.Add(age).Before(time.Now()) {
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
if ok == len(inmail.Messages) {
|
||||
mqMtx.Lock()
|
||||
delete(mqueue, inmail.MessageId)
|
||||
mqMtx.Unlock()
|
||||
}
|
||||
}
|
||||
if ok == true {
|
||||
mqMtx.Lock()
|
||||
delete(mqueue, uid)
|
||||
mqMtx.Unlock()
|
||||
}
|
||||
}
|
||||
log.Printf("Finished cleaning queue task: %d items in queue", len(mqueue))
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Every 24H, remove connections closed more than 24 hours ago
|
||||
func periodicallyCleanMQueue(mqueue map[string]*OpenLdapConnection, mqMtx *sync.Mutex) {
|
||||
for range time.Tick(time.Hour * 24) {
|
||||
cleanMQueue(mqueue, mqMtx, 24 * time.Hour)
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
func initConfig() {}
|
||||
|
||||
@ -975,7 +979,18 @@ func processLogs(cmd *cobra.Command, args []string) {
|
||||
}
|
||||
|
||||
// Cleaner thread
|
||||
//go periodicallyCleanMQueue(mQueue, &mqMtx)
|
||||
go periodicallyCleanMQueue(mQueue, &mqMtx)
|
||||
|
||||
// On demand Mqueue cleaning... For debug, dont try this at home, kids!
|
||||
/* sig2 := make(chan os.Signal)
|
||||
signal.Notify(sig2, syscall.SIGUSR2)
|
||||
go func() {
|
||||
for {
|
||||
<-sig2
|
||||
cleanMQueue(mQueue, &mqMtx, 5 * time.Minute)
|
||||
}
|
||||
}()
|
||||
*/
|
||||
|
||||
// Initialize Stdin input...
|
||||
if true == strings.EqualFold(gSyslogListenAddress, "do-not-listen") {
|
||||
|
Reference in New Issue
Block a user