Skip to content

Moved jaeger/prometheus check logic into reporters and added iterator… #997

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 13 additions & 29 deletions integration/fabric/iou/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ SPDX-License-Identifier: Apache-2.0
package iou

import (
"context"
"fmt"
"time"

"github.com/hyperledger-labs/fabric-smart-client/integration"
cviews "github.com/hyperledger-labs/fabric-smart-client/integration/fabric/common/views"
"github.com/hyperledger-labs/fabric-smart-client/integration/fabric/iou/views"
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/common"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections/iterators"
model2 "github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger-idl/proto-gen/api_v2"
"github.com/onsi/gomega"
"github.com/prometheus/common/model"
)

var logger = logging.MustGetLogger()
Expand Down Expand Up @@ -90,16 +88,14 @@ func CheckLocalMetrics(ii *integration.Infrastructure, user string, viewName str
}

func CheckJaegerTraces(ii *integration.Infrastructure, nodeName, viewName string, spanMatcher gomega.OmegaMatcher) {
cli, err := ii.NWO.JaegerAPI()
cli, err := ii.NWO.JaegerReporter()
gomega.Expect(err).NotTo(gomega.HaveOccurred())

findTraces, err := cli.FindTraces(context.Background(), &api_v2.FindTracesRequest{Query: &api_v2.TraceQueryParameters{ServiceName: nodeName, OperationName: viewName}})
it, err := cli.FindTraces(nodeName, viewName)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
spans := make([]model2.Span, 0)
for chunk, err := findTraces.Recv(); chunk != nil; chunk, err = findTraces.Recv() {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
spans = append(spans, chunk.Spans...)
}

spans, err := iterators.ReadAllPointers(iterators.FlattenValues(it, func(c *api_v2.SpansResponseChunk) ([]model2.Span, error) { return c.Spans, nil }))

gomega.Expect(err).NotTo(gomega.HaveOccurred())
logger.Infof("Received jaeger %d spans for [%s:%s]: %s", len(spans), nodeName, viewName, spans)

Expand All @@ -108,30 +104,18 @@ func CheckJaegerTraces(ii *integration.Infrastructure, nodeName, viewName string
return
}

services, err := cli.GetServices(context.Background(), &api_v2.GetServicesRequest{})
services, err := cli.GetServices()
gomega.Expect(err).NotTo(gomega.HaveOccurred())
operations, err := cli.GetOperations(context.Background(), &api_v2.GetOperationsRequest{})
operations, err := cli.GetOperations("")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
logger.Infof("No spans found. %d operations found in %d services: [%v] [%v]", len(operations.GetOperations()), len(services.GetServices()), services.GetServices(), operations.GetOperationNames())
logger.Infof("No spans found. %d operations found in %d services: [%v] [%v]", len(operations), len(services), services, operations)
gomega.Expect(spans).To(spanMatcher)
}

func CheckPrometheusMetrics(ii *integration.Infrastructure, viewName string) {
cli, err := ii.NWO.PrometheusAPI()
cli, err := ii.NWO.PrometheusReporter()
gomega.Expect(err).To(gomega.BeNil())
metric := model.Metric{
"__name__": model.LabelValue("fsc_view_operations"),
"view": model.LabelValue(viewName),
}
val, warnings, err := cli.Query(context.Background(), metric.String(), time.Now())
gomega.Expect(warnings).To(gomega.BeEmpty())
gomega.Expect(err).To(gomega.BeNil())
gomega.Expect(val.Type()).To(gomega.Equal(model.ValVector))

logger.Infof("Received prometheus metrics for view [%s]: %s", viewName, val)

vector, ok := val.(model.Vector)
gomega.Expect(ok).To(gomega.BeTrue())
gomega.Expect(vector).To(gomega.HaveLen(1))
gomega.Expect(vector[0].Value).NotTo(gomega.Equal(model.SampleValue(0)))
ops, err := cli.GetViewOperations("", viewName)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
gomega.Expect(ops).ToNot(gomega.BeZero())
}
2 changes: 2 additions & 0 deletions integration/nwo/monitoring/monitoring/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/onsi/gomega"
)

const PrometheusPort = 9090

