Skip to content

Commit 1667973

Browse files
cherry: cherry: feat: adding bulk call for alpha to inform zero about the tablets (#8… (#8100)
* feat: adding bulk call for alpha to inform zero about the tablets (#8089) * adding bulk call for alpha to inform zero about the tablets (#8088) (cherry picked from commit c204d0a) * fix(zero): fix update membership to make bulk tablet proposal instead of multiple small (#8090) Converting the smaller tablet proposals into a single bulk call makes the execution faster. Else, this update becomes of the order of O(n^2), where n is the number of tablet updates. (cherry picked from commit 695c6d7) Co-authored-by: aman bansal <[email protected]>
1 parent ebe64f7 commit 1667973

File tree

5 files changed

+1031
-393
lines changed

5 files changed

+1031
-393
lines changed

dgraph/cmd/zero/raft.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,12 +298,25 @@ func (n *node) regenerateChecksum() {
298298
}
299299
}
300300

301-
func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
301+
func (n *node) handleBulkTabletProposal(tablets []*pb.Tablet) error {
302302
n.server.AssertLock()
303-
state := n.server.state
304-
305303
defer n.regenerateChecksum()
304+
for _, tablet := range tablets {
305+
if err := n.handleTablet(tablet); err != nil {
306+
glog.Warningf("not able to handle tablet %s. Got err: %+v", tablet.GetPredicate(), err)
307+
}
308+
}
306309

310+
return nil
311+
}
312+
313+
// handleTablet will check if the given tablet is served by any group.
314+
// If not the tablet will be added to the current group predicate list
315+
//
316+
// This function doesn't take any locks.
317+
// It is the calling functions responsibility to manage the concurrency.
318+
func (n *node) handleTablet(tablet *pb.Tablet) error {
319+
state := n.server.state
307320
if tablet.GroupId == 0 {
308321
return errors.Errorf("Tablet group id is zero: %+v", tablet)
309322
}
@@ -339,6 +352,12 @@ func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
339352
return nil
340353
}
341354

355+
func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
356+
n.server.AssertLock()
357+
defer n.regenerateChecksum()
358+
return n.handleTablet(tablet)
359+
}
360+
342361
func (n *node) deleteNamespace(delNs uint64) error {
343362
n.server.AssertLock()
344363
state := n.server.state
@@ -431,6 +450,15 @@ func (n *node) applyProposal(e raftpb.Entry) (uint64, error) {
431450
return key, err
432451
}
433452
}
453+
454+
if p.Tablets != nil && len(p.Tablets) > 0 {
455+
if err := n.handleBulkTabletProposal(p.Tablets); err != nil {
456+
span.Annotatef(nil, "While applying bulk tablet proposal: %v", err)
457+
glog.Errorf("While applying bulk tablet proposal: %v", err)
458+
return key, err
459+
}
460+
}
461+
434462
if p.License != nil {
435463
// Check that the number of nodes in the cluster should be less than MaxNodes, otherwise
436464
// reject the proposal.

dgraph/cmd/zero/zero.go

Lines changed: 76 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -316,10 +316,8 @@ func (s *Server) ServingTablet(tablet string) *pb.Tablet {
316316
defer s.RUnlock()
317317

318318
for _, group := range s.state.Groups {
319-
for key, tab := range group.Tablets {
320-
if key == tablet {
321-
return tab
322-
}
319+
if tab, ok := group.Tablets[tablet]; ok {
320+
return tab
323321
}
324322
}
325323
return nil
@@ -341,10 +339,8 @@ func (s *Server) servingTablet(tablet string) *pb.Tablet {
341339
s.AssertRLock()
342340

343341
for _, group := range s.state.Groups {
344-
for key, tab := range group.Tablets {
345-
if key == tablet {
346-
return tab
347-
}
342+
if tab, ok := group.Tablets[tablet]; ok {
343+
return tab
348344
}
349345
}
350346
return nil
@@ -386,6 +382,8 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
386382
})
387383
}
388384
}
385+
386+
var tablets []*pb.Tablet
389387
for key, dstTablet := range dst.Tablets {
390388
group, has := s.state.Groups[dstTablet.GroupId]
391389
if !has {
@@ -401,15 +399,81 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
401399
d := float64(dstTablet.OnDiskBytes)
402400
if dstTablet.Remove || (s == 0 && d > 0) || (s > 0 && math.Abs(d/s-1) > 0.1) {
403401
dstTablet.Force = false
404-
proposal := &pb.ZeroProposal{
405-
Tablet: dstTablet,
406-
}
407-
res = append(res, proposal)
402+
tablets = append(tablets, dstTablet)
408403
}
409404
}
405+
406+
if len(tablets) > 0 {
407+
res = append(res, &pb.ZeroProposal{Tablets: tablets})
408+
}
410409
return res, nil
411410
}
412411

412+
func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletResponse, error) {
413+
ctx, span := otrace.StartSpan(ctx, "Zero.Inform")
414+
defer span.End()
415+
if req == nil || len(req.Tablets) == 0 {
416+
return nil, errors.Errorf("Tablets are empty in %+v", req)
417+
}
418+
419+
if req.GroupId == 0 {
420+
return nil, errors.Errorf("Group ID is Zero in %+v", req)
421+
}
422+
423+
tablets := make([]*pb.Tablet, 0)
424+
unknownTablets := make([]*pb.Tablet, 0)
425+
for _, t := range req.Tablets {
426+
tab := s.ServingTablet(t.Predicate)
427+
span.Annotatef(nil, "Tablet for %s: %+v", t.Predicate, tab)
428+
switch {
429+
case tab != nil && !t.Force:
430+
tablets = append(tablets, t)
431+
case t.ReadOnly:
432+
tablets = append(tablets, &pb.Tablet{})
433+
default:
434+
unknownTablets = append(unknownTablets, t)
435+
}
436+
}
437+
438+
if len(unknownTablets) == 0 {
439+
return &pb.TabletResponse{
440+
Tablets: tablets,
441+
}, nil
442+
}
443+
444+
// Set the tablet to be served by this server's group.
445+
var proposal pb.ZeroProposal
446+
proposal.Tablets = make([]*pb.Tablet, 0)
447+
for _, t := range unknownTablets {
448+
if x.IsReservedPredicate(t.Predicate) {
449+
// Force all the reserved predicates to be allocated to group 1.
450+
// This is to make it easier to stream ACL updates to all alpha servers
451+
// since they only need to open one pipeline to receive updates for all
452+
// ACL predicates.
453+
// This will also make it easier to restore the reserved predicates after
454+
// a DropAll operation.
455+
t.GroupId = 1
456+
}
457+
proposal.Tablets = append(proposal.Tablets, t)
458+
}
459+
460+
if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
461+
span.Annotatef(nil, "While proposing tablet: %v", err)
462+
return nil, err
463+
}
464+
465+
for _, t := range unknownTablets {
466+
tab := s.ServingTablet(t.Predicate)
467+
x.AssertTrue(tab != nil)
468+
span.Annotatef(nil, "Now serving tablet for %s: %+v", t.Predicate, tab)
469+
tablets = append(tablets, tab)
470+
}
471+
472+
return &pb.TabletResponse{
473+
Tablets: tablets,
474+
}, nil
475+
}
476+
413477
// RemoveNode removes the given node from the given group.
414478
// It's the user's responsibility to ensure that node doesn't come back again
415479
// before calling the api.

