Skip to content

Commit 543d5a1

Browse files
craig[bot]gh-casper
craig[bot]
andcommitted
Merge #84394
84394: streamingccl: switch PartitionedStreamClient to use pgx library r=gh-casper a=gh-casper We have been switching to pgx from lib/pq across teams in CRL. This PR continues this journey and makes PartitionedStreamClient to use pgx and also in a thread-safe way. To avoid overhead of creating connection, APIs except Subscribe share the same connection guarded by a mutex, and Subcribe owns a private connection which makes sense as two Subscribe calls don't want to share a connection. Fixes data race we have been seen in the past: Closes: #83694 Also unskip stream_replication_e2e_test under race. Release note: None Co-authored-by: Casper <[email protected]>
2 parents 293c43c + c212fbd commit 543d5a1

14 files changed

+126
-184
lines changed

pkg/ccl/streamingccl/streamclient/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ go_library(
3535
"//pkg/util/timeutil",
3636
"//pkg/util/tracing",
3737
"@com_github_cockroachdb_errors//:errors",
38+
"@com_github_jackc_pgx_v4//:pgx",
3839
],
3940
)
4041

@@ -67,7 +68,6 @@ go_test(
6768
"//pkg/streaming",
6869
"//pkg/testutils",
6970
"//pkg/testutils/serverutils",
70-
"//pkg/testutils/skip",
7171
"//pkg/testutils/testcluster",
7272
"//pkg/util/cancelchecker",
7373
"//pkg/util/ctxgroup",

pkg/ccl/streamingccl/streamclient/client.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ type Client interface {
7878
) (Subscription, error)
7979

8080
// Close releases all the resources used by this client.
81-
Close() error
81+
Close(ctx context.Context) error
8282

8383
// Complete completes a replication stream consumption.
8484
Complete(ctx context.Context, streamID streaming.StreamID) error
@@ -123,7 +123,9 @@ type Subscription interface {
123123
}
124124

125125
// NewStreamClient creates a new stream client based on the stream address.
126-
func NewStreamClient(streamAddress streamingccl.StreamAddress) (Client, error) {
126+
func NewStreamClient(
127+
ctx context.Context, streamAddress streamingccl.StreamAddress,
128+
) (Client, error) {
127129
var streamClient Client
128130
streamURL, err := streamAddress.URL()
129131
if err != nil {
@@ -134,7 +136,7 @@ func NewStreamClient(streamAddress streamingccl.StreamAddress) (Client, error) {
134136
case "postgres", "postgresql":
135137
// The canonical PostgreSQL URL scheme is "postgresql", however our
136138
// own client commands also accept "postgres".
137-
return newPartitionedStreamClient(streamURL)
139+
return newPartitionedStreamClient(ctx, streamURL)
138140
case RandomGenScheme:
139141
streamClient, err = newRandomStreamClient(streamURL)
140142
if err != nil {

pkg/ccl/streamingccl/streamclient/client_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (sc testStreamClient) Heartbeat(
5050
}
5151

5252
// Close implements the Client interface.
53-
func (sc testStreamClient) Close() error {
53+
func (sc testStreamClient) Close(ctx context.Context) error {
5454
return nil
5555
}
5656

@@ -109,11 +109,12 @@ func (t testStreamSubscription) Err() error {
109109
// client could be used.
110110
func ExampleClient() {
111111
client := testStreamClient{}
112+
ctx := context.Background()
112113
defer func() {
113-
_ = client.Close()
114+
_ = client.Close(ctx)
114115
}()
115116

116-
id, err := client.Create(context.Background(), roachpb.MakeTenantID(1))
117+
id, err := client.Create(ctx, roachpb.MakeTenantID(1))
117118
if err != nil {
118119
panic(err)
119120
}
@@ -125,7 +126,7 @@ func ExampleClient() {
125126

126127
done := make(chan struct{})
127128

128-
grp := ctxgroup.WithContext(context.Background())
129+
grp := ctxgroup.WithContext(ctx)
129130
grp.GoCtx(func(ctx context.Context) error {
130131
ticker := time.NewTicker(time.Second * 30)
131132
for {
@@ -150,14 +151,14 @@ func ExampleClient() {
150151
ts := ingested.ts
151152
ingested.Unlock()
152153

153-
topology, err := client.Plan(context.Background(), id)
154+
topology, err := client.Plan(ctx, id)
154155
if err != nil {
155156
panic(err)
156157
}
157158

158159
for _, partition := range topology {
159160
// TODO(dt): use Subscribe helper and partition.SrcAddr
160-
sub, err := client.Subscribe(context.Background(), id, partition.SubscriptionToken, ts)
161+
sub, err := client.Subscribe(ctx, id, partition.SubscriptionToken, ts)
161162
if err != nil {
162163
panic(err)
163164
}

pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go

Lines changed: 62 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ package streamclient
1010

1111
import (
1212
"context"
13-
gosql "database/sql"
1413
"net"
1514
"net/url"
1615

@@ -19,35 +18,45 @@ import (
1918
"github.com/cockroachdb/cockroach/pkg/roachpb"
2019
"github.com/cockroachdb/cockroach/pkg/streaming"
2120
"github.com/cockroachdb/cockroach/pkg/util/hlc"
21+
"github.com/cockroachdb/cockroach/pkg/util/log"
2222
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
2323
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
2424
"github.com/cockroachdb/cockroach/pkg/util/tracing"
2525
"github.com/cockroachdb/errors"
26+
"github.com/jackc/pgx/v4"
2627
)
2728

2829
type partitionedStreamClient struct {
29-
srcDB *gosql.DB // DB handle to the source cluster
3030
urlPlaceholder url.URL
31+
pgxConfig *pgx.ConnConfig
3132

3233
mu struct {
3334
syncutil.Mutex
3435

3536
closed bool
3637
activeSubscriptions map[*partitionedStreamSubscription]struct{}
38+
srcConn *pgx.Conn // pgx connection to the source cluster
3739
}
3840
}
3941

40-
func newPartitionedStreamClient(remote *url.URL) (*partitionedStreamClient, error) {
41-
db, err := gosql.Open("postgres", remote.String())
42+
func newPartitionedStreamClient(
43+
ctx context.Context, remote *url.URL,
44+
) (*partitionedStreamClient, error) {
45+
config, err := pgx.ParseConfig(remote.String())
46+
if err != nil {
47+
return nil, err
48+
}
49+
conn, err := pgx.ConnectConfig(ctx, config)
4250
if err != nil {
4351
return nil, err
4452
}
4553

4654
client := &partitionedStreamClient{
47-
srcDB: db,
4855
urlPlaceholder: *remote,
56+
pgxConfig: config,
4957
}
5058
client.mu.activeSubscriptions = make(map[*partitionedStreamSubscription]struct{})
59+
client.mu.srcConn = conn
5160
return client, nil
5261
}
5362

@@ -60,19 +69,16 @@ func (p *partitionedStreamClient) Create(
6069
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create")
6170
defer sp.Finish()
6271

63-
streamID := streaming.InvalidStreamID
64-
65-
conn, err := p.srcDB.Conn(ctx)
72+
p.mu.Lock()
73+
defer p.mu.Unlock()
74+
var streamID streaming.StreamID
75+
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64())
76+
err := row.Scan(&streamID)
6677
if err != nil {
67-
return streamID, err
68-
}
69-
70-
row := conn.QueryRowContext(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64())
71-
if row.Err() != nil {
72-
return streamID, errors.Wrapf(row.Err(), "error creating replication stream for tenant %s", tenantID.String())
78+
return streaming.InvalidStreamID,
79+
errors.Wrapf(err, "error creating replication stream for tenant %s", tenantID.String())
7380
}
7481

75-
err = row.Scan(&streamID)
7682
return streamID, err
7783
}
7884

@@ -83,21 +89,14 @@ func (p *partitionedStreamClient) Heartbeat(
8389
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Heartbeat")
8490
defer sp.Finish()
8591

86-
conn, err := p.srcDB.Conn(ctx)
87-
if err != nil {
88-
return streampb.StreamReplicationStatus{}, err
89-
}
90-
91-
row := conn.QueryRowContext(ctx,
92+
p.mu.Lock()
93+
defer p.mu.Unlock()
94+
row := p.mu.srcConn.QueryRow(ctx,
9295
`SELECT crdb_internal.replication_stream_progress($1, $2)`, streamID, consumed.String())
93-
if row.Err() != nil {
94-
return streampb.StreamReplicationStatus{},
95-
errors.Wrapf(row.Err(), "error sending heartbeat to replication stream %d", streamID)
96-
}
97-
9896
var rawStatus []byte
9997
if err := row.Scan(&rawStatus); err != nil {
100-
return streampb.StreamReplicationStatus{}, err
98+
return streampb.StreamReplicationStatus{},
99+
errors.Wrapf(err, "error sending heartbeat to replication stream %d", streamID)
101100
}
102101
var status streampb.StreamReplicationStatus
103102
if err := protoutil.Unmarshal(rawStatus, &status); err != nil {
@@ -121,23 +120,19 @@ func (p *partitionedStreamClient) postgresURL(servingAddr string) (url.URL, erro
121120
func (p *partitionedStreamClient) Plan(
122121
ctx context.Context, streamID streaming.StreamID,
123122
) (Topology, error) {
124-
conn, err := p.srcDB.Conn(ctx)
125-
if err != nil {
126-
return nil, err
127-
}
128-
129-
row := conn.QueryRowContext(ctx, `SELECT crdb_internal.replication_stream_spec($1)`, streamID)
130-
if row.Err() != nil {
131-
return nil, errors.Wrapf(row.Err(), "error planning replication stream %d", streamID)
132-
}
133-
134-
var rawSpec []byte
135-
if err := row.Scan(&rawSpec); err != nil {
136-
return nil, err
137-
}
138123
var spec streampb.ReplicationStreamSpec
139-
if err := protoutil.Unmarshal(rawSpec, &spec); err != nil {
140-
return nil, err
124+
{
125+
p.mu.Lock()
126+
defer p.mu.Unlock()
127+
128+
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.replication_stream_spec($1)`, streamID)
129+
var rawSpec []byte
130+
if err := row.Scan(&rawSpec); err != nil {
131+
return nil, errors.Wrapf(err, "error planning replication stream %d", streamID)
132+
}
133+
if err := protoutil.Unmarshal(rawSpec, &spec); err != nil {
134+
return nil, err
135+
}
141136
}
142137

143138
topology := Topology{}
@@ -163,7 +158,7 @@ func (p *partitionedStreamClient) Plan(
163158
}
164159

165160
// Close implements Client interface.
166-
func (p *partitionedStreamClient) Close() error {
161+
func (p *partitionedStreamClient) Close(ctx context.Context) error {
167162
p.mu.Lock()
168163
defer p.mu.Unlock()
169164

@@ -176,7 +171,7 @@ func (p *partitionedStreamClient) Close() error {
176171
close(sub.closeChan)
177172
delete(p.mu.activeSubscriptions, sub)
178173
}
179-
return p.srcDB.Close()
174+
return p.mu.srcConn.Close(ctx)
180175
}
181176

182177
// Subscribe implements Client interface.
@@ -198,11 +193,11 @@ func (p *partitionedStreamClient) Subscribe(
198193
}
199194

200195
res := &partitionedStreamSubscription{
201-
eventsChan: make(chan streamingccl.Event),
202-
db: p.srcDB,
203-
specBytes: specBytes,
204-
streamID: stream,
205-
closeChan: make(chan struct{}),
196+
eventsChan: make(chan streamingccl.Event),
197+
srcConnConfig: p.pgxConfig,
198+
specBytes: specBytes,
199+
streamID: stream,
200+
closeChan: make(chan struct{}),
206201
}
207202
p.mu.Lock()
208203
defer p.mu.Unlock()
@@ -215,21 +210,19 @@ func (p *partitionedStreamClient) Complete(ctx context.Context, streamID streami
215210
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Complete")
216211
defer sp.Finish()
217212

218-
conn, err := p.srcDB.Conn(ctx)
219-
if err != nil {
220-
return err
221-
}
222-
row := conn.QueryRowContext(ctx, `SELECT crdb_internal.complete_replication_stream($1)`, streamID)
223-
if row.Err() != nil {
224-
return errors.Wrapf(row.Err(), "error completing replication stream %d", streamID)
213+
p.mu.Lock()
214+
defer p.mu.Unlock()
215+
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.complete_replication_stream($1)`, streamID)
216+
if err := row.Scan(&streamID); err != nil {
217+
return errors.Wrapf(err, "error completing replication stream %d", streamID)
225218
}
226219
return nil
227220
}
228221

229222
type partitionedStreamSubscription struct {
230-
err error
231-
db *gosql.DB
232-
eventsChan chan streamingccl.Event
223+
err error
224+
srcConnConfig *pgx.ConnConfig
225+
eventsChan chan streamingccl.Event
233226
// Channel to send signal to close the subscription.
234227
closeChan chan struct{}
235228

@@ -274,16 +267,22 @@ func (p *partitionedStreamSubscription) Subscribe(ctx context.Context) error {
274267
defer sp.Finish()
275268

276269
defer close(p.eventsChan)
277-
conn, err := p.db.Conn(ctx)
270+
// Each subscription has its own pgx connection.
271+
srcConn, err := pgx.ConnectConfig(ctx, p.srcConnConfig)
278272
if err != nil {
279273
return err
280274
}
275+
defer func() {
276+
if err != nil {
277+
log.Warningf(ctx, "error when closing subscription connection: %v", err)
278+
}
279+
}()
281280

282-
_, err = conn.ExecContext(ctx, `SET avoid_buffering = true`)
281+
_, err = srcConn.Exec(ctx, `SET avoid_buffering = true`)
283282
if err != nil {
284283
return err
285284
}
286-
rows, err := conn.QueryContext(ctx, `SELECT * FROM crdb_internal.stream_partition($1, $2)`,
285+
rows, err := srcConn.Query(ctx, `SELECT * FROM crdb_internal.stream_partition($1, $2)`,
287286
p.streamID, p.specBytes)
288287
if err != nil {
289288
return err

pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/cockroachdb/cockroach/pkg/streaming"
2929
"github.com/cockroachdb/cockroach/pkg/testutils"
3030
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
31-
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
3231
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
3332
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
3433
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -58,7 +57,6 @@ func (f *subscriptionFeedSource) Close(ctx context.Context) {}
5857

5958
func TestPartitionedStreamReplicationClient(t *testing.T) {
6059
defer leaktest.AfterTest(t)()
61-
skip.UnderRaceWithIssue(t, 83694)
6260
defer log.Scope(t).Close(t)
6361

6462
h, cleanup := streamingtest.NewReplicationHelper(t,
@@ -90,9 +88,9 @@ INSERT INTO d.t1 (i) VALUES (42);
9088
INSERT INTO d.t2 VALUES (2);
9189
`)
9290

93-
client, err := newPartitionedStreamClient(&h.PGUrl)
91+
client, err := newPartitionedStreamClient(ctx, &h.PGUrl)
9492
defer func() {
95-
require.NoError(t, client.Close())
93+
require.NoError(t, client.Close(ctx))
9694
}()
9795
require.NoError(t, err)
9896
expectStreamState := func(streamID streaming.StreamID, status jobs.Status) {
@@ -166,9 +164,9 @@ INSERT INTO d.t2 VALUES (2);
166164
url, err := streamingccl.StreamAddress(top[0].SrcAddr).URL()
167165
require.NoError(t, err)
168166
// Create a new stream client with the given partition address.
169-
subClient, err := newPartitionedStreamClient(url)
167+
subClient, err := newPartitionedStreamClient(ctx, url)
170168
defer func() {
171-
require.NoError(t, subClient.Close())
169+
require.NoError(t, subClient.Close(ctx))
172170
}()
173171
require.NoError(t, err)
174172
sub, err := subClient.Subscribe(ctx, streamID, encodeSpec("t1"), hlc.Timestamp{})

pkg/ccl/streamingccl/streamclient/random_stream_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID(
339339
}
340340

341341
// Close implements the Client interface.
342-
func (m *randomStreamClient) Close() error {
342+
func (m *randomStreamClient) Close(ctx context.Context) error {
343343
return nil
344344
}
345345

0 commit comments

Comments
 (0)