@ -2,15 +2,15 @@ package app
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"syscall"
|
||||
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"framagit.org/ppom/reaction/logger"
|
||||
)
|
||||
|
||||
// Executes a command and channel-send its stdout
|
||||
@ -21,17 +21,29 @@ func cmdStdout(commandline []string) chan *string {
|
||||
cmd := exec.Command(commandline[0], commandline[1:]...)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
log.Fatalln("FATAL couldn't open stdout on command:", err)
|
||||
logger.Fatalln("couldn't open stdout on command:", err)
|
||||
}
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
logger.Fatalln("couldn't open stderr on command:", err)
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
log.Fatalln("FATAL couldn't start command:", err)
|
||||
logger.Fatalln("couldn't start command:", err)
|
||||
}
|
||||
defer stdout.Close()
|
||||
|
||||
defer stderr.Close()
|
||||
func() {
|
||||
errscan := bufio.NewScanner(stderr)
|
||||
for errscan.Scan() {
|
||||
line := errscan.Text()
|
||||
logger.Println(logger.WARN, "stderr:", line)
|
||||
}
|
||||
}()
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
lines <- &line
|
||||
logger.Println(logger.DEBUG, "stdout:", line)
|
||||
}
|
||||
close(lines)
|
||||
}()
|
||||
@ -57,7 +69,7 @@ func (f *Filter) match(line *string) string {
|
||||
match := matches[regex.SubexpIndex(f.pattern.name)]
|
||||
|
||||
if f.pattern.notAnIgnore(&match) {
|
||||
log.Printf("INFO %s.%s: match [%v]\n", f.stream.name, f.name, match)
|
||||
logger.Printf(logger.INFO, "%s.%s: match [%v]\n", f.stream.name, f.name, match)
|
||||
return match
|
||||
}
|
||||
}
|
||||
@ -81,12 +93,12 @@ func (a *Action) exec(match string) {
|
||||
computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.pattern.nameWithBraces, match))
|
||||
}
|
||||
|
||||
log.Printf("INFO %s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand)
|
||||
logger.Printf(logger.INFO, "%s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand)
|
||||
|
||||
cmd := exec.Command(computedCommand[0], computedCommand[1:]...)
|
||||
|
||||
if ret := cmd.Run(); ret != nil {
|
||||
log.Printf("ERROR %s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret)
|
||||
logger.Printf(logger.ERROR, "%s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@ -241,7 +253,7 @@ func matchesManagerHandleMatch(pft PFT) bool {
|
||||
|
||||
func StreamManager(s *Stream, endedSignal chan *Stream) {
|
||||
defer wgStreams.Done()
|
||||
log.Printf("INFO %s: start %s\n", s.name, s.Cmd)
|
||||
logger.Printf(logger.INFO, "%s: start %s\n", s.name, s.Cmd)
|
||||
|
||||
lines := cmdStdout(s.Cmd)
|
||||
for {
|
||||
@ -345,13 +357,13 @@ func Daemon(confFilename string) {
|
||||
for {
|
||||
select {
|
||||
case finishedStream := <-endSignals:
|
||||
log.Printf("ERROR %s stream finished", finishedStream.name)
|
||||
logger.Printf(logger.ERROR, "%s stream finished", finishedStream.name)
|
||||
nbStreamsInExecution--
|
||||
if nbStreamsInExecution == 0 {
|
||||
quit()
|
||||
}
|
||||
case <-sigs:
|
||||
log.Printf("INFO Received SIGINT/SIGTERM, exiting")
|
||||
logger.Printf(logger.INFO, "Received SIGINT/SIGTERM, exiting")
|
||||
quit()
|
||||
}
|
||||
}
|
||||
@ -360,19 +372,19 @@ func Daemon(confFilename string) {
|
||||
func quit() {
|
||||
// send stop to StreamManager·s
|
||||
close(stopStreams)
|
||||
log.Println("INFO Waiting for Streams to finish...")
|
||||
logger.Println(logger.INFO, "Waiting for Streams to finish...")
|
||||
wgStreams.Wait()
|
||||
// ActionsManager calls wgActions.Done() when it has launched all pending actions
|
||||
wgActions.Add(1)
|
||||
// send stop to ActionsManager
|
||||
close(stopActions)
|
||||
// stop all actions
|
||||
log.Println("INFO Waiting for Actions to finish...")
|
||||
logger.Println(logger.INFO, "Waiting for Actions to finish...")
|
||||
wgActions.Wait()
|
||||
// delete pipe
|
||||
err := os.Remove(*SocketPath)
|
||||
if err != nil {
|
||||
log.Println("Failed to remove socket:", err)
|
||||
logger.Println(logger.ERROR, "Failed to remove socket:", err)
|
||||
}
|
||||
|
||||
os.Exit(3)
|
||||
|
||||
Reference in New Issue
Block a user