@@ -763,7 +763,7 @@ func (m *sqlExecutionManager) RangeCompleteTransferTask(
763763}
764764
765765func (m * sqlExecutionManager ) GetReplicationTask (request * persistence.GetReplicationTaskRequest ) (* persistence.GetReplicationTaskResponse , error ) {
766- rows , err := m .db .SelectFromReplicationTasks (& sqlplugin.ReplicationTasksFilter {ShardID : request .ShardID , TaskID : request .TaskID })
766+ rows , err := m .db .SelectFromReplicationTasks (sqlplugin.ReplicationTasksFilter {ShardID : request .ShardID , TaskID : request .TaskID })
767767 if err != nil {
768768 if err == sql .ErrNoRows {
769769 return nil , serviceerror .NewNotFound (fmt .Sprintf ("GetReplicationTask operation failed. Task with ID %v not found. Error: %v" , request .TaskID , err ))
@@ -795,8 +795,8 @@ func (m *sqlExecutionManager) GetReplicationTasks(
795795 return nil , err
796796 }
797797
798- rows , err := m .db .SelectFromReplicationTasks (
799- & sqlplugin.ReplicationTasksFilter {
798+ rows , err := m .db .RangeSelectFromReplicationTasks (
799+ sqlplugin.ReplicationTasksRangeFilter {
800800 ShardID : m .shardID ,
801801 MinTaskID : readLevel ,
802802 MaxTaskID : maxReadLevelInclusive ,
@@ -854,11 +854,39 @@ func (m *sqlExecutionManager) populateGetReplicationTasksResponse(
854854 }, nil
855855}
856856
857+ func (m * sqlExecutionManager ) populateGetReplicationDLQTasksResponse (
858+ rows []sqlplugin.ReplicationDLQTasksRow ,
859+ requestMaxReadLevel int64 ,
860+ ) (* p.GetReplicationTasksResponse , error ) {
861+ if len (rows ) == 0 {
862+ return & p.GetReplicationTasksResponse {}, nil
863+ }
864+
865+ var tasks = make ([]* persistenceblobs.ReplicationTaskInfo , len (rows ))
866+ for i , row := range rows {
867+ info , err := serialization .ReplicationTaskInfoFromBlob (row .Data , row .DataEncoding )
868+ if err != nil {
869+ return nil , err
870+ }
871+
872+ tasks [i ] = info
873+ }
874+ var nextPageToken []byte
875+ lastTaskID := rows [len (rows )- 1 ].TaskID
876+ if lastTaskID < requestMaxReadLevel {
877+ nextPageToken = serializePageToken (lastTaskID )
878+ }
879+ return & p.GetReplicationTasksResponse {
880+ Tasks : tasks ,
881+ NextPageToken : nextPageToken ,
882+ }, nil
883+ }
884+
857885func (m * sqlExecutionManager ) CompleteReplicationTask (
858886 request * p.CompleteReplicationTaskRequest ,
859887) error {
860888
861- if _ , err := m .db .DeleteFromReplicationTasks (& sqlplugin.ReplicationTasksFilter {
889+ if _ , err := m .db .DeleteFromReplicationTasks (sqlplugin.ReplicationTasksFilter {
862890 ShardID : m .shardID ,
863891 TaskID : request .TaskID ,
864892 }); err != nil {
@@ -871,9 +899,10 @@ func (m *sqlExecutionManager) RangeCompleteReplicationTask(
871899 request * p.RangeCompleteReplicationTaskRequest ,
872900) error {
873901
874- if _ , err := m .db .RangeDeleteFromReplicationTasks (& sqlplugin.ReplicationTasksFilter {
875- ShardID : m .shardID ,
876- TaskID : request .InclusiveEndTaskID ,
902+ if _ , err := m .db .RangeDeleteFromReplicationTasks (sqlplugin.ReplicationTasksRangeFilter {
903+ ShardID : m .shardID ,
904+ MinTaskID : 0 ,
905+ MaxTaskID : request .InclusiveEndTaskID ,
877906 }); err != nil {
878907 return serviceerror .NewInternal (fmt .Sprintf ("RangeCompleteReplicationTask operation failed. Error: %v" , err ))
879908 }
@@ -889,20 +918,17 @@ func (m *sqlExecutionManager) GetReplicationTasksFromDLQ(
889918 return nil , err
890919 }
891920
892- filter := sqlplugin.ReplicationTasksFilter {
893- ShardID : m .shardID ,
894- MinTaskID : readLevel ,
895- MaxTaskID : maxReadLevelInclusive ,
896- PageSize : request .BatchSize ,
897- }
898- rows , err := m .db .SelectFromReplicationTasksDLQ (& sqlplugin.ReplicationTasksDLQFilter {
899- ReplicationTasksFilter : filter ,
900- SourceClusterName : request .SourceClusterName ,
921+ rows , err := m .db .RangeSelectFromReplicationDLQTasks (sqlplugin.ReplicationDLQTasksRangeFilter {
922+ ShardID : m .shardID ,
923+ MinTaskID : readLevel ,
924+ MaxTaskID : maxReadLevelInclusive ,
925+ PageSize : request .BatchSize ,
926+ SourceClusterName : request .SourceClusterName ,
901927 })
902928
903929 switch err {
904930 case nil :
905- return m .populateGetReplicationTasksResponse (rows , request .MaxReadLevel )
931+ return m .populateGetReplicationDLQTasksResponse (rows , request .MaxReadLevel )
906932 case sql .ErrNoRows :
907933 return & p.GetReplicationTasksResponse {}, nil
908934 default :
@@ -914,14 +940,10 @@ func (m *sqlExecutionManager) DeleteReplicationTaskFromDLQ(
914940 request * p.DeleteReplicationTaskFromDLQRequest ,
915941) error {
916942
917- filter := sqlplugin.ReplicationTasksFilter {
918- ShardID : m .shardID ,
919- TaskID : request .TaskID ,
920- }
921-
922- if _ , err := m .db .DeleteMessageFromReplicationTasksDLQ (& sqlplugin.ReplicationTasksDLQFilter {
923- ReplicationTasksFilter : filter ,
924- SourceClusterName : request .SourceClusterName ,
943+ if _ , err := m .db .DeleteFromReplicationDLQTasks (sqlplugin.ReplicationDLQTasksFilter {
944+ ShardID : m .shardID ,
945+ TaskID : request .TaskID ,
946+ SourceClusterName : request .SourceClusterName ,
925947 }); err != nil {
926948 return err
927949 }
@@ -932,15 +954,11 @@ func (m *sqlExecutionManager) RangeDeleteReplicationTaskFromDLQ(
932954 request * p.RangeDeleteReplicationTaskFromDLQRequest ,
933955) error {
934956
935- filter := sqlplugin.ReplicationTasksFilter {
936- ShardID : m .shardID ,
937- TaskID : request .ExclusiveBeginTaskID ,
938- InclusiveEndTaskID : request .InclusiveEndTaskID ,
939- }
940-
941- if _ , err := m .db .RangeDeleteMessageFromReplicationTasksDLQ (& sqlplugin.ReplicationTasksDLQFilter {
942- ReplicationTasksFilter : filter ,
943- SourceClusterName : request .SourceClusterName ,
957+ if _ , err := m .db .RangeDeleteFromReplicationDLQTasks (sqlplugin.ReplicationDLQTasksRangeFilter {
958+ ShardID : m .shardID ,
959+ SourceClusterName : request .SourceClusterName ,
960+ MinTaskID : request .ExclusiveBeginTaskID ,
961+ MaxTaskID : request .InclusiveEndTaskID ,
944962 }); err != nil {
945963 return err
946964 }
@@ -1075,15 +1093,13 @@ func (m *sqlExecutionManager) PutReplicationTaskToDLQ(request *p.PutReplicationT
10751093 return err
10761094 }
10771095
1078- row := & sqlplugin.ReplicationTaskDLQRow {
1096+ _ , err = m . db . InsertIntoReplicationDLQTasks ([] sqlplugin.ReplicationDLQTasksRow { {
10791097 SourceClusterName : request .SourceClusterName ,
10801098 ShardID : m .shardID ,
10811099 TaskID : replicationTask .GetTaskId (),
10821100 Data : blob .Data ,
10831101 DataEncoding : blob .Encoding .String (),
1084- }
1085-
1086- _ , err = m .db .InsertIntoReplicationTasksDLQ (row )
1102+ }})
10871103
10881104 // Tasks are immutable. So it's fine if we already persisted it before.
10891105 // This can happen when tasks are retried (ack and cleanup can have lag on source side).
0 commit comments