Skip to content

Support Long Timers ⏳📈🚀🚀🚀 #340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1cc2435
fix timestamp conversion bug
hossam-nasr Apr 8, 2022
9b507f5
initial long timer task
hossam-nasr Mar 23, 2022
dde7405
extra changes
hossam-nasr Mar 23, 2022
bbada16
remove create long timer action
hossam-nasr Mar 23, 2022
92af8df
finalize LongTimerTask
hossam-nasr Mar 23, 2022
a7ed2d2
add long timer task to durable orchestration context
hossam-nasr Mar 23, 2022
4759a12
add binding info
hossam-nasr Mar 23, 2022
299a898
fix logic
hossam-nasr Mar 23, 2022
72cd064
update logic
hossam-nasr Mar 23, 2022
f836164
switch to dynamic
hossam-nasr Mar 24, 2022
382bcb2
track open task
hossam-nasr Mar 25, 2022
1a511e8
Enforce replay schema for long timers
hossam-nasr Apr 9, 2022
225b47f
replay schema latest
hossam-nasr Apr 9, 2022
121c613
comments and clean up
hossam-nasr Apr 9, 2022
5673d61
simplify create timer
hossam-nasr Apr 9, 2022
0d8004e
make properties private
hossam-nasr Apr 11, 2022
25e0a27
remove abbreviations
hossam-nasr Apr 11, 2022
c1cc962
add error message
hossam-nasr Apr 11, 2022
e2f32c0
Update error message
hossam-nasr Apr 11, 2022
6ccb4cb
log property values
hossam-nasr Apr 11, 2022
008d8de
merge dev
hossam-nasr Apr 11, 2022
ac1ea82
add initial tests for long timers
hossam-nasr Apr 12, 2022
bf056a0
don't use moment.add
hossam-nasr Apr 12, 2022
66a9ae9
don't use moment.add
hossam-nasr Apr 13, 2022
529a33c
fix tests for long timers
hossam-nasr Apr 13, 2022
ddc0667
fix recursive comment
hossam-nasr Apr 13, 2022
2f2aaca
use same duration format as extension in tests
hossam-nasr Apr 13, 2022
98bf97b
cancelled -> canceled
hossam-nasr Apr 13, 2022
c9dd546
Simplify code
hossam-nasr Apr 13, 2022
db05956
canceled
hossam-nasr Apr 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/durableorchestrationbindinginfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export class DurableOrchestrationBindingInfo {
public readonly instanceId: string = "",
public readonly isReplaying: boolean = false,
public readonly parentInstanceId?: string,
public readonly maximumShortTimerDuration?: string,
public readonly longRunningTimerIntervalDuration?: string,
upperSchemaVersion = 0 // TODO: Implement entity locking // public readonly contextLocks?: EntityId[],
) {
// It is assumed that the extension supports all schemas in range [0, upperSchemaVersion].
Expand Down
66 changes: 63 additions & 3 deletions src/durableorchestrationcontext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import {
Task,
TimerTask,
DFTask,
LongTimerTask,
} from "./task";
import moment = require("moment");
import { ReplaySchema } from "./replaySchema";

/**
* Parameter data for orchestration bindings that can be used to schedule
Expand All @@ -41,6 +44,9 @@ export class DurableOrchestrationContext {
currentUtcDateTime: Date,
isReplaying: boolean,
parentInstanceId: string | undefined,
longRunningTimerIntervalDuration: string | undefined,
maximumShortTimerDuration: string | undefined,
schemaVersion: ReplaySchema,
input: unknown,
private taskOrchestratorExecutor: TaskOrchestrationExecutor
) {
Expand All @@ -49,6 +55,13 @@ export class DurableOrchestrationContext {
this.isReplaying = isReplaying;
this.currentUtcDateTime = currentUtcDateTime;
this.parentInstanceId = parentInstanceId;
this.longRunningTimerIntervalDuration = longRunningTimerIntervalDuration
? moment.duration(longRunningTimerIntervalDuration)
: undefined;
this.maximumShortTimerDuration = maximumShortTimerDuration
? moment.duration(maximumShortTimerDuration)
: undefined;
this.schemaVersion = schemaVersion;
this.input = input;
this.newGuidCounter = 0;
}
Expand Down Expand Up @@ -101,6 +114,34 @@ export class DurableOrchestrationContext {
*/
public currentUtcDateTime: Date;

/**
* Gets the maximum duration for timers allowed by the
* underlying storage infrastructure
*
* This duration property is determined by the underlying storage
* solution and passed to the SDK from the extension.
*/
private readonly maximumShortTimerDuration: moment.Duration | undefined;

/**
* A duration property which defines the duration of smaller
* timers to break long timers into, in case they are longer
* than the maximum supported duration
*
* This duration property is determined by the underlying
* storage solution and passed to the SDK from the extension.
*/
private readonly longRunningTimerIntervalDuration: moment.Duration | undefined;

/**
* Gets the current schema version that this execution is
* utilizing, based on negotiation with the extension.
*
* Different schema versions can allow different behavior.
* For example, long timers are only supported in schema version >=3
*/
private readonly schemaVersion: ReplaySchema;

/**
* @hidden
* This method informs the type-checker that an ITask[] can be treated as DFTask[].
Expand Down Expand Up @@ -293,9 +334,28 @@ export class DurableOrchestrationContext {
* @returns A TimerTask that completes when the durable timer expires.
*/
public createTimer(fireAt: Date): TimerTask {
const newAction = new CreateTimerAction(fireAt);
const task = new DFTimerTask(false, newAction);
return task;
const timerAction = new CreateTimerAction(fireAt);
const durationUntilFire = moment.duration(moment(fireAt).diff(this.currentUtcDateTime));
if (this.schemaVersion >= ReplaySchema.V3) {
if (!this.maximumShortTimerDuration || !this.longRunningTimerIntervalDuration) {
throw Error(
"Replay schema version >= V3 is being used, but one or more of the properties `maximumShortTimerDuration` and `longRunningTimerIntervalDuration` are not defined. This is likely an issue with the Extension."
);
}

if (durationUntilFire > this.maximumShortTimerDuration) {
return new LongTimerTask(
false,
timerAction,
this,
this.taskOrchestratorExecutor,
this.maximumShortTimerDuration,
this.longRunningTimerIntervalDuration
);
}
}

return new DFTimerTask(false, timerAction);
}

/**
Expand Down
3 changes: 3 additions & 0 deletions src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ export class Orchestrator {
this.currentUtcDateTime,
orchestrationBinding.isReplaying,
orchestrationBinding.parentInstanceId,
orchestrationBinding.longRunningTimerIntervalDuration,
orchestrationBinding.maximumShortTimerDuration,
upperSchemaVersion,
input,
this.taskOrchestrationExecutor
);
Expand Down
3 changes: 2 additions & 1 deletion src/replaySchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
export enum ReplaySchema {
V1 = 0,
V2 = 1,
V3 = 2,
}

export const LatestReplaySchema: ReplaySchema = ReplaySchema.V2;
export const LatestReplaySchema: ReplaySchema = ReplaySchema.V3;
105 changes: 104 additions & 1 deletion src/task.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { RetryOptions } from ".";
import { IAction, CreateTimerAction } from "./classes";
import { TaskOrchestrationExecutor } from "./taskorchestrationexecutor";
import moment = require("moment");
import { DurableOrchestrationContext } from "./durableorchestrationcontext";

/**
* @hidden
Expand All @@ -16,7 +18,7 @@ export enum TaskState {
* @hidden
* A taskID, either a `string` for external events,
* or either `false` or a `number` for un-awaited
* an awaited tasks respectively.
* and awaited tasks respectively.
*/
export type TaskID = number | string | false;

Expand Down Expand Up @@ -377,6 +379,107 @@ export class WhenAnyTask extends CompoundTask {
}
}

