506 lines
14 KiB
Go
506 lines
14 KiB
Go
// nodegopher is a Grafana/Prometheus nodeGraph helper
|
|
// It builds nodegraph structure by merging static data and metrics pulled from prometheus instance
|
|
// Copyright (c) 2025 yo000 <johan@nosd.in>
|
|
//
|
|
|
|
package main
|
|
|
|
import (
|
|
"os"
|
|
"fmt"
|
|
"flag"
|
|
"sync"
|
|
"time"
|
|
"regexp"
|
|
"context"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"net/http"
|
|
"os/signal"
|
|
|
|
"gopkg.in/yaml.v2"
|
|
"github.com/spf13/viper"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/fsnotify/fsnotify"
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/client_golang/api"
|
|
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
|
)
|
|
|
|
const (
|
|
gVersion = "0.2.1"
|
|
)
|
|
|
|
type PromDataSourceConfig struct {
|
|
Name string `yaml:"name"`
|
|
QType string `yaml:"type"`
|
|
Address string `yaml:"address"`
|
|
Query string `yaml:"query"`
|
|
Timeout int `yaml:"timeout"`
|
|
}
|
|
|
|
type Graph struct {
|
|
Name string `yaml:"name"`
|
|
Nodes []Item
|
|
Edges []Item
|
|
}
|
|
|
|
type Item interface {
|
|
GetId() string
|
|
GetMainStat() string
|
|
GetMainStatQuery() string
|
|
GetMainStatFormat() string
|
|
SetMainStat(string)
|
|
GetSecondaryStat() string
|
|
GetSecondaryStatQuery() string
|
|
GetSecondaryStatFormat() string
|
|
SetSecondaryStat(string)
|
|
}
|
|
|
|
// Query arguments. Based on grafana internal variables.
|
|
type QueryArgs struct {
|
|
From int64 `form:"from" binding:"required"`
|
|
To int64 `form:"to" binding:"required"`
|
|
Interval string `form:"interval" binding:"required"`
|
|
}
|
|
|
|
// Query arguments converted to prometheus_client format
|
|
type MyRange struct {
|
|
Start time.Time
|
|
End time.Time
|
|
Step time.Duration
|
|
}
|
|
|
|
var (
|
|
gDebug bool
|
|
gGraphs []Graph
|
|
gDataSources []PromDataSourceConfig
|
|
|
|
gDSVarCompRegex *regexp.Regexp
|
|
gGrafanaIntervalUnitRegex *regexp.Regexp
|
|
|
|
// manipulating In-Ram configuration Mutex. reloadConfig will lock WriteMutex, consuming queries will lock ReadMutex.
|
|
gCfgMutex sync.RWMutex
|
|
)
|
|
|
|
|
|
func (d *PromDataSourceConfig) GetData(timeRange *MyRange) (float64, error) {
|
|
client, err := api.NewClient(api.Config{
|
|
Address: d.Address,
|
|
})
|
|
if err != nil {
|
|
log.Errorf("DataSourceConfig.GetData: Error creating client: %v\n", err)
|
|
return 0.0, err
|
|
}
|
|
|
|
v1api := v1.NewAPI(client)
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d.Timeout)*time.Second)
|
|
defer cancel()
|
|
var result model.Value
|
|
var warnings v1.Warnings
|
|
if d.QType == "query" {
|
|
result, warnings, err = v1api.Query(ctx, d.Query, time.Now(), v1.WithTimeout(time.Duration(d.Timeout)*time.Second))
|
|
} else if d.QType == "query_range" {
|
|
rng := v1.Range{
|
|
Start: timeRange.Start,
|
|
End: timeRange.End,
|
|
Step: timeRange.Step,
|
|
}
|
|
result, warnings, err = v1api.QueryRange(ctx, d.Query, rng, v1.WithTimeout(time.Duration(d.Timeout)*time.Second))
|
|
}
|
|
if err != nil {
|
|
log.Errorf("DataSourceConfig.GetData: Error querying Prometheus: %v\n", err)
|
|
return 0.0, err
|
|
}
|
|
if len(warnings) > 0 {
|
|
log.Warningf("DataSourceConfig.GetData: Warnings: %v\n", warnings)
|
|
}
|
|
log.Debugf("DataSourceConfig.GetData: Result: %v\n", result)
|
|
|
|
switch {
|
|
case result.Type() == model.ValScalar:
|
|
log.Errorf("This is a model.ValScalar type, not implemented!\n")
|
|
//scalarVal := result.(*model.Scalar)
|
|
// handle scalar stuff
|
|
case result.Type() == model.ValVector:
|
|
vectorVal := result.(model.Vector)
|
|
// FIXME: Is averaging the right thing to do?
|
|
var total float64
|
|
for _, elem := range vectorVal {
|
|
log.Debugf("DataSourceConfig.GetData: Value: %v\n", elem.Value)
|
|
total += float64(elem.Value)
|
|
}
|
|
log.Debugf("DataSourceConfig.GetData: Total average: %f\n", total/float64(len(vectorVal)))
|
|
return total/float64(len(vectorVal)), nil
|
|
// QueryRange return this type
|
|
case result.Type() == model.ValMatrix:
|
|
matrixVal := result.(model.Matrix)
|
|
// FIXME: Is averaging the right thing to do?
|
|
var total float64
|
|
var length int64
|
|
for _, val := range matrixVal {
|
|
for _, v := range val.Values {
|
|
total += float64(v.Value)
|
|
}
|
|
length += int64(len(val.Values))
|
|
}
|
|
log.Debugf("DataSourceConfig.GetData: Total average: %f\n", total/float64(length))
|
|
return total/float64(length), nil
|
|
default:
|
|
log.Errorf("Prometheus result is an unknown type: %T", result)
|
|
return 0.0, nil
|
|
}
|
|
|
|
return 0.0, nil
|
|
}
|
|
|
|
func (g Graph) BuildMetrics(items *[]Item, timeRange *MyRange) (*[]Item, error) {
|
|
var err error
|
|
|
|
// Acquire Read Lock. This won't prevent other queries being answered, but reloadConfig WLock will have to wait until we finish
|
|
gCfgMutex.RLock()
|
|
defer gCfgMutex.RUnlock()
|
|
|
|
for _, item := range *items {
|
|
if len(item.GetMainStatQuery()) > 0 {
|
|
log.Debugf("Item %s have mainstatquery: %s\n", item.GetId(), item.GetMainStatQuery())
|
|
r := gDSVarCompRegex.FindStringSubmatch(item.GetMainStatQuery())
|
|
if len(r) > 1 {
|
|
var value float64
|
|
dsname := strings.TrimSpace(r[1])
|
|
log.Debugf("buildMetrics: datasource from mainstatquery : %s\n", dsname)
|
|
for _, d := range gDataSources {
|
|
if strings.EqualFold(d.Name, dsname) {
|
|
value, err = d.GetData(timeRange)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
break
|
|
}
|
|
}
|
|
format := item.GetMainStatFormat()
|
|
if len(format) == 0 {
|
|
format = "%f"
|
|
}
|
|
log.Debugf("buildMetrics: Replace %s mainstat with %s\n", item.GetId(), fmt.Sprintf(format, value))
|
|
item.SetMainStat(fmt.Sprintf(format, value))
|
|
} else {
|
|
log.Errorf("buildMetrics: Item %s mainstatquery unparseable: %s\n", item.GetId(), item.GetMainStatQuery())
|
|
}
|
|
}
|
|
if len(item.GetSecondaryStatQuery()) > 0 {
|
|
log.Debugf("Item %s have mainstatquery: %s\n", item.GetId(), item.GetSecondaryStatQuery())
|
|
r := gDSVarCompRegex.FindStringSubmatch(item.GetSecondaryStatQuery())
|
|
if len(r) > 1 {
|
|
var value float64
|
|
dsname := strings.TrimSpace(r[1])
|
|
log.Debugf("buildMetrics: datasource from secondarystatquery : %s\n", dsname)
|
|
for _, d := range gDataSources {
|
|
if strings.EqualFold(d.Name, dsname) {
|
|
value, err = d.GetData(timeRange)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
break
|
|
}
|
|
}
|
|
format := item.GetSecondaryStatFormat()
|
|
if len(format) == 0 {
|
|
format = "%f"
|
|
}
|
|
log.Debugf("buildMetrics: Replace %s secondarystat with %s\n", item.GetId(), fmt.Sprintf(format, value))
|
|
item.SetSecondaryStat(fmt.Sprintf(format, value))
|
|
} else {
|
|
log.Errorf("buildMetrics: Item %s secondarystatquery unparseable: %s\n", item.GetId(), item.GetSecondaryStatQuery())
|
|
}
|
|
}
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func queryArgsToTimeRange(args QueryArgs) (MyRange, error) {
|
|
// interval sent by Grafana could end with : ms, s, m, h, d
|
|
var step time.Duration
|
|
matches := gGrafanaIntervalUnitRegex.FindStringSubmatch(args.Interval)
|
|
if len(matches) != 3 {
|
|
return MyRange{}, fmt.Errorf("Invalid format: interval")
|
|
}
|
|
value, err := strconv.ParseInt(matches[gGrafanaIntervalUnitRegex.SubexpIndex("value")], 10, 64)
|
|
if err != nil {
|
|
return MyRange{}, fmt.Errorf("Invalid format (can not parse value): interval")
|
|
}
|
|
switch matches[gGrafanaIntervalUnitRegex.SubexpIndex("unit")] {
|
|
case "ms":
|
|
step = time.Duration(value)*time.Millisecond
|
|
case "s":
|
|
step = time.Duration(value)*time.Second
|
|
case "m":
|
|
step = time.Duration(value)*time.Minute
|
|
case "h":
|
|
step = time.Duration(value)*time.Hour
|
|
case "d":
|
|
step = time.Duration(value)*time.Hour*24
|
|
default:
|
|
return MyRange{}, fmt.Errorf("Invalid format (can not parse unit): interval")
|
|
}
|
|
|
|
return MyRange{Start: time.Unix(args.From/1000, 0), End: time.Unix(args.To/1000, 0), Step: step}, nil
|
|
}
|
|
|
|
func getGraph(name string) (Graph, error) {
|
|
// Acquire Read Lock. This won't prevent other queries being answered, but reloadConfig WLock will have to wait until we finish
|
|
gCfgMutex.RLock()
|
|
defer gCfgMutex.RUnlock()
|
|
|
|
for _, g := range gGraphs {
|
|
if strings.EqualFold(g.Name, name) {
|
|
return g, nil
|
|
}
|
|
}
|
|
return Graph{}, fmt.Errorf("Graph not found: %s", name)
|
|
}
|
|
|
|
func initRoutes(r *gin.Engine) {
|
|
r.GET("/ping", func(c *gin.Context) {
|
|
c.JSON(http.StatusOK, gin.H{
|
|
"message": "pong",
|
|
})
|
|
})
|
|
|
|
// An endpoint to force read of configuration file
|
|
r.POST("/reload", func(c *gin.Context) {
|
|
reloadConfigFile()
|
|
c.JSON(http.StatusOK, gin.H{
|
|
"message": "configuration successfully reloaded",
|
|
})
|
|
})
|
|
|
|
// An endpoint to toggle debug mode
|
|
r.POST("/debug", func(c *gin.Context) {
|
|
toggleDebug()
|
|
if gDebug {
|
|
c.JSON(http.StatusOK, gin.H{
|
|
"message": "debug mode enabled",
|
|
})
|
|
} else {
|
|
c.JSON(http.StatusOK, gin.H{
|
|
"message": "debug mode disabled",
|
|
})
|
|
}
|
|
})
|
|
|
|
r.GET("/:graph/nodes", func(c *gin.Context) {
|
|
// Validate presence and type
|
|
var args QueryArgs
|
|
if err := c.ShouldBindQuery(&args); err != nil {
|
|
log.Errorf("Invalid query arguments\n")
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
timeRange, err := queryArgsToTimeRange(args)
|
|
if err != nil {
|
|
log.Errorf("Unable to parse query arguments\n")
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
gName := c.Param("graph")
|
|
graph, err := getGraph(gName)
|
|
if err != nil {
|
|
c.JSON(http.StatusNotFound, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
nodes, err := graph.BuildMetrics(&graph.Nodes, &timeRange)
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
}
|
|
c.JSON(http.StatusOK, nodes)
|
|
})
|
|
|
|
r.GET("/:graph/edges", func(c *gin.Context) {
|
|
// Validate presence and type
|
|
var args QueryArgs
|
|
if err := c.ShouldBindQuery(&args); err != nil {
|
|
log.Errorf("Invalid query arguments\n")
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
timeRange, err := queryArgsToTimeRange(args)
|
|
if err != nil {
|
|
log.Errorf("Unable to parse query arguments\n")
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
gName := c.Param("graph")
|
|
graph, err := getGraph(gName)
|
|
if err != nil {
|
|
c.JSON(http.StatusNotFound, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
edges, err := graph.BuildMetrics(&graph.Edges, &timeRange)
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
}
|
|
c.JSON(http.StatusOK, edges)
|
|
})
|
|
}
|
|
|
|
func toggleDebug() {
|
|
if gDebug {
|
|
gin.SetMode(gin.ReleaseMode)
|
|
log.SetLevel(log.InfoLevel)
|
|
gDebug = false
|
|
} else {
|
|
gin.SetMode(gin.DebugMode)
|
|
log.SetLevel(log.DebugLevel)
|
|
gDebug = true
|
|
}
|
|
}
|
|
|
|
func reloadConfigFile() {
|
|
// First reread config file
|
|
if err := viper.ReadInConfig(); err != nil {
|
|
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
|
|
log.Fatalf("config file not found")
|
|
os.Exit(1)
|
|
} else {
|
|
log.Fatalf("unknown error looking for config file: %v", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
// then clear current config, after acquiring WriteLock
|
|
gCfgMutex.Lock()
|
|
defer gCfgMutex.Unlock()
|
|
|
|
for _, g := range gGraphs {
|
|
g.Nodes = nil
|
|
g.Edges = nil
|
|
}
|
|
gGraphs = nil
|
|
gDataSources = nil
|
|
|
|
// Finally unmarshal graphs, nodes, edges and datasources
|
|
gps := viper.Get("graphs").([]interface{})
|
|
for _, g := range gps {
|
|
yd, _ := yaml.Marshal(g)
|
|
// Unmarshal on anonymous structs with []Edge and []Node well defined so unmarshaler know how to handle
|
|
// then we convert to []Items
|
|
tmp := struct {
|
|
Name string `yaml:"name"`
|
|
Nodes []Node `yaml:"nodes"`
|
|
Edges []Edge `yaml:"edges"`
|
|
}{}
|
|
yaml.Unmarshal(yd, &tmp)
|
|
var graphNodes []Item
|
|
var graphEdges []Item
|
|
for _, n := range tmp.Nodes {
|
|
graphNodes = append(graphNodes, &n)
|
|
}
|
|
for _, e := range tmp.Edges {
|
|
graphEdges = append(graphEdges, &e)
|
|
}
|
|
|
|
graph := Graph{
|
|
Name: tmp.Name,
|
|
Nodes: graphNodes,
|
|
Edges: graphEdges,
|
|
}
|
|
gGraphs = append(gGraphs, graph)
|
|
}
|
|
|
|
if viper.Get("datasources") == nil {
|
|
log.Printf("no datasources found, data will be static")
|
|
return
|
|
}
|
|
dss := viper.Get("datasources").([]interface{})
|
|
for _, d := range dss {
|
|
yd, _ := yaml.Marshal(d)
|
|
var ds PromDataSourceConfig
|
|
yaml.Unmarshal(yd, &ds)
|
|
gDataSources = append(gDataSources, ds)
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
var listen string
|
|
var confFile string
|
|
|
|
flag.StringVar(&confFile, "config", "", "Path to the config file")
|
|
flag.StringVar(&listen, "listen-addr", "0.0.0.0:8080", "listen address for server")
|
|
flag.BoolVar(&gDebug, "debug", false, "Set log level to debug")
|
|
flag.Parse()
|
|
|
|
if len(confFile) == 0 {
|
|
log.Fatalf("config is mandatory")
|
|
os.Exit(1)
|
|
}
|
|
|
|
viper.SetConfigFile(confFile)
|
|
if err := viper.ReadInConfig(); err != nil {
|
|
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
|
|
log.Fatalf("config file not found")
|
|
os.Exit(1)
|
|
} else {
|
|
log.Fatalf("unknown error looking for config file: %v", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
if false == gDebug {
|
|
b := viper.GetBool("debug")
|
|
if b {
|
|
gDebug = b
|
|
}
|
|
}
|
|
|
|
if gDebug {
|
|
gin.SetMode(gin.DebugMode)
|
|
log.SetLevel(log.DebugLevel)
|
|
} else {
|
|
log.SetLevel(log.InfoLevel)
|
|
gin.SetMode(gin.ReleaseMode)
|
|
}
|
|
|
|
if strings.EqualFold(listen, "0.0.0.0:8080") && len(confFile) > 0 {
|
|
l := viper.GetString("listen")
|
|
if len(l) > 0 {
|
|
listen = l
|
|
}
|
|
}
|
|
|
|
// FIXME: Watch config changes. Does not work on FreeBSD. TODO: Test with linux
|
|
viper.OnConfigChange(func(e fsnotify.Event) {
|
|
log.Printf("Config file changed, reloading data\n")
|
|
reloadConfigFile()
|
|
})
|
|
|
|
// Lets reload config on SIGHUP
|
|
sigs := make(chan os.Signal, 1)
|
|
signal.Notify(sigs, syscall.SIGHUP)
|
|
go func() {
|
|
for {
|
|
_ = <- sigs
|
|
log.Infof("SIGHUP received, reloading configuration\n")
|
|
reloadConfigFile()
|
|
}
|
|
}()
|
|
|
|
reloadConfigFile()
|
|
|
|
// Capture variable name. There should be only one variable. Space is tolerated before and after name.
|
|
gDSVarCompRegex = regexp.MustCompile(`^\{\{(?:\ )?([a-zA-Z0-9\-_]+)(?:\ )?\}\}$`)
|
|
|
|
// Grafana interval parser
|
|
gGrafanaIntervalUnitRegex = regexp.MustCompile(`^(?P<value>[0-9]+)(?P<unit>[mshd]+)$`)
|
|
|
|
log.Printf("Starting NodeGopher v.%s\n", gVersion)
|
|
|
|
r := gin.Default()
|
|
initRoutes(r)
|
|
r.Run(listen)
|
|
}
|