// nodegopher is a Grafana/Prometheus nodeGraph helper // It builds nodegraph structure by merging static data and metrics pulled from prometheus instance // Copyright (c) 2025 yo000 // package main import ( "os" "fmt" "flag" "sync" "time" "regexp" "context" "strconv" "strings" "syscall" "net/http" "os/signal" "gopkg.in/yaml.v2" "github.com/spf13/viper" "github.com/gin-gonic/gin" "github.com/fsnotify/fsnotify" log "github.com/sirupsen/logrus" "golang.org/x/text/language" "golang.org/x/text/message" "github.com/prometheus/common/model" "github.com/prometheus/client_golang/api" v1 "github.com/prometheus/client_golang/api/prometheus/v1" ) const ( gVersion = "0.2.5" // Default datasource timeout is 10 seconds gDefaultDSTimeout = 10 ) type PromDataSourceConfig struct { Name string `yaml:"name"` QType string `yaml:"type"` Address string `yaml:"address"` Query string `yaml:"query"` Timeout int `yaml:"timeout"` } type Graph struct { Name string `yaml:"name"` Nodes []Item Edges []Item } type Item interface { GetId() string GetMainStat() string GetMainStatQuery() string GetMainStatFormat() string SetMainStat(string) GetSecondaryStat() string GetSecondaryStatQuery() string GetSecondaryStatFormat() string SetSecondaryStat(string) GetThicknessQuery() string SetThickness(float64) GetHighlightedQuery() string SetHighlighted(bool) } // Query arguments. Based on grafana internal variables. type QueryArgs struct { From int64 `form:"from" binding:"required"` To int64 `form:"to" binding:"required"` Interval string `form:"interval" binding:"required"` } // Query arguments converted to prometheus_client format type MyRange struct { Start time.Time End time.Time Step time.Duration } var ( gPrinter *message.Printer gDebug bool gGraphs []Graph gDataSources []PromDataSourceConfig gDSVarCompRegex *regexp.Regexp gGrafanaIntervalUnitRegex *regexp.Regexp // manipulating In-Ram configuration Mutex. reloadConfig will lock WriteMutex, consuming queries will lock ReadMutex. gCfgMutex sync.RWMutex ) func (d *PromDataSourceConfig) GetData(timeRange *MyRange) (float64, error) { client, err := api.NewClient(api.Config{ Address: d.Address, }) if err != nil { log.Errorf("DataSourceConfig.GetData: Error creating client: %v\n", err) return 0.0, err } v1api := v1.NewAPI(client) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d.Timeout)*time.Second) defer cancel() var result model.Value var warnings v1.Warnings if d.QType == "query" { result, warnings, err = v1api.Query(ctx, d.Query, time.Now(), v1.WithTimeout(time.Duration(d.Timeout)*time.Second)) } else if d.QType == "query_range" { rng := v1.Range{ Start: timeRange.Start, End: timeRange.End, Step: timeRange.Step, } result, warnings, err = v1api.QueryRange(ctx, d.Query, rng, v1.WithTimeout(time.Duration(d.Timeout)*time.Second)) } if err != nil { log.Errorf("DataSourceConfig.GetData: Error querying Prometheus: %v. Query is: %s\n", err, d.Query) return 0.0, err } if len(warnings) > 0 { log.Warningf("DataSourceConfig.GetData: Warnings: %v\n", warnings) } //log.Debugf("DataSourceConfig.GetData: Result: %v\n", result) switch { case result.Type() == model.ValScalar: log.Errorf("This is a model.ValScalar type, not implemented!\n") //scalarVal := result.(*model.Scalar) // handle scalar stuff case result.Type() == model.ValVector: vectorVal := result.(model.Vector) // FIXME: Is averaging the right thing to do? var total float64 for _, elem := range vectorVal { log.Debugf("DataSourceConfig.GetData: Value: %v\n", elem.Value) total += float64(elem.Value) } log.Debugf("DataSourceConfig.GetData: Total average: %f\n", total/float64(len(vectorVal))) return total/float64(len(vectorVal)), nil // QueryRange return this type case result.Type() == model.ValMatrix: matrixVal := result.(model.Matrix) // FIXME: Is averaging the right thing to do? var total float64 var length int64 for _, val := range matrixVal { for _, v := range val.Values { total += float64(v.Value) } length += int64(len(val.Values)) } log.Debugf("DataSourceConfig.GetData: Total average: %f\n", total/float64(length)) return total/float64(length), nil default: log.Errorf("Prometheus result is an unknown type: %T", result) return 0.0, nil } return 0.0, nil } func (g Graph) BuildMetrics(items *[]Item, timeRange *MyRange) (*[]Item, error) { var err error // Acquire Read Lock. This won't prevent other queries being answered, but reloadConfig WLock will have to wait until we finish gCfgMutex.RLock() defer gCfgMutex.RUnlock() for _, item := range *items { // Handle mainStat if len(item.GetMainStatQuery()) > 0 { log.Debugf("Item %s have mainstatquery: %s\n", item.GetId(), item.GetMainStatQuery()) r := gDSVarCompRegex.FindStringSubmatch(item.GetMainStatQuery()) if len(r) > 1 { var value float64 dsname := strings.TrimSpace(r[1]) log.Debugf("buildMetrics: datasource from mainstatquery : %s\n", dsname) for _, d := range gDataSources { if strings.EqualFold(d.Name, dsname) { value, err = d.GetData(timeRange) if err != nil { return nil, err } break } } format := item.GetMainStatFormat() if len(format) == 0 { format = "%f" } log.Debugf("buildMetrics: Replace %s mainstat with %s\n", item.GetId(), gPrinter.Sprintf(format, value)) item.SetMainStat(gPrinter.Sprintf(format, value)) } else { log.Errorf("buildMetrics: Item %s mainstatquery unparseable: %s\n", item.GetId(), item.GetMainStatQuery()) } } // Handle secondaryStat if len(item.GetSecondaryStatQuery()) > 0 { log.Debugf("Item %s have mainstatquery: %s\n", item.GetId(), item.GetSecondaryStatQuery()) r := gDSVarCompRegex.FindStringSubmatch(item.GetSecondaryStatQuery()) if len(r) > 1 { var value float64 dsname := strings.TrimSpace(r[1]) log.Debugf("buildMetrics: datasource from secondarystatquery : %s\n", dsname) for _, d := range gDataSources { if strings.EqualFold(d.Name, dsname) { value, err = d.GetData(timeRange) if err != nil { return nil, err } break } } format := item.GetSecondaryStatFormat() if len(format) == 0 { format = "%f" } log.Debugf("buildMetrics: Replace %s secondarystat with %s\n", item.GetId(), gPrinter.Sprintf(format, value)) item.SetSecondaryStat(gPrinter.Sprintf(format, value)) } else { log.Errorf("buildMetrics: Item %s secondarystatquery unparseable: %s\n", item.GetId(), item.GetSecondaryStatQuery()) } } // Handle highlighted if len(item.GetHighlightedQuery()) > 0 { log.Debugf("buildMetrics: Item %s have highlightedquery: %s\n", item.GetId(), item.GetHighlightedQuery()) r := gDSVarCompRegex.FindStringSubmatch(item.GetHighlightedQuery()) if len(r) > 1 { var value float64 dsname := strings.TrimSpace(r[1]) log.Debugf("buildMetrics: datasource from highlightedquery : %s\n", dsname) for _, d := range gDataSources { if strings.EqualFold(d.Name, dsname) { value, err = d.GetData(timeRange) if err != nil { return nil, err } break } } highlight := false if value == 0 { highlight = false } else { highlight = true } log.Debugf("buildMetrics: Replace %s highlighted with %t\n", item.GetId(), highlight) item.SetHighlighted(highlight) } else { log.Errorf("buildMetrics: Item %s highlightedquery unparseable: %s\n", item.GetId(), item.GetHighlightedQuery()) } } switch item.(type) { case *Edge: if len(item.GetThicknessQuery()) > 0 { log.Debugf("buildMetrics: Item %s have thicknessquery: %s\n", item.GetId(), item.GetThicknessQuery()) r := gDSVarCompRegex.FindStringSubmatch(item.GetThicknessQuery()) if len(r) > 1 { var value float64 dsname := strings.TrimSpace(r[1]) log.Debugf("buildMetrics: datasource from thicknessquery : %s\n", dsname) for _, d := range gDataSources { if strings.EqualFold(d.Name, dsname) { value, err = d.GetData(timeRange) if err != nil { return nil, err } break } } log.Debugf("buildMetrics: Replace %s thickness with %s\n", item.GetId(), fmt.Sprintf("%0.0f", value)) item.SetThickness(value) } else { log.Errorf("buildMetrics: Item %s thicknessquery unparseable: %s\n", item.GetId(), item.GetThicknessQuery()) } } } } return items, nil } func queryArgsToTimeRange(args QueryArgs) (MyRange, error) { // interval sent by Grafana could end with : ms, s, m, h, d var step time.Duration matches := gGrafanaIntervalUnitRegex.FindStringSubmatch(args.Interval) if len(matches) != 3 { return MyRange{}, fmt.Errorf("Invalid format: interval") } value, err := strconv.ParseInt(matches[gGrafanaIntervalUnitRegex.SubexpIndex("value")], 10, 64) if err != nil { return MyRange{}, fmt.Errorf("Invalid format (can not parse value): interval") } switch matches[gGrafanaIntervalUnitRegex.SubexpIndex("unit")] { case "ms": step = time.Duration(value)*time.Millisecond case "s": step = time.Duration(value)*time.Second case "m": step = time.Duration(value)*time.Minute case "h": step = time.Duration(value)*time.Hour case "d": step = time.Duration(value)*time.Hour*24 default: return MyRange{}, fmt.Errorf("Invalid format (can not parse unit): interval") } return MyRange{Start: time.Unix(args.From/1000, 0), End: time.Unix(args.To/1000, 0), Step: step}, nil } func getGraph(name string) (Graph, error) { // Acquire Read Lock. This won't prevent other queries being answered, but reloadConfig WLock will have to wait until we finish gCfgMutex.RLock() defer gCfgMutex.RUnlock() for _, g := range gGraphs { if strings.EqualFold(g.Name, name) { return g, nil } } return Graph{}, fmt.Errorf("Graph not found: %s", name) } func initRoutes(r *gin.Engine, confFile string) { r.GET("/ping", func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{ "message": "pong", }) }) // An endpoint to force read of configuration file r.POST("/reload", func(c *gin.Context) { if err := reloadConfigFile(confFile); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } c.JSON(http.StatusOK, gin.H{ "message": "configuration successfully reloaded", }) }) // An endpoint to toggle debug mode r.POST("/debug", func(c *gin.Context) { toggleDebug() if gDebug { c.JSON(http.StatusOK, gin.H{ "message": "debug mode enabled", }) } else { c.JSON(http.StatusOK, gin.H{ "message": "debug mode disabled", }) } }) r.GET("/:graph/nodes", func(c *gin.Context) { // Validate presence and type var args QueryArgs if err := c.ShouldBindQuery(&args); err != nil { log.Errorf("Invalid query arguments\n") c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } timeRange, err := queryArgsToTimeRange(args) if err != nil { log.Errorf("Unable to parse query arguments\n") c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } gName := c.Param("graph") graph, err := getGraph(gName) if err != nil { c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) return } nodes, err := graph.BuildMetrics(&graph.Nodes, &timeRange) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) } c.JSON(http.StatusOK, nodes) }) r.GET("/:graph/edges", func(c *gin.Context) { // Validate presence and type var args QueryArgs if err := c.ShouldBindQuery(&args); err != nil { log.Errorf("Invalid query arguments\n") c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } timeRange, err := queryArgsToTimeRange(args) if err != nil { log.Errorf("Unable to parse query arguments\n") c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } gName := c.Param("graph") graph, err := getGraph(gName) if err != nil { c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) return } edges, err := graph.BuildMetrics(&graph.Edges, &timeRange) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) } c.JSON(http.StatusOK, edges) }) } func toggleDebug() { if gDebug { gin.SetMode(gin.ReleaseMode) log.SetLevel(log.InfoLevel) gDebug = false } else { gin.SetMode(gin.DebugMode) log.SetLevel(log.DebugLevel) gDebug = true } } // Deep copy src Node into a new memory space func newNodeClone(src *Node) *Node { return &Node{ Name: src.Name, Id: src.Id, Title: src.Title, Subtitle: src.Subtitle, MainStat: src.MainStat, MainStatQuery: src.MainStatQuery, MainStatFormat: src.MainStatFormat, SecondaryStat: src.SecondaryStat, SecondaryStatQuery: src.SecondaryStatQuery, SecondaryStatFormat: src.SecondaryStatFormat, Color: src.Color, Icon: src.Icon, NodeRadius: src.NodeRadius, Highlighted: src.Highlighted, HighlightedQuery: src.HighlightedQuery, } } // Deep copy src Edge into a new memory space func newEdgeClone(src *Edge) *Edge { return &Edge{ Id: src.Id, Source: src.Source, Target: src.Target, MainStat: src.MainStat, MainStatQuery: src.MainStatQuery, MainStatFormat: src.MainStatFormat, SecondaryStat: src.SecondaryStat, SecondaryStatQuery: src.SecondaryStatQuery, SecondaryStatFormat: src.SecondaryStatFormat, Color: src.Color, Thickness: src.Thickness, ThicknessQuery: src.ThicknessQuery, Highlighted: src.Highlighted, HighlightedQuery: src.HighlightedQuery, StrokeDashArray: src.StrokeDashArray, } } // This function assume we already have a running configuration. func reloadConfigFile(confFile string) error { oldConfigRestored := false // We need to keep this config, incase the new one is b0rken fname := fmt.Sprintf("/tmp/nodegopher.%d.yaml", os.Getpid()) if err := viper.WriteConfigAs(fname); err != nil { log.Errorf("Unable to save current running config to %s, wont reload configuration.\n", fname) return fmt.Errorf("Unable to save current configuration, configuration not reloaded. See logs.") } defer os.Remove(fname) // Reread config file if oldErr := viper.ReadInConfig(); oldErr != nil { if _, ok := oldErr.(viper.ConfigFileNotFoundError); ok { log.Errorf("config file not found") } else { log.Errorf("unknown error looking for config file: %v", oldErr) } // Restore old configuration and notify. log.Debugf("Fallback on previous configuration.\n") viper.SetConfigFile(fname) if err := viper.ReadInConfig(); err != nil { log.Fatalf("Unable to restore configuration, and new is invalid. fix it now.\n") } viper.SetConfigFile(confFile) oldConfigRestored = true } switch viper.Get("language").(string) { case "english": gPrinter = message.NewPrinter(language.English) case "french": gPrinter = message.NewPrinter(language.French) case "german": gPrinter = message.NewPrinter(language.German) case "ukrainian": gPrinter = message.NewPrinter(language.Ukrainian) case "arabic": gPrinter = message.NewPrinter(language.Arabic) case "chinese": gPrinter = message.NewPrinter(language.Chinese) default: log.Errorf("Language not implemented: %s. Fallback to english\n", viper.Get("language").(string)) gPrinter = message.NewPrinter(language.English) } // then clear current config, after acquiring WriteLock gCfgMutex.Lock() defer gCfgMutex.Unlock() // We need to keep this config, incase the new one is b0rken for _, g := range gGraphs { g.Nodes = nil g.Edges = nil } gGraphs = nil gDataSources = nil // Finally unmarshal graphs, nodes, edges and datasources gps := viper.Get("graphs").([]interface{}) for _, g := range gps { yd, _ := yaml.Marshal(g) // Unmarshal on anonymous structs with []Edge and []Node well defined so unmarshaler know how to handle // then we convert to []Items tmp := struct { Name string `yaml:"name"` Nodes []Node `yaml:"nodes"` Edges []Edge `yaml:"edges"` }{} yaml.Unmarshal(yd, &tmp) graph := Graph{ Name: tmp.Name, } for _, n := range tmp.Nodes { // Deep copy Node so garbage collecting tmp won't pull the carpet under our feet graph.Nodes = append(graph.Nodes, newNodeClone(&n)) } for _, e := range tmp.Edges { // Deep copy Edge graph.Edges = append(graph.Edges, newEdgeClone(&e)) } gGraphs = append(gGraphs, graph) } if viper.Get("datasources") == nil { log.Warningf("no datasources found, data will be static") return nil } dss := viper.Get("datasources").([]interface{}) for _, d := range dss { yd, _ := yaml.Marshal(d) var ds PromDataSourceConfig yaml.Unmarshal(yd, &ds) // Set default Values if ds.Timeout == 0 { ds.Timeout = gDefaultDSTimeout } gDataSources = append(gDataSources, ds) } if oldConfigRestored { return fmt.Errorf("Unable to load new configuration, keeping old one. See logs.") } return nil } func main() { var listen string var confFile string flag.StringVar(&confFile, "config", "", "Path to the config file") flag.StringVar(&listen, "listen-addr", "0.0.0.0:8080", "listen address for server") flag.BoolVar(&gDebug, "debug", false, "Set log level to debug") flag.Parse() if len(confFile) == 0 { log.Fatalf("config is mandatory") os.Exit(1) } viper.SetConfigFile(confFile) if err := viper.ReadInConfig(); err != nil { if _, ok := err.(viper.ConfigFileNotFoundError); ok { log.Fatalf("config file not found") os.Exit(1) } else { log.Fatalf("unknown error looking for config file: %v", err) os.Exit(1) } } if false == gDebug { b := viper.GetBool("debug") if b { gDebug = b } } if gDebug { gin.SetMode(gin.DebugMode) log.SetLevel(log.DebugLevel) } else { log.SetLevel(log.InfoLevel) gin.SetMode(gin.ReleaseMode) } if strings.EqualFold(listen, "0.0.0.0:8080") && len(confFile) > 0 { l := viper.GetString("listen") if len(l) > 0 { listen = l } } // FIXME: Watch config changes. Does not work on FreeBSD. TODO: Test with linux viper.OnConfigChange(func(e fsnotify.Event) { log.Printf("Config file changed, reloading data\n") reloadConfigFile(confFile) }) // Lets reload config on SIGHUP sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGHUP) go func() { for { _ = <- sigs log.Infof("SIGHUP received, reloading configuration\n") reloadConfigFile(confFile) } }() reloadConfigFile(confFile) // Capture variable name. There should be only one variable. Space is tolerated before and after name. gDSVarCompRegex = regexp.MustCompile(`^\{\{(?:\ )?([a-zA-Z0-9\-_]+)(?:\ )?\}\}$`) // Grafana interval parser gGrafanaIntervalUnitRegex = regexp.MustCompile(`^(?P[0-9]+)(?P[mshd]+)$`) log.Printf("Starting NodeGopher v.%s\n", gVersion) r := gin.Default() initRoutes(r, confFile) r.Run(listen) }