Skip to content

Commit 1bf2547

Browse files
andrzej-stenceldmolenda-sumokasia-kujawa
authored
[receiver/sqlquery] add support for logs (open-telemetry#20730)
Fixes open-telemetry#20284 This introduces initial support for retrieving rows from SQL databases into logs. This PR aims to provide an initial, not feature rich, but production ready implementation. The following features are available: - Use `body_column` to select the column to use to fill the Body field of the created log - Use `tracking_start_value` and `tracking_column` properties to track rows that were already ingested - Use `storage` property to persist the tracking value across collector restarts In this state and marked as "development" stability, the component can be used for experimentation and to guide future development. There are definitely more things that need to be implemented for this component to be considered "alpha" quality - like filling in other [log fields](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/logs/data-model.md#log-and-event-record-definition) like Timestamp, ObservedTimestamp and others. I would like to add them in subsequent pull requests, as this pull request is already way too big. --------- Co-authored-by: Dominika Molenda <[email protected]> Co-authored-by: Dominika Molenda <[email protected]> Co-authored-by: Katarzyna Kujawa <[email protected]> Co-authored-by: Katarzyna Kujawa <[email protected]>
1 parent e7608db commit 1bf2547

24 files changed

+954
-46
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
5+
component: sqlqueryreceiver
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Add support for logs
9+
10+
# One or more tracking issues related to the change
11+
issues: [20284]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:

receiver/sqlqueryreceiver/README.md

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
| Status | |
55
| ------------- |-----------|
66
| Stability | [alpha]: metrics |
7+
| | [development]: logs |
78
| Distributions | [contrib], [observiq], [splunk], [sumo] |
89

910
[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
11+
[development]: https://github.com/open-telemetry/opentelemetry-collector#development
1012
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
1113
[observiq]: https://github.com/observIQ/observiq-otel-collector
1214
[splunk]: https://github.com/signalfx/splunk-otel-collector
@@ -28,29 +30,88 @@ The configuration supports the following top-level fields:
2830
a driver-specific string usually consisting of at least a database name and connection information. This is sometimes
2931
referred to as the "connection string" in driver documentation.
3032
e.g. _host=localhost port=5432 user=me password=s3cr3t sslmode=disable_
31-
- `queries`(required): A list of queries, where a query is a sql statement and one or more metrics (details below).
33+
- `queries`(required): A list of queries, where a query is a sql statement and one or more `logs` and/or `metrics` sections (details below).
3234
- `collection_interval`(optional): The time interval between query executions. Defaults to _10s_.
35+
- `storage` (optional, default `""`): The ID of a [storage][storage_extension] extension to be used to [track processed results](#tracking-processed-results).
36+
37+
[storage_extension]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage
3338

3439
### Queries
3540

36-
A _query_ consists of a sql statement and one or more _metrics_, where each metric consists of a
41+
A _query_ consists of a sql statement and one or more `logs` and/or `metrics` section.
42+
At least one `logs` or one `metrics` section is required.
43+
Note that technically you can put both `logs` and `metrics` sections in a single query section,
44+
but it's probably not a real world use case, as the requirements for logs and metrics queries
45+
are quite different.
46+
47+
Additionally, each `query` section supports the following properties:
48+
49+
- `tracking_column` (optional, default `""`) Applies only to logs. In case of a parameterized query,
50+
defines the column to retrieve the value of the parameter on subsequent query runs.
51+
See the below section [Tracking processed results](#tracking-processed-results).
52+
- `tracking_start_value` (optional, default `""`) Applies only to logs. In case of a parameterized query, defines the initial value for the parameter.
53+
See the below section [Tracking processed results](#tracking-processed-results).
54+
55+
Example:
56+
57+
```yaml
58+
receivers:
59+
sqlquery:
60+
driver: postgres
61+
datasource: "host=localhost port=5432 user=postgres password=s3cr3t sslmode=disable"
62+
queries:
63+
- sql: "select * from my_logs where log_id > $$1"
64+
tracking_start_value: "10000"
65+
tracking_column: log_id
66+
logs:
67+
- body_column: log_body
68+
- sql: "select count(*) as count, genre from movie group by genre"
69+
metrics:
70+
- metric_name: movie.genres
71+
value_column: "count"
72+
```
73+
74+
#### Logs Queries
75+
76+
The `logs` section is in development.
77+
78+
- `body_column` (required) defines the column to use as the log record's body.
79+
80+
##### Tracking processed results
81+
82+
With the default configuration and a non-parameterized logs query like `select * from my_logs`,
83+
the receiver will run the same query every collection interval, which can cause reading the same rows
84+
over and over again, unless there's an external actor removing the old rows from the `my_logs` table.
85+
86+
To prevent reading the same rows on every collection interval, use a parameterized query like `select * from my_logs where id_column > ?`,
87+
together with the `tracking_start_value` and `tracking_column` configuration properties.
88+
The receiver will use the configured `tracking_start_value` as the value for the query parameter when running the query for the first time.
89+
After each query run, the receiver will store the value of the `tracking_column` from the last row of the result set and use it as the value for the query parameter on next collection interval. To prevent duplicate log downloads, make sure to sort the query results in ascending order by the tracking_column value.
90+
91+
Note that the notation for the parameter depends on the database backend. For example in MySQL this is `?`, in PostgreSQL this is `$1`, in Oracle this is any string identifier starting with a colon `:`, for example `:my_parameter`.
92+
93+
Use the `storage` configuration property of the receiver to persist the tracking value across collector restarts.
94+
95+
#### Metrics queries
96+
97+
Each `metrics` section consists of a
3798
`metric_name`, a `value_column`, and additional optional fields.
3899
Each _metric_ in the configuration will produce one OTel metric per row returned from its sql query.
39100

40-
* `metric_name`(required): the name assigned to the OTel metric.
41-
* `value_column`(required): the column name in the returned dataset used to set the value of the metric's datapoint.
101+
- `metric_name`(required): the name assigned to the OTel metric.
102+
- `value_column`(required): the column name in the returned dataset used to set the value of the metric's datapoint.
42103
This may be case-sensitive, depending on the driver (e.g. Oracle DB).
43-
* `attribute_columns`(optional): a list of column names in the returned dataset used to set attibutes on the datapoint.
104+
- `attribute_columns`(optional): a list of column names in the returned dataset used to set attibutes on the datapoint.
44105
These attributes may be case-sensitive, depending on the driver (e.g. Oracle DB).
45-
* `data_type` (optional): can be `gauge` or `sum`; defaults to `gauge`.
46-
* `value_type` (optional): can be `int` or `double`; defaults to `int`.
47-
* `monotonic` (optional): boolean; whether a cumulative sum's value is monotonically increasing (i.e. never rolls over
106+
- `data_type` (optional): can be `gauge` or `sum`; defaults to `gauge`.
107+
- `value_type` (optional): can be `int` or `double`; defaults to `int`.
108+
- `monotonic` (optional): boolean; whether a cumulative sum's value is monotonically increasing (i.e. never rolls over
48109
or resets); defaults to false.
49-
* `aggregation` (optional): only applicable for `data_type=sum`; can be `cumulative` or `delta`; defaults
110+
- `aggregation` (optional): only applicable for `data_type=sum`; can be `cumulative` or `delta`; defaults
50111
to `cumulative`.
51-
* `description` (optional): the description applied to the metric.
52-
* `unit` (optional): the units applied to the metric.
53-
* `static_attributes` (optional): static attributes applied to the metrics
112+
- `description` (optional): the description applied to the metric.
113+
- `unit` (optional): the units applied to the metric.
114+
- `static_attributes` (optional): static attributes applied to the metrics
54115

55116
### Example
56117

@@ -59,28 +120,34 @@ receivers:
59120
sqlquery:
60121
driver: postgres
61122
datasource: "host=localhost port=5432 user=postgres password=s3cr3t sslmode=disable"
123+
storage: file_storage
62124
queries:
125+
- sql: "select * from my_logs where log_id > $$1"
126+
tracking_start_value: "10000"
127+
tracking_column: log_id
128+
logs:
129+
- body_column: log_body
63130
- sql: "select count(*) as count, genre from movie group by genre"
64131
metrics:
65132
- metric_name: movie.genres
66133
value_column: "count"
67-
attribute_columns: [ "genre" ]
134+
attribute_columns: ["genre"]
68135
static_attributes:
69136
dbinstance: mydbinstance
70137
```
71138

72139
Given a `movie` table with three rows:
73140

74141
| name | genre |
75-
|-----------|--------|
142+
| --------- | ------ |
76143
| E.T. | sci-fi |
77144
| Star Wars | sci-fi |
78145
| Die Hard | action |
79146

80147
If there are two rows returned from the query `select count(*) as count, genre from movie group by genre`:
81148

82149
| count | genre |
83-
|-------|--------|
150+
| ----- | ------ |
84151
| 2 | sci-fi |
85152
| 1 | action |
86153

@@ -94,7 +161,7 @@ Descriptor:
94161
NumberDataPoints #0
95162
Data point attributes:
96163
-> genre: STRING(sci-fi)
97-
-> dbinstance: STRING(mydbinstance)
164+
-> dbinstance: STRING(mydbinstance)
98165
Value: 2
99166
100167
Metric #1
@@ -121,4 +188,3 @@ Oracle DB driver to connect and query the same table schema and contents as the
121188
The Oracle DB driver documentation can be found [here.](https://github.com/sijms/go-ora)
122189
Another usage example is the `go_ora`
123190
example [here.](https://blogs.oracle.com/developers/post/connecting-a-go-application-to-oracle-database)
124-

receiver/sqlqueryreceiver/config.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ import (
1717

1818
type Config struct {
1919
scraperhelper.ScraperControllerSettings `mapstructure:",squash"`
20-
Driver string `mapstructure:"driver"`
21-
DataSource string `mapstructure:"datasource"`
22-
Queries []Query `mapstructure:"queries"`
20+
Driver string `mapstructure:"driver"`
21+
DataSource string `mapstructure:"datasource"`
22+
Queries []Query `mapstructure:"queries"`
23+
StorageID *component.ID `mapstructure:"storage"`
2324
}
2425

2526
func (c Config) Validate() error {
@@ -41,17 +42,25 @@ func (c Config) Validate() error {
4142
}
4243

4344
type Query struct {
44-
SQL string `mapstructure:"sql"`
45-
Metrics []MetricCfg `mapstructure:"metrics"`
45+
SQL string `mapstructure:"sql"`
46+
Metrics []MetricCfg `mapstructure:"metrics"`
47+
Logs []LogsCfg `mapstructure:"logs"`
48+
TrackingColumn string `mapstructure:"tracking_column"`
49+
TrackingStartValue string `mapstructure:"tracking_start_value"`
4650
}
4751

4852
func (q Query) Validate() error {
4953
var errs error
5054
if q.SQL == "" {
5155
errs = multierr.Append(errs, errors.New("'query.sql' cannot be empty"))
5256
}
53-
if len(q.Metrics) == 0 {
54-
errs = multierr.Append(errs, errors.New("'query.metrics' cannot be empty"))
57+
if len(q.Logs) == 0 && len(q.Metrics) == 0 {
58+
errs = multierr.Append(errs, errors.New("at least one of 'query.logs' and 'query.metrics' must not be empty"))
59+
}
60+
for _, logs := range q.Logs {
61+
if err := logs.Validate(); err != nil {
62+
errs = multierr.Append(errs, err)
63+
}
5564
}
5665
for _, metric := range q.Metrics {
5766
if err := metric.Validate(); err != nil {
@@ -61,6 +70,18 @@ func (q Query) Validate() error {
6170
return errs
6271
}
6372

73+
type LogsCfg struct {
74+
BodyColumn string `mapstructure:"body_column"`
75+
}
76+
77+
func (config LogsCfg) Validate() error {
78+
var errs error
79+
if config.BodyColumn == "" {
80+
errs = multierr.Append(errs, errors.New("'body_column' must not be empty"))
81+
}
82+
return errs
83+
}
84+
6485
type MetricCfg struct {
6586
MetricName string `mapstructure:"metric_name"`
6687
ValueColumn string `mapstructure:"value_column"`

receiver/sqlqueryreceiver/config_test.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,44 @@ func TestLoadConfig(t *testing.T) {
9696
errorMessage: "'driver' cannot be empty",
9797
},
9898
{
99-
fname: "config-invalid-missing-metrics.yaml",
99+
fname: "config-invalid-missing-logs-metrics.yaml",
100100
id: component.NewIDWithName(metadata.Type, ""),
101-
errorMessage: "'query.metrics' cannot be empty",
101+
errorMessage: "at least one of 'query.logs' and 'query.metrics' must not be empty",
102102
},
103103
{
104104
fname: "config-invalid-missing-datasource.yaml",
105105
id: component.NewIDWithName(metadata.Type, ""),
106106
errorMessage: "'datasource' cannot be empty",
107107
},
108+
{
109+
fname: "config-logs.yaml",
110+
id: component.NewIDWithName(metadata.Type, ""),
111+
expected: &Config{
112+
ScraperControllerSettings: scraperhelper.ScraperControllerSettings{
113+
CollectionInterval: 10 * time.Second,
114+
InitialDelay: time.Second,
115+
},
116+
Driver: "mydriver",
117+
DataSource: "host=localhost port=5432 user=me password=s3cr3t sslmode=disable",
118+
Queries: []Query{
119+
{
120+
SQL: "select * from test_logs where log_id > ?",
121+
TrackingColumn: "log_id",
122+
TrackingStartValue: "10",
123+
Logs: []LogsCfg{
124+
{
125+
BodyColumn: "log_body",
126+
},
127+
},
128+
},
129+
},
130+
},
131+
},
132+
{
133+
fname: "config-logs-missing-body-column.yaml",
134+
id: component.NewIDWithName(metadata.Type, ""),
135+
errorMessage: "'body_column' must not be empty",
136+
},
108137
{
109138
fname: "config-unnecessary-aggregation.yaml",
110139
id: component.NewIDWithName(metadata.Type, ""),
@@ -113,7 +142,7 @@ func TestLoadConfig(t *testing.T) {
113142
}
114143

115144
for _, tt := range tests {
116-
t.Run(tt.id.String(), func(t *testing.T) {
145+
t.Run(tt.fname, func(t *testing.T) {
117146
cm, err := confmaptest.LoadConf(filepath.Join("testdata", tt.fname))
118147
require.NoError(t, err)
119148

receiver/sqlqueryreceiver/db_client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
type stringMap map[string]string
2121

2222
type dbClient interface {
23-
metricRows(ctx context.Context) ([]stringMap, error)
23+
queryRows(ctx context.Context, args ...any) ([]stringMap, error)
2424
}
2525

2626
type dbSQLClient struct {
@@ -37,8 +37,8 @@ func newDbClient(db db, sql string, logger *zap.Logger) dbClient {
3737
}
3838
}
3939

40-
func (cl dbSQLClient) metricRows(ctx context.Context) ([]stringMap, error) {
41-
sqlRows, err := cl.db.QueryContext(ctx, cl.sql)
40+
func (cl dbSQLClient) queryRows(ctx context.Context, args ...any) ([]stringMap, error) {
41+
sqlRows, err := cl.db.QueryContext(ctx, cl.sql, args...)
4242
if err != nil {
4343
return nil, err
4444
}

receiver/sqlqueryreceiver/db_client_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestDBSQLClient_SingleRow(t *testing.T) {
2121
logger: zap.NewNop(),
2222
sql: "",
2323
}
24-
rows, err := cl.metricRows(context.Background())
24+
rows, err := cl.queryRows(context.Background())
2525
require.NoError(t, err)
2626
assert.Len(t, rows, 1)
2727
assert.EqualValues(t, map[string]string{
@@ -42,7 +42,7 @@ func TestDBSQLClient_MultiRow(t *testing.T) {
4242
logger: zap.NewNop(),
4343
sql: "",
4444
}
45-
rows, err := cl.metricRows(context.Background())
45+
rows, err := cl.queryRows(context.Background())
4646
require.NoError(t, err)
4747
assert.Len(t, rows, 2)
4848
assert.EqualValues(t, map[string]string{
@@ -69,7 +69,7 @@ func TestDBSQLClient_Nulls(t *testing.T) {
6969
logger: zap.NewNop(),
7070
sql: "",
7171
}
72-
rows, err := cl.metricRows(context.Background())
72+
rows, err := cl.queryRows(context.Background())
7373
assert.Error(t, err)
7474
assert.True(t, errors.Is(err, errNullValueWarning))
7575
assert.Len(t, rows, 1)
@@ -88,7 +88,7 @@ func TestDBSQLClient_Nulls_MultiRow(t *testing.T) {
8888
logger: zap.NewNop(),
8989
sql: "",
9090
}
91-
rows, err := cl.metricRows(context.Background())
91+
rows, err := cl.queryRows(context.Background())
9292
assert.Error(t, err)
9393
errs := multierr.Errors(err)
9494
for _, err := range errs {
@@ -152,7 +152,7 @@ type fakeDBClient struct {
152152
err error
153153
}
154154

155-
func (c *fakeDBClient) metricRows(context.Context) ([]stringMap, error) {
155+
func (c *fakeDBClient) queryRows(context.Context, ...any) ([]stringMap, error) {
156156
if c.err != nil {
157157
return nil, c.err
158158
}

receiver/sqlqueryreceiver/factory.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ func NewFactory() receiver.Factory {
1515
return receiver.NewFactory(
1616
metadata.Type,
1717
createDefaultConfig,
18-
receiver.WithMetrics(createReceiverFunc(sql.Open, newDbClient), metadata.MetricsStability),
18+
receiver.WithLogs(createLogsReceiverFunc(sql.Open, newDbClient), metadata.LogsStability),
19+
receiver.WithMetrics(createMetricsReceiverFunc(sql.Open, newDbClient), metadata.MetricsStability),
1920
)
2021
}

receiver/sqlqueryreceiver/factory_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,11 @@ func TestNewFactory(t *testing.T) {
2121
consumertest.NewNop(),
2222
)
2323
require.NoError(t, err)
24+
_, err = factory.CreateLogsReceiver(
25+
context.Background(),
26+
receivertest.NewNopCreateSettings(),
27+
factory.CreateDefaultConfig(),
28+
consumertest.NewNop(),
29+
)
30+
require.NoError(t, err)
2431
}

0 commit comments

Comments
 (0)