Fix persistence bug
This commit is contained in:
		| @ -133,15 +133,13 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E | |||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	var err error |  | ||||||
| 	var entry LogEntry |  | ||||||
| 	var filter *Filter |  | ||||||
|  |  | ||||||
| 	// pattern, stream, fitler → last flush | 	// pattern, stream, fitler → last flush | ||||||
| 	flushes := make(map[PSF]time.Time) | 	flushes := make(map[PSF]time.Time) | ||||||
| 	for { | 	for { | ||||||
|  | 		var entry LogEntry | ||||||
|  | 		var filter *Filter | ||||||
| 		// decode entry | 		// decode entry | ||||||
| 		err = flushDec.Decode(&entry) | 		err := flushDec.Decode(&entry) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			if err == io.EOF { | 			if err == io.EOF { | ||||||
| 				break | 				break | ||||||
| @ -167,9 +165,11 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E | |||||||
|  |  | ||||||
| 	now := time.Now() | 	now := time.Now() | ||||||
| 	for { | 	for { | ||||||
|  | 		var entry LogEntry | ||||||
|  | 		var filter *Filter | ||||||
|  |  | ||||||
| 		// decode entry | 		// decode entry | ||||||
| 		err = logDec.Decode(&entry) | 		err := logDec.Decode(&entry) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			if err == io.EOF { | 			if err == io.EOF { | ||||||
| 				break | 				break | ||||||
| @ -180,24 +180,14 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E | |||||||
|  |  | ||||||
| 		// retrieve related stream & filter | 		// retrieve related stream & filter | ||||||
| 		if entry.Stream == "" && entry.Filter == "" { | 		if entry.Stream == "" && entry.Filter == "" { | ||||||
| 			if entry.SF != 0 { |  | ||||||
| 			sf, ok := readSF2int[entry.SF] | 			sf, ok := readSF2int[entry.SF] | ||||||
| 				if ok { | 			if !ok { | ||||||
|  | 				discardedEntries[SF{"", ""}]++ | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
| 			entry.Stream = sf.s | 			entry.Stream = sf.s | ||||||
| 			entry.Filter = sf.f | 			entry.Filter = sf.f | ||||||
| 				} else { |  | ||||||
| 					discardedEntries[SF{"", ""}]++ |  | ||||||
| 					continue |  | ||||||
| 		} | 		} | ||||||
| 			} else { |  | ||||||
| 				discardedEntries[SF{"", ""}]++ |  | ||||||
| 				continue |  | ||||||
| 			} |  | ||||||
| 			// Only one of Stream, Filter is non-empty |  | ||||||
| 			// } else if entry.Stream == "" || entry.Filter == "" { |  | ||||||
| 			// discardedEntries[SF{"", ""}]++ |  | ||||||
| 			// continue |  | ||||||
| 		} else { |  | ||||||
| 		if stream := c.Streams[entry.Stream]; stream != nil { | 		if stream := c.Streams[entry.Stream]; stream != nil { | ||||||
| 			filter = stream.Filters[entry.Filter] | 			filter = stream.Filters[entry.Filter] | ||||||
| 		} | 		} | ||||||
| @ -208,7 +198,6 @@ func rotateDB(c *Conf, logDec *gob.Decoder, flushDec *gob.Decoder, logEnc *gob.E | |||||||
| 		if entry.SF != 0 { | 		if entry.SF != 0 { | ||||||
| 			readSF2int[entry.SF] = SF{entry.Stream, entry.Filter} | 			readSF2int[entry.SF] = SF{entry.Stream, entry.Filter} | ||||||
| 		} | 		} | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		// check if it hasn't been flushed | 		// check if it hasn't been flushed | ||||||
| 		lastGlobalFlush := flushes[PSF{entry.Pattern, "", ""}].Unix() | 		lastGlobalFlush := flushes[PSF{entry.Pattern, "", ""}].Unix() | ||||||
|  | |||||||
							
								
								
									
										50
									
								
								config/persistence.jsonnet
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										50
									
								
								config/persistence.jsonnet
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,50 @@ | |||||||
|  | { | ||||||
|  |   patterns: { | ||||||
|  |     num: { | ||||||
|  |       regex: '[0-9]+', | ||||||
|  |     }, | ||||||
|  |   }, | ||||||
|  |  | ||||||
|  |   streams: { | ||||||
|  |     tailDown1: { | ||||||
|  |       cmd: ['sh', '-c', "echo 01 02 03 04 05 | tr ' ' '\n' | while read i; do sleep 0.5; echo found $i; done"], | ||||||
|  |       filters: { | ||||||
|  |         findIP1: { | ||||||
|  |           regex: ['^found <num>$'], | ||||||
|  |           retry: 1, | ||||||
|  |           retryperiod: '2m', | ||||||
|  |           actions: { | ||||||
|  |             damn: { | ||||||
|  |               cmd: ['echo', '<num>'], | ||||||
|  |             }, | ||||||
|  |             undamn: { | ||||||
|  |               cmd: ['echo', 'undamn', '<num>'], | ||||||
|  |               after: '1m', | ||||||
|  |               onexit: true, | ||||||
|  |             }, | ||||||
|  |           }, | ||||||
|  |         }, | ||||||
|  |       }, | ||||||
|  |     }, | ||||||
|  |     tailDown2: { | ||||||
|  |       cmd: ['sh', '-c', "echo 11 12 13 14 15 11 13 15 | tr ' ' '\n' | while read i; do sleep 0.3; echo found $i; done"], | ||||||
|  |       filters: { | ||||||
|  |         findIP2: { | ||||||
|  |           regex: ['^found <num>$'], | ||||||
|  |           retry: 2, | ||||||
|  |           retryperiod: '2m', | ||||||
|  |           actions: { | ||||||
|  |             damn: { | ||||||
|  |               cmd: ['echo', '<num>'], | ||||||
|  |             }, | ||||||
|  |             undamn: { | ||||||
|  |               cmd: ['echo', 'undamn', '<num>'], | ||||||
|  |               after: '1m', | ||||||
|  |               onexit: true, | ||||||
|  |             }, | ||||||
|  |           }, | ||||||
|  |         }, | ||||||
|  |       }, | ||||||
|  |     }, | ||||||
|  |   }, | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user
	 ppom
					ppom