// nodegopher is a Grafana/Prometheus nodeGraph helper // It builds nodegraph structure by merging static data and metrics pulled from prometheus instance // Copyright (c) 2025 yo000 // package main import ( "os" "fmt" "flag" "sync" "time" "regexp" "context" "strconv" "strings" "syscall" "net/http" "os/signal" "gopkg.in/yaml.v2" "github.com/spf13/viper" "github.com/gin-gonic/gin" "github.com/fsnotify/fsnotify" log "github.com/sirupsen/logrus" "golang.org/x/text/language" "golang.org/x/text/message" "github.com/prometheus/common/model" "github.com/prometheus/client_golang/api" v1 "github.com/prometheus/client_golang/api/prometheus/v1" ) const ( gVersion = "0.2.3" ) type PromDataSourceConfig struct { Name string `yaml:"name"` QType string `yaml:"type"` Address string `yaml:"address"` Query string `yaml:"query"` Timeout int `yaml:"timeout"` } type Graph struct { Name string `yaml:"name"` Nodes []Item Edges []Item } type Item interface { GetId() string GetMainStat() string GetMainStatQuery() string GetMainStatFormat() string SetMainStat(string) GetSecondaryStat() string GetSecondaryStatQuery() string GetSecondaryStatFormat() string SetSecondaryStat(string) GetThicknessQuery() string SetThickness(float64) GetHighlightedQuery() string SetHighlighted(bool) } // Query arguments. Based on grafana internal variables. type QueryArgs struct { From int64 `form:"from" binding:"required"` To int64 `form:"to" binding:"required"` Interval string `form:"interval" binding:"required"` } // Query arguments converted to prometheus_client format type MyRange struct { Start time.Time End time.Time Step time.Duration } var ( gPrinter *message.Printer gDebug bool gGraphs []Graph gDataSources []PromDataSourceConfig gDSVarCompRegex *regexp.Regexp gGrafanaIntervalUnitRegex *regexp.Regexp // manipulating In-Ram configuration Mutex. reloadConfig will lock WriteMutex, consuming queries will lock ReadMutex. gCfgMutex sync.RWMutex ) func (d *PromDataSourceConfig) GetData(timeRange *MyRange) (float64, error) { client, err := api.NewClient(api.Config{ Address: d.Address, }) if err != nil { log.Errorf("DataSourceConfig.GetData: Error creating client: %v\n", err) return 0.0, err } v1api := v1.NewAPI(client) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d.Timeout)*time.Second) defer cancel() var result model.Value var warnings v1.Warnings if d.QType == "query" { result, warnings, err = v1api.Query(ctx, d.Query, time.Now(), v1.WithTimeout(time.Duration(d.Timeout)*time.Second)) } else if d.QType == "query_range" { rng := v1.Range{ Start: timeRange.Start, End: timeRange.End, Step: timeRange.Step, } result, warnings, err = v1api.QueryRange(ctx, d.Query, rng, v1.WithTimeout(time.Duration(d.Timeout)*time.Second)) } if err != nil { log.Errorf("DataSourceConfig.GetData: Error querying Prometheus: %v\n", err) return 0.0, err } if len(warnings) > 0 { log.Warningf("DataSourceConfig.GetData: Warnings: %v\n", warnings) } //log.Debugf("DataSourceConfig.GetData: Result: %v\n", result) switch { case result.Type() == model.ValScalar: log.Errorf("This is a model.ValScalar type, not implemented!\n") //scalarVal := result.(*model.Scalar) // handle scalar stuff case result.Type() == model.ValVector: vectorVal := result.(model.Vector) // FIXME: Is averaging the right thing to do? var total float64 for _, elem := range vectorVal { log.Debugf("DataSourceConfig.GetData: Value: %v\n", elem.Value) total += float64(elem.Value) } log.Debugf("DataSourceConfig.GetData: Total average: %f\n", total/float64(len(vectorVal))) return total/float64(len(vectorVal)), nil // QueryRange return this type case result.Type() == model.ValMatrix: matrixVal := result.(model.Matrix) // FIXME: Is averaging the right thing to do? var total float64 var length int64 for _, val := range matrixVal { for _, v := range val.Values { total += float64(v.Value) } length += int64(len(val.Values)) } log.Debugf("DataSourceConfig.GetData: Total average: %f\n", total/float64(length)) return total/float64(length), nil default: log.Errorf("Prometheus result is an unknown type: %T", result) return 0.0, nil } return 0.0, nil } func (g Graph) BuildMetrics(items *[]Item, timeRange *MyRange) (*[]Item, error) { var err error // Acquire Read Lock. This won't prevent other queries being answered, but reloadConfig WLock will have to wait until we finish gCfgMutex.RLock() defer gCfgMutex.RUnlock() for _, item := range *items { // Handle mainStat if len(item.GetMainStatQuery()) > 0 { log.Debugf("Item %s have mainstatquery: %s\n", item.GetId(), item.GetMainStatQuery()) r := gDSVarCompRegex.FindStringSubmatch(item.GetMainStatQuery()) if len(r) > 1 { var value float64 dsname := strings.TrimSpace(r[1]) log.Debugf("buildMetrics: datasource from mainstatquery : %s\n", dsname) for _, d := range gDataSources { if strings.EqualFold(d.Name, dsname) { value, err = d.GetData(timeRange) if err != nil { return nil, err } break } } format := item.GetMainStatFormat() if len(format) == 0 { format = "%f" } log.Debugf("buildMetrics: Replace %s mainstat with %s\n", item.GetId(), gPrinter.Sprintf(format, value)) item.SetMainStat(gPrinter.Sprintf(format, value)) } else { log.Errorf("buildMetrics: Item %s mainstatquery unparseable: %s\n", item.GetId(), item.GetMainStatQuery()) } } // Handle secondaryStat if len(item.GetSecondaryStatQuery()) > 0 { log.Debugf("Item %s have mainstatquery: %s\n", item.GetId(), item.GetSecondaryStatQuery()) r := gDSVarCompRegex.FindStringSubmatch(item.GetSecondaryStatQuery()) if len(r) > 1 { var value float64 dsname := strings.TrimSpace(r[1]) log.Debugf("buildMetrics: datasource from secondarystatquery : %s\n", dsname) for _, d := range gDataSources { if strings.EqualFold(d.Name, dsname) { value, err = d.GetData(timeRange) if err != nil { return nil, err } break } } format := item.GetSecondaryStatFormat() if len(format) == 0 { format = "%f" } log.Debugf("buildMetrics: Replace %s secondarystat with %s\n", item.GetId(), gPrinter.Sprintf(format, value)) item.SetSecondaryStat(gPrinter.Sprintf(format, value)) } else { log.Errorf("buildMetrics: Item %s secondarystatquery unparseable: %s\n", item.GetId(), item.GetSecondaryStatQuery()) } } // Handle highlighted if len(item.GetHighlightedQuery()) > 0 { log.Debugf("buildMetrics: Item %s have highlightedquery: %s\n", item.GetId(), item.GetHighlightedQuery()) r := gDSVarCompRegex.FindStringSubmatch(item.GetHighlightedQuery()) if len(r) > 1 { var value float64 dsname := strings.TrimSpace(r[1]) log.Debugf("buildMetrics: datasource from highlightedquery : %s\n", dsname) for _, d := range gDataSources { if strings.EqualFold(d.Name, dsname) { value, err = d.GetData(timeRange) if err != nil { return nil, err } break } } highlight := false if value == 0 { highlight = false } else { highlight = true } log.Debugf("buildMetrics: Replace %s highlighted with %t\n", item.GetId(), highlight) item.SetHighlighted(highlight) } else { log.Errorf("buildMetrics: Item %s highlightedquery unparseable: %s\n", item.GetId(), item.GetHighlightedQuery()) } } switch item.(type) { case *Edge: if len(item.GetThicknessQuery()) > 0 { log.Debugf("buildMetrics: Item %s have thicknessquery: %s\n", item.GetId(), item.GetThicknessQuery()) r := gDSVarCompRegex.FindStringSubmatch(item.GetThicknessQuery()) if len(r) > 1 { var value float64 dsname := strings.TrimSpace(r[1]) log.Debugf("buildMetrics: datasource from thicknessquery : %s\n", dsname) for _, d := range gDataSources { if strings.EqualFold(d.Name, dsname) { value, err = d.GetData(timeRange) if err != nil { return nil, err } break } } log.Debugf("buildMetrics: Replace %s thickness with %s\n", item.GetId(), fmt.Sprintf("%0.0f", value)) item.SetThickness(value) } else { log.Errorf("buildMetrics: Item %s thicknessquery unparseable: %s\n", item.GetId(), item.GetThicknessQuery()) } } } } return items, nil } func queryArgsToTimeRange(args QueryArgs) (MyRange, error) { // interval sent by Grafana could end with : ms, s, m, h, d var step time.Duration matches := gGrafanaIntervalUnitRegex.FindStringSubmatch(args.Interval) if len(matches) != 3 { return MyRange{}, fmt.Errorf("Invalid format: interval") } value, err := strconv.ParseInt(matches[gGrafanaIntervalUnitRegex.SubexpIndex("value")], 10, 64) if err != nil { return MyRange{}, fmt.Errorf("Invalid format (can not parse value): interval") } switch matches[gGrafanaIntervalUnitRegex.SubexpIndex("unit")] { case "ms": step = time.Duration(value)*time.Millisecond case "s": step = time.Duration(value)*time.Second case "m": step = time.Duration(value)*time.Minute case "h": step = time.Duration(value)*time.Hour case "d": step = time.Duration(value)*time.Hour*24 default: return MyRange{}, fmt.Errorf("Invalid format (can not parse unit): interval") } return MyRange{Start: time.Unix(args.From/1000, 0), End: time.Unix(args.To/1000, 0), Step: step}, nil } func getGraph(name string) (Graph, error) { // Acquire Read Lock. This won't prevent other queries being answered, but reloadConfig WLock will have to wait until we finish gCfgMutex.RLock() defer gCfgMutex.RUnlock() for _, g := range gGraphs { if strings.EqualFold(g.Name, name) { return g, nil } } return Graph{}, fmt.Errorf("Graph not found: %s", name) } func initRoutes(r *gin.Engine) { r.GET("/ping", func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{ "message": "pong", }) }) // An endpoint to force read of configuration file r.POST("/reload", func(c *gin.Context) { reloadConfigFile() c.JSON(http.StatusOK, gin.H{ "message": "configuration successfully reloaded", }) }) // An endpoint to toggle debug mode r.POST("/debug", func(c *gin.Context) { toggleDebug() if gDebug { c.JSON(http.StatusOK, gin.H{ "message": "debug mode enabled", }) } else { c.JSON(http.StatusOK, gin.H{ "message": "debug mode disabled", }) } }) r.GET("/:graph/nodes", func(c *gin.Context) { // Validate presence and type var args QueryArgs if err := c.ShouldBindQuery(&args); err != nil { log.Errorf("Invalid query arguments\n") c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } timeRange, err := queryArgsToTimeRange(args) if err != nil { log.Errorf("Unable to parse query arguments\n") c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } gName := c.Param("graph") graph, err := getGraph(gName) if err != nil { c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) return } nodes, err := graph.BuildMetrics(&graph.Nodes, &timeRange) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) } c.JSON(http.StatusOK, nodes) }) r.GET("/:graph/edges", func(c *gin.Context) { // Validate presence and type var args QueryArgs if err := c.ShouldBindQuery(&args); err != nil { log.Errorf("Invalid query arguments\n") c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } timeRange, err := queryArgsToTimeRange(args) if err != nil { log.Errorf("Unable to parse query arguments\n") c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } gName := c.Param("graph") graph, err := getGraph(gName) if err != nil { c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) return } edges, err := graph.BuildMetrics(&graph.Edges, &timeRange) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) } c.JSON(http.StatusOK, edges) }) } func toggleDebug() { if gDebug { gin.SetMode(gin.ReleaseMode) log.SetLevel(log.InfoLevel) gDebug = false } else { gin.SetMode(gin.DebugMode) log.SetLevel(log.DebugLevel) gDebug = true } } func reloadConfigFile() { // First reread config file if err := viper.ReadInConfig(); err != nil { if _, ok := err.(viper.ConfigFileNotFoundError); ok { log.Fatalf("config file not found") os.Exit(1) } else { log.Fatalf("unknown error looking for config file: %v", err) os.Exit(1) } } switch viper.Get("language").(string) { case "english": gPrinter = message.NewPrinter(language.English) case "french": gPrinter = message.NewPrinter(language.French) case "german": gPrinter = message.NewPrinter(language.German) case "ukrainian": gPrinter = message.NewPrinter(language.Ukrainian) case "arabic": gPrinter = message.NewPrinter(language.Arabic) case "chinese": gPrinter = message.NewPrinter(language.Chinese) default: log.Errorf("Language not implented: %s. Fallback to english\n", viper.Get("language").(string)) gPrinter = message.NewPrinter(language.English) } // then clear current config, after acquiring WriteLock gCfgMutex.Lock() defer gCfgMutex.Unlock() for _, g := range gGraphs { g.Nodes = nil g.Edges = nil } gGraphs = nil gDataSources = nil // Finally unmarshal graphs, nodes, edges and datasources gps := viper.Get("graphs").([]interface{}) for _, g := range gps { yd, _ := yaml.Marshal(g) // Unmarshal on anonymous structs with []Edge and []Node well defined so unmarshaler know how to handle // then we convert to []Items tmp := struct { Name string `yaml:"name"` Nodes []Node `yaml:"nodes"` Edges []Edge `yaml:"edges"` }{} yaml.Unmarshal(yd, &tmp) var graphNodes []Item var graphEdges []Item for _, n := range tmp.Nodes { graphNodes = append(graphNodes, &n) } for _, e := range tmp.Edges { graphEdges = append(graphEdges, &e) } graph := Graph{ Name: tmp.Name, Nodes: graphNodes, Edges: graphEdges, } gGraphs = append(gGraphs, graph) } if viper.Get("datasources") == nil { log.Printf("no datasources found, data will be static") return } dss := viper.Get("datasources").([]interface{}) for _, d := range dss { yd, _ := yaml.Marshal(d) var ds PromDataSourceConfig yaml.Unmarshal(yd, &ds) gDataSources = append(gDataSources, ds) } } func main() { var listen string var confFile string flag.StringVar(&confFile, "config", "", "Path to the config file") flag.StringVar(&listen, "listen-addr", "0.0.0.0:8080", "listen address for server") flag.BoolVar(&gDebug, "debug", false, "Set log level to debug") flag.Parse() if len(confFile) == 0 { log.Fatalf("config is mandatory") os.Exit(1) } viper.SetConfigFile(confFile) if err := viper.ReadInConfig(); err != nil { if _, ok := err.(viper.ConfigFileNotFoundError); ok { log.Fatalf("config file not found") os.Exit(1) } else { log.Fatalf("unknown error looking for config file: %v", err) os.Exit(1) } } if false == gDebug { b := viper.GetBool("debug") if b { gDebug = b } } if gDebug { gin.SetMode(gin.DebugMode) log.SetLevel(log.DebugLevel) } else { log.SetLevel(log.InfoLevel) gin.SetMode(gin.ReleaseMode) } if strings.EqualFold(listen, "0.0.0.0:8080") && len(confFile) > 0 { l := viper.GetString("listen") if len(l) > 0 { listen = l } } // FIXME: Watch config changes. Does not work on FreeBSD. TODO: Test with linux viper.OnConfigChange(func(e fsnotify.Event) { log.Printf("Config file changed, reloading data\n") reloadConfigFile() }) // Lets reload config on SIGHUP sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGHUP) go func() { for { _ = <- sigs log.Infof("SIGHUP received, reloading configuration\n") reloadConfigFile() } }() reloadConfigFile() // Capture variable name. There should be only one variable. Space is tolerated before and after name. gDSVarCompRegex = regexp.MustCompile(`^\{\{(?:\ )?([a-zA-Z0-9\-_]+)(?:\ )?\}\}$`) // Grafana interval parser gGrafanaIntervalUnitRegex = regexp.MustCompile(`^(?P[0-9]+)(?P[mshd]+)$`) log.Printf("Starting NodeGopher v.%s\n", gVersion) r := gin.Default() initRoutes(r) r.Run(listen) }