Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,22 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics
package http

import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
)

// Client is an interface for retrieving the metrics from an endpoint URL.
// Client is an interface for retrieving the data from an endpoint URL.
type Client interface {
Get(ctx context.Context, target *url.URL, ep datalayer.Addressable) (PrometheusMetricMap, error)
Get(ctx context.Context, target *url.URL, ep datalayer.Addressable, parser func(io.Reader) (any, error)) (any, error)
}

const (
Expand Down Expand Up @@ -67,14 +65,15 @@ type client struct {
http.Client
}

func (cl *client) Get(ctx context.Context, target *url.URL, ep datalayer.Addressable) (PrometheusMetricMap, error) {
func (cl *client) Get(ctx context.Context, target *url.URL, ep datalayer.Addressable,
parser func(io.Reader) (any, error)) (any, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %v", err)
}
resp, err := defaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", ep.GetNamespacedName(), err)
return nil, fmt.Errorf("failed to fetch data from %s: %w", ep.GetNamespacedName(), err)
}
defer func() {
_ = resp.Body.Close()
Expand All @@ -84,10 +83,5 @@ func (cl *client) Get(ctx context.Context, target *url.URL, ep datalayer.Address
return nil, fmt.Errorf("unexpected status code from %s: %v", ep.GetNamespacedName(), resp.StatusCode)
}

parser := expfmt.NewTextParser(model.LegacyValidation)
metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
return nil, err
}
return metricFamilies, err
return parser(resp.Body)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,61 +14,67 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics
package http

import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net/url"
"reflect"
"sync"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
)

