diff --git a/src/actions/createtimeraction.ts b/src/actions/createtimeraction.ts index e1aa000..026b6f9 100644 --- a/src/actions/createtimeraction.ts +++ b/src/actions/createtimeraction.ts @@ -5,7 +5,7 @@ import { ActionType, IAction } from "../classes"; export class CreateTimerAction implements IAction { public readonly actionType: ActionType = ActionType.CreateTimer; - constructor(public readonly fireAt: Date, public isCancelled: boolean = false) { + constructor(public readonly fireAt: Date, public isCanceled: boolean = false) { if (!isDate(fireAt)) { throw new TypeError(`fireAt: Expected valid Date object but got ${fireAt}`); } diff --git a/src/durableorchestrationbindinginfo.ts b/src/durableorchestrationbindinginfo.ts index 0fc23e4..2e77f30 100644 --- a/src/durableorchestrationbindinginfo.ts +++ b/src/durableorchestrationbindinginfo.ts @@ -11,6 +11,8 @@ export class DurableOrchestrationBindingInfo { public readonly instanceId: string = "", public readonly isReplaying: boolean = false, public readonly parentInstanceId?: string, + public readonly maximumShortTimerDuration?: string, + public readonly longRunningTimerIntervalDuration?: string, upperSchemaVersion = 0 // TODO: Implement entity locking // public readonly contextLocks?: EntityId[], ) { // It is assumed that the extension supports all schemas in range [0, upperSchemaVersion]. diff --git a/src/durableorchestrationcontext.ts b/src/durableorchestrationcontext.ts index b2da3be..833e7b4 100644 --- a/src/durableorchestrationcontext.ts +++ b/src/durableorchestrationcontext.ts @@ -28,7 +28,10 @@ import { Task, TimerTask, DFTask, + LongTimerTask, } from "./task"; +import moment = require("moment"); +import { ReplaySchema } from "./replaySchema"; /** * Parameter data for orchestration bindings that can be used to schedule @@ -41,6 +44,9 @@ export class DurableOrchestrationContext { currentUtcDateTime: Date, isReplaying: boolean, parentInstanceId: string | undefined, + longRunningTimerIntervalDuration: string | undefined, + maximumShortTimerDuration: string | undefined, + schemaVersion: ReplaySchema, input: unknown, private taskOrchestratorExecutor: TaskOrchestrationExecutor ) { @@ -49,6 +55,13 @@ export class DurableOrchestrationContext { this.isReplaying = isReplaying; this.currentUtcDateTime = currentUtcDateTime; this.parentInstanceId = parentInstanceId; + this.longRunningTimerIntervalDuration = longRunningTimerIntervalDuration + ? moment.duration(longRunningTimerIntervalDuration) + : undefined; + this.maximumShortTimerDuration = maximumShortTimerDuration + ? moment.duration(maximumShortTimerDuration) + : undefined; + this.schemaVersion = schemaVersion; this.input = input; this.newGuidCounter = 0; } @@ -101,6 +114,34 @@ export class DurableOrchestrationContext { */ public currentUtcDateTime: Date; + /** + * Gets the maximum duration for timers allowed by the + * underlying storage infrastructure + * + * This duration property is determined by the underlying storage + * solution and passed to the SDK from the extension. + */ + private readonly maximumShortTimerDuration: moment.Duration | undefined; + + /** + * A duration property which defines the duration of smaller + * timers to break long timers into, in case they are longer + * than the maximum supported duration + * + * This duration property is determined by the underlying + * storage solution and passed to the SDK from the extension. + */ + private readonly longRunningTimerIntervalDuration: moment.Duration | undefined; + + /** + * Gets the current schema version that this execution is + * utilizing, based on negotiation with the extension. + * + * Different schema versions can allow different behavior. + * For example, long timers are only supported in schema version >=3 + */ + private readonly schemaVersion: ReplaySchema; + /** * @hidden * This method informs the type-checker that an ITask[] can be treated as DFTask[]. @@ -293,9 +334,33 @@ export class DurableOrchestrationContext { * @returns A TimerTask that completes when the durable timer expires. */ public createTimer(fireAt: Date): TimerTask { - const newAction = new CreateTimerAction(fireAt); - const task = new DFTimerTask(false, newAction); - return task; + const timerAction = new CreateTimerAction(fireAt); + const durationUntilFire = moment.duration(moment(fireAt).diff(this.currentUtcDateTime)); + if (this.schemaVersion >= ReplaySchema.V3) { + if (!this.maximumShortTimerDuration || !this.longRunningTimerIntervalDuration) { + throw Error( + "A framework-internal error was detected: replay schema version >= V3 is being used, " + + "but one or more of the properties `maximumShortTimerDuration` and `longRunningTimerIntervalDuration` are not defined. " + + "This is likely an issue with the Durable Functions Extension. " + + "Please report this bug here: https://github.com/Azure/azure-functions-durable-js/issues\n" + + `maximumShortTimerDuration: ${this.maximumShortTimerDuration}\n` + + `longRunningTimerIntervalDuration: ${this.longRunningTimerIntervalDuration}` + ); + } + + if (durationUntilFire > this.maximumShortTimerDuration) { + return new LongTimerTask( + false, + timerAction, + this, + this.taskOrchestratorExecutor, + this.maximumShortTimerDuration, + this.longRunningTimerIntervalDuration + ); + } + } + + return new DFTimerTask(false, timerAction); } /** diff --git a/src/orchestrator.ts b/src/orchestrator.ts index 09fb1c1..9f11421 100644 --- a/src/orchestrator.ts +++ b/src/orchestrator.ts @@ -61,6 +61,9 @@ export class Orchestrator { this.currentUtcDateTime, orchestrationBinding.isReplaying, orchestrationBinding.parentInstanceId, + orchestrationBinding.longRunningTimerIntervalDuration, + orchestrationBinding.maximumShortTimerDuration, + upperSchemaVersion, input, this.taskOrchestrationExecutor ); diff --git a/src/replaySchema.ts b/src/replaySchema.ts index 637d8fb..ae07a5c 100644 --- a/src/replaySchema.ts +++ b/src/replaySchema.ts @@ -5,6 +5,7 @@ export enum ReplaySchema { V1 = 0, V2 = 1, + V3 = 2, } -export const LatestReplaySchema: ReplaySchema = ReplaySchema.V2; +export const LatestReplaySchema: ReplaySchema = ReplaySchema.V3; diff --git a/src/task.ts b/src/task.ts index 9f11d17..9dd99b1 100644 --- a/src/task.ts +++ b/src/task.ts @@ -1,6 +1,8 @@ import { RetryOptions } from "."; import { IAction, CreateTimerAction } from "./classes"; import { TaskOrchestrationExecutor } from "./taskorchestrationexecutor"; +import moment = require("moment"); +import { DurableOrchestrationContext } from "./durableorchestrationcontext"; /** * @hidden @@ -16,7 +18,7 @@ export enum TaskState { * @hidden * A taskID, either a `string` for external events, * or either `false` or a `number` for un-awaited - * an awaited tasks respectively. + * and awaited tasks respectively. */ export type TaskID = number | string | false; @@ -94,7 +96,7 @@ export interface TimerTask extends Task { /** * @returns Whether or not the timer has been canceled. */ - isCancelled: boolean; + isCanceled: boolean; /** * Indicates the timer should be canceled. This request will execute on the * next `yield` or `return` statement. @@ -290,9 +292,9 @@ export class DFTimerTask extends AtomicTask implements TimerTask { super(id, action); } - /** Whether this timer task is cancelled */ - get isCancelled(): boolean { - return this.action.isCancelled; + /** Whether this timer task is canceled */ + get isCanceled(): boolean { + return this.action.isCanceled; } /** @@ -304,7 +306,7 @@ export class DFTimerTask extends AtomicTask implements TimerTask { if (this.hasResult) { throw Error("Cannot cancel a completed task."); } - this.action.isCancelled = true; // TODO: fix typo + this.action.isCanceled = true; } } @@ -377,6 +379,103 @@ export class WhenAnyTask extends CompoundTask { } } +/** + * @hidden + * + * A long Timer Task. + * + * This Task is created when a timer is created with a duration + * longer than the maximum timer duration supported by storage infrastructure. + * + * It extends `WhenAllTask` because it decomposes into + * several smaller sub-`TimerTask`s + */ +export class LongTimerTask extends WhenAllTask implements TimerTask { + public id: TaskID; + public action: CreateTimerAction; + private readonly executor: TaskOrchestrationExecutor; + private readonly maximumTimerLength: moment.Duration; + private readonly orchestrationContext: DurableOrchestrationContext; + private readonly longRunningTimerIntervalDuration: moment.Duration; + + public constructor( + id: TaskID, + action: CreateTimerAction, + orchestrationContext: DurableOrchestrationContext, + executor: TaskOrchestrationExecutor, + maximumTimerLength: moment.Duration, + longRunningTimerIntervalDuration: moment.Duration + ) { + const currentTime = orchestrationContext.currentUtcDateTime; + const finalFireTime = action.fireAt; + const durationUntilFire = moment.duration(moment(finalFireTime).diff(currentTime)); + + const nextFireTime: Date = + durationUntilFire > maximumTimerLength + ? moment(currentTime).add(longRunningTimerIntervalDuration).toDate() + : finalFireTime; + + const nextTimerAction = new CreateTimerAction(nextFireTime); + const nextTimerTask = new DFTimerTask(false, nextTimerAction); + super([nextTimerTask], action); + + this.id = id; + this.action = action; + this.orchestrationContext = orchestrationContext; + this.executor = executor; + this.maximumTimerLength = maximumTimerLength; + this.longRunningTimerIntervalDuration = longRunningTimerIntervalDuration; + } + + get isCanceled(): boolean { + return this.action.isCanceled; + } + + /** + * @hidden + * Cancel this timer task. + * It errors out if the task has already completed. + */ + public cancel(): void { + if (this.hasResult) { + throw Error("Cannot cancel a completed task."); + } + this.action.isCanceled = true; + } + + /** + * @hidden + * Attempts to set a value to this timer, given a completed sub-timer + * + * @param child + * The sub-timer that just completed + */ + public trySetValue(child: DFTimerTask): void { + const currentTime = this.orchestrationContext.currentUtcDateTime; + const finalFireTime = this.action.fireAt; + if (finalFireTime > currentTime) { + const nextTimer: DFTimerTask = this.getNextTimerTask(finalFireTime, currentTime); + this.addNewChild(nextTimer); + } + super.trySetValue(child); + } + + private getNextTimerTask(finalFireTime: Date, currentTime: Date): DFTimerTask { + const durationUntilFire = moment.duration(moment(finalFireTime).diff(currentTime)); + const nextFireTime: Date = + durationUntilFire > this.maximumTimerLength + ? moment(currentTime).add(this.longRunningTimerIntervalDuration).toDate() + : finalFireTime; + return new DFTimerTask(false, new CreateTimerAction(nextFireTime)); + } + + private addNewChild(childTimer: DFTimerTask): void { + childTimer.parent = this; + this.children.push(childTimer); + this.executor.trackOpenTask(childTimer); + } +} + /** * @hidden * diff --git a/src/testingUtils.ts b/src/testingUtils.ts index 58d5cd9..fa17c53 100644 --- a/src/testingUtils.ts +++ b/src/testingUtils.ts @@ -16,6 +16,7 @@ import { OrchestratorStartedEvent, } from "./classes"; import { IOrchestrationFunctionContext } from "./iorchestrationfunctioncontext"; +import { ReplaySchema } from "./replaySchema"; import { TaskOrchestrationExecutor } from "./taskorchestrationexecutor"; /** @@ -33,6 +34,9 @@ export class DummyOrchestrationContext implements IOrchestrationFunctionContext * @param input The input to the orchestration * @param currentUtcDateTime The deterministic date at the beginning of orchestration replay * @param isReplaying Whether the orchestration is to be marked as isReplaying the its first event + * @param longRunningTimerIntervalDuration The duration to break smaller timers into if a long timer exceeds the maximum allowed duration + * @param maximumShortTimerDuration The maximum duration for a timer allowed by the underlying storage infrastructure + * @param schemaVersion The schema version currently used after being negotiated with the extension * @param parentInstanceId The instanceId of the orchestration's parent, if this is a sub-orchestration */ constructor( @@ -40,6 +44,9 @@ export class DummyOrchestrationContext implements IOrchestrationFunctionContext history: HistoryEvent[] | undefined = undefined, input: any = undefined, currentUtcDateTime: Date = new Date(), + longRunningTimerIntervalDuration: string, + maximumShortTimerDuration: string, + schemaVersion: ReplaySchema, isReplaying = false, parentInstanceId = "" ) { @@ -54,6 +61,9 @@ export class DummyOrchestrationContext implements IOrchestrationFunctionContext currentUtcDateTime, isReplaying, parentInstanceId, + longRunningTimerIntervalDuration, + maximumShortTimerDuration, + schemaVersion, input, new TaskOrchestrationExecutor() ); diff --git a/src/utils.ts b/src/utils.ts index 96dfe87..99ff97f 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -19,9 +19,6 @@ export class Utils { this.hasAllPropertiesOf(value, typeInstance) ); - // This recursive step ensures _all_ Timestamp properties are converted properly - // For example, a payload can contain the history as a property, so if we want to - // parse each HistoryEvent's Timestamp, we need to traverse the payload recursively this.parseTimestampsAsDates(candidateObjects); return candidateObjects as T[]; @@ -60,6 +57,9 @@ export class Utils { obj.Timestamp = new Date(obj.Timestamp); } Object.values(obj).map((value) => { + // This recursive step ensures _all_ Timestamp properties are converted properly + // For example, a payload can contain the history as a property, so if we want to + // parse each HistoryEvent's Timestamp, we need to traverse the payload recursively this.parseTimestampsAsDates(value); }); } diff --git a/test/integration/orchestrator-spec.ts b/test/integration/orchestrator-spec.ts index 8d36fd6..6e46808 100644 --- a/test/integration/orchestrator-spec.ts +++ b/test/integration/orchestrator-spec.ts @@ -3,6 +3,7 @@ import { TraceContext } from "@azure/functions"; import { expect } from "chai"; import "mocha"; import * as moment from "moment"; +import { start } from "repl"; import * as uuidv1 from "uuid/v1"; import { ManagedIdentityTokenSource } from "../../src"; import { @@ -84,32 +85,6 @@ describe("Orchestrator", () => { ); }); - /* - it("handles a simple orchestration function (no activity functions), with yield of non-Task object", async () => { - const orchestrator = TestOrchestrations.SayHelloInlineInproperYield; - const name = "World"; - const mockContext = new MockContext({ - context: new DurableOrchestrationBindingInfo( - TestHistories.GetOrchestratorStart( - "SayHelloInlineInproperYield", - moment.utc().toDate(), - name - ), - name - ), - }); - orchestrator(mockContext); - - expect(mockContext.doneValue).to.be.deep.equal( - new OrchestratorState({ - isDone: true, - actions: [], - output: `Hello, ${name}!`, - }) - ); - }); - */ - describe("handle falsy values", () => { for (const falsyValue of falsyValues) { it(`handles an orchestration function that returns ${ @@ -1587,6 +1562,107 @@ describe("Orchestrator", () => { ) ); }); + + describe("long timers", () => { + it("schedules long timers", () => { + const orchestrator = TestOrchestrations.WaitOnTimer; + const startTime = moment.utc().toDate(); + const fireAt = moment(startTime).add(10, "d").toDate(); + + const mockContext = new MockContext({ + context: new DurableOrchestrationBindingInfo( + TestHistories.GetOrchestratorStart("WaitOnTimer", startTime), + fireAt, + "", + false, + undefined, + "6.00:00:00", + "3.00:00:00", + ReplaySchema.V3 + ), + }); + + orchestrator(mockContext); + + expect(mockContext.doneValue).to.be.deep.equal( + new OrchestratorState( + { + isDone: false, + output: undefined, + actions: [[new CreateTimerAction(fireAt)]], + schemaVersion: ReplaySchema.V3, + }, + true + ) + ); + }); + + it("waits for sub-timers of long timer", () => { + const orchestrator = TestOrchestrations.WaitOnTimer; + const startTime = moment.utc().toDate(); + const fireAt = moment(startTime).add(10, "days").toDate(); + + const mockContext = new MockContext({ + context: new DurableOrchestrationBindingInfo( + TestHistories.GetWaitOnLongTimerHalfway(startTime, fireAt), + fireAt, + "", + false, + undefined, + "6.00:00:00", + "3.00:00:00", + ReplaySchema.V3 + ), + }); + + orchestrator(mockContext); + + expect(mockContext.doneValue).to.be.deep.equal( + new OrchestratorState( + { + isDone: false, + output: undefined, + actions: [[new CreateTimerAction(fireAt)]], + schemaVersion: ReplaySchema.V3, + }, + true + ) + ); + }); + + it("proceeds after long timer fires", () => { + const orchestrator = TestOrchestrations.WaitOnTimer; + const startTimestamp = moment.utc().toDate(); + const fireAt = moment(startTimestamp).add(10, "d").toDate(); + + const mockContext = new MockContext({ + context: new DurableOrchestrationBindingInfo( + TestHistories.GetWaitOnTimerFired(startTimestamp, fireAt), + fireAt, + "", + false, + undefined, + "6.00:00:00", + "3.00:00:00", + ReplaySchema.V3 + ), + }); + + orchestrator(mockContext); + + expect(mockContext.doneValue).to.deep.equal( + new OrchestratorState( + { + isDone: true, + actions: [[new CreateTimerAction(fireAt)]], + output: "Timer fired!", + schemaVersion: ReplaySchema.V3, + }, + true + ) + ); + }); + }); }); describe("newGuid()", () => { diff --git a/test/testobjects/testhistories.ts b/test/testobjects/testhistories.ts index 916a792..419902b 100644 --- a/test/testobjects/testhistories.ts +++ b/test/testobjects/testhistories.ts @@ -2303,31 +2303,29 @@ export class TestHistories { ]; } - public static GetWaitOnTimerFired(firstTimestamp: Date, fireAt: Date): HistoryEvent[] { + private static TimerCreated(timestamp: Date, fireAt: Date, timerId: number): HistoryEvent[] { return [ new OrchestratorStartedEvent({ eventId: -1, - timestamp: firstTimestamp, + timestamp, isPlayed: false, }), - new ExecutionStartedEvent({ - eventId: -1, - timestamp: firstTimestamp, - isPlayed: true, - name: "WaitOnTimer", - input: JSON.stringify(fireAt), - }), new TimerCreatedEvent({ - eventId: 0, - timestamp: firstTimestamp, + eventId: timerId, + timestamp, isPlayed: false, fireAt, }), new OrchestratorCompletedEvent({ eventId: -1, - timestamp: firstTimestamp, + timestamp, isPlayed: false, }), + ]; + } + + private static TimerFired(fireAt: Date, timerId: number): HistoryEvent[] { + return [ new OrchestratorStartedEvent({ eventId: -1, timestamp: fireAt, @@ -2338,8 +2336,76 @@ export class TestHistories { timestamp: fireAt, isPlayed: false, fireAt, - timerId: 0, + timerId, + }), + new OrchestratorCompletedEvent({ + eventId: -1, + timestamp: fireAt, + isPlayed: false, + }), + ]; + } + + public static GetWaitOnTimerFired( + firstTimestamp: Date, + fireAt: Date, + maximumShortTimerDuration: moment.Duration = moment.duration(6, "d"), + longRunningTimerIntervalDuration: moment.Duration = moment.duration(3, "d") + ): HistoryEvent[] { + const history: HistoryEvent[] = [ + new ExecutionStartedEvent({ + eventId: -1, + timestamp: firstTimestamp, + isPlayed: true, + name: "WaitOnTimer", + input: JSON.stringify(fireAt), }), ]; + + let previousTimestamp = firstTimestamp; + let nextFireAt; + let timerId = 0; + while ( + moment.duration(moment(fireAt).diff(previousTimestamp)) > maximumShortTimerDuration + ) { + nextFireAt = moment(previousTimestamp).add(longRunningTimerIntervalDuration).toDate(); + history.push( + ...this.TimerCreated(previousTimestamp, nextFireAt, timerId), + ...this.TimerFired(nextFireAt, timerId) + ); + previousTimestamp = nextFireAt; + timerId++; + } + history.push( + ...this.TimerCreated(previousTimestamp, fireAt, timerId), + ...this.TimerFired(fireAt, timerId) + ); + return history; + } + + public static GetWaitOnLongTimerHalfway( + firstTimestamp: Date, + fireAt: Date, + maximumShortTimerDuration: moment.Duration = moment.duration(6, "d"), + longRunningTimerIntervalDuration: moment.Duration = moment.duration(3, "d") + ): HistoryEvent[] { + if (moment.duration(moment(fireAt).diff(firstTimestamp)) < maximumShortTimerDuration) { + throw new Error("Not a long timer"); + } + + const history: HistoryEvent[] = [ + new ExecutionStartedEvent({ + eventId: -1, + timestamp: firstTimestamp, + isPlayed: true, + name: "WaitOnTimer", + input: JSON.stringify(fireAt), + }), + ]; + + const nextFireAt = moment(firstTimestamp).add(longRunningTimerIntervalDuration).toDate(); + history.push(...this.TimerCreated(firstTimestamp, nextFireAt, 0)); + history.push(...this.TimerFired(nextFireAt, 0)); + return history; } } diff --git a/test/unit/timertask-spec.ts b/test/unit/timertask-spec.ts index f74d608..e38510b 100644 --- a/test/unit/timertask-spec.ts +++ b/test/unit/timertask-spec.ts @@ -5,9 +5,9 @@ import { DFTimerTask } from "../../src/task"; describe("TimerTask", () => { it("throws cannot cancel a completed task", async () => { - const isCancelled = false; + const isCanceled = false; const date = new Date(); - const action = new CreateTimerAction(date, isCancelled); + const action = new CreateTimerAction(date, isCanceled); const task = new DFTimerTask(0, action); task.setValue(false, undefined); // set value to complete task @@ -17,21 +17,21 @@ describe("TimerTask", () => { }); it("cancels an incomplete task", async () => { - const isCancelled = false; + const isCanceled = false; const date = new Date(); - const action = new CreateTimerAction(date, isCancelled); + const action = new CreateTimerAction(date, isCanceled); const task = new DFTimerTask(0, action); task.cancel(); - expect(task.isCancelled).to.equal(true); + expect(task.isCanceled).to.equal(true); }); it("is canceled when its action is canceled", async () => { - const isCancelled = true; + const isCanceled = true; const date = new Date(); - const action = new CreateTimerAction(date, isCancelled); + const action = new CreateTimerAction(date, isCanceled); const task = new DFTimerTask(0, action); - expect(task.isCancelled).to.equal(true); + expect(task.isCanceled).to.equal(true); }); });