Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,44 @@ See more in https://github.com/indeedeng/iwf#what-is-iwf

# Development Plan

## 1.0
- [x] Start workflow API
- [x] Executing `wait_until`/`execute` APIs and completing workflow
- [x] Parallel execution of multiple states
- [ ] Timer command
- [x] StateOption: WaitUntil(optional)/Execute API timeout and retry policy
- [x] Get workflow with wait API
- [x] Timer command
- [x] AnyCommandCompleted and AllCommandCompleted waitingType
- [ ] InternalChannel command
- [ ] DataAttribute
- [ ] StateExecutionLocal
- [ ] Search workflow API
- [ ] Cancel workflow API
- [ ] Reset workflow API
- [ ] AnyCommandCompleted waitingType
- [ ] More workflow start options: IdReusePolicy, cron schedule, retry
- [ ] StateOption: WaitUntil/Execute API timeout and retry policy
- [ ] Reset workflow by stateId/StateExecutionId
- [ ] New search attribute types: Double, Bool, Datetime, Keyword array, Text
- [ ] Workflow start options: initial search attributes
- [ ] Skip timer API for testing/operation
- [ ] Decider trigger type: any command combination
- [ ] Support failing workflow with results
- [ ] Support execute API failure policy
- [ ] Improve workflow uncompleted error return(canceled, failed, timeout, terminated)
- [ ] Support PROCEED_ON_FAILURE
- [ ] Support workflow RPC
- [ ] Stop workflow API

## Future

- [ ] Support wait_until API failure policy
- [ ] More workflow start options: IdReusePolicy, cron schedule, retry
- [ ] Support caching on persistence
- [ ] Support atomic conditional complete workflow by checking signal/internal channel emptiness
- [ ] Support dynamic data/search attributes and internal/signal channel definition
- [ ] Support state options overridden dynamically
- [ ] Support describe workflow API
- [ ] Support execute API failure policy
- [ ] Support RPC persistence locking policy
- [ ] Support RPC persistence locking policy
- [ ] Signal command
- [ ] Signal workflow API
- [ ] Get workflow API
- [ ] SearchAttribute
- [ ] Get workflow DataAttributes/SearchAttributes API
- [ ] StateExecutionLocal
- [ ] Search workflow API
- [ ] Reset workflow API
- [ ] Reset workflow by stateId/StateExecutionId
- [ ] New search attribute types: Double, Bool, Datetime, Keyword array, Text
- [ ] Workflow start options: initial search attributes
- [ ] Skip timer API for testing/operation
- [ ] Decider trigger type: any command combination
- [ ] Support failing workflow with results

### Running iwf-server locally

Expand Down
66 changes: 60 additions & 6 deletions iwf/command_request.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,85 @@
import time
from dataclasses import dataclass
from datetime import timedelta
from typing import Optional
from typing import Optional, Union

from iwf_api.models import CommandWaitingType
from iwf_api.models.command_request import CommandRequest as IdlCommandRequest
from iwf_api.models.inter_state_channel_command import (
InterStateChannelCommand as IdlInternalChannelCommand,
)
from iwf_api.models.timer_command import TimerCommand as IdlTimerCommand


@dataclass
class TimerCommand:
command_id: Optional[str]
command_id: str
firing_unix_timestamp_seconds: int

@classmethod
def timer_command_by_duration(
cls, duration: timedelta, command_id: Optional[str] = None
):
return TimerCommand(
command_id, int(time.time()) + int(duration.total_seconds())
command_id if command_id is not None else "",
int(time.time()) + int(duration.total_seconds()),
)


@dataclass
class InternalChannelCommand:
command_id: str
channel_name: str

@classmethod
def by_name(cls, channel_name: str, command_id: Optional[str] = None):
return InternalChannelCommand(
command_id if command_id is not None else "", channel_name
)


BaseCommand = Union[TimerCommand, InternalChannelCommand]


@dataclass
class CommandRequest:
pass
commands: list[BaseCommand]
command_waiting_type: CommandWaitingType

@classmethod
def for_any_command_completed(cls, *commands: BaseCommand):
bc = [c for c in commands]
return CommandRequest(bc, CommandWaitingType.ANY_COMPLETED)

@classmethod
def for_all_command_completed(cls, *commands: BaseCommand):
bc = [c for c in commands]
return CommandRequest(bc, CommandWaitingType.ALL_COMPLETED)

