diff --git a/common/persistence/cassandra/mutable_state_task_store.go b/common/persistence/cassandra/mutable_state_task_store.go index 6ed2b82068b..61187c73e24 100644 --- a/common/persistence/cassandra/mutable_state_task_store.go +++ b/common/persistence/cassandra/mutable_state_task_store.go @@ -76,8 +76,8 @@ const ( `and workflow_id = ? ` + `and run_id = ? ` + `and visibility_ts = ? ` + - `and task_id > ? ` + - `and task_id <= ?` + `and task_id >= ? ` + + `and task_id < ?` templateGetHistoryScheduledTasksQuery = `SELECT task_data, task_encoding ` + `FROM executions ` + diff --git a/common/persistence/sql/execution_tasks.go b/common/persistence/sql/execution_tasks.go index 3ed30809eef..5b79930672a 100644 --- a/common/persistence/sql/execution_tasks.go +++ b/common/persistence/sql/execution_tasks.go @@ -62,72 +62,363 @@ func (m *sqlExecutionStore) GetHistoryTask( ctx context.Context, request *p.GetHistoryTaskRequest, ) (*p.InternalGetHistoryTaskResponse, error) { - switch request.TaskCategory.ID() { + switch request.TaskCategory.Type() { + case tasks.CategoryTypeImmediate: + return m.getHistoryImmediateTask(ctx, request) + case tasks.CategoryTypeScheduled: + return m.getHistoryScheduledTask(ctx, request) + default: + return nil, serviceerror.NewInternal(fmt.Sprintf("Unknown task category type: %v", request.TaskCategory)) + } +} + +func (m *sqlExecutionStore) GetHistoryTasks( + ctx context.Context, + request *p.GetHistoryTasksRequest, +) (*p.InternalGetHistoryTasksResponse, error) { + switch request.TaskCategory.Type() { + case tasks.CategoryTypeImmediate: + return m.getHistoryImmediateTasks(ctx, request) + case tasks.CategoryTypeScheduled: + return m.getHistoryScheduledTasks(ctx, request) + default: + return nil, serviceerror.NewInternal(fmt.Sprintf("Unknown task category type: %v", request.TaskCategory)) + } +} + +func (m *sqlExecutionStore) CompleteHistoryTask( + ctx context.Context, + request *p.CompleteHistoryTaskRequest, +) error { + switch request.TaskCategory.Type() { + case tasks.CategoryTypeImmediate: + return m.completeHistoryImmediateTask(ctx, request) + case tasks.CategoryTypeScheduled: + return m.completeHistoryScheduledTask(ctx, request) + default: + return serviceerror.NewInternal(fmt.Sprintf("Unknown task category type: %v", request.TaskCategory)) + } +} + +func (m *sqlExecutionStore) RangeCompleteHistoryTasks( + ctx context.Context, + request *p.RangeCompleteHistoryTasksRequest, +) error { + switch request.TaskCategory.Type() { + case tasks.CategoryTypeImmediate: + return m.rangeCompleteHistoryImmediateTasks(ctx, request) + case tasks.CategoryTypeScheduled: + return m.rangeCompleteHistoryScheduledTasks(ctx, request) + default: + return serviceerror.NewInternal(fmt.Sprintf("Unknown task category type: %v", request.TaskCategory)) + } +} + +func (m *sqlExecutionStore) getHistoryImmediateTask( + ctx context.Context, + request *p.GetHistoryTaskRequest, +) (*p.InternalGetHistoryTaskResponse, error) { + // This is for backward compatiblity. + // These task categories exist before the general history_immediate_tasks table is created, + // so they have their own tables. + categoryID := request.TaskCategory.ID() + switch categoryID { case tasks.CategoryIDTransfer: return m.getTransferTask(ctx, request) - case tasks.CategoryIDTimer: - return m.getTimerTask(ctx, request) case tasks.CategoryIDVisibility: return m.getVisibilityTask(ctx, request) case tasks.CategoryIDReplication: return m.getReplicationTask(ctx, request) - default: - return nil, serviceerror.NewInternal(fmt.Sprintf("unknown task category: %v", request.TaskCategory)) } + + rows, err := m.Db.SelectFromHistoryImmediateTasks(ctx, sqlplugin.HistoryImmediateTasksFilter{ + ShardID: request.ShardID, + CategoryID: categoryID, + TaskID: request.TaskKey.TaskID, + }) + if err != nil { + if err == sql.ErrNoRows { + return nil, serviceerror.NewNotFound( + fmt.Sprintf("GetHistoryTask operation failed. CategoryID: %v. Task with ID %v not found. Error: %v", categoryID, request.TaskKey.TaskID, err), + ) + } + return nil, serviceerror.NewUnavailable( + fmt.Sprintf("GetHistoryTask operation failed. Failed to get record. CategoryID: %v. TaskId: %v. Error: %v", categoryID, request.TaskKey.TaskID, err), + ) + } + + if len(rows) == 0 { + return nil, serviceerror.NewNotFound( + fmt.Sprintf("GetHistoryTask operation failed. Failed to get record. CategoryID: %v. TaskId: %v", categoryID, request.TaskKey.TaskID), + ) + } + + immedidateTaskRow := rows[0] + resp := &p.InternalGetHistoryTaskResponse{ + Task: *p.NewDataBlob(immedidateTaskRow.Data, immedidateTaskRow.DataEncoding), + } + return resp, nil } -func (m *sqlExecutionStore) GetHistoryTasks( +func (m *sqlExecutionStore) getHistoryImmediateTasks( ctx context.Context, request *p.GetHistoryTasksRequest, ) (*p.InternalGetHistoryTasksResponse, error) { - switch request.TaskCategory.ID() { + // This is for backward compatiblity. + // These task categories exist before the general history_immediate_tasks table is created, + // so they have their own tables. + categoryID := request.TaskCategory.ID() + switch categoryID { case tasks.CategoryIDTransfer: return m.getTransferTasks(ctx, request) - case tasks.CategoryIDTimer: - return m.getTimerTasks(ctx, request) case tasks.CategoryIDVisibility: return m.getVisibilityTasks(ctx, request) case tasks.CategoryIDReplication: return m.getReplicationTasks(ctx, request) - default: - return nil, serviceerror.NewInternal(fmt.Sprintf("unknown task category: %v", request.TaskCategory)) } + + inclusiveMinTaskID, exclusiveMaxTaskID, err := getImmediateTaskReadRange(request) + if err != nil { + return nil, err + } + + rows, err := m.Db.RangeSelectFromHistoryImmediateTasks(ctx, sqlplugin.HistoryImmediateTasksRangeFilter{ + ShardID: request.ShardID, + CategoryID: categoryID, + InclusiveMinTaskID: inclusiveMinTaskID, + ExclusiveMaxTaskID: exclusiveMaxTaskID, + PageSize: request.BatchSize, + }) + if err != nil { + if err != sql.ErrNoRows { + return nil, serviceerror.NewUnavailable( + fmt.Sprintf("GetHistoryTasks operation failed. Select failed. CategoryID: %v. Error: %v", categoryID, err), + ) + } + } + resp := &p.InternalGetHistoryTasksResponse{ + Tasks: make([]commonpb.DataBlob, len(rows)), + } + if len(rows) == 0 { + return resp, nil + } + + for i, row := range rows { + resp.Tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding) + } + if len(rows) == request.BatchSize { + resp.NextPageToken = getImmediateTaskNextPageToken( + rows[len(rows)-1].TaskID, + exclusiveMaxTaskID, + ) + } + + return resp, nil } -func (m *sqlExecutionStore) CompleteHistoryTask( +func (m *sqlExecutionStore) completeHistoryImmediateTask( ctx context.Context, request *p.CompleteHistoryTaskRequest, ) error { - switch request.TaskCategory.ID() { + // This is for backward compatiblity. + // These task categories exist before the general history_immediate_tasks table is created, + // so they have their own tables. + categoryID := request.TaskCategory.ID() + switch categoryID { case tasks.CategoryIDTransfer: return m.completeTransferTask(ctx, request) - case tasks.CategoryIDTimer: - return m.completeTimerTask(ctx, request) case tasks.CategoryIDVisibility: return m.completeVisibilityTask(ctx, request) case tasks.CategoryIDReplication: return m.completeReplicationTask(ctx, request) - default: - return serviceerror.NewInternal(fmt.Sprintf("unknown task category: %v", request.TaskCategory)) } + + if _, err := m.Db.DeleteFromHistoryImmediateTasks(ctx, sqlplugin.HistoryImmediateTasksFilter{ + ShardID: request.ShardID, + CategoryID: categoryID, + TaskID: request.TaskKey.TaskID, + }); err != nil { + return serviceerror.NewUnavailable( + fmt.Sprintf("CompleteHistoryTask operation failed. CategoryID: %v. Error: %v", categoryID, err), + ) + } + return nil } -func (m *sqlExecutionStore) RangeCompleteHistoryTasks( +func (m *sqlExecutionStore) rangeCompleteHistoryImmediateTasks( ctx context.Context, request *p.RangeCompleteHistoryTasksRequest, ) error { - switch request.TaskCategory.ID() { + // This is for backward compatiblity. + // These task categories exist before the general history_immediate_tasks table is created, + // so they have their own tables. + categoryID := request.TaskCategory.ID() + switch categoryID { case tasks.CategoryIDTransfer: return m.rangeCompleteTransferTasks(ctx, request) - case tasks.CategoryIDTimer: - return m.rangeCompleteTimerTasks(ctx, request) case tasks.CategoryIDVisibility: return m.rangeCompleteVisibilityTasks(ctx, request) case tasks.CategoryIDReplication: return m.rangeCompleteReplicationTasks(ctx, request) - default: - return serviceerror.NewInternal(fmt.Sprintf("unknown task category: %v", request.TaskCategory)) } + + if _, err := m.Db.RangeDeleteFromHistoryImmediateTasks(ctx, sqlplugin.HistoryImmediateTasksRangeFilter{ + ShardID: request.ShardID, + CategoryID: categoryID, + InclusiveMinTaskID: request.InclusiveMinTaskKey.TaskID, + ExclusiveMaxTaskID: request.ExclusiveMaxTaskKey.TaskID, + }); err != nil { + return serviceerror.NewUnavailable( + fmt.Sprintf("RangeCompleteTransferTask operation failed. CategoryID: %v. Error: %v", categoryID, err), + ) + } + return nil +} + +func (m *sqlExecutionStore) getHistoryScheduledTask( + ctx context.Context, + request *p.GetHistoryTaskRequest, +) (*p.InternalGetHistoryTaskResponse, error) { + // This is for backward compatiblity. + // These task categories exist before the general history_scheduled_tasks table is created, + // so they have their own tables. + categoryID := request.TaskCategory.ID() + if categoryID == tasks.CategoryIDTimer { + return m.getTimerTask(ctx, request) + } + + rows, err := m.Db.SelectFromHistoryScheduledTasks(ctx, sqlplugin.HistoryScheduledTasksFilter{ + ShardID: request.ShardID, + CategoryID: categoryID, + TaskID: request.TaskKey.TaskID, + VisibilityTimestamp: request.TaskKey.FireTime, + }) + if err != nil { + if err == sql.ErrNoRows { + return nil, serviceerror.NewNotFound( + fmt.Sprintf("GetHistoryTask operation failed. CategoryID: %v. Task with ID %v not found. Error: %v", categoryID, request.TaskKey.TaskID, err), + ) + } + return nil, serviceerror.NewUnavailable( + fmt.Sprintf("GetHistoryTask operation failed. Failed to get record. CategoryID: %v. TaskId: %v. Error: %v", categoryID, request.TaskKey.TaskID, err), + ) + } + + if len(rows) == 0 { + return nil, serviceerror.NewNotFound( + fmt.Sprintf("GetHistoryTask operation failed. Failed to get record. CategoryID: %v. TaskId: %v", categoryID, request.TaskKey.TaskID), + ) + } + + scheduledTaskRow := rows[0] + resp := &p.InternalGetHistoryTaskResponse{ + Task: *p.NewDataBlob(scheduledTaskRow.Data, scheduledTaskRow.DataEncoding), + } + return resp, nil +} + +func (m *sqlExecutionStore) getHistoryScheduledTasks( + ctx context.Context, + request *p.GetHistoryTasksRequest, +) (*p.InternalGetHistoryTasksResponse, error) { + // This is for backward compatiblity. + // These task categories exist before the general history_scheduled_tasks table is created, + // so they have their own tables. + categoryID := request.TaskCategory.ID() + if categoryID == tasks.CategoryIDTimer { + return m.getTimerTasks(ctx, request) + } + + pageToken := &scheduledTaskPageToken{TaskID: math.MinInt64, Timestamp: request.InclusiveMinTaskKey.FireTime} + if len(request.NextPageToken) > 0 { + if err := pageToken.deserialize(request.NextPageToken); err != nil { + return nil, serviceerror.NewInternal( + fmt.Sprintf("categoryID: %v. error deserializing scheduledTaskPageToken: %v", categoryID, err), + ) + } + } + + rows, err := m.Db.RangeSelectFromHistoryScheduledTasks(ctx, sqlplugin.HistoryScheduledTasksRangeFilter{ + ShardID: request.ShardID, + CategoryID: categoryID, + InclusiveMinVisibilityTimestamp: pageToken.Timestamp, + InclusiveMinTaskID: pageToken.TaskID, + ExclusiveMaxVisibilityTimestamp: request.ExclusiveMaxTaskKey.FireTime, + PageSize: request.BatchSize, + }) + + if err != nil && err != sql.ErrNoRows { + return nil, serviceerror.NewUnavailable( + fmt.Sprintf("GetHistoryTasks operation failed. Select failed. CategoryID: %v. Error: %v", categoryID, err), + ) + } + + resp := &p.InternalGetHistoryTasksResponse{Tasks: make([]commonpb.DataBlob, len(rows))} + for i, row := range rows { + resp.Tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding) + } + + if len(resp.Tasks) == request.BatchSize { + pageToken = &scheduledTaskPageToken{ + TaskID: rows[request.BatchSize-1].TaskID + 1, + Timestamp: rows[request.BatchSize-1].VisibilityTimestamp, + } + nextToken, err := pageToken.serialize() + if err != nil { + return nil, serviceerror.NewInternal(fmt.Sprintf("GetHistoryTasks: error serializing page token: %v", err)) + } + resp.NextPageToken = nextToken + } + + return resp, nil +} + +func (m *sqlExecutionStore) completeHistoryScheduledTask( + ctx context.Context, + request *p.CompleteHistoryTaskRequest, +) error { + // This is for backward compatiblity. + // These task categories exist before the general history_scheduled_tasks table is created, + // so they have their own tables. + categoryID := request.TaskCategory.ID() + if categoryID == tasks.CategoryIDTimer { + return m.completeTimerTask(ctx, request) + } + + if _, err := m.Db.DeleteFromHistoryScheduledTasks(ctx, sqlplugin.HistoryScheduledTasksFilter{ + ShardID: request.ShardID, + CategoryID: categoryID, + VisibilityTimestamp: request.TaskKey.FireTime, + TaskID: request.TaskKey.TaskID, + }); err != nil { + return serviceerror.NewUnavailable(fmt.Sprintf("CompleteHistoryTask operation failed. CategoryID: %v. Error: %v", categoryID, err)) + } + return nil +} + +func (m *sqlExecutionStore) rangeCompleteHistoryScheduledTasks( + ctx context.Context, + request *p.RangeCompleteHistoryTasksRequest, +) error { + // This is for backward compatiblity. + // These task categories exist before the general history_scheduled_tasks table is created, + // so they have their own tables. + categoryID := request.TaskCategory.ID() + if categoryID == tasks.CategoryIDTimer { + return m.rangeCompleteTimerTasks(ctx, request) + } + + start := request.InclusiveMinTaskKey.FireTime + end := request.ExclusiveMaxTaskKey.FireTime + if _, err := m.Db.RangeDeleteFromHistoryScheduledTasks(ctx, sqlplugin.HistoryScheduledTasksRangeFilter{ + ShardID: request.ShardID, + CategoryID: categoryID, + InclusiveMinVisibilityTimestamp: start, + ExclusiveMaxVisibilityTimestamp: end, + }); err != nil { + return serviceerror.NewUnavailable(fmt.Sprintf("RangeCompleteHistoryTask operation failed. CategoryID: %v. Error: %v", categoryID, err)) + } + return nil } func (m *sqlExecutionStore) getTransferTask( @@ -254,7 +545,7 @@ func (m *sqlExecutionStore) getTimerTasks( ctx context.Context, request *p.GetHistoryTasksRequest, ) (*p.InternalGetHistoryTasksResponse, error) { - pageToken := &timerTaskPageToken{TaskID: math.MinInt64, Timestamp: request.InclusiveMinTaskKey.FireTime} + pageToken := &scheduledTaskPageToken{TaskID: math.MinInt64, Timestamp: request.InclusiveMinTaskKey.FireTime} if len(request.NextPageToken) > 0 { if err := pageToken.deserialize(request.NextPageToken); err != nil { return nil, serviceerror.NewInternal(fmt.Sprintf("error deserializing timerTaskPageToken: %v", err)) @@ -279,7 +570,7 @@ func (m *sqlExecutionStore) getTimerTasks( } if len(resp.Tasks) == request.BatchSize { - pageToken = &timerTaskPageToken{ + pageToken = &scheduledTaskPageToken{ TaskID: rows[request.BatchSize-1].TaskID + 1, Timestamp: rows[request.BatchSize-1].VisibilityTimestamp, } @@ -318,7 +609,7 @@ func (m *sqlExecutionStore) rangeCompleteTimerTasks( InclusiveMinVisibilityTimestamp: start, ExclusiveMaxVisibilityTimestamp: end, }); err != nil { - return serviceerror.NewUnavailable(fmt.Sprintf("CompleteTimerTask operation failed. Error: %v", err)) + return serviceerror.NewUnavailable(fmt.Sprintf("RangeCompleteTimerTask operation failed. Error: %v", err)) } return nil } @@ -652,15 +943,15 @@ func (m *sqlExecutionStore) rangeCompleteVisibilityTasks( return nil } -type timerTaskPageToken struct { +type scheduledTaskPageToken struct { TaskID int64 Timestamp time.Time } -func (t *timerTaskPageToken) serialize() ([]byte, error) { +func (t *scheduledTaskPageToken) serialize() ([]byte, error) { return json.Marshal(t) } -func (t *timerTaskPageToken) deserialize(payload []byte) error { +func (t *scheduledTaskPageToken) deserialize(payload []byte) error { return json.Unmarshal(payload, t) } diff --git a/common/persistence/sql/execution_util.go b/common/persistence/sql/execution_util.go index 326f00e2e9f..a5f20685c54 100644 --- a/common/persistence/sql/execution_util.go +++ b/common/persistence/sql/execution_util.go @@ -529,17 +529,13 @@ func applyTasks( var err error for category, tasksByCategory := range insertTasks { - switch category.ID() { - case tasks.CategoryIDTransfer: - err = createTransferTasks(ctx, tx, shardID, tasksByCategory) - case tasks.CategoryIDTimer: - err = createTimerTasks(ctx, tx, shardID, tasksByCategory) - case tasks.CategoryIDVisibility: - err = createVisibilityTasks(ctx, tx, shardID, tasksByCategory) - case tasks.CategoryIDReplication: - err = createReplicationTasks(ctx, tx, shardID, tasksByCategory) + switch category.Type() { + case tasks.CategoryTypeImmediate: + err = createImmediateTasks(ctx, tx, shardID, category.ID(), tasksByCategory) + case tasks.CategoryTypeScheduled: + err = createScheduledTasks(ctx, tx, shardID, category.ID(), tasksByCategory) default: - err = serviceerror.NewInternal(fmt.Sprintf("Unknown task category: %v", category)) + err = serviceerror.NewInternal(fmt.Sprintf("Unknown task category type: %v", category)) } if err != nil { @@ -698,6 +694,101 @@ func lockExecution( return dbRecordVersion, nextEventID, nil } +func createImmediateTasks( + ctx context.Context, + tx sqlplugin.Tx, + shardID int32, + categoryID int32, + immedidateTasks []p.InternalHistoryTask, +) error { + // This is for backward compatiblity. + // These task categories exist before the general history_immediate_tasks table is created, + // so they have their own tables. + switch categoryID { + case tasks.CategoryIDTransfer: + return createTransferTasks(ctx, tx, shardID, immedidateTasks) + case tasks.CategoryIDVisibility: + return createVisibilityTasks(ctx, tx, shardID, immedidateTasks) + case tasks.CategoryIDReplication: + return createReplicationTasks(ctx, tx, shardID, immedidateTasks) + } + + if len(immedidateTasks) == 0 { + return nil + } + + immediateTasksRows := make([]sqlplugin.HistoryImmediateTasksRow, 0, len(immedidateTasks)) + for _, task := range immedidateTasks { + immediateTasksRows = append(immediateTasksRows, sqlplugin.HistoryImmediateTasksRow{ + ShardID: shardID, + CategoryID: categoryID, + TaskID: task.Key.TaskID, + Data: task.Blob.Data, + DataEncoding: task.Blob.EncodingType.String(), + }) + } + + result, err := tx.InsertIntoHistoryImmediateTasks(ctx, immediateTasksRows) + if err != nil { + return serviceerror.NewUnavailable(fmt.Sprintf("createImmediateTasks failed. Error: %v", err)) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return serviceerror.NewUnavailable(fmt.Sprintf("createImmediateTasks failed. Could not verify number of rows inserted. Error: %v", err)) + } + + if int(rowsAffected) != len(immediateTasksRows) { + return serviceerror.NewUnavailable(fmt.Sprintf("createImmediateTasks failed. Inserted %v instead of %v rows into history_immediate_tasks. Error: %v", rowsAffected, len(immediateTasksRows), err)) + } + return nil +} + +func createScheduledTasks( + ctx context.Context, + tx sqlplugin.Tx, + shardID int32, + categoryID int32, + scheduledTasks []p.InternalHistoryTask, +) error { + // This is for backward compatiblity. + // These task categories exists before the general history_scheduled_tasks table is created, + // so they have their own tables. + if categoryID == tasks.CategoryIDTimer { + return createTimerTasks(ctx, tx, shardID, scheduledTasks) + } + + if len(scheduledTasks) == 0 { + return nil + } + + scheduledTasksRows := make([]sqlplugin.HistoryScheduledTasksRow, 0, len(scheduledTasks)) + for _, task := range scheduledTasks { + scheduledTasksRows = append(scheduledTasksRows, sqlplugin.HistoryScheduledTasksRow{ + ShardID: shardID, + CategoryID: categoryID, + VisibilityTimestamp: task.Key.FireTime, + TaskID: task.Key.TaskID, + Data: task.Blob.Data, + DataEncoding: task.Blob.EncodingType.String(), + }) + } + + result, err := tx.InsertIntoHistoryScheduledTasks(ctx, scheduledTasksRows) + if err != nil { + return serviceerror.NewUnavailable(fmt.Sprintf("createScheduledTasks failed. Error: %v", err)) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return serviceerror.NewUnavailable(fmt.Sprintf("createScheduledTasks failed. Could not verify number of rows inserted. Error: %v", err)) + } + + if int(rowsAffected) != len(scheduledTasks) { + return serviceerror.NewUnavailable(fmt.Sprintf("createScheduledTasks failed. Inserted %v instead of %v rows into history_scheduled_tasks. Error: %v", rowsAffected, len(scheduledTasks), err)) + } + return nil +} + func createTransferTasks( ctx context.Context, tx sqlplugin.Tx, diff --git a/common/persistence/sql/sqlplugin/history_immediate_tasks.go b/common/persistence/sql/sqlplugin/history_immediate_tasks.go new file mode 100644 index 00000000000..6d6c938eeda --- /dev/null +++ b/common/persistence/sql/sqlplugin/history_immediate_tasks.go @@ -0,0 +1,74 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package sqlplugin + +import ( + "context" + "database/sql" +) + +type ( + // HistoryImmediateTasksRow represents a row in history_immediate_tasks table + HistoryImmediateTasksRow struct { + ShardID int32 + CategoryID int32 + TaskID int64 + Data []byte + DataEncoding string + } + + // HistoryImmediateTasksFilter contains the column names within history_immediate_tasks table that + // can be used to filter results through a WHERE clause + HistoryImmediateTasksFilter struct { + ShardID int32 + CategoryID int32 + TaskID int64 + } + + // HistoryImmediateTasksRangeFilter contains the column names within history_immediate_tasks table that + // can be used to filter results through a WHERE clause + HistoryImmediateTasksRangeFilter struct { + ShardID int32 + CategoryID int32 + InclusiveMinTaskID int64 + ExclusiveMaxTaskID int64 + PageSize int + } + + // HistoryImmediateTask is the SQL persistence interface for history immediate tasks + HistoryImmediateTask interface { + // InsertIntoHistoryImmediateTasks inserts rows that into history_immediate_tasks table. + InsertIntoHistoryImmediateTasks(ctx context.Context, rows []HistoryImmediateTasksRow) (sql.Result, error) + // SelectFromHistoryImmediateTasks returns rows that match filter criteria from history_immediate_tasks table. + SelectFromHistoryImmediateTasks(ctx context.Context, filter HistoryImmediateTasksFilter) ([]HistoryImmediateTasksRow, error) + // RangeSelectFromHistoryImmediateTasks returns rows that match filter criteria from history_immediate_tasks table. + RangeSelectFromHistoryImmediateTasks(ctx context.Context, filter HistoryImmediateTasksRangeFilter) ([]HistoryImmediateTasksRow, error) + // DeleteFromHistoryImmediateTasks deletes one rows from history_immediate_tasks table. + DeleteFromHistoryImmediateTasks(ctx context.Context, filter HistoryImmediateTasksFilter) (sql.Result, error) + // RangeDeleteFromHistoryImmediateTasks deletes one or more rows from history_immediate_tasks table. + // HistoryImmediateTasksRangeFilter - {PageSize} will be ignored + RangeDeleteFromHistoryImmediateTasks(ctx context.Context, filter HistoryImmediateTasksRangeFilter) (sql.Result, error) + } +) diff --git a/common/persistence/sql/sqlplugin/history_replication_tasks.go b/common/persistence/sql/sqlplugin/history_replication_tasks.go index d787bc1de31..06616b2afdd 100644 --- a/common/persistence/sql/sqlplugin/history_replication_tasks.go +++ b/common/persistence/sql/sqlplugin/history_replication_tasks.go @@ -56,6 +56,7 @@ type ( // HistoryReplicationTask is the SQL persistence interface for history replication tasks HistoryReplicationTask interface { + // InsertIntoReplicationTasks inserts rows that into replication_tasks table. InsertIntoReplicationTasks(ctx context.Context, rows []ReplicationTasksRow) (sql.Result, error) // SelectFromReplicationTasks returns one or more rows from replication_tasks table SelectFromReplicationTasks(ctx context.Context, filter ReplicationTasksFilter) ([]ReplicationTasksRow, error) diff --git a/common/persistence/sql/sqlplugin/history_scheduled_tasks.go b/common/persistence/sql/sqlplugin/history_scheduled_tasks.go new file mode 100644 index 00000000000..519d53f047b --- /dev/null +++ b/common/persistence/sql/sqlplugin/history_scheduled_tasks.go @@ -0,0 +1,78 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package sqlplugin + +import ( + "context" + "database/sql" + "time" +) + +type ( + // HistoryScheduledTasksRow represents a row in history_scheduled_tasks table + HistoryScheduledTasksRow struct { + ShardID int32 + CategoryID int32 + VisibilityTimestamp time.Time + TaskID int64 + Data []byte + DataEncoding string + } + + // HistoryScheduledTasksFilter contains the column names within history_scheduled_tasks table that + // can be used to filter results through a WHERE clause + HistoryScheduledTasksFilter struct { + ShardID int32 + CategoryID int32 + TaskID int64 + VisibilityTimestamp time.Time + } + + // HistoryScheduledTasksFilter contains the column names within history_scheduled_tasks table that + // can be used to filter results through a WHERE clause + HistoryScheduledTasksRangeFilter struct { + ShardID int32 + CategoryID int32 + InclusiveMinTaskID int64 + InclusiveMinVisibilityTimestamp time.Time + ExclusiveMaxVisibilityTimestamp time.Time + PageSize int + } + + // HistoryScheduledTask is the SQL persistence interface for history scheduled tasks + HistoryScheduledTask interface { + // InsertIntoHistoryScheduledTasks inserts rows that into history_scheduled_tasks table. + InsertIntoHistoryScheduledTasks(ctx context.Context, rows []HistoryScheduledTasksRow) (sql.Result, error) + // SelectFromScheduledTasks returns one or more rows from history_scheduled_tasks table + SelectFromHistoryScheduledTasks(ctx context.Context, filter HistoryScheduledTasksFilter) ([]HistoryScheduledTasksRow, error) + // RangeSelectFromScheduledTasks returns one or more rows from history_scheduled_tasks table + RangeSelectFromHistoryScheduledTasks(ctx context.Context, filter HistoryScheduledTasksRangeFilter) ([]HistoryScheduledTasksRow, error) + // DeleteFromScheduledTasks deletes one or more rows from history_scheduled_tasks table + DeleteFromHistoryScheduledTasks(ctx context.Context, filter HistoryScheduledTasksFilter) (sql.Result, error) + // RangeDeleteFromScheduledTasks deletes one or more rows from history_scheduled_tasks table + // ScheduledTasksRangeFilter - {TaskID, PageSize} will be ignored + RangeDeleteFromHistoryScheduledTasks(ctx context.Context, filter HistoryScheduledTasksRangeFilter) (sql.Result, error) + } +) diff --git a/common/persistence/sql/sqlplugin/history_timer_tasks.go b/common/persistence/sql/sqlplugin/history_timer_tasks.go index 0121a2b1dbb..42bd4c3c105 100644 --- a/common/persistence/sql/sqlplugin/history_timer_tasks.go +++ b/common/persistence/sql/sqlplugin/history_timer_tasks.go @@ -60,6 +60,7 @@ type ( // HistoryTimerTask is the SQL persistence interface for history timer tasks HistoryTimerTask interface { + // InsertIntoTimerTasks inserts rows that into timer_tasks table. InsertIntoTimerTasks(ctx context.Context, rows []TimerTasksRow) (sql.Result, error) // SelectFromTimerTasks returns one or more rows from timer_tasks table SelectFromTimerTasks(ctx context.Context, filter TimerTasksFilter) ([]TimerTasksRow, error) diff --git a/common/persistence/sql/sqlplugin/history_transfer_tasks.go b/common/persistence/sql/sqlplugin/history_transfer_tasks.go index 70100769f72..55aafb69734 100644 --- a/common/persistence/sql/sqlplugin/history_transfer_tasks.go +++ b/common/persistence/sql/sqlplugin/history_transfer_tasks.go @@ -56,6 +56,7 @@ type ( // HistoryTransferTask is the SQL persistence interface for history transfer tasks HistoryTransferTask interface { + // InsertIntoTransferTasks inserts rows that into transfer_tasks table. InsertIntoTransferTasks(ctx context.Context, rows []TransferTasksRow) (sql.Result, error) // SelectFromTransferTasks returns rows that match filter criteria from transfer_tasks table. SelectFromTransferTasks(ctx context.Context, filter TransferTasksFilter) ([]TransferTasksRow, error) diff --git a/common/persistence/sql/sqlplugin/history_visibility_tasks.go b/common/persistence/sql/sqlplugin/history_visibility_tasks.go index be2e86974a6..211bd15a1b8 100644 --- a/common/persistence/sql/sqlplugin/history_visibility_tasks.go +++ b/common/persistence/sql/sqlplugin/history_visibility_tasks.go @@ -56,6 +56,7 @@ type ( // HistoryVisibilityTask is the SQL persistence interface for history visibility tasks HistoryVisibilityTask interface { + // InsertIntoVisibilityTasks inserts rows that into visibility_tasks table. InsertIntoVisibilityTasks(ctx context.Context, rows []VisibilityTasksRow) (sql.Result, error) // SelectFromVisibilityTasks returns rows that match filter criteria from visibility_tasks table. SelectFromVisibilityTasks(ctx context.Context, filter VisibilityTasksFilter) ([]VisibilityTasksRow, error) diff --git a/common/persistence/sql/sqlplugin/interfaces.go b/common/persistence/sql/sqlplugin/interfaces.go index 540a1abaa4f..14f202ca1a6 100644 --- a/common/persistence/sql/sqlplugin/interfaces.go +++ b/common/persistence/sql/sqlplugin/interfaces.go @@ -74,6 +74,8 @@ type ( HistoryExecutionSignal HistoryExecutionSignalRequest + HistoryImmediateTask + HistoryScheduledTask HistoryTransferTask HistoryTimerTask HistoryReplicationTask diff --git a/common/persistence/sql/sqlplugin/mysql/execution.go b/common/persistence/sql/sqlplugin/mysql/execution.go index d1eed5217c8..411f5e9dc15 100644 --- a/common/persistence/sql/sqlplugin/mysql/execution.go +++ b/common/persistence/sql/sqlplugin/mysql/execution.go @@ -83,6 +83,32 @@ namespace_id = :namespace_id AND workflow_id = :workflow_id ` + createHistoryImmediateTasksQuery = `INSERT INTO history_immediate_tasks(shard_id, category_id, task_id, data, data_encoding) + VALUES(:shard_id, :category_id, :task_id, :data, :data_encoding)` + + getHistoryImmediateTaskQuery = `SELECT task_id, data, data_encoding + FROM history_immediate_tasks WHERE shard_id = ? AND category_id = ? AND task_id = ?` + getHistoryImmediateTasksQuery = `SELECT task_id, data, data_encoding + FROM history_immediate_tasks WHERE shard_id = ? AND category_id = ? AND task_id >= ? AND task_id < ? ORDER BY task_id LIMIT ?` + + deleteHistoryImmediateTaskQuery = `DELETE FROM history_immediate_tasks WHERE shard_id = ? AND category_id = ? AND task_id = ?` + rangeDeleteHistoryImmediateTasksQuery = `DELETE FROM history_immediate_tasks WHERE shard_id = ? AND category_id = ? AND task_id >= ? AND task_id < ?` + + createHistoryScheduledTasksQuery = `INSERT INTO history_scheduled_tasks (shard_id, category_id, visibility_timestamp, task_id, data, data_encoding) + VALUES (:shard_id, :category_id, :visibility_timestamp, :task_id, :data, :data_encoding)` + + getHistoryScheduledTaskQuery = `SELECT visibility_timestamp, task_id, data, data_encoding FROM history_scheduled_tasks + WHERE shard_id = ? AND category_id = ? AND visibility_timestamp = ? AND task_id = ?` + getHistoryScheduledTasksQuery = `SELECT visibility_timestamp, task_id, data, data_encoding FROM history_scheduled_tasks + WHERE shard_id = ? + AND category_id = ? + AND ((visibility_timestamp >= ? AND task_id >= ?) OR visibility_timestamp > ?) + AND visibility_timestamp < ? + ORDER BY visibility_timestamp,task_id LIMIT ?` + + deleteHistoryScheduledTaskQuery = `DELETE FROM history_scheduled_tasks WHERE shard_id = ? AND category_id = ? AND visibility_timestamp = ? AND task_id = ?` + rangeDeleteHistoryScheduledTasksQuery = `DELETE FROM history_scheduled_tasks WHERE shard_id = ? AND category_id = ? AND visibility_timestamp >= ? AND visibility_timestamp < ?` + createTransferTasksQuery = `INSERT INTO transfer_tasks(shard_id, task_id, data, data_encoding) VALUES(:shard_id, :task_id, :data, :data_encoding)` @@ -350,6 +376,179 @@ func (mdb *db) LockCurrentExecutionsJoinExecutions( return rows, err } +// InsertIntoHistoryImmediateTasks inserts one or more rows into history_immediate_tasks table +func (mdb *db) InsertIntoHistoryImmediateTasks( + ctx context.Context, + rows []sqlplugin.HistoryImmediateTasksRow, +) (sql.Result, error) { + return mdb.conn.NamedExecContext(ctx, + createHistoryImmediateTasksQuery, + rows, + ) +} + +// SelectFromHistoryImmediateTasks reads one or more rows from transfer_tasks table +func (mdb *db) SelectFromHistoryImmediateTasks( + ctx context.Context, + filter sqlplugin.HistoryImmediateTasksFilter, +) ([]sqlplugin.HistoryImmediateTasksRow, error) { + var rows []sqlplugin.HistoryImmediateTasksRow + if err := mdb.conn.SelectContext(ctx, + &rows, + getHistoryImmediateTaskQuery, + filter.ShardID, + filter.CategoryID, + filter.TaskID, + ); err != nil { + return nil, err + } + return rows, nil +} + +// RangeSelectFromHistoryImmediateTasks reads one or more rows from transfer_tasks table +func (mdb *db) RangeSelectFromHistoryImmediateTasks( + ctx context.Context, + filter sqlplugin.HistoryImmediateTasksRangeFilter, +) ([]sqlplugin.HistoryImmediateTasksRow, error) { + var rows []sqlplugin.HistoryImmediateTasksRow + if err := mdb.conn.SelectContext(ctx, + &rows, + getHistoryImmediateTasksQuery, + filter.ShardID, + filter.CategoryID, + filter.InclusiveMinTaskID, + filter.ExclusiveMaxTaskID, + filter.PageSize, + ); err != nil { + return nil, err + } + return rows, nil +} + +// DeleteFromHistoryImmediateTasks deletes one or more rows from transfer_tasks table +func (mdb *db) DeleteFromHistoryImmediateTasks( + ctx context.Context, + filter sqlplugin.HistoryImmediateTasksFilter, +) (sql.Result, error) { + return mdb.conn.ExecContext(ctx, + deleteHistoryImmediateTaskQuery, + filter.ShardID, + filter.CategoryID, + filter.TaskID, + ) +} + +// RangeDeleteFromHistoryImmediateTasks deletes one or more rows from transfer_tasks table +func (mdb *db) RangeDeleteFromHistoryImmediateTasks( + ctx context.Context, + filter sqlplugin.HistoryImmediateTasksRangeFilter, +) (sql.Result, error) { + return mdb.conn.ExecContext(ctx, + rangeDeleteHistoryImmediateTasksQuery, + filter.ShardID, + filter.CategoryID, + filter.InclusiveMinTaskID, + filter.ExclusiveMaxTaskID, + ) +} + +// InsertIntoHistoryScheduledTasks inserts one or more rows into timer_tasks table +func (mdb *db) InsertIntoHistoryScheduledTasks( + ctx context.Context, + rows []sqlplugin.HistoryScheduledTasksRow, +) (sql.Result, error) { + for i := range rows { + rows[i].VisibilityTimestamp = mdb.converter.ToMySQLDateTime(rows[i].VisibilityTimestamp) + } + return mdb.conn.NamedExecContext( + ctx, + createHistoryScheduledTasksQuery, + rows, + ) +} + +// SelectFromHistoryScheduledTasks reads one or more rows from timer_tasks table +func (mdb *db) SelectFromHistoryScheduledTasks( + ctx context.Context, + filter sqlplugin.HistoryScheduledTasksFilter, +) ([]sqlplugin.HistoryScheduledTasksRow, error) { + var rows []sqlplugin.HistoryScheduledTasksRow + filter.VisibilityTimestamp = mdb.converter.ToMySQLDateTime(filter.VisibilityTimestamp) + err := mdb.conn.SelectContext(ctx, + &rows, + getHistoryScheduledTaskQuery, + filter.ShardID, + filter.CategoryID, + filter.VisibilityTimestamp, + filter.TaskID, + ) + if err != nil { + return nil, err + } + for i := range rows { + rows[i].VisibilityTimestamp = mdb.converter.FromMySQLDateTime(rows[i].VisibilityTimestamp) + } + return rows, nil +} + +// RangeSelectFromHistoryScheduledTasks reads one or more rows from timer_tasks table +func (mdb *db) RangeSelectFromHistoryScheduledTasks( + ctx context.Context, + filter sqlplugin.HistoryScheduledTasksRangeFilter, +) ([]sqlplugin.HistoryScheduledTasksRow, error) { + var rows []sqlplugin.HistoryScheduledTasksRow + filter.InclusiveMinVisibilityTimestamp = mdb.converter.ToMySQLDateTime(filter.InclusiveMinVisibilityTimestamp) + filter.ExclusiveMaxVisibilityTimestamp = mdb.converter.ToMySQLDateTime(filter.ExclusiveMaxVisibilityTimestamp) + if err := mdb.conn.SelectContext(ctx, + &rows, + getHistoryScheduledTasksQuery, + filter.ShardID, + filter.CategoryID, + filter.InclusiveMinVisibilityTimestamp, + filter.InclusiveMinTaskID, + filter.InclusiveMinVisibilityTimestamp, + filter.ExclusiveMaxVisibilityTimestamp, + filter.PageSize, + ); err != nil { + return nil, err + } + for i := range rows { + rows[i].VisibilityTimestamp = mdb.converter.FromMySQLDateTime(rows[i].VisibilityTimestamp) + } + return rows, nil +} + +// DeleteFromHistoryScheduledTasks deletes one or more rows from timer_tasks table +func (mdb *db) DeleteFromHistoryScheduledTasks( + ctx context.Context, + filter sqlplugin.HistoryScheduledTasksFilter, +) (sql.Result, error) { + filter.VisibilityTimestamp = mdb.converter.ToMySQLDateTime(filter.VisibilityTimestamp) + return mdb.conn.ExecContext(ctx, + deleteHistoryScheduledTaskQuery, + filter.ShardID, + filter.CategoryID, + filter.VisibilityTimestamp, + filter.TaskID, + ) +} + +// RangeDeleteFromHistoryScheduledTasks deletes one or more rows from timer_tasks table +func (mdb *db) RangeDeleteFromHistoryScheduledTasks( + ctx context.Context, + filter sqlplugin.HistoryScheduledTasksRangeFilter, +) (sql.Result, error) { + filter.InclusiveMinVisibilityTimestamp = mdb.converter.ToMySQLDateTime(filter.InclusiveMinVisibilityTimestamp) + filter.ExclusiveMaxVisibilityTimestamp = mdb.converter.ToMySQLDateTime(filter.ExclusiveMaxVisibilityTimestamp) + return mdb.conn.ExecContext(ctx, + rangeDeleteHistoryScheduledTasksQuery, + filter.ShardID, + filter.CategoryID, + filter.InclusiveMinVisibilityTimestamp, + filter.ExclusiveMaxVisibilityTimestamp, + ) +} + // InsertIntoTransferTasks inserts one or more rows into transfer_tasks table func (mdb *db) InsertIntoTransferTasks( ctx context.Context, diff --git a/common/persistence/sql/sqlplugin/postgresql/execution.go b/common/persistence/sql/sqlplugin/postgresql/execution.go index b6ca3924b0a..707dc143b28 100644 --- a/common/persistence/sql/sqlplugin/postgresql/execution.go +++ b/common/persistence/sql/sqlplugin/postgresql/execution.go @@ -83,6 +83,32 @@ namespace_id = :namespace_id AND workflow_id = :workflow_id ` + createHistoryImmediateTasksQuery = `INSERT INTO history_immediate_tasks(shard_id, category_id, task_id, data, data_encoding) + VALUES(:shard_id, :category_id, :task_id, :data, :data_encoding)` + + getHistoryImmediateTaskQuery = `SELECT task_id, data, data_encoding + FROM history_immediate_tasks WHERE shard_id = $1 AND category_id = $2 AND task_id = $3` + getHistoryImmediateTasksQuery = `SELECT task_id, data, data_encoding + FROM history_immediate_tasks WHERE shard_id = $1 AND category_id = $2 AND task_id >= $3 AND task_id < $4 ORDER BY task_id LIMIT $5` + + deleteHistoryImmediateTaskQuery = `DELETE FROM history_immediate_tasks WHERE shard_id = $1 AND category_id = $2 AND task_id = $3` + rangeDeleteHistoryImmediateTasksQuery = `DELETE FROM history_immediate_tasks WHERE shard_id = $1 AND category_id = $2 AND task_id >= $3 AND task_id < $4` + + createHistoryScheduledTasksQuery = `INSERT INTO history_scheduled_tasks (shard_id, category_id, visibility_timestamp, task_id, data, data_encoding) + VALUES (:shard_id, :category_id, :visibility_timestamp, :task_id, :data, :data_encoding)` + + getHistoryScheduledTaskQuery = `SELECT visibility_timestamp, task_id, data, data_encoding FROM history_scheduled_tasks + WHERE shard_id = $1 AND category_id = $2 AND visibility_timestamp = $3 AND task_id = $4` + getHistoryScheduledTasksQuery = `SELECT visibility_timestamp, task_id, data, data_encoding FROM history_scheduled_tasks + WHERE shard_id = $1 + AND category_id = $2 + AND ((visibility_timestamp >= $3 AND task_id >= $4) OR visibility_timestamp > $5) + AND visibility_timestamp < $6 + ORDER BY visibility_timestamp,task_id LIMIT $7` + + deleteHistoryScheduledTaskQuery = `DELETE FROM history_scheduled_tasks WHERE shard_id = $1 AND category_id = $2 AND visibility_timestamp = $3 AND task_id = $4` + rangeDeleteHistoryScheduledTasksQuery = `DELETE FROM history_scheduled_tasks WHERE shard_id = $1 AND category_id = $2 AND visibility_timestamp >= $3 AND visibility_timestamp < $4` + createTransferTasksQuery = `INSERT INTO transfer_tasks(shard_id, task_id, data, data_encoding) VALUES(:shard_id, :task_id, :data, :data_encoding)` @@ -350,6 +376,179 @@ func (pdb *db) LockCurrentExecutionsJoinExecutions( return rows, err } +// InsertIntoHistoryImmediateTasks inserts one or more rows into history_immediate_tasks table +func (pdb *db) InsertIntoHistoryImmediateTasks( + ctx context.Context, + rows []sqlplugin.HistoryImmediateTasksRow, +) (sql.Result, error) { + return pdb.conn.NamedExecContext(ctx, + createHistoryImmediateTasksQuery, + rows, + ) +} + +// SelectFromHistoryImmediateTasks reads one or more rows from transfer_tasks table +func (pdb *db) SelectFromHistoryImmediateTasks( + ctx context.Context, + filter sqlplugin.HistoryImmediateTasksFilter, +) ([]sqlplugin.HistoryImmediateTasksRow, error) { + var rows []sqlplugin.HistoryImmediateTasksRow + if err := pdb.conn.SelectContext(ctx, + &rows, + getHistoryImmediateTaskQuery, + filter.ShardID, + filter.CategoryID, + filter.TaskID, + ); err != nil { + return nil, err + } + return rows, nil +} + +// RangeSelectFromHistoryImmediateTasks reads one or more rows from transfer_tasks table +func (pdb *db) RangeSelectFromHistoryImmediateTasks( + ctx context.Context, + filter sqlplugin.HistoryImmediateTasksRangeFilter, +) ([]sqlplugin.HistoryImmediateTasksRow, error) { + var rows []sqlplugin.HistoryImmediateTasksRow + if err := pdb.conn.SelectContext(ctx, + &rows, + getHistoryImmediateTasksQuery, + filter.ShardID, + filter.CategoryID, + filter.InclusiveMinTaskID, + filter.ExclusiveMaxTaskID, + filter.PageSize, + ); err != nil { + return nil, err + } + return rows, nil +} + +// DeleteFromHistoryImmediateTasks deletes one or more rows from transfer_tasks table +func (pdb *db) DeleteFromHistoryImmediateTasks( + ctx context.Context, + filter sqlplugin.HistoryImmediateTasksFilter, +) (sql.Result, error) { + return pdb.conn.ExecContext(ctx, + deleteHistoryImmediateTaskQuery, + filter.ShardID, + filter.CategoryID, + filter.TaskID, + ) +} + +// RangeDeleteFromHistoryImmediateTasks deletes one or more rows from transfer_tasks table +func (pdb *db) RangeDeleteFromHistoryImmediateTasks( + ctx context.Context, + filter sqlplugin.HistoryImmediateTasksRangeFilter, +) (sql.Result, error) { + return pdb.conn.ExecContext(ctx, + rangeDeleteHistoryImmediateTasksQuery, + filter.ShardID, + filter.CategoryID, + filter.InclusiveMinTaskID, + filter.ExclusiveMaxTaskID, + ) +} + +// InsertIntoHistoryScheduledTasks inserts one or more rows into timer_tasks table +func (pdb *db) InsertIntoHistoryScheduledTasks( + ctx context.Context, + rows []sqlplugin.HistoryScheduledTasksRow, +) (sql.Result, error) { + for i := range rows { + rows[i].VisibilityTimestamp = pdb.converter.ToPostgreSQLDateTime(rows[i].VisibilityTimestamp) + } + return pdb.conn.NamedExecContext( + ctx, + createHistoryScheduledTasksQuery, + rows, + ) +} + +// SelectFromHistoryScheduledTasks reads one or more rows from timer_tasks table +func (pdb *db) SelectFromHistoryScheduledTasks( + ctx context.Context, + filter sqlplugin.HistoryScheduledTasksFilter, +) ([]sqlplugin.HistoryScheduledTasksRow, error) { + var rows []sqlplugin.HistoryScheduledTasksRow + filter.VisibilityTimestamp = pdb.converter.ToPostgreSQLDateTime(filter.VisibilityTimestamp) + err := pdb.conn.SelectContext(ctx, + &rows, + getHistoryScheduledTaskQuery, + filter.ShardID, + filter.CategoryID, + filter.VisibilityTimestamp, + filter.TaskID, + ) + if err != nil { + return nil, err + } + for i := range rows { + rows[i].VisibilityTimestamp = pdb.converter.ToPostgreSQLDateTime(rows[i].VisibilityTimestamp) + } + return rows, nil +} + +// RangeSelectFromHistoryScheduledTasks reads one or more rows from timer_tasks table +func (pdb *db) RangeSelectFromHistoryScheduledTasks( + ctx context.Context, + filter sqlplugin.HistoryScheduledTasksRangeFilter, +) ([]sqlplugin.HistoryScheduledTasksRow, error) { + var rows []sqlplugin.HistoryScheduledTasksRow + filter.InclusiveMinVisibilityTimestamp = pdb.converter.ToPostgreSQLDateTime(filter.InclusiveMinVisibilityTimestamp) + filter.ExclusiveMaxVisibilityTimestamp = pdb.converter.ToPostgreSQLDateTime(filter.ExclusiveMaxVisibilityTimestamp) + if err := pdb.conn.SelectContext(ctx, + &rows, + getHistoryScheduledTasksQuery, + filter.ShardID, + filter.CategoryID, + filter.InclusiveMinVisibilityTimestamp, + filter.InclusiveMinTaskID, + filter.InclusiveMinVisibilityTimestamp, + filter.ExclusiveMaxVisibilityTimestamp, + filter.PageSize, + ); err != nil { + return nil, err + } + for i := range rows { + rows[i].VisibilityTimestamp = pdb.converter.ToPostgreSQLDateTime(rows[i].VisibilityTimestamp) + } + return rows, nil +} + +// DeleteFromHistoryScheduledTasks deletes one or more rows from timer_tasks table +func (pdb *db) DeleteFromHistoryScheduledTasks( + ctx context.Context, + filter sqlplugin.HistoryScheduledTasksFilter, +) (sql.Result, error) { + filter.VisibilityTimestamp = pdb.converter.ToPostgreSQLDateTime(filter.VisibilityTimestamp) + return pdb.conn.ExecContext(ctx, + deleteHistoryScheduledTaskQuery, + filter.ShardID, + filter.CategoryID, + filter.VisibilityTimestamp, + filter.TaskID, + ) +} + +// RangeDeleteFromHistoryScheduledTasks deletes one or more rows from timer_tasks table +func (pdb *db) RangeDeleteFromHistoryScheduledTasks( + ctx context.Context, + filter sqlplugin.HistoryScheduledTasksRangeFilter, +) (sql.Result, error) { + filter.InclusiveMinVisibilityTimestamp = pdb.converter.ToPostgreSQLDateTime(filter.InclusiveMinVisibilityTimestamp) + filter.ExclusiveMaxVisibilityTimestamp = pdb.converter.ToPostgreSQLDateTime(filter.ExclusiveMaxVisibilityTimestamp) + return pdb.conn.ExecContext(ctx, + rangeDeleteHistoryScheduledTasksQuery, + filter.ShardID, + filter.CategoryID, + filter.InclusiveMinVisibilityTimestamp, + filter.ExclusiveMaxVisibilityTimestamp, + ) +} + // InsertIntoTransferTasks inserts one or more rows into transfer_tasks table func (pdb *db) InsertIntoTransferTasks( ctx context.Context, diff --git a/common/persistence/sql/sqlplugin/sqlite/execution.go b/common/persistence/sql/sqlplugin/sqlite/execution.go index a10beecc98e..6c1af9cfc16 100644 --- a/common/persistence/sql/sqlplugin/sqlite/execution.go +++ b/common/persistence/sql/sqlplugin/sqlite/execution.go @@ -85,6 +85,32 @@ namespace_id = :namespace_id AND workflow_id = :workflow_id ` + createHistoryImmediateTasksQuery = `INSERT INTO history_immediate_tasks(shard_id, category_id, task_id, data, data_encoding) + VALUES(:shard_id, :category_id, :task_id, :data, :data_encoding)` + + getHistoryImmediateTaskQuery = `SELECT task_id, data, data_encoding + FROM history_immediate_tasks WHERE shard_id = ? AND category_id = ? AND task_id = ?` + getHistoryImmediateTasksQuery = `SELECT task_id, data, data_encoding + FROM history_immediate_tasks WHERE shard_id = ? AND category_id = ? AND task_id >= ? AND task_id < ? ORDER BY task_id LIMIT ?` + + deleteHistoryImmediateTaskQuery = `DELETE FROM history_immediate_tasks WHERE shard_id = ? AND category_id = ? AND task_id = ?` + rangeDeleteHistoryImmediateTasksQuery = `DELETE FROM history_immediate_tasks WHERE shard_id = ? AND category_id = ? AND task_id >= ? AND task_id < ?` + + createHistoryScheduledTasksQuery = `INSERT INTO history_scheduled_tasks (shard_id, category_id, visibility_timestamp, task_id, data, data_encoding) + VALUES (:shard_id, :category_id, :visibility_timestamp, :task_id, :data, :data_encoding)` + + getHistoryScheduledTaskQuery = `SELECT visibility_timestamp, task_id, data, data_encoding FROM history_scheduled_tasks + WHERE shard_id = ? AND category_id = ? AND visibility_timestamp = ? AND task_id = ?` + getHistoryScheduledTasksQuery = `SELECT visibility_timestamp, task_id, data, data_encoding FROM history_scheduled_tasks + WHERE shard_id = ? + AND category_id = ? + AND ((visibility_timestamp >= ? AND task_id >= ?) OR visibility_timestamp > ?) + AND visibility_timestamp < ? + ORDER BY visibility_timestamp,task_id LIMIT ?` + + deleteHistoryScheduledTaskQuery = `DELETE FROM history_scheduled_tasks WHERE shard_id = ? AND category_id = ? AND visibility_timestamp = ? AND task_id = ?` + rangeDeleteHistoryScheduledTasksQuery = `DELETE FROM history_scheduled_tasks WHERE shard_id = ? AND category_id = ? AND visibility_timestamp >= ? AND visibility_timestamp < ?` + createTransferTasksQuery = `INSERT INTO transfer_tasks(shard_id, task_id, data, data_encoding) VALUES(:shard_id, :task_id, :data, :data_encoding)` @@ -352,6 +378,179 @@ func (mdb *db) LockCurrentExecutionsJoinExecutions( return rows, err } +// InsertIntoHistoryImmediateTasks inserts one or more rows into history_immediate_tasks table +func (mdb *db) InsertIntoHistoryImmediateTasks( + ctx context.Context, + rows []sqlplugin.HistoryImmediateTasksRow, +) (sql.Result, error) { + return mdb.conn.NamedExecContext(ctx, + createHistoryImmediateTasksQuery, + rows, + ) +} + +// SelectFromHistoryImmediateTasks reads one or more rows from transfer_tasks table +func (mdb *db) SelectFromHistoryImmediateTasks( + ctx context.Context, + filter sqlplugin.HistoryImmediateTasksFilter, +) ([]sqlplugin.HistoryImmediateTasksRow, error) { + var rows []sqlplugin.HistoryImmediateTasksRow + if err := mdb.conn.SelectContext(ctx, + &rows, + getHistoryImmediateTaskQuery, + filter.ShardID, + filter.CategoryID, + filter.TaskID, + ); err != nil { + return nil, err + } + return rows, nil +} + +// RangeSelectFromHistoryImmediateTasks reads one or more rows from transfer_tasks table +func (mdb *db) RangeSelectFromHistoryImmediateTasks( + ctx context.Context, + filter sqlplugin.HistoryImmediateTasksRangeFilter, +) ([]sqlplugin.HistoryImmediateTasksRow, error) { + var rows []sqlplugin.HistoryImmediateTasksRow + if err := mdb.conn.SelectContext(ctx, + &rows, + getHistoryImmediateTasksQuery, + filter.ShardID, + filter.CategoryID, + filter.InclusiveMinTaskID, + filter.ExclusiveMaxTaskID, + filter.PageSize, + ); err != nil { + return nil, err + } + return rows, nil +} + +// DeleteFromHistoryImmediateTasks deletes one or more rows from transfer_tasks table +func (mdb *db) DeleteFromHistoryImmediateTasks( + ctx context.Context, + filter sqlplugin.HistoryImmediateTasksFilter, +) (sql.Result, error) { + return mdb.conn.ExecContext(ctx, + deleteHistoryImmediateTaskQuery, + filter.ShardID, + filter.CategoryID, + filter.TaskID, + ) +} + +// RangeDeleteFromHistoryImmediateTasks deletes one or more rows from transfer_tasks table +func (mdb *db) RangeDeleteFromHistoryImmediateTasks( + ctx context.Context, + filter sqlplugin.HistoryImmediateTasksRangeFilter, +) (sql.Result, error) { + return mdb.conn.ExecContext(ctx, + rangeDeleteHistoryImmediateTasksQuery, + filter.ShardID, + filter.CategoryID, + filter.InclusiveMinTaskID, + filter.ExclusiveMaxTaskID, + ) +} + +// InsertIntoHistoryScheduledTasks inserts one or more rows into timer_tasks table +func (mdb *db) InsertIntoHistoryScheduledTasks( + ctx context.Context, + rows []sqlplugin.HistoryScheduledTasksRow, +) (sql.Result, error) { + for i := range rows { + rows[i].VisibilityTimestamp = mdb.converter.ToSQLiteDateTime(rows[i].VisibilityTimestamp) + } + return mdb.conn.NamedExecContext( + ctx, + createHistoryScheduledTasksQuery, + rows, + ) +} + +// SelectFromHistoryScheduledTasks reads one or more rows from timer_tasks table +func (mdb *db) SelectFromHistoryScheduledTasks( + ctx context.Context, + filter sqlplugin.HistoryScheduledTasksFilter, +) ([]sqlplugin.HistoryScheduledTasksRow, error) { + var rows []sqlplugin.HistoryScheduledTasksRow + filter.VisibilityTimestamp = mdb.converter.ToSQLiteDateTime(filter.VisibilityTimestamp) + err := mdb.conn.SelectContext(ctx, + &rows, + getHistoryScheduledTaskQuery, + filter.ShardID, + filter.CategoryID, + filter.VisibilityTimestamp, + filter.TaskID, + ) + if err != nil { + return nil, err + } + for i := range rows { + rows[i].VisibilityTimestamp = mdb.converter.ToSQLiteDateTime(rows[i].VisibilityTimestamp) + } + return rows, nil +} + +// RangeSelectFromHistoryScheduledTasks reads one or more rows from timer_tasks table +func (mdb *db) RangeSelectFromHistoryScheduledTasks( + ctx context.Context, + filter sqlplugin.HistoryScheduledTasksRangeFilter, +) ([]sqlplugin.HistoryScheduledTasksRow, error) { + var rows []sqlplugin.HistoryScheduledTasksRow + filter.InclusiveMinVisibilityTimestamp = mdb.converter.ToSQLiteDateTime(filter.InclusiveMinVisibilityTimestamp) + filter.ExclusiveMaxVisibilityTimestamp = mdb.converter.ToSQLiteDateTime(filter.ExclusiveMaxVisibilityTimestamp) + if err := mdb.conn.SelectContext(ctx, + &rows, + getHistoryScheduledTasksQuery, + filter.ShardID, + filter.CategoryID, + filter.InclusiveMinVisibilityTimestamp, + filter.InclusiveMinTaskID, + filter.InclusiveMinVisibilityTimestamp, + filter.ExclusiveMaxVisibilityTimestamp, + filter.PageSize, + ); err != nil { + return nil, err + } + for i := range rows { + rows[i].VisibilityTimestamp = mdb.converter.ToSQLiteDateTime(rows[i].VisibilityTimestamp) + } + return rows, nil +} + +// DeleteFromHistoryScheduledTasks deletes one or more rows from timer_tasks table +func (mdb *db) DeleteFromHistoryScheduledTasks( + ctx context.Context, + filter sqlplugin.HistoryScheduledTasksFilter, +) (sql.Result, error) { + filter.VisibilityTimestamp = mdb.converter.ToSQLiteDateTime(filter.VisibilityTimestamp) + return mdb.conn.ExecContext(ctx, + deleteHistoryScheduledTaskQuery, + filter.ShardID, + filter.CategoryID, + filter.VisibilityTimestamp, + filter.TaskID, + ) +} + +// RangeDeleteFromHistoryScheduledTasks deletes one or more rows from timer_tasks table +func (mdb *db) RangeDeleteFromHistoryScheduledTasks( + ctx context.Context, + filter sqlplugin.HistoryScheduledTasksRangeFilter, +) (sql.Result, error) { + filter.InclusiveMinVisibilityTimestamp = mdb.converter.ToSQLiteDateTime(filter.InclusiveMinVisibilityTimestamp) + filter.ExclusiveMaxVisibilityTimestamp = mdb.converter.ToSQLiteDateTime(filter.ExclusiveMaxVisibilityTimestamp) + return mdb.conn.ExecContext(ctx, + rangeDeleteHistoryScheduledTasksQuery, + filter.ShardID, + filter.CategoryID, + filter.InclusiveMinVisibilityTimestamp, + filter.ExclusiveMaxVisibilityTimestamp, + ) +} + // InsertIntoTransferTasks inserts one or more rows into transfer_tasks table func (mdb *db) InsertIntoTransferTasks( ctx context.Context, diff --git a/common/persistence/tests/execution_mutable_state_task.go b/common/persistence/tests/execution_mutable_state_task.go index df795e1d6fb..be1da2dd2b5 100644 --- a/common/persistence/tests/execution_mutable_state_task.go +++ b/common/persistence/tests/execution_mutable_state_task.go @@ -31,9 +31,12 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/definition" @@ -41,6 +44,7 @@ import ( "go.temporal.io/server/common/log" p "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/tasks" ) @@ -60,6 +64,15 @@ type ( Ctx context.Context Cancel context.CancelFunc } + + testSerializer struct { + serialization.Serializer + } +) + +var ( + fakeImmediateTaskCategory = tasks.NewCategory(1234, tasks.CategoryTypeImmediate, "fake-immediate") + fakeScheduledTaskCategory = tasks.NewCategory(2345, tasks.CategoryTypeScheduled, "fake-scheduled") ) func NewExecutionMutableStateTaskSuite( @@ -69,6 +82,7 @@ func NewExecutionMutableStateTaskSuite( serializer serialization.Serializer, logger log.Logger, ) *ExecutionMutableStateTaskSuite { + serializer = newTestSerializer(serializer) return &ExecutionMutableStateTaskSuite{ Assertions: require.New(t), ShardManager: p.NewShardManager( @@ -137,6 +151,90 @@ func (s *ExecutionMutableStateTaskSuite) TearDownTest() { s.Cancel() } +func (s *ExecutionMutableStateTaskSuite) TestAddGetRangeCompleteImmediateTasks_Multiple() { + numTasks := 20 + immediateTasks := s.AddRandomTasks( + fakeImmediateTaskCategory, + numTasks, + func(workflowKey definition.WorkflowKey, taskID int64, visibilityTimestamp time.Time) tasks.Task { + fakeTask := tasks.NewFakeTask( + workflowKey, + fakeImmediateTaskCategory, + visibilityTimestamp, + ) + fakeTask.SetTaskID(taskID) + return fakeTask + }, + ) + + immediateTasks, inclusiveMinTaskKey, exclusiveMaxTaskKey := s.RandomPaginateRange(immediateTasks) + loadedTasks := s.PaginateTasks( + fakeImmediateTaskCategory, + inclusiveMinTaskKey, + exclusiveMaxTaskKey, + rand.Intn(len(immediateTasks)*2)+1, + ) + s.Equal(immediateTasks, loadedTasks) + + err := s.ExecutionManager.RangeCompleteHistoryTasks(s.Ctx, &p.RangeCompleteHistoryTasksRequest{ + ShardID: s.ShardID, + TaskCategory: fakeImmediateTaskCategory, + InclusiveMinTaskKey: tasks.NewImmediateKey(0), + ExclusiveMaxTaskKey: tasks.NewImmediateKey(math.MaxInt64), + }) + s.NoError(err) + + loadedTasks = s.PaginateTasks( + fakeImmediateTaskCategory, + inclusiveMinTaskKey, + exclusiveMaxTaskKey, + 1, + ) + s.Empty(loadedTasks) +} + +func (s *ExecutionMutableStateTaskSuite) TestAddGetRangeCompleteScheduledTasks_Multiple() { + numTasks := 20 + scheduledTasks := s.AddRandomTasks( + fakeScheduledTaskCategory, + numTasks, + func(workflowKey definition.WorkflowKey, taskID int64, visibilityTimestamp time.Time) tasks.Task { + fakeTask := tasks.NewFakeTask( + workflowKey, + fakeScheduledTaskCategory, + visibilityTimestamp, + ) + fakeTask.SetTaskID(taskID) + return fakeTask + }, + ) + + scheduledTasks, inclusiveMinTaskKey, exclusiveMaxTaskKey := s.RandomPaginateRange(scheduledTasks) + loadedTasks := s.PaginateTasks( + fakeScheduledTaskCategory, + inclusiveMinTaskKey, + exclusiveMaxTaskKey, + rand.Intn(len(scheduledTasks)*2)+1, + ) + s.Equal(scheduledTasks, loadedTasks) + + err := s.ExecutionManager.RangeCompleteHistoryTasks(s.Ctx, &p.RangeCompleteHistoryTasksRequest{ + ShardID: s.ShardID, + TaskCategory: fakeScheduledTaskCategory, + InclusiveMinTaskKey: tasks.NewKey(time.Unix(0, 0), 0), + ExclusiveMaxTaskKey: tasks.NewKey(time.Unix(0, math.MaxInt64), 0), + }) + s.NoError(err) + + loadedTasks = s.PaginateTasks( + fakeScheduledTaskCategory, + inclusiveMinTaskKey, + exclusiveMaxTaskKey, + 1, + ) + s.Empty(loadedTasks) +} + func (s *ExecutionMutableStateTaskSuite) TestAddGetTransferTasks_Multiple() { numTasks := 20 transferTasks := s.AddRandomTasks( @@ -319,3 +417,67 @@ func (s *ExecutionMutableStateTaskSuite) RandomPaginateRange( return createdTasks[firstTaskIdx:nextTaskIdx], inclusiveMinTaskKey, exclusiveMaxTaskKey } + +func newTestSerializer( + serializer serialization.Serializer, +) serialization.Serializer { + return &testSerializer{ + Serializer: serializer, + } +} + +func (s *testSerializer) SerializeTask( + task tasks.Task, +) (commonpb.DataBlob, error) { + if fakeTask, ok := task.(*tasks.FakeTask); ok { + data, err := proto.Marshal(&persistencespb.TransferTaskInfo{ + NamespaceId: fakeTask.WorkflowKey.NamespaceID, + WorkflowId: fakeTask.WorkflowKey.WorkflowID, + RunId: fakeTask.WorkflowKey.RunID, + TaskType: fakeTask.GetType(), + Version: fakeTask.Version, + TaskId: fakeTask.TaskID, + VisibilityTime: timestamp.TimePtr(fakeTask.VisibilityTimestamp), + }) + if err != nil { + return commonpb.DataBlob{}, err + } + return commonpb.DataBlob{ + Data: data, + EncodingType: enumspb.ENCODING_TYPE_PROTO3, + }, nil + } + + return s.Serializer.SerializeTask(task) +} + +func (s *testSerializer) DeserializeTask( + category tasks.Category, + blob commonpb.DataBlob, +) (tasks.Task, error) { + switch category.ID() { + case tasks.CategoryIDTransfer, + tasks.CategoryIDTimer, + tasks.CategoryIDVisibility, + tasks.CategoryIDReplication: + return s.Serializer.DeserializeTask(category, blob) + } + + taskInfo := &persistencespb.TransferTaskInfo{} + if err := proto.Unmarshal(blob.Data, taskInfo); err != nil { + return nil, err + } + + fakeTask := tasks.NewFakeTask( + definition.NewWorkflowKey( + taskInfo.NamespaceId, + taskInfo.WorkflowId, + taskInfo.RunId, + ), + category, + *taskInfo.VisibilityTime, + ) + fakeTask.SetTaskID(taskInfo.TaskId) + + return fakeTask, nil +} diff --git a/schema/mysql/v57/temporal/schema.sql b/schema/mysql/v57/temporal/schema.sql index 2a40fd30728..297423311eb 100644 --- a/schema/mysql/v57/temporal/schema.sql +++ b/schema/mysql/v57/temporal/schema.sql @@ -89,6 +89,27 @@ CREATE TABLE task_queues ( PRIMARY KEY (range_hash, task_queue_id) ); +CREATE TABLE history_immediate_tasks( + shard_id INT NOT NULL, + category_id INT NOT NULL, + task_id BIGINT NOT NULL, + -- + data MEDIUMBLOB NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, category_id, task_id) +); + +CREATE TABLE history_scheduled_tasks ( + shard_id INT NOT NULL, + category_id INT NOT NULL, + visibility_timestamp DATETIME(6) NOT NULL, + task_id BIGINT NOT NULL, + -- + data MEDIUMBLOB NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, category_id, visibility_timestamp, task_id) +); + CREATE TABLE transfer_tasks( shard_id INT NOT NULL, task_id BIGINT NOT NULL, @@ -264,9 +285,9 @@ CREATE TABLE cluster_membership rpc_address VARCHAR(128) NOT NULL, rpc_port SMALLINT NOT NULL, role TINYINT NOT NULL, - session_start TIMESTAMP DEFAULT '1970-01-01 00:00:01+00:00', - last_heartbeat TIMESTAMP DEFAULT '1970-01-01 00:00:01+00:00', - record_expiry TIMESTAMP DEFAULT '1970-01-01 00:00:01+00:00', + session_start TIMESTAMP DEFAULT '1970-01-02 00:00:01', + last_heartbeat TIMESTAMP DEFAULT '1970-01-02 00:00:01', + record_expiry TIMESTAMP DEFAULT '1970-01-02 00:00:01', INDEX (role, host_id), INDEX (role, last_heartbeat), INDEX (rpc_address, role), @@ -274,4 +295,3 @@ CREATE TABLE cluster_membership INDEX (record_expiry), PRIMARY KEY (membership_partition, host_id) ); - diff --git a/schema/mysql/v57/temporal/versioned/v1.9/history_tasks_table.sql b/schema/mysql/v57/temporal/versioned/v1.9/history_tasks_table.sql new file mode 100644 index 00000000000..6c7cd15480c --- /dev/null +++ b/schema/mysql/v57/temporal/versioned/v1.9/history_tasks_table.sql @@ -0,0 +1,20 @@ +CREATE TABLE history_immediate_tasks( + shard_id INT NOT NULL, + category_id INT NOT NULL, + task_id BIGINT NOT NULL, + -- + data MEDIUMBLOB NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, category_id, task_id) +); + +CREATE TABLE history_scheduled_tasks ( + shard_id INT NOT NULL, + category_id INT NOT NULL, + visibility_timestamp DATETIME(6) NOT NULL, + task_id BIGINT NOT NULL, + -- + data MEDIUMBLOB NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, category_id, visibility_timestamp, task_id) +); \ No newline at end of file diff --git a/schema/mysql/v57/temporal/versioned/v1.9/manifest.json b/schema/mysql/v57/temporal/versioned/v1.9/manifest.json new file mode 100644 index 00000000000..9df45be3080 --- /dev/null +++ b/schema/mysql/v57/temporal/versioned/v1.9/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "1.9", + "MinCompatibleVersion": "1.0", + "Description": "add history tasks table", + "SchemaUpdateCqlFiles": [ + "history_tasks_table.sql" + ] +} diff --git a/schema/mysql/version.go b/schema/mysql/version.go index 546ae6bf031..202d98ab91d 100644 --- a/schema/mysql/version.go +++ b/schema/mysql/version.go @@ -27,7 +27,7 @@ package mysql // NOTE: whenever there is a new database schema update, plz update the following versions // Version is the MySQL database release version -const Version = "1.8" +const Version = "1.9" // VisibilityVersion is the MySQL visibility database release version const VisibilityVersion = "1.1" diff --git a/schema/postgresql/v96/temporal/schema.sql b/schema/postgresql/v96/temporal/schema.sql index da1833560e2..765e9e32e21 100644 --- a/schema/postgresql/v96/temporal/schema.sql +++ b/schema/postgresql/v96/temporal/schema.sql @@ -89,6 +89,27 @@ CREATE TABLE task_queues ( PRIMARY KEY (range_hash, task_queue_id) ); +CREATE TABLE history_immediate_tasks( + shard_id INTEGER NOT NULL, + category_id INTEGER NOT NULL, + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, category_id, task_id) +); + +CREATE TABLE history_scheduled_tasks ( + shard_id INTEGER NOT NULL, + category_id INTEGER NOT NULL, + visibility_timestamp TIMESTAMP NOT NULL, + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, category_id, visibility_timestamp, task_id) +); + CREATE TABLE transfer_tasks( shard_id INTEGER NOT NULL, task_id BIGINT NOT NULL, diff --git a/schema/postgresql/v96/temporal/versioned/v1.9/history_tasks_table.sql b/schema/postgresql/v96/temporal/versioned/v1.9/history_tasks_table.sql new file mode 100644 index 00000000000..b84d230a951 --- /dev/null +++ b/schema/postgresql/v96/temporal/versioned/v1.9/history_tasks_table.sql @@ -0,0 +1,20 @@ +CREATE TABLE history_immediate_tasks( + shard_id INTEGER NOT NULL, + category_id INTEGER NOT NULL, + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, category_id, task_id) +); + +CREATE TABLE history_scheduled_tasks ( + shard_id INTEGER NOT NULL, + category_id INTEGER NOT NULL, + visibility_timestamp TIMESTAMP NOT NULL, + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, category_id, visibility_timestamp, task_id) +); \ No newline at end of file diff --git a/schema/postgresql/v96/temporal/versioned/v1.9/manifest.json b/schema/postgresql/v96/temporal/versioned/v1.9/manifest.json new file mode 100644 index 00000000000..9df45be3080 --- /dev/null +++ b/schema/postgresql/v96/temporal/versioned/v1.9/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "1.9", + "MinCompatibleVersion": "1.0", + "Description": "add history tasks table", + "SchemaUpdateCqlFiles": [ + "history_tasks_table.sql" + ] +} diff --git a/schema/postgresql/version.go b/schema/postgresql/version.go index 11ebbc791c7..80ab9c91587 100644 --- a/schema/postgresql/version.go +++ b/schema/postgresql/version.go @@ -28,7 +28,7 @@ package postgresql // Version is the Postgres database release version // Temporal supports both MySQL and Postgres officially, so upgrade should be performed for both MySQL and Postgres -const Version = "1.8" +const Version = "1.9" // VisibilityVersion is the Postgres visibility database release version // Temporal supports both MySQL and Postgres officially, so upgrade should be performed for both MySQL and Postgres diff --git a/schema/sqlite/v3/temporal/schema.sql b/schema/sqlite/v3/temporal/schema.sql index 66c6a631aba..bd0344cdfab 100644 --- a/schema/sqlite/v3/temporal/schema.sql +++ b/schema/sqlite/v3/temporal/schema.sql @@ -88,6 +88,27 @@ CREATE TABLE task_queues ( PRIMARY KEY (range_hash, task_queue_id) ); +CREATE TABLE history_immediate_tasks( + shard_id INT NOT NULL, + category_id INT NOT NULL, + task_id BIGINT NOT NULL, + -- + data MEDIUMBLOB NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, category_id, task_id) +); + +CREATE TABLE history_scheduled_tasks ( + shard_id INT NOT NULL, + category_id INT NOT NULL, + visibility_timestamp TIMESTAMP NOT NULL, + task_id BIGINT NOT NULL, + -- + data MEDIUMBLOB NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, category_id, visibility_timestamp, task_id) +); + CREATE TABLE transfer_tasks( shard_id INT NOT NULL, task_id BIGINT NOT NULL, diff --git a/service/history/tasks/fake_task.go b/service/history/tasks/fake_task.go index 5ecc2d13f1b..e301a546bca 100644 --- a/service/history/tasks/fake_task.go +++ b/service/history/tasks/fake_task.go @@ -33,7 +33,7 @@ import ( ) type ( - fakeTask struct { + FakeTask struct { definition.WorkflowKey VisibilityTimestamp time.Time TaskID int64 @@ -47,7 +47,7 @@ func NewFakeTask( category Category, visibilityTimestamp time.Time, ) Task { - return &fakeTask{ + return &FakeTask{ WorkflowKey: workflowKey, TaskID: common.EmptyEventTaskID, Version: common.EmptyVersion, @@ -56,38 +56,38 @@ func NewFakeTask( } } -func (f *fakeTask) GetKey() Key { +func (f *FakeTask) GetKey() Key { return NewKey(f.VisibilityTimestamp, f.TaskID) } -func (f *fakeTask) GetVersion() int64 { +func (f *FakeTask) GetVersion() int64 { return f.Version } -func (f *fakeTask) SetVersion(version int64) { +func (f *FakeTask) SetVersion(version int64) { f.Version = version } -func (f *fakeTask) GetTaskID() int64 { +func (f *FakeTask) GetTaskID() int64 { return f.TaskID } -func (f *fakeTask) SetTaskID(id int64) { +func (f *FakeTask) SetTaskID(id int64) { f.TaskID = id } -func (f *fakeTask) GetVisibilityTime() time.Time { +func (f *FakeTask) GetVisibilityTime() time.Time { return f.VisibilityTimestamp } -func (f *fakeTask) SetVisibilityTime(t time.Time) { +func (f *FakeTask) SetVisibilityTime(t time.Time) { f.VisibilityTimestamp = t } -func (f *fakeTask) GetCategory() Category { +func (f *FakeTask) GetCategory() Category { return f.Category } -func (f *fakeTask) GetType() enumsspb.TaskType { +func (f *FakeTask) GetType() enumsspb.TaskType { return enumsspb.TASK_TYPE_UNSPECIFIED } diff --git a/service/history/tasks/utils.go b/service/history/tasks/utils.go index a1e8f4672b1..69d87802a87 100644 --- a/service/history/tasks/utils.go +++ b/service/history/tasks/utils.go @@ -92,7 +92,7 @@ func GetTransferTaskEventID( eventID = task.InitiatedEventID case *ResetWorkflowTask: eventID = common.FirstEventID - case *fakeTask: + case *FakeTask: // no-op default: panic(serviceerror.NewInternal("unknown transfer task")) @@ -120,7 +120,7 @@ func GetTimerTaskEventID( eventID = common.FirstEventID case *DeleteHistoryEventTask: eventID = common.FirstEventID - case *fakeTask: + case *FakeTask: // no-op default: panic(serviceerror.NewInternal("unknown timer task"))