Skip to content

Commit 59e4295

Browse files
feat: initial conversion to pubsub v2
1 parent 1a90a59 commit 59e4295

13 files changed

Lines changed: 357 additions & 268 deletions

File tree

.vscode/settings.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
"editor.formatOnPaste": true,
33
"editor.formatOnSave": true,
44
"editor.formatOnSaveMode": "file",
5-
"go.buildOnSave": "workspace",
6-
"go.autocompleteUnimportedPackages": true,
75
"go.lintTool": "golangci-lint",
86
"go.generateTestsFlags": ["-template=testify"],
97
"cSpell.flagWords": [],
108
"cSpell.words": [
119
"Acks",
10+
"apiv1",
1211
"cimg",
1312
"createdb",
1413
"dbstats",
@@ -24,6 +23,7 @@
2423
"entsql",
2524
"enttest",
2625
"favicon",
26+
"fieldmaskpb",
2727
"frob",
2828
"frood",
2929
"fxtest",

go.mod

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module go.6river.tech/mmmbbb
33
go 1.24.0
44

55
require (
6-
cloud.google.com/go/pubsub v1.50.0
6+
cloud.google.com/go/pubsub/v2 v2.0.0
77
entgo.io/ent v0.14.5
88
github.com/Depado/ginprom v1.8.1
99
github.com/alecthomas/participle/v2 v2.1.4
@@ -46,7 +46,6 @@ require (
4646
cloud.google.com/go/iam v1.5.2 // indirect
4747
cloud.google.com/go/longrunning v0.6.7 // indirect
4848
cloud.google.com/go/monitoring v1.24.2 // indirect
49-
cloud.google.com/go/pubsub/v2 v2.0.0 // indirect
5049
cloud.google.com/go/spanner v1.83.0 // indirect
5150
cloud.google.com/go/storage v1.55.0 // indirect
5251
dario.cat/mergo v1.0.1 // indirect
@@ -247,6 +246,7 @@ require (
247246
github.com/zclconf/go-cty v1.16.3 // indirect
248247
github.com/zclconf/go-cty-yaml v1.1.0 // indirect
249248
github.com/zeebo/errs v1.4.0 // indirect
249+
go.einride.tech/aip v0.73.0 // indirect
250250
go.mongodb.org/mongo-driver v1.17.4 // indirect
251251
go.opencensus.io v0.24.0 // indirect
252252
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
@@ -283,6 +283,7 @@ require (
283283
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 // indirect
284284
gopkg.in/yaml.v2 v2.4.0 // indirect
285285
gopkg.in/yaml.v3 v3.0.1 // indirect
286+
gotest.tools/v3 v3.5.2 // indirect
286287
k8s.io/apimachinery v0.33.3 // indirect
287288
k8s.io/klog/v2 v2.130.1 // indirect
288289
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect

go.sum

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -445,8 +445,6 @@ cloud.google.com/go/pubsub v1.26.0/go.mod h1:QgBH3U/jdJy/ftjPhTkyXNj543Tin1pRYcd
445445
cloud.google.com/go/pubsub v1.27.1/go.mod h1:hQN39ymbV9geqBnfQq6Xf63yNhUAhv9CZhzp5O6qsW0=
446446
cloud.google.com/go/pubsub v1.28.0/go.mod h1:vuXFpwaVoIPQMGXqRyUQigu/AX1S3IWugR9xznmcXX8=
447447
cloud.google.com/go/pubsub v1.30.0/go.mod h1:qWi1OPS0B+b5L+Sg6Gmc9zD1Y+HaM0MdUr7LsupY1P4=
448-
cloud.google.com/go/pubsub v1.50.0 h1:hnYpOIxVlgVD1Z8LN7est4DQZK3K6tvZNurZjIVjUe0=
449-
cloud.google.com/go/pubsub v1.50.0/go.mod h1:Di2Y+nqXBpIS+dXUEJPQzLh8PbIQZMLE9IVUFhf2zmM=
450448
cloud.google.com/go/pubsub/v2 v2.0.0 h1:0qS6mRJ41gD1lNmM/vdm6bR7DQu6coQcVwD+VPf0Bz0=
451449
cloud.google.com/go/pubsub/v2 v2.0.0/go.mod h1:0aztFxNzVQIRSZ8vUr79uH2bS3jwLebwK6q1sgEub+E=
452450
cloud.google.com/go/pubsublite v1.5.0/go.mod h1:xapqNQ1CuLfGi23Yda/9l4bBCKz/wC3KIJ5gKcxveZg=
@@ -2275,8 +2273,8 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
22752273
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
22762274
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
22772275
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
2278-
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
2279-
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
2276+
gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q=
2277+
gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA=
22802278
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
22812279
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
22822280
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

grpc/generate.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,5 +65,5 @@ cp -avf \
6565

6666
cp -avf "${td}/google/pubsub/v1/pubsub.swagger.json" "${td}/google/pubsub/v1/schema.swagger.json" .
6767

68-
./gen-types.sh pubsubpb cloud.google.com/go/pubsub/apiv1/pubsubpb "${td}/google/pubsub/v1/pubsub.pb.go" pubsubpb/pubsub-types.go
69-
./gen-types.sh pubsubpb cloud.google.com/go/pubsub/apiv1/pubsubpb "${td}/google/pubsub/v1/schema.pb.go" pubsubpb/schema-types.go
68+
./gen-types.sh pubsubpb cloud.google.com/go/pubsub/v2/apiv1/pubsubpb "${td}/google/pubsub/v1/pubsub.pb.go" pubsubpb/pubsub-types.go
69+
./gen-types.sh pubsubpb cloud.google.com/go/pubsub/v2/apiv1/pubsubpb "${td}/google/pubsub/v1/schema.pb.go" pubsubpb/schema-types.go

grpc/pubsubpb/pubsub-types.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

grpc/pubsubpb/schema-types.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/go-compat/deadletter/deadletter.go

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,13 @@ import (
3030
"sync/atomic"
3131
"time"
3232

33-
"cloud.google.com/go/pubsub" //nolint:staticcheck // https://github.com/6RiverSystems/mmmbbb/issues/521
33+
"cloud.google.com/go/pubsub/v2"
34+
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
3435
"github.com/google/uuid"
3536
"golang.org/x/sync/errgroup"
37+
"google.golang.org/protobuf/types/known/durationpb"
38+
39+
"go.6river.tech/mmmbbb/internal"
3640
)
3741

3842
func main() {
@@ -46,33 +50,46 @@ func main() {
4650
psc, err := pubsub.NewClient(ctx, projectID)
4751
panicIf(err)
4852
uniq := uuid.NewString()
49-
primaryID := "go-deadletter-primary-" + uniq
50-
deadletterID := "go-deadletter-dead-" + uniq
51-
primaryTopic, err := psc.CreateTopic(ctx, primaryID)
53+
tac := psc.TopicAdminClient
54+
primaryTopicMeta, err := tac.CreateTopic(ctx, &pubsubpb.Topic{
55+
Name: internal.PSTopicName(projectID, "go-deadletter-primary-"+uniq),
56+
})
5257
panicIf(err)
53-
defer func() { panicIf(primaryTopic.Delete(ctx)) }()
54-
deadletterTopic, err := psc.CreateTopic(ctx, deadletterID)
58+
defer func() { panicIf(tac.DeleteTopic(ctx, &pubsubpb.DeleteTopicRequest{Topic: primaryTopicMeta.Name})) }()
59+
deadletterTopicMeta, err := tac.CreateTopic(ctx, &pubsubpb.Topic{
60+
Name: internal.PSTopicName(projectID, "go-deadletter-dead-"+uniq),
61+
})
5562
panicIf(err)
56-
defer func() { panicIf(deadletterTopic.Delete(ctx)) }()
63+
defer func() { panicIf(tac.DeleteTopic(ctx, &pubsubpb.DeleteTopicRequest{Topic: deadletterTopicMeta.Name})) }()
5764

58-
primarySub, err := psc.CreateSubscription(ctx, primaryID, pubsub.SubscriptionConfig{
59-
Topic: primaryTopic,
60-
RetryPolicy: &pubsub.RetryPolicy{
61-
MinimumBackoff: time.Second * 3 / 2,
65+
sac := psc.SubscriptionAdminClient
66+
primarySubMeta, err := sac.CreateSubscription(ctx, &pubsubpb.Subscription{
67+
Name: internal.PSSubName(projectID, "go-deadletter-primary-"+uniq),
68+
Topic: primaryTopicMeta.Name,
69+
RetryPolicy: &pubsubpb.RetryPolicy{
70+
MinimumBackoff: durationpb.New(time.Second * 3 / 2),
6271
},
63-
DeadLetterPolicy: &pubsub.DeadLetterPolicy{
64-
DeadLetterTopic: deadletterTopic.String(),
72+
DeadLetterPolicy: &pubsubpb.DeadLetterPolicy{
73+
DeadLetterTopic: deadletterTopicMeta.String(),
6574
// 5 is the minimum google allows, mmmbbb permits as low as 1
6675
MaxDeliveryAttempts: 5,
6776
},
6877
})
6978
panicIf(err)
70-
defer func() { panicIf(primarySub.Delete(ctx)) }()
71-
deadletterSub, err := psc.CreateSubscription(ctx, deadletterID, pubsub.SubscriptionConfig{
72-
Topic: deadletterTopic,
79+
defer func() {
80+
panicIf(sac.DeleteSubscription(ctx, &pubsubpb.DeleteSubscriptionRequest{Subscription: primarySubMeta.Name}))
81+
}()
82+
deadletterSubMeta, err := sac.CreateSubscription(ctx, &pubsubpb.Subscription{
83+
Name: internal.PSSubName(projectID, "go-deadletter-dead-"+uniq),
84+
Topic: deadletterTopicMeta.Name,
7385
})
7486
panicIf(err)
75-
defer func() { panicIf(deadletterSub.Delete(ctx)) }()
87+
defer func() {
88+
panicIf(sac.DeleteSubscription(ctx, &pubsubpb.DeleteSubscriptionRequest{Subscription: deadletterSubMeta.Name}))
89+
}()
90+
primaryPublisher := psc.Publisher(primaryTopicMeta.Name)
91+
primarySub := psc.Subscriber(primarySubMeta.Name)
92+
deadletterSub := psc.Subscriber(deadletterSubMeta.Name)
7693
primarySub.ReceiveSettings.MaxOutstandingMessages = 20
7794
deadletterSub.ReceiveSettings.MaxOutstandingMessages = 20
7895

@@ -126,7 +143,7 @@ func main() {
126143
case <-egCtx.Done():
127144
fmt.Printf("canceled after queueing %d messages\n", i)
128145
return egCtx.Err()
129-
case pubs <- primaryTopic.Publish(egCtx, &pubsub.Message{
146+
case pubs <- primaryPublisher.Publish(egCtx, &pubsub.Message{
130147
Data: payload,
131148
Attributes: map[string]string{
132149
"i": strconv.Itoa(i),

internal/go-compat/stress/stress.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,16 @@ import (
2929
"sync/atomic"
3030
"time"
3131

32-
"cloud.google.com/go/pubsub" //nolint:staticcheck // https://github.com/6RiverSystems/mmmbbb/issues/521
32+
"cloud.google.com/go/pubsub/v2"
3333
"github.com/google/uuid"
3434
"golang.org/x/sync/errgroup"
3535
"google.golang.org/api/option"
3636
"google.golang.org/grpc"
3737
"google.golang.org/grpc/credentials/insecure"
38+
"google.golang.org/protobuf/types/known/durationpb"
39+
40+
"go.6river.tech/mmmbbb/grpc/pubsubpb"
41+
"go.6river.tech/mmmbbb/internal"
3842
)
3943

4044
func main() {
@@ -61,22 +65,30 @@ func main() {
6165
} else {
6266
os.Setenv("PUBSUB_EMULATOR_HOST", "localhost:8802")
6367
}
64-
psc, err := pubsub.NewClient(ctx, "go-compat-stress", pscOpts...)
68+
const projectID = "go-compat-stress"
69+
psc, err := pubsub.NewClient(ctx, projectID, pscOpts...)
6570
panicIf(err)
66-
id := "go-stress-" + uuid.NewString()
67-
t, err := psc.CreateTopic(ctx, id)
71+
_id := "go-stress-" + uuid.NewString()
72+
tac := psc.TopicAdminClient
73+
tm, err := tac.CreateTopic(ctx, &pubsubpb.Topic{Name: internal.PSTopicName(projectID, _id)})
6874
panicIf(err)
75+
defer func() { panicIf(tac.DeleteTopic(ctx, &pubsubpb.DeleteTopicRequest{Topic: tm.Name})) }()
76+
t := psc.Publisher(tm.Name)
6977
t.EnableMessageOrdering = orderSplit != 0
70-
defer func() { panicIf(t.Delete(ctx)) }()
71-
s, err := psc.CreateSubscription(ctx, id, pubsub.SubscriptionConfig{
72-
Topic: t,
78+
sac := psc.SubscriptionAdminClient
79+
sm, err := sac.CreateSubscription(ctx, &pubsubpb.Subscription{
80+
Name: internal.PSSubName(projectID, _id),
81+
Topic: tm.Name,
7382
EnableMessageOrdering: orderSplit != 0,
74-
RetryPolicy: &pubsub.RetryPolicy{
75-
MinimumBackoff: time.Second * 3 / 2,
83+
RetryPolicy: &pubsubpb.RetryPolicy{
84+
MinimumBackoff: durationpb.New(time.Second * 3 / 2),
7685
},
7786
})
7887
panicIf(err)
79-
defer func() { panicIf(s.Delete(ctx)) }()
88+
defer func() {
89+
panicIf(sac.DeleteSubscription(ctx, &pubsubpb.DeleteSubscriptionRequest{Subscription: sm.Name}))
90+
}()
91+
s := psc.Subscriber(sm.Name)
8092
// configure the flow control same as our typical apps
8193
s.ReceiveSettings.MaxOutstandingMessages = 20
8294
// scaling up from the default of 10 can make it go faster if the emulator can

internal/ps-name.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package internal
2+
3+
func PSTopicName(project, topic string) string {
4+
return "projects/" + project + "/topics/" + topic
5+
}
6+
7+
func PSSubName(project, sub string) string {
8+
return "projects/" + project + "/subscriptions/" + sub
9+
}

0 commit comments

Comments
 (0)