Skip to content

Commit f8e1109

Browse files
authored
CLIENT-3891 TaskId not being properly assigned during Background Query (#545)
* CLIENT-3891 TaskId not being properly assigned during Background Query * Added deprecation warnings and additional comments
1 parent eb2ee82 commit f8e1109

File tree

4 files changed

+95
-7
lines changed

4 files changed

+95
-7
lines changed

client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,7 +1385,7 @@ func (clnt *Client) QueryExecute(policy *QueryPolicy,
13851385

13861386
var errs Error
13871387
for i := range nodes {
1388-
command := newServerCommand(nodes[i], policy, writePolicy, statement, ops)
1388+
command := newServerCommand(nodes[i], policy, writePolicy, statement, taskId, ops)
13891389
if err := command.Execute(); err != nil {
13901390
errs = chainErrors(err, errs)
13911391
}
@@ -1420,7 +1420,7 @@ func (clnt *Client) ExecuteUDF(policy *QueryPolicy,
14201420

14211421
var errs Error
14221422
for i := range nodes {
1423-
command := newServerCommand(nodes[i], policy, nil, statement, nil)
1423+
command := newServerCommand(nodes[i], policy, nil, statement, taskId, nil)
14241424
if err := command.Execute(); err != nil {
14251425
errs = chainErrors(err, errs)
14261426
}
@@ -1453,7 +1453,7 @@ func (clnt *Client) ExecuteUDFNode(policy *QueryPolicy,
14531453

14541454
statement.SetAggregateFunction(packageName, functionName, functionArgs, false)
14551455

1456-
command := newServerCommand(node, policy, nil, statement, nil)
1456+
command := newServerCommand(node, policy, nil, statement, taskId, nil)
14571457
err := command.Execute()
14581458

14591459
return NewExecuteTask(clnt.cluster, statement, taskId), err

query_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"errors"
2020
"math"
2121
"math/rand"
22+
"strconv"
2223

2324
as "github.com/aerospike/aerospike-client-go/v8"
2425
ast "github.com/aerospike/aerospike-client-go/v8/types"
@@ -684,4 +685,84 @@ var _ = gg.Describe("Query operations", func() {
684685
gm.Expect(count2).To(gm.Equal(count3))
685686
gm.Expect(count1).To(gm.BeNumerically(">", 0))
686687
})
688+
689+
gg.It("must validate taskId for background query jobs using query-show info command", func() {
690+
stm := as.NewStatement(ns, set)
691+
stm.SetFilter(as.NewRangeFilter(bin3.Name, 0, math.MaxInt16))
692+
693+
updateBin := as.NewBin("Aerospike10", 888)
694+
695+
et, err := client.QueryExecute(queryPolicy, nil, stm, as.PutOp(updateBin))
696+
gm.Expect(err).ToNot(gm.HaveOccurred())
697+
gm.Expect(et).ToNot(gm.BeNil())
698+
699+
// Get and validate task ID
700+
taskId := et.TaskId()
701+
gm.Expect(taskId).To(gm.BeNumerically(">", 0))
702+
703+
// Query all nodes to verify the task ID is actually valid and tracked by the server
704+
nodes := client.GetNodes()
705+
gm.Expect(len(nodes)).To(gm.BeNumerically(">", 0))
706+
707+
taskIdStr := strconv.FormatUint(taskId, 10)
708+
taskFoundAndValidated := false
709+
710+
// Check that at least one node has the task tracked with the correct task ID
711+
for _, node := range nodes {
712+
ip := as.NewInfoPolicy()
713+
714+
infoCmd := "query-show:id=" + taskIdStr
715+
716+
res, err := node.RequestInfo(ip, infoCmd)
717+
if err == nil {
718+
response := res[infoCmd]
719+
// If we don't get ERROR:2, the task exists or existed
720+
if !bytes.Contains([]byte(response), []byte("ERROR:2")) {
721+
// Verify response contains expected job information
722+
gm.Expect(response).To(gm.ContainSubstring("status="))
723+
724+
// Verify the response contains the task ID we're looking for
725+
gm.Expect(response).To(gm.Or(
726+
gm.ContainSubstring("id="+taskIdStr),
727+
gm.ContainSubstring("trid="+taskIdStr),
728+
))
729+
730+
taskFoundAndValidated = true
731+
}
732+
}
733+
}
734+
735+
gm.Expect(taskFoundAndValidated).To(gm.BeTrue(),
736+
"Task ID %d should be tracked by at least one server node", taskId)
737+
738+
gm.Expect(<-et.OnComplete()).To(gm.BeNil())
739+
740+
// Verify IsDone returns true after completion
741+
done, err := et.IsDone()
742+
gm.Expect(err).ToNot(gm.HaveOccurred())
743+
gm.Expect(done).To(gm.BeTrue())
744+
745+
// After completion, verify that the task ID is still consistent
746+
taskIdAfterCompletion := et.TaskId()
747+
gm.Expect(taskIdAfterCompletion).To(gm.Equal(taskId),
748+
"Task ID should remain consistent before and after completion")
749+
750+
// After completion, verify that records were actually updated
751+
verifyStm := as.NewStatement(ns, set)
752+
verifyStm.SetFilter(as.NewRangeFilter(bin3.Name, 0, math.MaxInt16))
753+
recordset, err := client.Query(queryPolicy, verifyStm)
754+
gm.Expect(err).ToNot(gm.HaveOccurred())
755+
756+
updatedCount := 0
757+
for res := range recordset.Results() {
758+
gm.Expect(res.Err).ToNot(gm.HaveOccurred())
759+
rec := res.Record
760+
if rec.Bins["Aerospike10"] == 888 {
761+
updatedCount++
762+
}
763+
}
764+
765+
// All matching records should have been updated
766+
gm.Expect(updatedCount).To(gm.BeNumerically(">", 0))
767+
})
687768
})

server_command.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,23 @@ import (
2222
)
2323

2424
type serverCommand struct {
25+
// Deprecated: should not be used and will be removed in future major release.
26+
taskId uint64
2527
queryCommand
2628
}
2729

28-
func newServerCommand(node *Node, policy *QueryPolicy, writePolicy *WritePolicy, statement *Statement, operations []*Operation) *serverCommand {
30+
func newServerCommand(node *Node, policy *QueryPolicy, writePolicy *WritePolicy, statement *Statement, taskId uint64, operations []*Operation) *serverCommand {
31+
// Statement does contain a taskId however we cannot rely on it since the statement might be reused.
32+
// If TaskId is 0, set it to a new random value and return the same statement. Cannot modify the original
33+
// statement since user might want to reuse it.
2934
return &serverCommand{
3035
queryCommand: *newQueryCommand(node, policy, writePolicy, statement, operations, nil),
36+
taskId: taskId,
3137
}
3238
}
3339

3440
func (cmd *serverCommand) writeBuffer(ifc command) (err Error) {
35-
return cmd.setQuery(cmd.policy, cmd.writePolicy, cmd.statement, cmd.statement.TaskId, cmd.operations, cmd.writePolicy != nil, nil)
41+
return cmd.setQuery(cmd.policy, cmd.writePolicy, cmd.statement, cmd.taskId, cmd.operations, cmd.writePolicy != nil, nil)
3642
}
3743

3844
func (cmd *serverCommand) parseRecordResults(ifc command, receiveSize int) (bool, Error) {

statement.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type Statement struct {
4747

4848
// TaskId determines query task id. (Optional)
4949
// This value is not used anymore and will be removed later.
50+
// Deprecated: Statement instances should not set TaskId directly since instances are meant to be immutable.
5051
TaskId uint64
5152

5253
// determines if the query should return data
@@ -118,12 +119,12 @@ func (stmt *Statement) prepareTaskId() uint64 {
118119
// If TaskId is 0, set it to a new random value and return the same statement.
119120
// This also means that that the taskId was never set by the user.
120121
// If TaskId is non-zero, it means that the user set it manually.
121-
// In that case, we make a copy of the statement and set a new random taskId on the copy.
122-
// This way we don't modify the original statement that the user provided.
123122
// This is important because the user might want to reuse the same statement for multiple queries.
124123
// However the control of the taskId is now with the client library and not the user.
125124
// Important to remember is that the server will reject queries that have already been executed and the result is not
126125
// available yet.
126+
//
127+
// The statement.TaskId is deprecated and will be removed/revamped in future major release.
127128
if stmt.TaskId == 0 {
128129
taskId := rand.Uint64()
129130

0 commit comments

Comments
 (0)