Skip to content

Commit c204d0a

Browse files
authored
feat: adding bulk call for alpha to inform zero about the tablets (#8089)
* adding bulk call for alpha to inform zero about the tablets (#8088)
1 parent 461b3be commit c204d0a

File tree

5 files changed

+1025
-389
lines changed

5 files changed

+1025
-389
lines changed

dgraph/cmd/zero/raft.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,12 +298,25 @@ func (n *node) regenerateChecksum() {
298298
}
299299
}
300300

301-
func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
301+
func (n *node) handleBulkTabletProposal(tablets []*pb.Tablet) error {
302302
n.server.AssertLock()
303-
state := n.server.state
304-
305303
defer n.regenerateChecksum()
304+
for _, tablet := range tablets {
305+
if err := n.handleTablet(tablet); err != nil {
306+
glog.Warningf("not able to handle tablet %s. Got err: %+v", tablet.GetPredicate(), err)
307+
}
308+
}
306309

310+
return nil
311+
}
312+
313+
// handleTablet will check if the given tablet is served by any group.
314+
// If not the tablet will be added to the current group predicate list
315+
//
316+
// This function doesn't take any locks.
317+
// It is the calling functions responsibility to manage the concurrency.
318+
func (n *node) handleTablet(tablet *pb.Tablet) error {
319+
state := n.server.state
307320
if tablet.GroupId == 0 {
308321
return errors.Errorf("Tablet group id is zero: %+v", tablet)
309322
}
@@ -339,6 +352,12 @@ func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
339352
return nil
340353
}
341354

355+
func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
356+
n.server.AssertLock()
357+
defer n.regenerateChecksum()
358+
return n.handleTablet(tablet)
359+
}
360+
342361
func (n *node) deleteNamespace(delNs uint64) error {
343362
n.server.AssertLock()
344363
state := n.server.state
@@ -431,6 +450,15 @@ func (n *node) applyProposal(e raftpb.Entry) (uint64, error) {
431450
return key, err
432451
}
433452
}
453+
454+
if p.Tablets != nil && len(p.Tablets) > 0 {
455+
if err := n.handleBulkTabletProposal(p.Tablets); err != nil {
456+
span.Annotatef(nil, "While applying bulk tablet proposal: %v", err)
457+
glog.Errorf("While applying bulk tablet proposal: %v", err)
458+
return key, err
459+
}
460+
}
461+
434462
if p.License != nil {
435463
// Check that the number of nodes in the cluster should be less than MaxNodes, otherwise
436464
// reject the proposal.

dgraph/cmd/zero/zero.go

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -314,10 +314,8 @@ func (s *Server) ServingTablet(tablet string) *pb.Tablet {
314314
defer s.RUnlock()
315315

316316
for _, group := range s.state.Groups {
317-
for key, tab := range group.Tablets {
318-
if key == tablet {
319-
return tab
320-
}
317+
if tab, ok := group.Tablets[tablet]; ok {
318+
return tab
321319
}
322320
}
323321
return nil
@@ -339,10 +337,8 @@ func (s *Server) servingTablet(tablet string) *pb.Tablet {
339337
s.AssertRLock()
340338

341339
for _, group := range s.state.Groups {
342-
for key, tab := range group.Tablets {
343-
if key == tablet {
344-
return tab
345-
}
340+
if tab, ok := group.Tablets[tablet]; ok {
341+
return tab
346342
}
347343
}
348344
return nil
@@ -408,6 +404,71 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
408404
return res, nil
409405
}
410406

407+
func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletResponse, error) {
408+
ctx, span := otrace.StartSpan(ctx, "Zero.Inform")
409+
defer span.End()
410+
if req == nil || len(req.Tablets) == 0 {
411+
return nil, errors.Errorf("Tablets are empty in %+v", req)
412+
}
413+
414+
if req.GroupId == 0 {
415+
return nil, errors.Errorf("Group ID is Zero in %+v", req)
416+
}
417+
418+
tablets := make([]*pb.Tablet, 0)
419+
unknownTablets := make([]*pb.Tablet, 0)
420+
for _, t := range req.Tablets {
421+
tab := s.ServingTablet(t.Predicate)
422+
span.Annotatef(nil, "Tablet for %s: %+v", t.Predicate, tab)
423+
switch {
424+
case tab != nil && !t.Force:
425+
tablets = append(tablets, t)
426+
case t.ReadOnly:
427+
tablets = append(tablets, &pb.Tablet{})
428+
default:
429+
unknownTablets = append(unknownTablets, t)
430+
}
431+
}
432+
433+
if len(unknownTablets) == 0 {
434+
return &pb.TabletResponse{
435+
Tablets: tablets,
436+
}, nil
437+
}
438+
439+
// Set the tablet to be served by this server's group.
440+
var proposal pb.ZeroProposal
441+
proposal.Tablets = make([]*pb.Tablet, 0)
442+
for _, t := range unknownTablets {
443+
if x.IsReservedPredicate(t.Predicate) {
444+
// Force all the reserved predicates to be allocated to group 1.
445+
// This is to make it easier to stream ACL updates to all alpha servers
446+
// since they only need to open one pipeline to receive updates for all
447+
// ACL predicates.
448+
// This will also make it easier to restore the reserved predicates after
449+
// a DropAll operation.
450+
t.GroupId = 1
451+
}
452+
proposal.Tablets = append(proposal.Tablets, t)
453+
}
454+
455+
if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
456+
span.Annotatef(nil, "While proposing tablet: %v", err)
457+
return nil, err
458+
}
459+
460+
for _, t := range unknownTablets {
461+
tab := s.ServingTablet(t.Predicate)
462+
x.AssertTrue(tab != nil)
463+
span.Annotatef(nil, "Now serving tablet for %s: %+v", t.Predicate, tab)
464+
tablets = append(tablets, tab)
465+
}
466+
467+
return &pb.TabletResponse{
468+
Tablets: tablets,
469+
}, nil
470+
}
471+
411472
// RemoveNode removes the given node from the given group.
412473
// It's the user's responsibility to ensure that node doesn't come back again
413474
// before calling the api.

protos/pb.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ message ZeroProposal {
168168
ZeroSnapshot snapshot = 11; // Used to make Zeros take a snapshot.
169169
// 12 has already been used.
170170
DeleteNsRequest delete_ns = 13; // Used to delete namespace.
171+
repeated Tablet tablets = 14;
171172
}
172173

173174
// MembershipState is used to pack together the current membership state of all
@@ -567,6 +568,7 @@ service Zero {
567568

568569
rpc Oracle(api.Payload) returns (stream OracleDelta) {}
569570
rpc ShouldServe(Tablet) returns (Tablet) {}
571+
rpc Inform(TabletRequest) returns (TabletResponse) {}
570572
rpc AssignIds(Num) returns (AssignedIds) {}
571573
rpc Timestamps(Num) returns (AssignedIds) {}
572574
rpc CommitOrAbort(api.TxnContext) returns (api.TxnContext) {}
@@ -596,6 +598,14 @@ service Worker {
596598
rpc TaskStatus(TaskStatusRequest) returns (TaskStatusResponse) {}
597599
}
598600

601+
message TabletResponse {
602+
repeated Tablet tablets = 1;
603+
}
604+
message TabletRequest {
605+
repeated Tablet tablets = 1;
606+
uint32 group_id = 2 [(gogoproto.jsontag) = "groupId,omitempty"]; // Served by which group.
607+
}
608+
599609
message SubscriptionRequest {
600610
repeated bytes prefixes = 1;
601611
repeated badgerpb3.Match matches = 2;

0 commit comments

Comments
 (0)