-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathdata_source.go
136 lines (112 loc) · 3.71 KB
/
data_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package rdbms
import (
"context"
"fmt"
"go.uber.org/zap"
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/server/conversion"
"github.com/ydb-platform/fq-connector-go/app/server/datasource"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/app/server/paging"
"github.com/ydb-platform/fq-connector-go/common"
)
type Preset struct {
SQLFormatter rdbms_utils.SQLFormatter
ConnectionManager rdbms_utils.ConnectionManager
TypeMapper datasource.TypeMapper
SchemaProvider rdbms_utils.SchemaProvider
}
var _ datasource.DataSource[any] = (*dataSourceImpl)(nil)
type dataSourceImpl struct {
typeMapper datasource.TypeMapper
sqlFormatter rdbms_utils.SQLFormatter
connectionManager rdbms_utils.ConnectionManager
schemaProvider rdbms_utils.SchemaProvider
converterCollection conversion.Collection
logger *zap.Logger
}
func (ds *dataSourceImpl) DescribeTable(
ctx context.Context,
logger *zap.Logger,
request *api_service_protos.TDescribeTableRequest,
) (*api_service_protos.TDescribeTableResponse, error) {
conn, err := ds.connectionManager.Make(ctx, logger, request.DataSourceInstance)
if err != nil {
return nil, fmt.Errorf("make connection: %w", err)
}
defer ds.connectionManager.Release(logger, conn)
schema, err := ds.schemaProvider.GetSchema(ctx, logger, conn, request)
if err != nil {
return nil, fmt.Errorf("get schema: %w", err)
}
return &api_service_protos.TDescribeTableResponse{Schema: schema}, nil
}
func (ds *dataSourceImpl) doReadSplit(
ctx context.Context,
logger *zap.Logger,
split *api_service_protos.TSplit,
sink paging.Sink[any],
) error {
query, args, err := rdbms_utils.MakeReadSplitQuery(logger, ds.sqlFormatter, split.Select)
if err != nil {
return fmt.Errorf("make read split query: %w", err)
}
conn, err := ds.connectionManager.Make(ctx, logger, split.Select.DataSourceInstance)
if err != nil {
return fmt.Errorf("make connection: %w", err)
}
defer ds.connectionManager.Release(logger, conn)
rows, err := conn.Query(ctx, query, args...)
if err != nil {
return fmt.Errorf("query '%s' error: %w", query, err)
}
defer func() { common.LogCloserError(logger, rows, "close rows") }()
ydbTypes, err := common.SelectWhatToYDBTypes(split.Select.What)
if err != nil {
return fmt.Errorf("convert Select.What to Ydb types: %w", err)
}
transformer, err := rows.MakeTransformer(ydbTypes, ds.converterCollection)
if err != nil {
return fmt.Errorf("make transformer: %w", err)
}
// FIXME: use https://pkg.go.dev/database/sql#Rows.NextResultSet
// Very important! Possible data loss.
for rows.Next() {
if err := rows.Scan(transformer.GetAcceptors()...); err != nil {
return fmt.Errorf("rows scan error: %w", err)
}
if err := sink.AddRow(transformer); err != nil {
return fmt.Errorf("add row to paging writer: %w", err)
}
}
if err := rows.Err(); err != nil {
return fmt.Errorf("rows error: %w", err)
}
return nil
}
func (ds *dataSourceImpl) ReadSplit(
ctx context.Context,
logger *zap.Logger,
split *api_service_protos.TSplit,
sink paging.Sink[any],
) {
err := ds.doReadSplit(ctx, logger, split, sink)
if err != nil {
sink.AddError(err)
}
sink.Finish()
}
func NewDataSource(
logger *zap.Logger,
preset *Preset,
converterCollection conversion.Collection,
) datasource.DataSource[any] {
return &dataSourceImpl{
logger: logger,
sqlFormatter: preset.SQLFormatter,
connectionManager: preset.ConnectionManager,
typeMapper: preset.TypeMapper,
schemaProvider: preset.SchemaProvider,
converterCollection: converterCollection,
}
}