-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathconnection_manager.go
63 lines (51 loc) · 1.86 KB
/
connection_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package clickhouse
import (
"context"
"fmt"
"go.uber.org/zap"
api_common "github.com/ydb-platform/fq-connector-go/api/common"
"github.com/ydb-platform/fq-connector-go/app/config"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/common"
)
var _ rdbms_utils.ConnectionManager = (*connectionManager)(nil)
type connectionManager struct {
rdbms_utils.ConnectionManagerBase
cfg *config.TClickHouseConfig
}
func (c *connectionManager) Make(
params *rdbms_utils.ConnectionParams,
) ([]rdbms_utils.Connection, error) {
if params.DataSourceInstance.GetCredentials().GetBasic() == nil {
return nil, fmt.Errorf("currently only basic auth is supported")
}
var (
conn rdbms_utils.Connection
err error
)
switch params.DataSourceInstance.Protocol {
case api_common.EGenericProtocol_NATIVE:
conn, err = makeConnectionNative(
params.Ctx, params.Logger, c.cfg, params.DataSourceInstance, params.TableName, c.QueryLoggerFactory.Make(params.Logger))
case api_common.EGenericProtocol_HTTP:
conn, err = makeConnectionHTTP(
params.Ctx, params.Logger, c.cfg, params.DataSourceInstance, params.TableName, c.QueryLoggerFactory.Make(params.Logger))
default:
return nil, fmt.Errorf("can not run ClickHouse connection with protocol '%v'", params.DataSourceInstance.Protocol)
}
if err != nil {
return nil, fmt.Errorf("make connection: %w", err)
}
return []rdbms_utils.Connection{conn}, nil
}
func (*connectionManager) Release(_ context.Context, logger *zap.Logger, cs []rdbms_utils.Connection) {
for _, conn := range cs {
common.LogCloserError(logger, conn, "close clickhouse connection")
}
}
func NewConnectionManager(
cfg *config.TClickHouseConfig,
base rdbms_utils.ConnectionManagerBase,
) rdbms_utils.ConnectionManager {
return &connectionManager{ConnectionManagerBase: base, cfg: cfg}
}