Telemetry
For share telemetry we need to use opentelemetry
. It's a standard for telemetry data collection and export.
Never use other libraries directly in code!
go get github.com/worldline-go/tell
Check example in here https://github.com/worldline-go/telemetry_example
Installation
I created a opentelemetry library for helper telemetry initialization so I will continue to use it.
import (
"github.com/worldline-go/tell"
)
type Config struct {
// ...
Telemetry tell.Config
}
Inside in the main function after load the config, we need to initialize the tell library to make connection with otel-collector.
collector, err := tell.New(ctx, cfg.Telemetry)
if err != nil {
return fmt.Errorf("failed to init telemetry; %w", err)
}
// flush metrics on failure
defer collector.Shutdown()
You can now use any of telemetry library or own telemetry data.
Metric
For metrics you can create one and add lots of attributes to it but attributes cannot be dynamic! Dynamic attributes killing memory in otel-collector and hard to investigate them.
Custom
package telemetry
import (
"fmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
var (
GlobalAttr []attribute.KeyValue
GlobalMeter *Meter
)
type Meter struct {
Error metric.Int64Counter
Processed metric.Int64Counter
Rules metric.Int64Counter
}
func AddGlobalAttr(v ...attribute.KeyValue) {
GlobalAttr = append(GlobalAttr, v...)
}
func ExtendGlobalAttr(v ...attribute.KeyValue) []attribute.KeyValue {
return append(GlobalAttr, v...)
}
func SetGlobalMeter() error {
mp := otel.GetMeterProvider()
m := &Meter{}
var err error
meter := mp.Meter("")
//nolint:lll // description
m.Processed, err = meter.Int64Counter("transaction_processed_total", metric.WithDescription("number of successfully validated count"))
if err != nil {
return fmt.Errorf("failed to initialize transaction_processed_total; %w", err)
}
m.Error, err = meter.Int64Counter("transaction_error_total", metric.WithDescription("number of error on validation count"))
if err != nil {
return fmt.Errorf("failed to initialize transaction_error_total; %w", err)
}
m.Rules, err = meter.Int64Counter("transaction_rules_total", metric.WithDescription("number of used rule on validation count"))
if err != nil {
return fmt.Errorf("failed to initialize transaction_error_total; %w", err)
}
GlobalMeter = m
return nil
}
//nolint:gochecknoinits // set noop
func init() {
_ = SetGlobalMeter()
}
Echo
// import "github.com/worldline-go/tell/metric/metricecho"
// add echo metrics
e.Use(metricecho.HTTPMetrics())
Trace
Span
Span creating a trace for a specific operation like for function.
Please check the https://opentelemetry.io/docs/specs/semconv/ for adding attributes for general thing.
Echo
go get go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho
Add echo's middleware to the service.
// add echo otel tracing
e.Use(otelecho.Middleware(config.ServiceName))
Http Request
So we get ctx from request's context but if we use that directly there cloud be timeout issue, so we need to create a new context with timeout.
ctx := context.WithoutCancel(ctx)
And use that one to create new span to measure http time but don't forget to add span kind as client.
This is important for generating service-graph!
ctx, spanCall := tracer.Start(ctx, "get-transaction", trace.WithSpanKind(trace.SpanKindClient))
defer spanCall.End()
// add context propagation or use klient's inject option to do it automatically
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(request.Header))
Inject to http header of traceID can done with klient's option.
klient.WithInject(func(ctx context.Context, req *http.Request) {
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))
})
Database
ctx, span := otel.Tracer("").Start(ctx,
"add_product",
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attribute.String("db.name", "postgres")),
)
defer span.End()
Kafka
wkafka uses franz-go library and it is help propogation.
go get github.com/twmb/franz-go/plugin/kotel
Use that with initializing the kafka client.
kafkaTracer = kotel.NewTracer()
kafkaClient, err = wkafka.New(ctx,
config.Application.KafkaConfig,
wkafka.WithConsumer(config.Application.KafkaConsumer),
wkafka.WithClientInfo(config.ServiceName, config.ServiceVersion),
wkafka.WithKGOOptions(kgo.WithHooks(kotel.NewKotel(kotel.WithTracer(kafkaTracer)).Hooks()...)),
)
Produce message
Important to have span kind as producer.
ctx, spanKafka := otel.Tracer("").Start(ctx, "produce_message", trace.WithSpanKind(trace.SpanKindProducer))
defer spanKafka.End()
if err := h.KafkaProducer.Produce(ctx, product); err != nil {
spanKafka.SetStatus(codes.Error, err.Error())
return c.JSON(http.StatusBadRequest, model.Message{
Message: err.Error(),
})
}
Consume message
k.Tracer
is we initialized on kafka client (kotel.NewTracer()).
func (k *Kafka) Consume(ctx context.Context, product model.Product) error {
// use tracer's returned ctx for next spans
_, span := k.Tracer.WithProcessSpan(wkafka.CtxRecord(ctx))
defer span.End()
span.SetAttributes(attribute.String("product.name", product.Name))
log.Info().Str("product", product.Name).Str("description", product.Description).Msg("consume message")
return nil
}
If using on batch consumer then use wkafka.CtxRecordWithIndex function to get record from the batch.
_, span := s.tracer.WithProcessSpan(wkafka.CtxRecordWithIndex(ctx, 123))
defer span.End()