Skip to content

Commit c49f264

Browse files
authored
Datalayer refactoring: HTTP datasource and client (#2120)
* HTTP datasource and client Signed-off-by: irar2 <irar@il.ibm.com> * Typo fix Signed-off-by: irar2 <irar@il.ibm.com> * Changed copyright year Signed-off-by: irar2 <irar@il.ibm.com> * Renamed package to http Signed-off-by: irar2 <irar@il.ibm.com> --------- Signed-off-by: irar2 <irar@il.ibm.com>
1 parent 9db1e7e commit c49f264

File tree

4 files changed

+61
-48
lines changed

4 files changed

+61
-48
lines changed
Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,22 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package metrics
17+
package http
1818

1919
import (
2020
"context"
2121
"fmt"
22+
"io"
2223
"net/http"
2324
"net/url"
2425
"time"
2526

26-
"github.com/prometheus/common/expfmt"
27-
"github.com/prometheus/common/model"
28-
2927
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
3028
)
3129

32-
// Client is an interface for retrieving the metrics from an endpoint URL.
30+
// Client is an interface for retrieving the data from an endpoint URL.
3331
type Client interface {
34-
Get(ctx context.Context, target *url.URL, ep datalayer.Addressable) (PrometheusMetricMap, error)
32+
Get(ctx context.Context, target *url.URL, ep datalayer.Addressable, parser func(io.Reader) (any, error)) (any, error)
3533
}
3634

3735
const (
@@ -67,14 +65,15 @@ type client struct {
6765
http.Client
6866
}
6967

70-
func (cl *client) Get(ctx context.Context, target *url.URL, ep datalayer.Addressable) (PrometheusMetricMap, error) {
68+
func (cl *client) Get(ctx context.Context, target *url.URL, ep datalayer.Addressable,
69+
parser func(io.Reader) (any, error)) (any, error) {
7170
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil)
7271
if err != nil {
7372
return nil, fmt.Errorf("failed to create request: %v", err)
7473
}
7574
resp, err := defaultClient.Do(req)
7675
if err != nil {
77-
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", ep.GetNamespacedName(), err)
76+
return nil, fmt.Errorf("failed to fetch data from %s: %w", ep.GetNamespacedName(), err)
7877
}
7978
defer func() {
8079
_ = resp.Body.Close()
@@ -84,10 +83,5 @@ func (cl *client) Get(ctx context.Context, target *url.URL, ep datalayer.Address
8483
return nil, fmt.Errorf("unexpected status code from %s: %v", ep.GetNamespacedName(), resp.StatusCode)
8584
}
8685

87-
parser := expfmt.NewTextParser(model.LegacyValidation)
88-
metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
89-
if err != nil {
90-
return nil, err
91-
}
92-
return metricFamilies, err
86+
return parser(resp.Body)
9387
}
Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,61 +14,67 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package metrics
17+
package http
1818

1919
import (
2020
"context"
2121
"crypto/tls"
2222
"errors"
2323
"fmt"
24+
"io"
2425
"net/url"
26+
"reflect"
2527
"sync"
2628

2729
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
2830
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2931
)
3032

31-
// DataSource is a Model Server Protocol (MSP) compliant metrics data source,
32-
// returning Prometheus formatted metrics for an endpoint.
33-
type DataSource struct {
34-
typedName plugins.TypedName
35-
metricsScheme string // scheme to use in metrics URL
36-
metricsPath string // path to use in metrics URL
33+
// HTTPDataSource is a data source that receives its data using HTTP client.
34+
type HTTPDataSource struct {
35+
typedName plugins.TypedName
36+
scheme string // scheme to use
37+
path string // path to use
3738

38-
client Client // client (e.g. a wrapped http.Client) used to get metrics
39+
client Client // client (e.g. a wrapped http.Client) used to get data
40+
parser func(io.Reader) (any, error)
41+
outputType reflect.Type
3942
extractors sync.Map // key: name, value: extractor
4043
}
4144

42-
// NewMetricsDataSource returns a new MSP compliant metrics data source, configured with
45+
// NewHTTPDataSource returns a new data source, configured with
4346
// the provided scheme, path and certificate verification parameters.
44-
func NewMetricsDataSource(metricsScheme string, metricsPath string, skipCertVerification bool) *DataSource {
45-
if metricsScheme == "https" {
47+
func NewHTTPDataSource(scheme string, path string, skipCertVerification bool, pluginType string,
48+
pluginName string, parser func(io.Reader) (any, error), outputType reflect.Type) *HTTPDataSource {
49+
if scheme == "https" {
4650
httpsTransport := baseTransport.Clone()
4751
httpsTransport.TLSClientConfig = &tls.Config{
4852
InsecureSkipVerify: skipCertVerification,
4953
}
5054
defaultClient.Transport = httpsTransport
5155
}
5256

53-
dataSrc := &DataSource{
57+
dataSrc := &HTTPDataSource{
5458
typedName: plugins.TypedName{
55-
Type: MetricsDataSourceType,
56-
Name: MetricsDataSourceType,
59+
Type: pluginType,
60+
Name: pluginName,
5761
},
58-
metricsScheme: metricsScheme,
59-
metricsPath: metricsPath,
60-
client: defaultClient,
62+
scheme: scheme,
63+
path: path,
64+
client: defaultClient,
65+
parser: parser,
66+
outputType: outputType,
6167
}
6268
return dataSrc
6369
}
6470

65-
// TypedName returns the metrics data source type and name.
66-
func (dataSrc *DataSource) TypedName() plugins.TypedName {
71+
// TypedName returns the data source type and name.
72+
func (dataSrc *HTTPDataSource) TypedName() plugins.TypedName {
6773
return dataSrc.typedName
6874
}
6975

7076
// Extractors returns a list of registered Extractor names.
71-
func (dataSrc *DataSource) Extractors() []string {
77+
func (dataSrc *HTTPDataSource) Extractors() []string {
7278
extractors := []string{}
7379
dataSrc.extractors.Range(func(_, val any) bool {
7480
if ex, ok := val.(datalayer.Extractor); ok {
@@ -80,9 +86,9 @@ func (dataSrc *DataSource) Extractors() []string {
8086
}
8187

8288
// AddExtractor adds an extractor to the data source, validating it can process
83-
// the metrics' data source output type.
84-
func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {
85-
if err := datalayer.ValidateExtractorType(PrometheusMetricType, extractor.ExpectedInputType()); err != nil {
89+
// the data source output type.
90+
func (dataSrc *HTTPDataSource) AddExtractor(extractor datalayer.Extractor) error {
91+
if err := datalayer.ValidateExtractorType(dataSrc.outputType, extractor.ExpectedInputType()); err != nil {
8692
return err
8793
}
8894
if _, loaded := dataSrc.extractors.LoadOrStore(extractor.TypedName().Name, extractor); loaded {
@@ -92,10 +98,10 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {
9298
}
9399

94100
// Collect is triggered by the data layer framework to fetch potentially new
95-
// MSP metrics data for an endpoint.
96-
func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error {
97-
target := dataSrc.getMetricsEndpoint(ep.GetMetadata())
98-
families, err := dataSrc.client.Get(ctx, target, ep.GetMetadata())
101+
// data for an endpoint.
102+
func (dataSrc *HTTPDataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error {
103+
target := dataSrc.getEndpoint(ep.GetMetadata())
104+
data, err := dataSrc.client.Get(ctx, target, ep.GetMetadata(), dataSrc.parser)
99105

100106
if err != nil {
101107
return err
@@ -104,7 +110,7 @@ func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) e
104110
var errs []error
105111
dataSrc.extractors.Range(func(_, val any) bool {
106112
if ex, ok := val.(datalayer.Extractor); ok {
107-
if err = ex.Extract(ctx, families, ep); err != nil {
113+
if err = ex.Extract(ctx, data, ep); err != nil {
108114
errs = append(errs, err)
109115
}
110116
}
@@ -117,10 +123,12 @@ func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) e
117123
return nil
118124
}
119125

120-
func (dataSrc *DataSource) getMetricsEndpoint(ep datalayer.Addressable) *url.URL {
126+
func (dataSrc *HTTPDataSource) getEndpoint(ep datalayer.Addressable) *url.URL {
121127
return &url.URL{
122-
Scheme: dataSrc.metricsScheme,
128+
Scheme: dataSrc.scheme,
123129
Host: ep.GetMetricsHost(),
124-
Path: dataSrc.metricsPath,
130+
Path: dataSrc.path,
125131
}
126132
}
133+
134+
var _ datalayer.DataSource = (*HTTPDataSource)(nil)

pkg/epp/datalayer/metrics/datasource_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ import (
2525
"k8s.io/apimachinery/pkg/types"
2626

2727
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
28+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/http"
2829
)
2930

3031
func TestDatasource(t *testing.T) {
31-
source := NewMetricsDataSource("https", "/metrics", true)
32+
source := http.NewHTTPDataSource("https", "/metrics", true, MetricsDataSourceType,
33+
"metrics-data-source", parseMetrics, PrometheusMetricType)
3234
extractor, err := NewModelServerExtractor(defaultTotalQueuedRequestsMetric, "", "", "", "")
3335
assert.Nil(t, err, "failed to create extractor")
3436

pkg/epp/datalayer/metrics/factories.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@ package metrics
1919
import (
2020
"encoding/json"
2121
"fmt"
22+
"io"
2223
"strconv"
2324

25+
"github.com/prometheus/common/expfmt"
26+
"github.com/prometheus/common/model"
2427
flag "github.com/spf13/pflag"
2528

29+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/http"
2630
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2731
)
2832

@@ -72,8 +76,8 @@ func MetricsDataSourceFactory(name string, parameters json.RawMessage, handle pl
7276
}
7377
}
7478

75-
ds := NewMetricsDataSource(cfg.Scheme, cfg.Path, cfg.InsecureSkipVerify)
76-
ds.typedName.Name = name
79+
ds := http.NewHTTPDataSource(cfg.Scheme, cfg.Path, cfg.InsecureSkipVerify, MetricsDataSourceType,
80+
name, parseMetrics, PrometheusMetricType)
7781
return ds, nil
7882
}
7983

@@ -183,3 +187,8 @@ func fromBoolFlag(name string) (bool, error) {
183187
}
184188
return b, nil
185189
}
190+
191+
func parseMetrics(data io.Reader) (any, error) {
192+
parser := expfmt.NewTextParser(model.LegacyValidation)
193+
return parser.TextToMetricFamilies(data)
194+
}

0 commit comments

Comments
 (0)