rename package
This commit is contained in:
		
							
								
								
									
										136
									
								
								reaction/conf.go
									
									
									
									
									
								
							
							
						
						
									
										136
									
								
								reaction/conf.go
									
									
									
									
									
								
							@ -1,136 +0,0 @@
 | 
			
		||||
package reaction
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"log"
 | 
			
		||||
	"os"
 | 
			
		||||
	"regexp"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"gopkg.in/yaml.v3"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Conf struct {
 | 
			
		||||
	Patterns map[string]string
 | 
			
		||||
	Streams  map[string]*Stream
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Stream struct {
 | 
			
		||||
	name string
 | 
			
		||||
 | 
			
		||||
	Cmd     []string
 | 
			
		||||
	Filters map[string]*Filter
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Filter struct {
 | 
			
		||||
	stream *Stream
 | 
			
		||||
	name   string
 | 
			
		||||
 | 
			
		||||
	Regex                          []string
 | 
			
		||||
	compiledRegex                  []regexp.Regexp
 | 
			
		||||
	patternName, patternWithBraces string
 | 
			
		||||
 | 
			
		||||
	Retry         uint
 | 
			
		||||
	RetryPeriod   string `yaml:"retry-period"`
 | 
			
		||||
	retryDuration time.Duration
 | 
			
		||||
 | 
			
		||||
	Actions map[string]*Action
 | 
			
		||||
 | 
			
		||||
	matches map[string][]time.Time
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Action struct {
 | 
			
		||||
	filter *Filter
 | 
			
		||||
	name   string
 | 
			
		||||
 | 
			
		||||
	Cmd []string
 | 
			
		||||
 | 
			
		||||
	After         string `yaml:",omitempty"`
 | 
			
		||||
	afterDuration time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *Conf) setup() {
 | 
			
		||||
	for patternName, pattern := range c.Patterns {
 | 
			
		||||
		c.Patterns[patternName] = fmt.Sprintf("(?P<%s>%s)", patternName, pattern)
 | 
			
		||||
	}
 | 
			
		||||
	for streamName := range c.Streams {
 | 
			
		||||
 | 
			
		||||
		stream := c.Streams[streamName]
 | 
			
		||||
		stream.name = streamName
 | 
			
		||||
 | 
			
		||||
		for filterName := range stream.Filters {
 | 
			
		||||
 | 
			
		||||
			filter := stream.Filters[filterName]
 | 
			
		||||
			filter.stream = stream
 | 
			
		||||
			filter.name = filterName
 | 
			
		||||
 | 
			
		||||
			// Parse Duration
 | 
			
		||||
			retryDuration, err := time.ParseDuration(filter.RetryPeriod)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.Fatalln("Failed to parse time in configuration file:", err)
 | 
			
		||||
			}
 | 
			
		||||
			filter.retryDuration = retryDuration
 | 
			
		||||
 | 
			
		||||
			// Compute Regexes
 | 
			
		||||
			// Look for Patterns inside Regexes
 | 
			
		||||
			for _, regex := range filter.Regex {
 | 
			
		||||
				for patternName, pattern := range c.Patterns {
 | 
			
		||||
					if strings.Contains(regex, patternName) {
 | 
			
		||||
 | 
			
		||||
						switch filter.patternName {
 | 
			
		||||
						case "":
 | 
			
		||||
							filter.patternName = patternName
 | 
			
		||||
							filter.patternWithBraces = fmt.Sprintf("<%s>", patternName)
 | 
			
		||||
						case patternName:
 | 
			
		||||
							// no op
 | 
			
		||||
						default:
 | 
			
		||||
							log.Fatalf(
 | 
			
		||||
								"ERROR Can't mix different patterns (%s, %s) in same filter (%s.%s)\n",
 | 
			
		||||
								filter.patternName, patternName, streamName, filterName,
 | 
			
		||||
							)
 | 
			
		||||
						}
 | 
			
		||||
 | 
			
		||||
						regex = strings.Replace(regex, fmt.Sprintf("<%s>", patternName), pattern, 1)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				filter.compiledRegex = append(filter.compiledRegex, *regexp.MustCompile(regex))
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			for actionName := range filter.Actions {
 | 
			
		||||
 | 
			
		||||
				action := filter.Actions[actionName]
 | 
			
		||||
				action.filter = filter
 | 
			
		||||
				action.name = actionName
 | 
			
		||||
 | 
			
		||||
				// Parse Duration
 | 
			
		||||
				if action.After != "" {
 | 
			
		||||
					afterDuration, err := time.ParseDuration(action.After)
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						log.Fatalln("Failed to parse time in configuration file:", err)
 | 
			
		||||
					}
 | 
			
		||||
					action.afterDuration = afterDuration
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func parseConf(filename string) *Conf {
 | 
			
		||||
 | 
			
		||||
	data, err := os.ReadFile(filename)
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatalln("Failed to read configuration file:", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var conf Conf
 | 
			
		||||
	err = yaml.Unmarshal(data, &conf)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatalln("Failed to parse configuration file:", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	conf.setup()
 | 
			
		||||
 | 
			
		||||
	return &conf
 | 
			
		||||
}
 | 
			
		||||
@ -1,156 +0,0 @@
 | 
			
		||||
package reaction
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"flag"
 | 
			
		||||
	// "fmt"
 | 
			
		||||
	"log"
 | 
			
		||||
	"os"
 | 
			
		||||
	"os/exec"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Executes a command and channel-send its stdout
 | 
			
		||||
func cmdStdout(commandline []string) chan *string {
 | 
			
		||||
	lines := make(chan *string)
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		cmd := exec.Command(commandline[0], commandline[1:]...)
 | 
			
		||||
		stdout, err := cmd.StdoutPipe()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Fatal("couldn't open stdout on command:", err)
 | 
			
		||||
		}
 | 
			
		||||
		if err := cmd.Start(); err != nil {
 | 
			
		||||
			log.Fatal("couldn't start command:", err)
 | 
			
		||||
		}
 | 
			
		||||
		defer stdout.Close()
 | 
			
		||||
 | 
			
		||||
		scanner := bufio.NewScanner(stdout)
 | 
			
		||||
		for scanner.Scan() {
 | 
			
		||||
			line := scanner.Text()
 | 
			
		||||
			lines <- &line
 | 
			
		||||
		}
 | 
			
		||||
		close(lines)
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return lines
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Whether one of the filter's regexes is matched on a line
 | 
			
		||||
func (f *Filter) match(line *string) string {
 | 
			
		||||
	for _, regex := range f.compiledRegex {
 | 
			
		||||
 | 
			
		||||
		if matches := regex.FindStringSubmatch(*line); matches != nil {
 | 
			
		||||
 | 
			
		||||
			match := matches[regex.SubexpIndex(f.patternName)]
 | 
			
		||||
 | 
			
		||||
			log.Printf("INFO %s.%s: match [%v]\n", f.stream.name, f.name, match)
 | 
			
		||||
			return match
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return ""
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *Filter) execActions(match string) {
 | 
			
		||||
	for _, a := range f.Actions {
 | 
			
		||||
		wgActions.Add(1)
 | 
			
		||||
		go a.exec(match)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *Action) exec(match string) {
 | 
			
		||||
	defer wgActions.Done()
 | 
			
		||||
	if a.afterDuration != 0 {
 | 
			
		||||
		time.Sleep(a.afterDuration)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	computedCommand := make([]string, 0, len(a.Cmd))
 | 
			
		||||
	for _, item := range a.Cmd {
 | 
			
		||||
		computedCommand = append(computedCommand, strings.ReplaceAll(item, a.filter.patternWithBraces, match))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	log.Printf("INFO %s.%s.%s: run %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand)
 | 
			
		||||
 | 
			
		||||
	cmd := exec.Command(computedCommand[0], computedCommand[1:]...)
 | 
			
		||||
 | 
			
		||||
	if ret := cmd.Run(); ret != nil {
 | 
			
		||||
		log.Printf("ERR  %s.%s.%s: run %s, code %s\n", a.filter.stream.name, a.filter.name, a.name, computedCommand, ret)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *Filter) handle() chan *string {
 | 
			
		||||
	lines := make(chan *string)
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		for line := range lines {
 | 
			
		||||
			if match := f.match(line); match != "" {
 | 
			
		||||
				f.execActions(match)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return lines
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func multiplex(input chan *string, outputs []chan *string) {
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	for item := range input {
 | 
			
		||||
		for _, output := range outputs {
 | 
			
		||||
			wg.Add(1)
 | 
			
		||||
			go func(s *string) {
 | 
			
		||||
				output <- s
 | 
			
		||||
				wg.Done()
 | 
			
		||||
			}(item)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for _, output := range outputs {
 | 
			
		||||
		wg.Wait()
 | 
			
		||||
		close(output)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Stream) handle(signal chan *Stream) {
 | 
			
		||||
	log.Printf("INFO %s: start %s\n", s.name, s.Cmd)
 | 
			
		||||
 | 
			
		||||
	lines := cmdStdout(s.Cmd)
 | 
			
		||||
 | 
			
		||||
	var filterInputs = make([]chan *string, 0, len(s.Filters))
 | 
			
		||||
	for _, filter := range s.Filters {
 | 
			
		||||
		filterInputs = append(filterInputs, filter.handle())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	multiplex(lines, filterInputs)
 | 
			
		||||
 | 
			
		||||
	signal <- s
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var wgActions sync.WaitGroup
 | 
			
		||||
 | 
			
		||||
func Main() {
 | 
			
		||||
	confFilename := flag.String("c", "", "configuration file. see an example at https://framagit.org/ppom/reaction/-/blob/main/reaction.yml")
 | 
			
		||||
	flag.Parse()
 | 
			
		||||
 | 
			
		||||
	if *confFilename == "" {
 | 
			
		||||
		flag.PrintDefaults()
 | 
			
		||||
		os.Exit(2)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	conf := parseConf(*confFilename)
 | 
			
		||||
 | 
			
		||||
	endSignals := make(chan *Stream)
 | 
			
		||||
 | 
			
		||||
	for _, stream := range conf.Streams {
 | 
			
		||||
		go stream.handle(endSignals)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < len(conf.Streams); i++ {
 | 
			
		||||
		finishedStream := <-endSignals
 | 
			
		||||
		log.Printf("ERR  %s stream finished", finishedStream.name)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	wgActions.Wait()
 | 
			
		||||
 | 
			
		||||
	os.Exit(3)
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user