const (
PrometheusImg = "prom/prometheus:latest"
GrafanaImg = "grafana/grafana:latest"
Expand Down
57 changes: 23 additions & 34 deletions integration/nwo/monitoring/platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0
package monitoring

import (
"fmt"
"path/filepath"
"strings"
"time"
Expand All @@ -18,14 +17,12 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/monitoring/hle"
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/monitoring/monitoring"
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/monitoring/optl"
"github.com/hyperledger-labs/fabric-smart-client/integration/reporting/jaeger"
"github.com/hyperledger-labs/fabric-smart-client/integration/reporting/prometheus"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging"
"github.com/jaegertracing/jaeger-idl/proto-gen/api_v2"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils"
"github.com/onsi/gomega"
prom_api "github.com/prometheus/client_golang/api"
prom_v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/tedsuo/ifrit/grouper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var logger = logging.MustGetLogger()
Expand Down Expand Up @@ -55,34 +52,26 @@ type Extension interface {
}

type Platform struct {
Context api.Context
topology *Topology
RootDir string
Prefix string
Extensions []Extension
networkID string
promAPI prom_v1.API
jaegerAPI api_v2.QueryServiceClient
Context api.Context
topology *Topology
RootDir string
Prefix string
Extensions []Extension
networkID string
prometheusReporter prometheus.Reporter
jaegerReporter jaeger.Reporter
}

func New(reg api.Context, topology *Topology) *Platform {
promClient, err := prom_api.NewClient(prom_api.Config{Address: fmt.Sprintf("http://0.0.0.0:%d", topology.PrometheusPort)})
if err != nil {
panic(err)
}
jaegerClientConn, err := grpc.NewClient(fmt.Sprintf("0.0.0.0:%d", topology.JaegerQueryPort), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
p := &Platform{
Context: reg,
RootDir: reg.RootDir(),
Prefix: topology.Name(),
topology: topology,
Extensions: []Extension{},
networkID: common.UniqueName(),
promAPI: prom_v1.NewAPI(promClient),
jaegerAPI: api_v2.NewQueryServiceClient(jaegerClientConn),
Context: reg,
RootDir: reg.RootDir(),
Prefix: topology.Name(),
topology: topology,
Extensions: []Extension{},
networkID: common.UniqueName(),
prometheusReporter: utils.MustGet(prometheus.NewLocalReporter()),
jaegerReporter: utils.MustGet(jaeger.NewLocalReporter()),
}
p.AddExtension(hle.NewExtension(p))
p.AddExtension(monitoring.NewExtension(p))
Expand Down Expand Up @@ -147,12 +136,12 @@ func (p *Platform) Cleanup() {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

func (p *Platform) PrometheusAPI() prom_v1.API {
return p.promAPI
func (p *Platform) PrometheusReporter() prometheus.Reporter {
return p.prometheusReporter
}

func (p *Platform) JaegerAPI() api_v2.QueryServiceClient {
return p.jaegerAPI
func (p *Platform) JaegerReporter() jaeger.Reporter {
return p.jaegerReporter
}

func (p *Platform) AddExtension(ex Extension) {
Expand Down
7 changes: 5 additions & 2 deletions integration/nwo/monitoring/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ SPDX-License-Identifier: Apache-2.0

package monitoring

import "github.com/hyperledger-labs/fabric-smart-client/integration/nwo/monitoring/optl"
import (
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/monitoring/monitoring"
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/monitoring/optl"
)

type Topology struct {
TopologyName string `yaml:"name,omitempty"`
Expand All @@ -26,7 +29,7 @@ func NewTopology() *Topology {
TopologyName: TopologyName,
TopologyType: TopologyName,
HyperledgerExplorerPort: 8080,
PrometheusPort: 9090,
PrometheusPort: monitoring.PrometheusPort,
GrafanaPort: 3000,
OPTLPort: 4319,
JaegerQueryPort: optl.JaegerQueryPort,
Expand Down
12 changes: 6 additions & 6 deletions integration/nwo/nwo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/common/context"
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/common/runner"
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/monitoring"
"github.com/hyperledger-labs/fabric-smart-client/integration/reporting/jaeger"
"github.com/hyperledger-labs/fabric-smart-client/integration/reporting/prometheus"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging"
"github.com/jaegertracing/jaeger-idl/proto-gen/api_v2"
"github.com/onsi/gomega"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/tedsuo/ifrit"
"github.com/tedsuo/ifrit/grouper"
)
Expand Down Expand Up @@ -244,19 +244,19 @@ func (n *NWO) storePIDs(f *os.File, members grouper.Members) {
}
}

func (n *NWO) PrometheusAPI() (v1.API, error) {
func (n *NWO) PrometheusReporter() (prometheus.Reporter, error) {
for _, platform := range n.Platforms {
if metricsPlatform, ok := platform.(*monitoring.Platform); ok {
return metricsPlatform.PrometheusAPI(), nil
return metricsPlatform.PrometheusReporter(), nil
}
}
return nil, fmt.Errorf("no Prometheus API available on any platform")
}

func (n *NWO) JaegerAPI() (api_v2.QueryServiceClient, error) {
func (n *NWO) JaegerReporter() (jaeger.Reporter, error) {
for _, platform := range n.Platforms {
if metricsPlatform, ok := platform.(*monitoring.Platform); ok {
return metricsPlatform.JaegerAPI(), nil
return metricsPlatform.JaegerReporter(), nil
}
}
return nil, fmt.Errorf("no Jaeger API available on any platform")
Expand Down
80 changes: 80 additions & 0 deletions integration/reporting/jaeger/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package jaeger

import (
"context"
"fmt"

"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/monitoring/optl"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections/iterators"
"github.com/jaegertracing/jaeger-idl/proto-gen/api_v2"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type Reporter interface {
FindTraces(nodeName, operationName string) (iterators.Iterator[*api_v2.SpansResponseChunk], error)
GetServices() ([]string, error)
GetOperations(nodeName string) ([]string, error)
}

func NewLocalReporter() (*reporter, error) {
return NewReporter(fmt.Sprintf("0.0.0.0:%d", optl.JaegerQueryPort))
}

func NewReporter(address string) (*reporter, error) {
conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
return &reporter{ClientConn: conn}, nil
}

type reporter struct {
*grpc.ClientConn
}

func (c *reporter) GetServices() ([]string, error) {
resp, err := c.queryService().GetServices(context.Background(), &api_v2.GetServicesRequest{})
if err != nil {
return nil, err
}
return resp.GetServices(), nil
}

func (c *reporter) GetOperations(nodeName string) ([]string, error) {
req := &api_v2.GetOperationsRequest{}
if len(nodeName) > 0 {
req.Service = nodeName
}
resp, err := c.queryService().GetOperations(context.Background(), req)
if err != nil {
return nil, err
}
return resp.GetOperationNames(), nil
}

func (c *reporter) FindTraces(nodeName, operationName string) (iterators.Iterator[*api_v2.SpansResponseChunk], error) {
if len(nodeName) == 0 {
return nil, errors.New("no node name passed")
}
params := &api_v2.TraceQueryParameters{ServiceName: nodeName, RawTraces: true}
if len(operationName) > 0 {
params.OperationName = operationName
}
findTraces, err := c.queryService().FindTraces(context.Background(), &api_v2.FindTracesRequest{Query: params})
if err != nil {
return nil, err
}
return iterators.Stream[*api_v2.SpansResponseChunk](findTraces), nil
}

func (c *reporter) queryService() api_v2.QueryServiceClient {
return api_v2.NewQueryServiceClient(c.ClientConn)
}
29 changes: 29 additions & 0 deletions integration/reporting/jaeger/predicates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package jaeger

import (
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections/iterators"
"github.com/jaegertracing/jaeger-idl/proto-gen/api_v2"
)

var (
IsTransfer = ContainsSpanWithName("/idap/assets/currencies/transfers")
IsWithdrawal = ContainsSpanWithName("/idap/assets/currencies/withdrawals")
IsTransaction = iterators.Or(IsTransfer, IsWithdrawal)
)

func ContainsSpanWithName(name string) func(t *api_v2.SpansResponseChunk) bool {
return func(t *api_v2.SpansResponseChunk) bool {
for _, s := range t.Spans {
if s.OperationName == name {
return true
}
}
return false
}
}
41 changes: 41 additions & 0 deletions integration/reporting/jaeger/transformers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package jaeger

import (
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections/slices"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
)

func LargestEventGap(t *v1.TracesData) (time.Duration, error) {
events := slices.SortedSlice[uint64]{}
if t == nil {
return 0, nil
}
for _, rs := range t.ResourceSpans {
for _, ss := range rs.ScopeSpans {
for _, s := range ss.Spans {
for _, e := range s.Events {
events.Add(e.TimeUnixNano)
}
}
}
}
if len(events) <= 1 {
return 0, nil
}

maxGap := uint64(0)
for i := range events[1:] {
if gap := events[i+1] - events[i]; gap > maxGap {
maxGap = gap
}
}
return time.Duration(maxGap), nil
}
Loading
Loading