@classmethod
def empty(cls):
return CommandRequest(list(), CommandWaitingType.ALL_COMPLETED)


def _to_idl_command_request(request: CommandRequest) -> IdlCommandRequest:
# TODO
return IdlCommandRequest(command_waiting_type=CommandWaitingType.ALL_COMPLETED)
req = IdlCommandRequest(
command_waiting_type=request.command_waiting_type,
)

timer_commands = [
IdlTimerCommand(t.command_id, t.firing_unix_timestamp_seconds)
for t in request.commands
if isinstance(t, TimerCommand)
]

internal_channel_command = [
IdlInternalChannelCommand(i.command_id, i.channel_name)
for i in request.commands
if isinstance(i, InternalChannelCommand)
]

if len(timer_commands) > 0:
req.timer_commands = timer_commands
if len(internal_channel_command) > 0:
req.inter_state_channel_commands = internal_channel_command
return req
2 changes: 1 addition & 1 deletion iwf/tests/test_basic_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def wait_until(
) -> CommandRequest:
if input != "input":
raise RuntimeError("input is incorrect")
return CommandRequest()
return CommandRequest.empty()

def execute(
self,
Expand Down
58 changes: 58 additions & 0 deletions iwf/tests/test_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import inspect
import time
from datetime import timedelta

from iwf.client import Client
from iwf.command_request import CommandRequest, TimerCommand
from iwf.command_results import CommandResults
from iwf.communication import Communication
from iwf.persistence import Persistence
from iwf.state_decision import StateDecision
from iwf.state_schema import StateSchema
from iwf.tests.worker_server import registry
from iwf.workflow import ObjectWorkflow
from iwf.workflow_context import WorkflowContext
from iwf.workflow_state import T, WorkflowState


class WaitState(WorkflowState[int]):
def wait_until(
self,
ctx: WorkflowContext,
input: int,
persistence: Persistence,
communication: Communication,
) -> CommandRequest:
return CommandRequest.for_all_command_completed(
TimerCommand.timer_command_by_duration(timedelta(seconds=input))
)

def execute(
self,
ctx: WorkflowContext,
input: T,
command_results: CommandResults,
persistence: Persistence,
communication: Communication,
) -> StateDecision:
return StateDecision.graceful_complete_workflow()


class TimerWorkflow(ObjectWorkflow):
def get_workflow_states(self) -> StateSchema:
return StateSchema.with_starting_state(WaitState())


wf = TimerWorkflow()
registry.add_workflow(wf)
client = Client(registry)


def test_timer_workflow():
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"

client.start_workflow(TimerWorkflow, wf_id, 100, 5)
start_ms = time.time_ns() / 1000000
client.get_simple_workflow_result_with_wait(wf_id, None)
elapsed_ms = time.time_ns() / 1000000 - start_ms
assert 4000 <= elapsed_ms <= 7000, f"expected 5000 ms timer, actual is {elapsed_ms}"
12 changes: 6 additions & 6 deletions iwf/unregistered_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
post_api_v_1_workflow_get_with_wait,
)
from iwf_api.models import (
EncodedObject,
ErrorResponse,
IDReusePolicy,
SearchAttribute,
SearchAttributeKeyAndType,
WorkflowConfig,
WorkflowGetDataObjectsRequest,
WorkflowGetRequest,
WorkflowGetResponse,
WorkflowGetSearchAttributesRequest,
WorkflowGetSearchAttributesResponse,
WorkflowResetRequest,
Expand All @@ -34,18 +37,15 @@
WorkflowStateOptions,
WorkflowStatus,
WorkflowStopRequest,
WorkflowGetResponse,
ErrorResponse,
EncodedObject,
)
from iwf_api.types import Response

from iwf.client_options import ClientOptions
from iwf.errors import (
process_http_error,
process_http_error_get_api,
WorkflowAbnormalExitError,
WorkflowDefinitionError,
process_http_error,
process_http_error_get_api,
)
from iwf.reset_workflow_type_and_options import ResetWorkflowTypeAndOptions
from iwf.stop_workflow_options import StopWorkflowOptions
Expand All @@ -67,7 +67,7 @@ class UnregisteredWorkflowOptions:
class UnregisteredClient:
def __init__(self, client_options: ClientOptions):
self.client_options = client_options
self.api_client = Client(base_url=client_options.server_url)
self.api_client = Client(base_url=client_options.server_url, timeout=60)

def start_workflow(
self,
Expand Down