/**
* @hidden
*
* A long Timer Task.
*
* This Task is created when a timer is created with a duration
* longer than the maximum timer duration supported by storage infrastructure.
*
* It extends `WhenAllTask` because it decomposes into
* several smaller sub-`TimerTask`s
*/
export class LongTimerTask extends WhenAllTask implements TimerTask {
public id: TaskID;
public action: CreateTimerAction;
private readonly executor: TaskOrchestrationExecutor;
private readonly maximumTimerLength: moment.Duration;
private readonly orchestrationContext: DurableOrchestrationContext;
private readonly longRunningTimerIntervalDuration: moment.Duration;

public constructor(
id: TaskID,
action: CreateTimerAction,
orchestrationContext: DurableOrchestrationContext,
executor: TaskOrchestrationExecutor,
maximumTimerLength: moment.Duration,
longRunningTimerIntervalDuration: moment.Duration
) {
const currentTime = moment(orchestrationContext.currentUtcDateTime);
const finalFireTime = moment(action.fireAt);
const durationUntilFire = moment.duration(finalFireTime.diff(currentTime));

let nextFireTime: Date;
if (durationUntilFire > maximumTimerLength) {
nextFireTime = currentTime.add(longRunningTimerIntervalDuration).toDate();
} else {
nextFireTime = finalFireTime.toDate();
}

const nextTimerAction = new CreateTimerAction(nextFireTime);
const nextTimerTask = new DFTimerTask(false, nextTimerAction);
super([nextTimerTask], action);

this.id = id;
this.action = action;
this.orchestrationContext = orchestrationContext;
this.executor = executor;
this.maximumTimerLength = maximumTimerLength;
this.longRunningTimerIntervalDuration = longRunningTimerIntervalDuration;
}

get isCancelled(): boolean {
return this.action.isCancelled;
}

/**
* @hidden
* Cancel this timer task.
* It errors out if the task has already completed.
*/
public cancel(): void {
if (this.hasResult) {
throw Error("Cannot cancel a completed task.");
}
this.action.isCancelled = true; // TODO: fix typo
}

/**
* @hidden
* Attempts to set a value to this timer, given a completed sub-timer
*
* @param child
* The sub-timer that just completed
*/
public trySetValue(child: DFTimerTask): void {
const currentTime = this.orchestrationContext.currentUtcDateTime;
const finalFireTime = this.action.fireAt;
if (finalFireTime > currentTime) {
const nextTimer: DFTimerTask = this.getNextTimerTask(finalFireTime, currentTime);
this.addNewChild(nextTimer);
}
super.trySetValue(child);
}

private getNextTimerTask(finalFireTime: Date, currentTime: Date): DFTimerTask {
const durationUntilFire = moment.duration(moment(finalFireTime).diff(currentTime));
let nextFireTime: Date;
if (durationUntilFire > this.maximumTimerLength) {
nextFireTime = moment(currentTime).add(this.longRunningTimerIntervalDuration).toDate();
} else {
nextFireTime = finalFireTime;
}
return new DFTimerTask(false, new CreateTimerAction(nextFireTime));
}

private addNewChild(childTimer: DFTimerTask): void {
childTimer.parent = this;
this.children.push(childTimer);
this.executor.trackOpenTask(childTimer);
}
}