protos/pb.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ message ZeroProposal {
176176
ZeroSnapshot snapshot = 11; // Used to make Zeros take a snapshot.
177177
// 12 has already been used.
178178
DeleteNsRequest delete_ns = 13; // Used to delete namespace.
179+
repeated Tablet tablets = 14;
179180
}
180181

181182
// MembershipState is used to pack together the current membership state of all
@@ -556,6 +557,7 @@ service Zero {
556557

557558
rpc Oracle(api.Payload) returns (stream OracleDelta) {}
558559
rpc ShouldServe(Tablet) returns (Tablet) {}
560+
rpc Inform(TabletRequest) returns (TabletResponse) {}
559561
rpc AssignIds(Num) returns (AssignedIds) {}
560562
rpc Timestamps(Num) returns (AssignedIds) {}
561563
rpc CommitOrAbort(api.TxnContext) returns (api.TxnContext) {}
@@ -585,6 +587,14 @@ service Worker {
585587
rpc TaskStatus(TaskStatusRequest) returns (TaskStatusResponse) {}
586588
}
587589

590+
message TabletResponse {
591+
repeated Tablet tablets = 1;
592+
}
593+
message TabletRequest {
594+
repeated Tablet tablets = 1;
595+
uint32 group_id = 2 [(gogoproto.jsontag) = "groupId,omitempty"]; // Served by which group.
596+
}
597+
588598
message SubscriptionRequest {
589599
repeated bytes prefixes = 1;
590600
repeated badgerpb3.Match matches = 2;

0 commit comments

Comments
 (0)