// DataSource is a Model Server Protocol (MSP) compliant metrics data source,
// returning Prometheus formatted metrics for an endpoint.
type DataSource struct {
typedName plugins.TypedName
metricsScheme string // scheme to use in metrics URL
metricsPath string // path to use in metrics URL
// HTTPDataSource is a data source that receives its data using HTTP client.
type HTTPDataSource struct {
typedName plugins.TypedName
scheme string // scheme to use
path string // path to use

client Client // client (e.g. a wrapped http.Client) used to get metrics
client Client // client (e.g. a wrapped http.Client) used to get data
parser func(io.Reader) (any, error)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: I think there are two high level ways to refactor this:

  • keep a shared implementation and configure it from the outside with implementation specific details (the typed-name, parse callback, output type, etc); or
  • create specific instances (e.g., metrics, v1/models) that build a data source from lower level building blocks (e.g., they would have their own TypedName, call client.Get() and then pass the result to an internal parse method, instead of adding parse to the Get parameters).

To clarify, you could still have an HTTPDataSource under pkg/epp/datalayer/http, but add MetricsSource and ModelsSource Go files next to it or in their own directories. By embedding an HTTPDataSource the specific sources could reuse the shared implementation.

Out of curiosuity, any thoughts on preferring one over the other? Either approach works, they can be thought of as composition vs inheritance.
I would lean towards the second approach (e.g., there is no HTTPDataSource registered factory in the system, only ones for metrics and v1/models so the HTTP bits seem like an implemntation detail to me and can be modeled as function "building blocks" and not reuse the full struct, esp. since v1/models would be in a different repo altogether).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the second approach allows less code reuse comparing to the first one

outputType reflect.Type
extractors sync.Map // key: name, value: extractor
}

// NewMetricsDataSource returns a new MSP compliant metrics data source, configured with
// NewHTTPDataSource returns a new data source, configured with
// the provided scheme, path and certificate verification parameters.
func NewMetricsDataSource(metricsScheme string, metricsPath string, skipCertVerification bool) *DataSource {
if metricsScheme == "https" {
func NewHTTPDataSource(scheme string, path string, skipCertVerification bool, pluginType string,
pluginName string, parser func(io.Reader) (any, error), outputType reflect.Type) *HTTPDataSource {
if scheme == "https" {
httpsTransport := baseTransport.Clone()
httpsTransport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: skipCertVerification,
}
defaultClient.Transport = httpsTransport
}

dataSrc := &DataSource{
dataSrc := &HTTPDataSource{
typedName: plugins.TypedName{
Type: MetricsDataSourceType,
Name: MetricsDataSourceType,
Type: pluginType,
Name: pluginName,
},
metricsScheme: metricsScheme,
metricsPath: metricsPath,
client: defaultClient,
scheme: scheme,
path: path,
client: defaultClient,
parser: parser,
outputType: outputType,
}
return dataSrc
}

// TypedName returns the metrics data source type and name.
func (dataSrc *DataSource) TypedName() plugins.TypedName {
// TypedName returns the data source type and name.
func (dataSrc *HTTPDataSource) TypedName() plugins.TypedName {
return dataSrc.typedName
}

// Extractors returns a list of registered Extractor names.
func (dataSrc *DataSource) Extractors() []string {
func (dataSrc *HTTPDataSource) Extractors() []string {
extractors := []string{}
dataSrc.extractors.Range(func(_, val any) bool {
if ex, ok := val.(datalayer.Extractor); ok {
Expand All @@ -80,9 +86,9 @@ func (dataSrc *DataSource) Extractors() []string {
}

// AddExtractor adds an extractor to the data source, validating it can process
// the metrics' data source output type.
func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {
if err := datalayer.ValidateExtractorType(PrometheusMetricType, extractor.ExpectedInputType()); err != nil {
// the data source output type.
func (dataSrc *HTTPDataSource) AddExtractor(extractor datalayer.Extractor) error {
if err := datalayer.ValidateExtractorType(dataSrc.outputType, extractor.ExpectedInputType()); err != nil {
return err
}
if _, loaded := dataSrc.extractors.LoadOrStore(extractor.TypedName().Name, extractor); loaded {
Expand All @@ -92,10 +98,10 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {
}

// Collect is triggered by the data layer framework to fetch potentially new
// MSP metrics data for an endpoint.
func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error {
target := dataSrc.getMetricsEndpoint(ep.GetMetadata())
families, err := dataSrc.client.Get(ctx, target, ep.GetMetadata())
// data for an endpoint.
func (dataSrc *HTTPDataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error {
target := dataSrc.getEndpoint(ep.GetMetadata())
data, err := dataSrc.client.Get(ctx, target, ep.GetMetadata(), dataSrc.parser)

if err != nil {
return err
Expand All @@ -104,7 +110,7 @@ func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) e
var errs []error
dataSrc.extractors.Range(func(_, val any) bool {
if ex, ok := val.(datalayer.Extractor); ok {
if err = ex.Extract(ctx, families, ep); err != nil {
if err = ex.Extract(ctx, data, ep); err != nil {
errs = append(errs, err)
}
}
Expand All @@ -117,10 +123,12 @@ func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) e
return nil
}

func (dataSrc *DataSource) getMetricsEndpoint(ep datalayer.Addressable) *url.URL {
func (dataSrc *HTTPDataSource) getEndpoint(ep datalayer.Addressable) *url.URL {
return &url.URL{
Scheme: dataSrc.metricsScheme,
Scheme: dataSrc.scheme,
Host: ep.GetMetricsHost(),
Path: dataSrc.metricsPath,
Path: dataSrc.path,
}
}

var _ datalayer.DataSource = (*HTTPDataSource)(nil)
4 changes: 3 additions & 1 deletion pkg/epp/datalayer/metrics/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/http"
)

func TestDatasource(t *testing.T) {
source := NewMetricsDataSource("https", "/metrics", true)
source := http.NewHTTPDataSource("https", "/metrics", true, MetricsDataSourceType,
"metrics-data-source", parseMetrics, PrometheusMetricType)
extractor, err := NewModelServerExtractor(defaultTotalQueuedRequestsMetric, "", "", "", "")
assert.Nil(t, err, "failed to create extractor")

Expand Down
13 changes: 11 additions & 2 deletions pkg/epp/datalayer/metrics/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ package metrics
import (
"encoding/json"
"fmt"
"io"
"strconv"

"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
flag "github.com/spf13/pflag"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/http"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
)

Expand Down Expand Up @@ -72,8 +76,8 @@ func MetricsDataSourceFactory(name string, parameters json.RawMessage, handle pl
}
}

ds := NewMetricsDataSource(cfg.Scheme, cfg.Path, cfg.InsecureSkipVerify)
ds.typedName.Name = name
ds := http.NewHTTPDataSource(cfg.Scheme, cfg.Path, cfg.InsecureSkipVerify, MetricsDataSourceType,
name, parseMetrics, PrometheusMetricType)
return ds, nil
}

Expand Down Expand Up @@ -183,3 +187,8 @@ func fromBoolFlag(name string) (bool, error) {
}
return b, nil
}

func parseMetrics(data io.Reader) (any, error) {
parser := expfmt.NewTextParser(model.LegacyValidation)
return parser.TextToMetricFamilies(data)
}