/**
* @hidden
*
Expand Down
10 changes: 10 additions & 0 deletions src/testingUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
OrchestratorStartedEvent,
} from "./classes";
import { IOrchestrationFunctionContext } from "./iorchestrationfunctioncontext";
import { ReplaySchema } from "./replaySchema";
import { TaskOrchestrationExecutor } from "./taskorchestrationexecutor";

/**
Expand All @@ -33,13 +34,19 @@ export class DummyOrchestrationContext implements IOrchestrationFunctionContext
* @param input The input to the orchestration
* @param currentUtcDateTime The deterministic date at the beginning of orchestration replay
* @param isReplaying Whether the orchestration is to be marked as isReplaying the its first event
* @param longRunningTimerIntervalDuration The duration to break smaller timers into if a long timer exceeds the maximum allowed duration
* @param maximumShortTimerDuration The maximum duration for a timer allowed by the underlying storage infrastructure
* @param schemaVersion The schema version currently used after being negotiated with the extension
* @param parentInstanceId The instanceId of the orchestration's parent, if this is a sub-orchestration
*/
constructor(
instanceId = "",
history: HistoryEvent[] | undefined = undefined,
input: any = undefined,
currentUtcDateTime: Date = new Date(),
longRunningTimerIntervalDuration: string,
maximumShortTimerDuration: string,
schemaVersion: ReplaySchema,
isReplaying = false,
parentInstanceId = ""
) {
Expand All @@ -54,6 +61,9 @@ export class DummyOrchestrationContext implements IOrchestrationFunctionContext
currentUtcDateTime,
isReplaying,
parentInstanceId,
longRunningTimerIntervalDuration,
maximumShortTimerDuration,
schemaVersion,
input,
new TaskOrchestrationExecutor()
);
Expand Down
30 changes: 16 additions & 14 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ export class Utils {
collection: { [index: string]: unknown },
typeInstance: T
): T[] {
return collection && typeInstance
? (Object.keys(collection)
.filter((key: string) => this.hasAllPropertiesOf(collection[key], typeInstance))
.map((key: string) => this.parseTimestampsAsDates(collection[key])) as T[])
: [];
if (collection && typeInstance) {
const candidateObjects = Object.values(collection).filter((value) =>
this.hasAllPropertiesOf(value, typeInstance)
);
this.parseTimestampsAsDates(candidateObjects);
return candidateObjects as T[];
}
return [];
}

public static getHrMilliseconds(times: number[]): number {
Expand Down Expand Up @@ -46,16 +49,15 @@ export class Utils {
return obj.hasOwnProperty(prop);
}

public static parseTimestampsAsDates(obj: unknown): unknown {
if (
typeof obj === "object" &&
obj !== null &&
this.hasOwnProperty(obj, "Timestamp") &&
typeof obj.Timestamp === "string"
) {
obj.Timestamp = new Date(obj.Timestamp);
public static parseTimestampsAsDates(obj: unknown): void {
if (typeof obj === "object" && obj != null) {
if (this.hasOwnProperty(obj, "Timestamp") && typeof obj.Timestamp === "string") {
obj.Timestamp = new Date(obj.Timestamp);
}
Object.values(obj).map((value) => {
this.parseTimestampsAsDates(value);
});
}
return obj;
}

public static hasAllPropertiesOf<T>(obj: unknown, refInstance: T): boolean {
Expand Down