Skip to content

Commit 707023d

Browse files
committed
Use bytes instead of UUIDs for persistence interfaces
1 parent 5c790eb commit 707023d

File tree

7 files changed

+66
-60
lines changed

7 files changed

+66
-60
lines changed

common/membership/ringpop/monitor.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ type monitor struct {
6464
logger log.Logger
6565
metadataManager persistence.ClusterMetadataManager
6666
broadcastHostPortResolver func() (string, error)
67-
hostID uuid.UUID
67+
hostID []byte
6868
initialized *future.FutureImpl[struct{}]
6969
}
7070

@@ -87,6 +87,8 @@ func newMonitor(
8787
lifecycleCtx,
8888
headers.SystemBackgroundHighCallerInfo,
8989
)
90+
hostID, _ := uuid.New().MarshalBinary()
91+
// MarshalBinary should never error.
9092

9193
rpo := &monitor{
9294
status: common.DaemonStatusInitialized,
@@ -101,7 +103,7 @@ func newMonitor(
101103
logger: logger,
102104
metadataManager: metadataManager,
103105
broadcastHostPortResolver: broadcastHostPortResolver,
104-
hostID: uuid.New(),
106+
hostID: hostID,
105107
initialized: future.NewFuture[struct{}](),
106108
maxJoinDuration: maxJoinDuration,
107109
propagationTime: propagationTime,
@@ -251,10 +253,14 @@ func (rpo *monitor) upsertMyMembership(
251253
err := rpo.metadataManager.UpsertClusterMembership(ctx, request)
252254

253255
if err == nil {
256+
hostID, err := uuid.FromBytes(request.HostID)
257+
if err != nil {
258+
return err
259+
}
254260
rpo.logger.Debug("Membership heartbeat upserted successfully",
255261
tag.Address(request.RPCAddress.String()),
256262
tag.Port(int(request.RPCPort)),
257-
tag.HostID(request.HostID.String()))
263+
tag.HostID(hostID.String()))
258264
}
259265

260266
return err
@@ -313,10 +319,14 @@ func (rpo *monitor) startHeartbeat(broadcastHostport string) error {
313319
// read side by filtering on the last time a heartbeat was seen.
314320
err = rpo.upsertMyMembership(rpo.lifecycleCtx, req)
315321
if err == nil {
322+
hostID, err := uuid.FromBytes(rpo.hostID)
323+
if err != nil {
324+
return err
325+
}
316326
rpo.logger.Info("Membership heartbeat upserted successfully",
317327
tag.Address(broadcastAddress.String()),
318328
tag.Port(int(broadcastPort)),
319-
tag.HostID(rpo.hostID.String()))
329+
tag.HostID(hostID.String()))
320330

321331
rpo.startHeartbeatUpsertLoop(req)
322332
}
@@ -350,7 +360,7 @@ func (rpo *monitor) fetchCurrentBootstrapHostports() ([]string, error) {
350360
nextPageToken = resp.NextPageToken
351361

352362
// Stop iterating once we have either 500 unique ip:port combos or there is no more results.
353-
if nextPageToken == nil || len(set) >= 500 {
363+
if len(nextPageToken) == 0 || len(set) >= 500 {
354364
bootstrapHostPorts := make([]string, 0, len(set))
355365
for k := range set {
356366
bootstrapHostPorts = append(bootstrapHostPorts, k)

common/membership/ringpop/test_cluster.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,11 @@ func newTestCluster(
9191
logger.Error("unable to split host port", tag.Error(err))
9292
return nil
9393
}
94+
// MarshalBinary never fails for UUIDs
95+
hostID, _ := uuid.New().MarshalBinary()
96+
9497
seedMember := &persistence.ClusterMember{
95-
HostID: uuid.New(),
98+
HostID: hostID,
9699
RPCAddress: seedAddress,
97100
RPCPort: seedPort,
98101
SessionStart: time.Now().UTC(),
@@ -104,12 +107,13 @@ func newTestCluster(
104107
func(_ context.Context, _ *persistence.GetClusterMembersRequest) (*persistence.GetClusterMembersResponse, error) {
105108
res := &persistence.GetClusterMembersResponse{ActiveMembers: []*persistence.ClusterMember{seedMember}}
106109

110+
hostID, _ := uuid.New().MarshalBinary()
107111
if firstGetClusterMemberCall {
108112
// The first time GetClusterMembers is invoked, we simulate returning a stale/bad heartbeat.
109113
// All subsequent calls only return the single "good" seed member
110114
// This ensures that we exercise the retry path in bootstrap properly.
111115
badSeedMember := &persistence.ClusterMember{
112-
HostID: uuid.New(),
116+
HostID: hostID,
113117
RPCAddress: seedAddress,
114118
RPCPort: seedPort + 1,
115119
SessionStart: time.Now().UTC(),

common/persistence/cassandra/cluster_metadata_store.go

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@ package cassandra
22

33
import (
44
"context"
5-
"fmt"
65
"net"
76
"strings"
87
"time"
98

10-
"github.com/google/uuid"
119
"go.temporal.io/api/serviceerror"
1210
"go.temporal.io/server/common/log"
1311
p "go.temporal.io/server/common/persistence"
@@ -177,13 +175,9 @@ func (m *ClusterMetadataStore) GetClusterMembers(
177175
queryString.WriteString(templateGetClusterMembership)
178176
operands = append(operands, constMembershipPartition)
179177

180-
if request.HostIDEquals != uuid.Nil {
178+
if len(request.HostIDEquals) != 0 {
181179
queryString.WriteString(templateWithHostIDSuffix)
182-
hostIDEqualsBytes, err := request.HostIDEquals.MarshalBinary()
183-
if err != nil {
184-
return nil, gocql.ConvertError("GetClusterMembers", fmt.Errorf("failed to marshal HostIDEquals: %w", err))
185-
}
186-
operands = append(operands, hostIDEqualsBytes)
180+
operands = append(operands, request.HostIDEquals)
187181
}
188182

189183
if request.RPCAddressEquals != nil {
@@ -225,7 +219,7 @@ func (m *ClusterMetadataStore) GetClusterMembers(
225219

226220
for iter.Scan(&cqlHostID, &rpcAddress, &rpcPort, &role, &sessionStart, &lastHeartbeat, &cassNow, &ttl) {
227221
member := p.ClusterMember{
228-
HostID: uuid.UUID(cqlHostID),
222+
HostID: cqlHostID,
229223
RPCAddress: rpcAddress,
230224
RPCPort: rpcPort,
231225
Role: role,
@@ -253,22 +247,18 @@ func (m *ClusterMetadataStore) UpsertClusterMembership(
253247
ctx context.Context,
254248
request *p.UpsertClusterMembershipRequest,
255249
) error {
256-
hostIDBytes, err := request.HostID.MarshalBinary()
257-
if err != nil {
258-
return gocql.ConvertError("UpsertClusterMembership", fmt.Errorf("failed to marshal HostID: %w", err))
259-
}
260250
query := m.session.Query(
261251
templateUpsertActiveClusterMembership,
262252
constMembershipPartition,
263-
hostIDBytes,
253+
request.HostID,
264254
request.RPCAddress,
265255
request.RPCPort,
266256
request.Role,
267257
request.SessionStart,
268258
time.Now().UTC(),
269259
int64(request.RecordExpiry.Seconds()),
270260
).WithContext(ctx)
271-
err = query.Exec()
261+
err := query.Exec()
272262

273263
if err != nil {
274264
return gocql.ConvertError("UpsertClusterMembership", err)

common/persistence/data_interfaces.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"strings"
1010
"time"
1111

12-
"github.com/google/uuid"
1312
commonpb "go.temporal.io/api/common/v1"
1413
enumspb "go.temporal.io/api/enums/v1"
1514
historypb "go.temporal.io/api/history/v1"
@@ -995,7 +994,7 @@ type (
995994
GetClusterMembersRequest struct {
996995
LastHeartbeatWithin time.Duration
997996
RPCAddressEquals net.IP
998-
HostIDEquals uuid.UUID
997+
HostIDEquals []byte
999998
RoleEquals ServiceType
1000999
SessionStartedAfter time.Time
10011000
NextPageToken []byte
@@ -1011,7 +1010,7 @@ type (
10111010
// ClusterMember is used as a response to GetClusterMembers
10121011
ClusterMember struct {
10131012
Role ServiceType
1014-
HostID uuid.UUID
1013+
HostID []byte
10151014
RPCAddress net.IP
10161015
RPCPort uint16
10171016
SessionStart time.Time
@@ -1022,7 +1021,7 @@ type (
10221021
// UpsertClusterMembershipRequest is the request to UpsertClusterMembership
10231022
UpsertClusterMembershipRequest struct {
10241023
Role ServiceType
1025-
HostID uuid.UUID
1024+
HostID []byte
10261025
RPCAddress net.IP
10271026
RPCPort uint16
10281027
SessionStart time.Time

common/persistence/persistence-tests/cluster_metadata_manager.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,18 @@ func (s *ClusterMetadataManagerSuite) TestClusterMembershipEmptyInitially() {
6161

6262
// TestClusterMembershipUpsertCanReadAny verifies that we can UpsertClusterMembership and read our result
6363
func (s *ClusterMetadataManagerSuite) TestClusterMembershipUpsertCanReadAny() {
64-
64+
hostID, err := uuid.New().MarshalBinary()
65+
s.NoError(err)
6566
req := &p.UpsertClusterMembershipRequest{
66-
HostID: uuid.New(),
67+
HostID: hostID,
6768
RPCAddress: net.ParseIP("127.0.0.2"),
6869
RPCPort: 123,
6970
Role: p.Frontend,
7071
SessionStart: time.Now().UTC(),
7172
RecordExpiry: time.Second,
7273
}
7374

74-
err := s.ClusterMetadataManager.UpsertClusterMembership(s.ctx, req)
75+
err = s.ClusterMetadataManager.UpsertClusterMembership(s.ctx, req)
7576
s.Nil(err)
7677

7778
resp, err := s.ClusterMetadataManager.GetClusterMembers(s.ctx, &p.GetClusterMembersRequest{})
@@ -90,17 +91,19 @@ func (s *ClusterMetadataManagerSuite) TestClusterMembershipUpsertCanPageRead() {
9091
hostID := uuid.New()
9192

9293
expectedIds[hostID.String()]++
94+
hostIDBytes, err := hostID.MarshalBinary()
95+
s.NoError(err)
9396

9497
req := &p.UpsertClusterMembershipRequest{
95-
HostID: hostID,
98+
HostID: hostIDBytes,
9699
RPCAddress: net.ParseIP("127.0.0.2"),
97100
RPCPort: 123,
98101
Role: p.Frontend,
99102
SessionStart: time.Now().UTC(),
100103
RecordExpiry: 3 * time.Second,
101104
}
102105

103-
err := s.ClusterMetadataManager.UpsertClusterMembership(s.ctx, req)
106+
err = s.ClusterMetadataManager.UpsertClusterMembership(s.ctx, req)
104107
s.NoError(err)
105108
}
106109

@@ -111,7 +114,9 @@ func (s *ClusterMetadataManagerSuite) TestClusterMembershipUpsertCanPageRead() {
111114
s.NoError(err)
112115
nextPageToken = resp.NextPageToken
113116
for _, member := range resp.ActiveMembers {
114-
expectedIds[member.HostID.String()]--
117+
hostID, err := uuid.FromBytes(member.HostID)
118+
s.NoError(err)
119+
expectedIds[hostID.String()]--
115120
hostCount++
116121
}
117122

@@ -145,16 +150,18 @@ func (s *ClusterMetadataManagerSuite) validateUpsert(req *p.UpsertClusterMembers
145150
// TestClusterMembershipReadFiltersCorrectly verifies that we can UpsertClusterMembership and read our result using filters
146151
func (s *ClusterMetadataManagerSuite) TestClusterMembershipReadFiltersCorrectly() {
147152
now := time.Now().UTC()
153+
hostID, err := uuid.New().MarshalBinary()
154+
s.NoError(err)
148155
req := &p.UpsertClusterMembershipRequest{
149-
HostID: uuid.New(),
156+
HostID: hostID,
150157
RPCAddress: net.ParseIP("127.0.0.2"),
151158
RPCPort: 123,
152159
Role: p.Frontend,
153160
SessionStart: now,
154161
RecordExpiry: time.Second * 4,
155162
}
156163

157-
err := s.ClusterMetadataManager.UpsertClusterMembership(s.ctx, req)
164+
err = s.ClusterMetadataManager.UpsertClusterMembership(s.ctx, req)
158165
s.Nil(err)
159166

160167
resp, err := s.ClusterMetadataManager.GetClusterMembers(
@@ -203,16 +210,18 @@ func (s *ClusterMetadataManagerSuite) TestClusterMembershipReadFiltersCorrectly(
203210

204211
// TestClusterMembershipUpsertExpiresCorrectly verifies RecordExpiry functions properly for ClusterMembership records
205212
func (s *ClusterMetadataManagerSuite) TestClusterMembershipUpsertExpiresCorrectly() {
213+
hostID, err := uuid.New().MarshalBinary()
214+
s.NoError(err)
206215
req := &p.UpsertClusterMembershipRequest{
207-
HostID: uuid.New(),
216+
HostID: hostID,
208217
RPCAddress: net.ParseIP("127.0.0.2"),
209218
RPCPort: 123,
210219
Role: p.Frontend,
211220
SessionStart: time.Now().UTC(),
212221
RecordExpiry: time.Second,
213222
}
214223

215-
err := s.ClusterMetadataManager.UpsertClusterMembership(s.ctx, req)
224+
err = s.ClusterMetadataManager.UpsertClusterMembership(s.ctx, req)
216225
s.NoError(err)
217226

218227
err = s.ClusterMetadataManager.PruneClusterMembership(s.ctx, &p.PruneClusterMembershipRequest{MaxRecordsPruned: 100})
@@ -260,16 +269,18 @@ func (s *ClusterMetadataManagerSuite) waitForPrune(waitFor time.Duration) {
260269

261270
// TestClusterMembershipUpsertInvalidExpiry verifies we cannot specify a non-positive RecordExpiry duration
262271
func (s *ClusterMetadataManagerSuite) TestClusterMembershipUpsertInvalidExpiry() {
272+
hostID, err := uuid.New().MarshalBinary()
273+
s.NoError(err)
263274
req := &p.UpsertClusterMembershipRequest{
264-
HostID: uuid.New(),
275+
HostID: hostID,
265276
RPCAddress: net.ParseIP("127.0.0.2"),
266277
RPCPort: 123,
267278
Role: p.Frontend,
268279
SessionStart: time.Now().UTC(),
269280
RecordExpiry: time.Second * 0,
270281
}
271282

272-
err := s.ClusterMetadataManager.UpsertClusterMembership(s.ctx, req)
283+
err = s.ClusterMetadataManager.UpsertClusterMembership(s.ctx, req)
273284
s.NotNil(err)
274285
s.IsType(err, p.ErrInvalidMembershipExpiry)
275286
}

common/persistence/sql/cluster_metadata.go

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"net"
77
"time"
88

9-
"github.com/google/uuid"
109
"go.temporal.io/api/serviceerror"
1110
"go.temporal.io/server/common/log"
1211
p "go.temporal.io/server/common/persistence"
@@ -136,14 +135,9 @@ func (s *sqlClusterMetadataManager) GetClusterMembers(
136135
return nil, serviceerror.NewInternal("page token is corrupted.")
137136
}
138137

139-
hostIDEqualsBytes, err := request.HostIDEquals.MarshalBinary()
140-
if err != nil {
141-
return nil, serviceerror.NewInvalidArgumentf("unable to marshal HostIDEquals: %v", err)
142-
}
143-
144138
now := time.Now().UTC()
145139
filter := &sqlplugin.ClusterMembershipFilter{
146-
HostIDEquals: hostIDEqualsBytes,
140+
HostIDEquals: request.HostIDEquals,
147141
RoleEquals: request.RoleEquals,
148142
RecordExpiryAfter: now,
149143
SessionStartedAfter: request.SessionStartedAfter,
@@ -170,13 +164,8 @@ func (s *sqlClusterMetadataManager) GetClusterMembers(
170164

171165
convertedRows := make([]*p.ClusterMember, 0, len(rows))
172166
for _, row := range rows {
173-
hostIDUuid, err := uuid.ParseBytes(row.HostID)
174-
if err != nil {
175-
return nil, serviceerror.NewInternalf("unable to parse HostID: %v", err)
176-
}
177-
178167
convertedRows = append(convertedRows, &p.ClusterMember{
179-
HostID: hostIDUuid,
168+
HostID: row.HostID,
180169
Role: row.Role,
181170
RPCAddress: net.ParseIP(row.RPCAddress),
182171
RPCPort: row.RPCPort,
@@ -201,14 +190,9 @@ func (s *sqlClusterMetadataManager) UpsertClusterMembership(
201190
) error {
202191
now := time.Now().UTC()
203192
recordExpiry := now.Add(request.RecordExpiry)
204-
hostIDBytes, err := request.HostID.MarshalBinary()
205-
if err != nil {
206-
return serviceerror.NewInvalidArgumentf("unable to marshal HostID: %v", err)
207-
}
208-
209-
_, err = s.DB.UpsertClusterMembership(ctx, &sqlplugin.ClusterMembershipRow{
193+
_, err := s.DB.UpsertClusterMembership(ctx, &sqlplugin.ClusterMembershipRow{
210194
Role: request.Role,
211-
HostID: hostIDBytes,
195+
HostID: request.HostID,
212196
RPCAddress: request.RPCAddress.String(),
213197
RPCPort: request.RPCPort,
214198
SessionStart: request.SessionStart,

service/frontend/admin_handler.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,11 +1134,15 @@ func (adh *AdminHandler) ListClusterMembers(
11341134
if err != nil {
11351135
return nil, serviceerror.NewInvalidArgumentf("host ID %q is not a valid UUID: %v", request.GetHostId(), err)
11361136
}
1137+
hostIDEqualBytes, err := hostIDEqual.MarshalBinary()
1138+
if err != nil {
1139+
return nil, serviceerror.NewInternalf("unable to marshal host ID %q to bytes: %v", request.GetHostId(), err)
1140+
}
11371141

11381142
resp, err := metadataMgr.GetClusterMembers(ctx, &persistence.GetClusterMembersRequest{
11391143
LastHeartbeatWithin: heartbit,
11401144
RPCAddressEquals: net.ParseIP(request.GetRpcAddress()),
1141-
HostIDEquals: hostIDEqual,
1145+
HostIDEquals: hostIDEqualBytes,
11421146
RoleEquals: persistence.ServiceType(request.GetRole()),
11431147
SessionStartedAfter: startedTime,
11441148
PageSize: int(request.GetPageSize()),
@@ -1150,9 +1154,13 @@ func (adh *AdminHandler) ListClusterMembers(
11501154

11511155
var activeMembers []*clusterspb.ClusterMember
11521156
for _, member := range resp.ActiveMembers {
1157+
u, err := uuid.FromBytes(member.HostID)
1158+
if err != nil {
1159+
return nil, serviceerror.NewInternalf("unable to parse host ID bytes to UUID: %v", err)
1160+
}
11531161
activeMembers = append(activeMembers, &clusterspb.ClusterMember{
11541162
Role: enumsspb.ClusterMemberRole(member.Role),
1155-
HostId: member.HostID.String(),
1163+
HostId: u.String(),
11561164
RpcAddress: member.RPCAddress.String(),
11571165
RpcPort: int32(member.RPCPort),
11581166
SessionStartTime: timestamppb.New(member.SessionStart),

0 commit comments

Comments
 (0)