nodegopher/main.go

635 lines
18 KiB
Go

// nodegopher is a Grafana/Prometheus nodeGraph helper
// It builds nodegraph structure by merging static data and metrics pulled from prometheus instance
// Copyright (c) 2025 yo000 <johan@nosd.in>
//
package main
import (
"os"
"fmt"
"flag"
"sync"
"time"
"regexp"
"context"
"strconv"
"strings"
"syscall"
"net/http"
"os/signal"
"gopkg.in/yaml.v2"
"github.com/spf13/viper"
"github.com/gin-gonic/gin"
"github.com/fsnotify/fsnotify"
log "github.com/sirupsen/logrus"
"golang.org/x/text/language"
"golang.org/x/text/message"
"github.com/prometheus/common/model"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
)
const (
gVersion = "0.2.4"
// Default datasource timeout is 10 seconds
gDefaultDSTimeout = 10
)
type PromDataSourceConfig struct {
Name string `yaml:"name"`
QType string `yaml:"type"`
Address string `yaml:"address"`
Query string `yaml:"query"`
Timeout int `yaml:"timeout"`
}
type Graph struct {
Name string `yaml:"name"`
Nodes []Item
Edges []Item
}
type Item interface {
GetId() string
GetMainStat() string
GetMainStatQuery() string
GetMainStatFormat() string
SetMainStat(string)
GetSecondaryStat() string
GetSecondaryStatQuery() string
GetSecondaryStatFormat() string
SetSecondaryStat(string)
GetThicknessQuery() string
SetThickness(float64)
GetHighlightedQuery() string
SetHighlighted(bool)
}
// Query arguments. Based on grafana internal variables.
type QueryArgs struct {
From int64 `form:"from" binding:"required"`
To int64 `form:"to" binding:"required"`
Interval string `form:"interval" binding:"required"`
}
// Query arguments converted to prometheus_client format
type MyRange struct {
Start time.Time
End time.Time
Step time.Duration
}
var (
gPrinter *message.Printer
gDebug bool
gGraphs []Graph
gDataSources []PromDataSourceConfig
gDSVarCompRegex *regexp.Regexp
gGrafanaIntervalUnitRegex *regexp.Regexp
// manipulating In-Ram configuration Mutex. reloadConfig will lock WriteMutex, consuming queries will lock ReadMutex.
gCfgMutex sync.RWMutex
)
func (d *PromDataSourceConfig) GetData(timeRange *MyRange) (float64, error) {
client, err := api.NewClient(api.Config{
Address: d.Address,
})
if err != nil {
log.Errorf("DataSourceConfig.GetData: Error creating client: %v\n", err)
return 0.0, err
}
v1api := v1.NewAPI(client)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d.Timeout)*time.Second)
defer cancel()
var result model.Value
var warnings v1.Warnings
if d.QType == "query" {
result, warnings, err = v1api.Query(ctx, d.Query, time.Now(), v1.WithTimeout(time.Duration(d.Timeout)*time.Second))
} else if d.QType == "query_range" {
rng := v1.Range{
Start: timeRange.Start,
End: timeRange.End,
Step: timeRange.Step,
}
result, warnings, err = v1api.QueryRange(ctx, d.Query, rng, v1.WithTimeout(time.Duration(d.Timeout)*time.Second))
}
if err != nil {
log.Errorf("DataSourceConfig.GetData: Error querying Prometheus: %v. Query is: %s\n", err, d.Query)
return 0.0, err
}
if len(warnings) > 0 {
log.Warningf("DataSourceConfig.GetData: Warnings: %v\n", warnings)
}
//log.Debugf("DataSourceConfig.GetData: Result: %v\n", result)
switch {
case result.Type() == model.ValScalar:
log.Errorf("This is a model.ValScalar type, not implemented!\n")
//scalarVal := result.(*model.Scalar)
// handle scalar stuff
case result.Type() == model.ValVector:
vectorVal := result.(model.Vector)
// FIXME: Is averaging the right thing to do?
var total float64
for _, elem := range vectorVal {
log.Debugf("DataSourceConfig.GetData: Value: %v\n", elem.Value)
total += float64(elem.Value)
}
log.Debugf("DataSourceConfig.GetData: Total average: %f\n", total/float64(len(vectorVal)))
return total/float64(len(vectorVal)), nil
// QueryRange return this type
case result.Type() == model.ValMatrix:
matrixVal := result.(model.Matrix)
// FIXME: Is averaging the right thing to do?
var total float64
var length int64
for _, val := range matrixVal {
for _, v := range val.Values {
total += float64(v.Value)
}
length += int64(len(val.Values))
}
log.Debugf("DataSourceConfig.GetData: Total average: %f\n", total/float64(length))
return total/float64(length), nil
default:
log.Errorf("Prometheus result is an unknown type: %T", result)
return 0.0, nil
}
return 0.0, nil
}
func (g Graph) BuildMetrics(items *[]Item, timeRange *MyRange) (*[]Item, error) {
var err error
// Acquire Read Lock. This won't prevent other queries being answered, but reloadConfig WLock will have to wait until we finish
gCfgMutex.RLock()
defer gCfgMutex.RUnlock()
for _, item := range *items {
// Handle mainStat
if len(item.GetMainStatQuery()) > 0 {
log.Debugf("Item %s have mainstatquery: %s\n", item.GetId(), item.GetMainStatQuery())
r := gDSVarCompRegex.FindStringSubmatch(item.GetMainStatQuery())
if len(r) > 1 {
var value float64
dsname := strings.TrimSpace(r[1])
log.Debugf("buildMetrics: datasource from mainstatquery : %s\n", dsname)
for _, d := range gDataSources {
if strings.EqualFold(d.Name, dsname) {
value, err = d.GetData(timeRange)
if err != nil {
return nil, err
}
break
}
}
format := item.GetMainStatFormat()
if len(format) == 0 {
format = "%f"
}
log.Debugf("buildMetrics: Replace %s mainstat with %s\n", item.GetId(), gPrinter.Sprintf(format, value))
item.SetMainStat(gPrinter.Sprintf(format, value))
} else {
log.Errorf("buildMetrics: Item %s mainstatquery unparseable: %s\n", item.GetId(), item.GetMainStatQuery())
}
}
// Handle secondaryStat
if len(item.GetSecondaryStatQuery()) > 0 {
log.Debugf("Item %s have mainstatquery: %s\n", item.GetId(), item.GetSecondaryStatQuery())
r := gDSVarCompRegex.FindStringSubmatch(item.GetSecondaryStatQuery())
if len(r) > 1 {
var value float64
dsname := strings.TrimSpace(r[1])
log.Debugf("buildMetrics: datasource from secondarystatquery : %s\n", dsname)
for _, d := range gDataSources {
if strings.EqualFold(d.Name, dsname) {
value, err = d.GetData(timeRange)
if err != nil {
return nil, err
}
break
}
}
format := item.GetSecondaryStatFormat()
if len(format) == 0 {
format = "%f"
}
log.Debugf("buildMetrics: Replace %s secondarystat with %s\n", item.GetId(), gPrinter.Sprintf(format, value))
item.SetSecondaryStat(gPrinter.Sprintf(format, value))
} else {
log.Errorf("buildMetrics: Item %s secondarystatquery unparseable: %s\n", item.GetId(), item.GetSecondaryStatQuery())
}
}
// Handle highlighted
if len(item.GetHighlightedQuery()) > 0 {
log.Debugf("buildMetrics: Item %s have highlightedquery: %s\n", item.GetId(), item.GetHighlightedQuery())
r := gDSVarCompRegex.FindStringSubmatch(item.GetHighlightedQuery())
if len(r) > 1 {
var value float64
dsname := strings.TrimSpace(r[1])
log.Debugf("buildMetrics: datasource from highlightedquery : %s\n", dsname)
for _, d := range gDataSources {
if strings.EqualFold(d.Name, dsname) {
value, err = d.GetData(timeRange)
if err != nil {
return nil, err
}
break
}
}
highlight := false
if value == 0 {
highlight = false
} else {
highlight = true
}
log.Debugf("buildMetrics: Replace %s highlighted with %t\n", item.GetId(), highlight)
item.SetHighlighted(highlight)
} else {
log.Errorf("buildMetrics: Item %s highlightedquery unparseable: %s\n", item.GetId(), item.GetHighlightedQuery())
}
}
switch item.(type) {
case *Edge:
if len(item.GetThicknessQuery()) > 0 {
log.Debugf("buildMetrics: Item %s have thicknessquery: %s\n", item.GetId(), item.GetThicknessQuery())
r := gDSVarCompRegex.FindStringSubmatch(item.GetThicknessQuery())
if len(r) > 1 {
var value float64
dsname := strings.TrimSpace(r[1])
log.Debugf("buildMetrics: datasource from thicknessquery : %s\n", dsname)
for _, d := range gDataSources {
if strings.EqualFold(d.Name, dsname) {
value, err = d.GetData(timeRange)
if err != nil {
return nil, err
}
break
}
}
log.Debugf("buildMetrics: Replace %s thickness with %s\n", item.GetId(), fmt.Sprintf("%0.0f", value))
item.SetThickness(value)
} else {
log.Errorf("buildMetrics: Item %s thicknessquery unparseable: %s\n", item.GetId(), item.GetThicknessQuery())
}
}
}
}
return items, nil
}
func queryArgsToTimeRange(args QueryArgs) (MyRange, error) {
// interval sent by Grafana could end with : ms, s, m, h, d
var step time.Duration
matches := gGrafanaIntervalUnitRegex.FindStringSubmatch(args.Interval)
if len(matches) != 3 {
return MyRange{}, fmt.Errorf("Invalid format: interval")
}
value, err := strconv.ParseInt(matches[gGrafanaIntervalUnitRegex.SubexpIndex("value")], 10, 64)
if err != nil {
return MyRange{}, fmt.Errorf("Invalid format (can not parse value): interval")
}
switch matches[gGrafanaIntervalUnitRegex.SubexpIndex("unit")] {
case "ms":
step = time.Duration(value)*time.Millisecond
case "s":
step = time.Duration(value)*time.Second
case "m":
step = time.Duration(value)*time.Minute
case "h":
step = time.Duration(value)*time.Hour
case "d":
step = time.Duration(value)*time.Hour*24
default:
return MyRange{}, fmt.Errorf("Invalid format (can not parse unit): interval")
}
return MyRange{Start: time.Unix(args.From/1000, 0), End: time.Unix(args.To/1000, 0), Step: step}, nil
}
func getGraph(name string) (Graph, error) {
// Acquire Read Lock. This won't prevent other queries being answered, but reloadConfig WLock will have to wait until we finish
gCfgMutex.RLock()
defer gCfgMutex.RUnlock()
for _, g := range gGraphs {
if strings.EqualFold(g.Name, name) {
return g, nil
}
}
return Graph{}, fmt.Errorf("Graph not found: %s", name)
}
func initRoutes(r *gin.Engine) {
r.GET("/ping", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"message": "pong",
})
})
// An endpoint to force read of configuration file
r.POST("/reload", func(c *gin.Context) {
reloadConfigFile()
c.JSON(http.StatusOK, gin.H{
"message": "configuration successfully reloaded",
})
})
// An endpoint to toggle debug mode
r.POST("/debug", func(c *gin.Context) {
toggleDebug()
if gDebug {
c.JSON(http.StatusOK, gin.H{
"message": "debug mode enabled",
})
} else {
c.JSON(http.StatusOK, gin.H{
"message": "debug mode disabled",
})
}
})
r.GET("/:graph/nodes", func(c *gin.Context) {
// Validate presence and type
var args QueryArgs
if err := c.ShouldBindQuery(&args); err != nil {
log.Errorf("Invalid query arguments\n")
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
timeRange, err := queryArgsToTimeRange(args)
if err != nil {
log.Errorf("Unable to parse query arguments\n")
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
gName := c.Param("graph")
graph, err := getGraph(gName)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": err.Error()})
return
}
nodes, err := graph.BuildMetrics(&graph.Nodes, &timeRange)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
}
c.JSON(http.StatusOK, nodes)
})
r.GET("/:graph/edges", func(c *gin.Context) {
// Validate presence and type
var args QueryArgs
if err := c.ShouldBindQuery(&args); err != nil {
log.Errorf("Invalid query arguments\n")
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
timeRange, err := queryArgsToTimeRange(args)
if err != nil {
log.Errorf("Unable to parse query arguments\n")
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
gName := c.Param("graph")
graph, err := getGraph(gName)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": err.Error()})
return
}
edges, err := graph.BuildMetrics(&graph.Edges, &timeRange)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
}
c.JSON(http.StatusOK, edges)
})
}
func toggleDebug() {
if gDebug {
gin.SetMode(gin.ReleaseMode)
log.SetLevel(log.InfoLevel)
gDebug = false
} else {
gin.SetMode(gin.DebugMode)
log.SetLevel(log.DebugLevel)
gDebug = true
}
}
// Deep copy src Node into a new memory space
func newNodeClone(src *Node) *Node {
return &Node{
Name: src.Name,
Id: src.Id,
Title: src.Title,
Subtitle: src.Subtitle,
MainStat: src.MainStat,
MainStatQuery: src.MainStatQuery,
MainStatFormat: src.MainStatFormat,
SecondaryStat: src.SecondaryStat,
SecondaryStatQuery: src.SecondaryStatQuery,
SecondaryStatFormat: src.SecondaryStatFormat,
Color: src.Color,
Icon: src.Icon,
NodeRadius: src.NodeRadius,
Highlighted: src.Highlighted,
HighlightedQuery: src.HighlightedQuery,
}
}
// Deep copy src Edge into a new memory space
func newEdgeClone(src *Edge) *Edge {
return &Edge{
Id: src.Id,
Source: src.Source,
Target: src.Target,
MainStat: src.MainStat,
MainStatQuery: src.MainStatQuery,
MainStatFormat: src.MainStatFormat,
SecondaryStat: src.SecondaryStat,
SecondaryStatQuery: src.SecondaryStatQuery,
SecondaryStatFormat: src.SecondaryStatFormat,
Color: src.Color,
Thickness: src.Thickness,
ThicknessQuery: src.ThicknessQuery,
Highlighted: src.Highlighted,
HighlightedQuery: src.HighlightedQuery,
StrokeDashArray: src.StrokeDashArray,
}
}
func reloadConfigFile() {
// First reread config file
if err := viper.ReadInConfig(); err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
log.Fatalf("config file not found")
os.Exit(1)
} else {
log.Fatalf("unknown error looking for config file: %v", err)
os.Exit(1)
}
}
switch viper.Get("language").(string) {
case "english":
gPrinter = message.NewPrinter(language.English)
case "french":
gPrinter = message.NewPrinter(language.French)
case "german":
gPrinter = message.NewPrinter(language.German)
case "ukrainian":
gPrinter = message.NewPrinter(language.Ukrainian)
case "arabic":
gPrinter = message.NewPrinter(language.Arabic)
case "chinese":
gPrinter = message.NewPrinter(language.Chinese)
default:
log.Errorf("Language not implemented: %s. Fallback to english\n", viper.Get("language").(string))
gPrinter = message.NewPrinter(language.English)
}
// then clear current config, after acquiring WriteLock
gCfgMutex.Lock()
defer gCfgMutex.Unlock()
for _, g := range gGraphs {
g.Nodes = nil
g.Edges = nil
}
gGraphs = nil
gDataSources = nil
// Finally unmarshal graphs, nodes, edges and datasources
gps := viper.Get("graphs").([]interface{})
for _, g := range gps {
yd, _ := yaml.Marshal(g)
// Unmarshal on anonymous structs with []Edge and []Node well defined so unmarshaler know how to handle
// then we convert to []Items
tmp := struct {
Name string `yaml:"name"`
Nodes []Node `yaml:"nodes"`
Edges []Edge `yaml:"edges"`
}{}
yaml.Unmarshal(yd, &tmp)
graph := Graph{
Name: tmp.Name,
}
for _, n := range tmp.Nodes {
// Deep copy Node so garbage collecting tmp won't pull the carpet under our feet
graph.Nodes = append(graph.Nodes, newNodeClone(&n))
}
for _, e := range tmp.Edges {
// Deep copy Edge
graph.Edges = append(graph.Edges, newEdgeClone(&e))
}
gGraphs = append(gGraphs, graph)
}
if viper.Get("datasources") == nil {
log.Printf("no datasources found, data will be static")
return
}
dss := viper.Get("datasources").([]interface{})
for _, d := range dss {
yd, _ := yaml.Marshal(d)
var ds PromDataSourceConfig
yaml.Unmarshal(yd, &ds)
// Set default Values
if ds.Timeout == 0 {
ds.Timeout = gDefaultDSTimeout
}
gDataSources = append(gDataSources, ds)
}
}
func main() {
var listen string
var confFile string
flag.StringVar(&confFile, "config", "", "Path to the config file")
flag.StringVar(&listen, "listen-addr", "0.0.0.0:8080", "listen address for server")
flag.BoolVar(&gDebug, "debug", false, "Set log level to debug")
flag.Parse()
if len(confFile) == 0 {
log.Fatalf("config is mandatory")
os.Exit(1)
}
viper.SetConfigFile(confFile)
if err := viper.ReadInConfig(); err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
log.Fatalf("config file not found")
os.Exit(1)
} else {
log.Fatalf("unknown error looking for config file: %v", err)
os.Exit(1)
}
}
if false == gDebug {
b := viper.GetBool("debug")
if b {
gDebug = b
}
}
if gDebug {
gin.SetMode(gin.DebugMode)
log.SetLevel(log.DebugLevel)
} else {
log.SetLevel(log.InfoLevel)
gin.SetMode(gin.ReleaseMode)
}
if strings.EqualFold(listen, "0.0.0.0:8080") && len(confFile) > 0 {
l := viper.GetString("listen")
if len(l) > 0 {
listen = l
}
}
// FIXME: Watch config changes. Does not work on FreeBSD. TODO: Test with linux
viper.OnConfigChange(func(e fsnotify.Event) {
log.Printf("Config file changed, reloading data\n")
reloadConfigFile()
})
// Lets reload config on SIGHUP
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP)
go func() {
for {
_ = <- sigs
log.Infof("SIGHUP received, reloading configuration\n")
reloadConfigFile()
}
}()
reloadConfigFile()
// Capture variable name. There should be only one variable. Space is tolerated before and after name.
gDSVarCompRegex = regexp.MustCompile(`^\{\{(?:\ )?([a-zA-Z0-9\-_]+)(?:\ )?\}\}$`)
// Grafana interval parser
gGrafanaIntervalUnitRegex = regexp.MustCompile(`^(?P<value>[0-9]+)(?P<unit>[mshd]+)$`)
log.Printf("Starting NodeGopher v.%s\n", gVersion)
r := gin.Default()
initRoutes(r)
r.Run(listen)
}