Skip to content

Commit 1e7c280

Browse files
authored
Create CLI admin command to describe Transfer Task (#412)
1 parent 4e0c4e0 commit 1e7c280

File tree

10 files changed

+137
-1
lines changed

10 files changed

+137
-1
lines changed

common/metrics/defs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ const (
154154
PersistenceGetCurrentExecutionScope
155155
// PersistenceListConcreteExecutionsScope tracks ListConcreteExecutions calls made by service to persistence layer
156156
PersistenceListConcreteExecutionsScope
157+
// PersistenceGetTransferTaskScope tracks GetTransferTask calls made by service to persistence layer
158+
PersistenceGetTransferTaskScope
157159
// PersistenceGetTransferTasksScope tracks GetTransferTasks calls made by service to persistence layer
158160
PersistenceGetTransferTasksScope
159161
// PersistenceCompleteTransferTaskScope tracks CompleteTransferTasks calls made by service to persistence layer
@@ -1073,6 +1075,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
10731075
PersistenceDeleteCurrentWorkflowExecutionScope: {operation: "DeleteCurrentWorkflowExecution"},
10741076
PersistenceGetCurrentExecutionScope: {operation: "GetCurrentExecution"},
10751077
PersistenceListConcreteExecutionsScope: {operation: "ListConcreteExecutions"},
1078+
PersistenceGetTransferTaskScope: {operation: "GetTransferTask"},
10761079
PersistenceGetTransferTasksScope: {operation: "GetTransferTasks"},
10771080
PersistenceCompleteTransferTaskScope: {operation: "CompleteTransferTask"},
10781081
PersistenceRangeCompleteTransferTaskScope: {operation: "RangeCompleteTransferTask"},

common/mocks/ExecutionManager.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,29 @@ func (_m *ExecutionManager) ListConcreteExecutions(request *persistence.ListConc
234234
return r0, r1
235235
}
236236

237+
// GetTransferTask provides a mock function with given fields: request
238+
func (_m *ExecutionManager) GetTransferTask(request *persistence.GetTransferTaskRequest) (*persistence.GetTransferTaskResponse, error) {
239+
ret := _m.Called(request)
240+
241+
var r0 *persistence.GetTransferTaskResponse
242+
if rf, ok := ret.Get(0).(func(*persistence.GetTransferTaskRequest) *persistence.GetTransferTaskResponse); ok {
243+
r0 = rf(request)
244+
} else {
245+
if ret.Get(0) != nil {
246+
r0 = ret.Get(0).(*persistence.GetTransferTaskResponse)
247+
}
248+
}
249+
250+
var r1 error
251+
if rf, ok := ret.Get(1).(func(*persistence.GetTransferTaskRequest) error); ok {
252+
r1 = rf(request)
253+
} else {
254+
r1 = ret.Error(1)
255+
}
256+
257+
return r0, r1
258+
}
259+
237260
// GetTransferTasks provides a mock function with given fields: request
238261
func (_m *ExecutionManager) GetTransferTasks(request *persistence.GetTransferTasksRequest) (*persistence.GetTransferTasksResponse, error) {
239262
ret := _m.Called(request)

common/persistence/cassandra/cassandraPersistence.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,16 @@ workflow_state = ? ` +
506506
`and visibility_ts = ? ` +
507507
`and task_id = ? `
508508

509+
templateGetTransferTaskQuery = `SELECT transfer, transfer_encoding ` +
510+
`FROM executions ` +
511+
`WHERE shard_id = ? ` +
512+
`and type = ? ` +
513+
`and namespace_id = ? ` +
514+
`and workflow_id = ? ` +
515+
`and run_id = ? ` +
516+
`and visibility_ts = ? ` +
517+
`and task_id = ? `
518+
509519
templateGetTransferTasksQuery = `SELECT transfer, transfer_encoding ` +
510520
`FROM executions ` +
511521
`WHERE shard_id = ? ` +
@@ -1932,6 +1942,33 @@ func (d *cassandraPersistence) ListConcreteExecutions(
19321942
return response, nil
19331943
}
19341944

1945+
func (d *cassandraPersistence) GetTransferTask(request *p.GetTransferTaskRequest) (*p.GetTransferTaskResponse, error) {
1946+
shardID := d.shardID
1947+
taskID := request.TaskID
1948+
query := d.session.Query(templateGetTransferTaskQuery,
1949+
shardID,
1950+
rowTypeTransferTask,
1951+
rowTypeTransferNamespaceID,
1952+
rowTypeTransferWorkflowID,
1953+
rowTypeTransferRunID,
1954+
defaultVisibilityTimestamp,
1955+
taskID)
1956+
1957+
var data []byte
1958+
var encoding string
1959+
if err := query.Scan(&data, &encoding); err != nil {
1960+
return nil, convertCommonErrors("GetTransferTask", err)
1961+
}
1962+
1963+
info, err := serialization.TransferTaskInfoFromBlob(data, encoding)
1964+
1965+
if err != nil {
1966+
return nil, convertCommonErrors("GetTransferTask", err)
1967+
}
1968+
1969+
return &p.GetTransferTaskResponse{TransferTaskInfo: info}, nil
1970+
}
1971+
19351972
func (d *cassandraPersistence) GetTransferTasks(request *p.GetTransferTasksRequest) (*p.GetTransferTasksResponse, error) {
19361973

19371974
// Reading transfer tasks need to be quorum level consistent, otherwise we could loose task

common/persistence/dataInterfaces.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,17 @@ type (
776776
RunID string
777777
}
778778

779+
// GetTransferTaskRequest is the request for GetTransferTask
780+
GetTransferTaskRequest struct {
781+
ShardID int32
782+
TaskID int64
783+
}
784+
785+
// GetTransferTaskResponse is the response to GetTransferTask
786+
GetTransferTaskResponse struct {
787+
TransferTaskInfo *persistenceblobs.TransferTaskInfo
788+
}
789+
779790
// GetTransferTasksRequest is used to read tasks from the transfer task queue
780791
GetTransferTasksRequest struct {
781792
ReadLevel int64
@@ -1351,6 +1362,7 @@ type (
13511362
GetCurrentExecution(request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error)
13521363

13531364
// Transfer task related methods
1365+
GetTransferTask(request *GetTransferTaskRequest) (*GetTransferTaskResponse, error)
13541366
GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error)
13551367
CompleteTransferTask(request *CompleteTransferTaskRequest) error
13561368
RangeCompleteTransferTask(request *RangeCompleteTransferTaskRequest) error

common/persistence/executionStore.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,12 @@ func (m *executionManagerImpl) ListConcreteExecutions(
802802
}
803803

804804
// Transfer task related methods
805+
func (m *executionManagerImpl) GetTransferTask(
806+
request *GetTransferTaskRequest,
807+
) (*GetTransferTaskResponse, error) {
808+
return m.persistence.GetTransferTask(request)
809+
}
810+
805811
func (m *executionManagerImpl) GetTransferTasks(
806812
request *GetTransferTasksRequest,
807813
) (*GetTransferTasksResponse, error) {

common/persistence/persistenceInterface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ type (
9898
GetCurrentExecution(request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error)
9999

100100
// Transfer task related methods
101+
GetTransferTask(request *GetTransferTaskRequest) (*GetTransferTaskResponse, error)
101102
GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error)
102103
CompleteTransferTask(request *CompleteTransferTaskRequest) error
103104
RangeCompleteTransferTask(request *RangeCompleteTransferTaskRequest) error

common/persistence/persistenceMetricClients.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,20 @@ func (p *workflowExecutionPersistenceClient) ListConcreteExecutions(request *Lis
365365
return response, err
366366
}
367367

368+
func (p *workflowExecutionPersistenceClient) GetTransferTask(request *GetTransferTaskRequest) (*GetTransferTaskResponse, error) {
369+
p.metricClient.IncCounter(metrics.PersistenceGetTransferTaskScope, metrics.PersistenceRequests)
370+
371+
sw := p.metricClient.StartTimer(metrics.PersistenceGetTransferTaskScope, metrics.PersistenceLatency)
372+
response, err := p.persistence.GetTransferTask(request)
373+
sw.Stop()
374+
375+
if err != nil {
376+
p.updateErrorMetric(metrics.PersistenceGetTransferTaskScope, err)
377+
}
378+
379+
return response, err
380+
}
381+
368382
func (p *workflowExecutionPersistenceClient) GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error) {
369383
p.metricClient.IncCounter(metrics.PersistenceGetTransferTasksScope, metrics.PersistenceRequests)
370384

common/persistence/persistenceRateLimitedClients.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,15 @@ func (p *workflowExecutionRateLimitedPersistenceClient) ListConcreteExecutions(r
293293
return response, err
294294
}
295295

296+
func (p *workflowExecutionRateLimitedPersistenceClient) GetTransferTask(request *GetTransferTaskRequest) (*GetTransferTaskResponse, error) {
297+
if ok := p.rateLimiter.Allow(); !ok {
298+
return nil, ErrPersistenceLimitExceeded
299+
}
300+
301+
response, err := p.persistence.GetTransferTask(request)
302+
return response, err
303+
}
304+
296305
func (p *workflowExecutionRateLimitedPersistenceClient) GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error) {
297306
if ok := p.rateLimiter.Allow(); !ok {
298307
return nil, ErrPersistenceLimitExceeded

common/persistence/sql/sqlExecutionManager.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,30 @@ func (m *sqlExecutionManager) ListConcreteExecutions(
728728
return nil, serviceerror.NewUnimplemented("ListConcreteExecutions is not implemented")
729729
}
730730

731+
func (m *sqlExecutionManager) GetTransferTask(request *persistence.GetTransferTaskRequest) (*persistence.GetTransferTaskResponse, error) {
732+
rows, err := m.db.SelectFromTransferTasks(&sqlplugin.TransferTasksFilter{ShardID: int(request.ShardID), TaskID: &request.TaskID})
733+
if err != nil {
734+
if err == sql.ErrNoRows {
735+
return nil, serviceerror.NewNotFound(fmt.Sprintf("GetTransferTask operation failed. Task with ID %v not found. Error: %v", request.TaskID, err))
736+
}
737+
return nil, serviceerror.NewInternal(fmt.Sprintf("GetTransferTask operation failed. Failed to get record. TaskId: %v. Error: %v", request.TaskID, err))
738+
}
739+
740+
if len(rows) == 0 {
741+
return nil, serviceerror.NewInternal(fmt.Sprintf("GetTransferTask operation failed. Failed to get record. TaskId: %v", request.TaskID))
742+
}
743+
744+
transferRow := rows[0]
745+
transferInfo, err := serialization.TransferTaskInfoFromBlob(transferRow.Data, transferRow.DataEncoding)
746+
if err != nil {
747+
return nil, err
748+
}
749+
750+
resp := &persistence.GetTransferTaskResponse{TransferTaskInfo: transferInfo}
751+
752+
return resp, nil
753+
}
754+
731755
func (m *sqlExecutionManager) GetTransferTasks(
732756
request *p.GetTransferTasksRequest,
733757
) (*p.GetTransferTasksResponse, error) {

tools/cli/adminCommands.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,6 @@ func AdminGetShardID(c *cli.Context) {
373373
func AdminDescribeTask(c *cli.Context) {
374374
sid := getRequiredIntOption(c, FlagShardID)
375375
tid := getRequiredIntOption(c, FlagTaskID)
376-
vis := getRequiredInt64Option(c, FlagTaskVisibilityTimestamp)
377376
category := commongenpb.TaskCategory(c.Int(FlagTaskType))
378377

379378
pFactory := CreatePersistenceFactory(c)
@@ -383,6 +382,7 @@ func AdminDescribeTask(c *cli.Context) {
383382
}
384383

385384
if category == commongenpb.TaskCategory_TaskCategory_Timer {
385+
vis := getRequiredInt64Option(c, FlagTaskVisibilityTimestamp)
386386
req := &persistence.GetTimerTaskRequest{ShardID: int32(sid), TaskID: int64(tid), VisibilityTimestamp: time.Unix(0, vis)}
387387
task, err := executionManager.GetTimerTask(req)
388388
if err != nil {
@@ -396,6 +396,13 @@ func AdminDescribeTask(c *cli.Context) {
396396
ErrorAndExit("Failed to get Replication Task", err)
397397
}
398398
prettyPrintJSONObject(task)
399+
} else if category == commongenpb.TaskCategory_TaskCategory_Transfer {
400+
req := &persistence.GetTransferTaskRequest{ShardID: int32(sid), TaskID: int64(tid)}
401+
task, err := executionManager.GetTransferTask(req)
402+
if err != nil {
403+
ErrorAndExit("Failed to get Transfer Task", err)
404+
}
405+
prettyPrintJSONObject(task)
399406
} else {
400407
ErrorAndExit("Failed to describe task", fmt.Errorf("Unrecognized task type, task_type=%v", category))
401408
}

0 commit comments

Comments
 (0)