Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 19 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,32 @@ See more in https://github.com/indeedeng/iwf#what-is-iwf
- [x] AnyCommandCompleted and AllCommandCompleted waitingType
- [x] InternalChannel command
- [x] DataAttribute
- [x] Stop workflow API
- [x] Improve workflow uncompleted error return(canceled, failed, timeout, terminated)
- [ ] Support execute API failure policy
- [ ] Improve workflow uncompleted error return(canceled, failed, timeout, terminated)
- [ ] Support workflow RPC
- [ ] Stop workflow API

## Future

- [ ] Support wait_until API failure policy
- [ ] More workflow start options: IdReusePolicy, cron schedule, retry
- [ ] Support caching on persistence
- [ ] Support atomic conditional complete workflow by checking signal/internal channel emptiness
- [ ] Support dynamic data/search attributes and internal/signal channel definition
- [ ] Support state options overridden dynamically
- [ ] Support describe workflow API
- [ ] Support RPC persistence locking policy
- [ ] Signal command
- [ ] Signal workflow API
- [ ] SearchAttribute
- [ ] Get workflow DataAttributes/SearchAttributes API
- [ ] StateExecutionLocal
- [ ] Search workflow API
- [ ] Reset workflow API
- [ ] Reset workflow by stateId/StateExecutionId

## Future
- [ ] Atomic conditional complete workflow by checking signal/internal channel emptiness
- [ ] Dynamic data/search attributes and internal/signal channel definition
- [ ] State options overridden dynamically
- [ ] Describe workflow API
- [ ] TryGetWorkflowResults API
- [ ] Consume N messages in a single command
- [ ] SearchAttribute: keyword
- [ ] New search attribute types: Double, Bool, Datetime, Keyword array, Text
- [ ] Workflow start options: initial search attributes
- [ ] Search workflow API
- [ ] Reset workflow API
- [ ] Skip timer API for testing/operation
- [ ] Decider trigger type: any command combination
- [ ] Support failing workflow with results
- [ ] Failing workflow with results
- [ ] Wait_until API failure policy
- [ ] Caching on persistence
- [ ] Get workflow DataAttributes/SearchAttributes API
- [ ] StateExecutionLocal

### Running iwf-server locally

Expand Down Expand Up @@ -120,7 +118,7 @@ This project uses [openapi-python-client](https://github.com/openapi-generators/
```bash
poetry run openapi-python-client update --path iwf-idl/iwf-sdk.yaml --config .openapi-python-client-config.yaml
```

Then update the version in `iwf-api/pyproject.toml` so that poetry can know that the local package is updated.
#### Linting

To run linting for this project:
Expand Down
2 changes: 1 addition & 1 deletion iwf-api/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "iwf-api"
version = "1.0.0"
version = "1.0.1"
description = "A client library for accessing Workflow APIs"

authors = []
Expand Down
8 changes: 8 additions & 0 deletions iwf/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from iwf.client_options import ClientOptions
from iwf.registry import Registry
from iwf.stop_workflow_options import StopWorkflowOptions
from iwf.unregistered_client import UnregisteredClient, UnregisteredWorkflowOptions
from iwf.workflow import ObjectWorkflow, get_workflow_type_by_class
from iwf.workflow_options import WorkflowOptions
Expand Down Expand Up @@ -93,3 +94,10 @@ def get_simple_workflow_result_with_wait(
return self._unregistered_client.get_simple_workflow_result_with_wait(
workflow_id, "", type_hint
)

def stop_workflow(
self,
workflow_id: str,
options: Optional[StopWorkflowOptions] = None,
):
return self._unregistered_client.stop_workflow(workflow_id, "", options)
1 change: 1 addition & 0 deletions iwf/client_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class ClientOptions:
server_url: str
worker_url: str
object_encoder: ObjectEncoder
api_timeout: int = 60

@classmethod
def local_default(cls):
Expand Down
70 changes: 61 additions & 9 deletions iwf/errors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
from iwf_api.models import ErrorResponse, WorkflowGetResponse
import json as jsonlib

from httpx._utils import guess_json_utf
from iwf_api.models import (
ErrorResponse,
ErrorSubStatus,
WorkflowGetResponse,
WorkflowStatus,
)


class WorkflowDefinitionError(Exception):
Expand Down Expand Up @@ -29,18 +37,22 @@ class WorkflowStillRunningError(ClientSideError):
pass


def process_http_error_get_api(status: int, err_resp: ErrorResponse) -> HttpError:
"""
special handling for 420 for get API
"""
if status == 420:
return WorkflowStillRunningError(status, err_resp)
return process_http_error(status, err_resp)
class WorkflowAlreadyStartedError(ClientSideError):
pass


class WorkflowNotExistsError(ClientSideError):
pass


def process_http_error(status: int, err_resp: ErrorResponse) -> HttpError:
if 400 <= status < 500:
return ClientSideError(status, err_resp)
if err_resp.sub_status == ErrorSubStatus.WORKFLOW_ALREADY_STARTED_SUB_STATUS:
return WorkflowAlreadyStartedError(status, err_resp)
elif err_resp.sub_status == ErrorSubStatus.WORKFLOW_NOT_EXISTS_SUB_STATUS:
return WorkflowNotExistsError(status, err_resp)
else:
return ClientSideError(status, err_resp)
else:
return ServerSideError(status, err_resp)

Expand All @@ -53,3 +65,43 @@ def __init__(self, get_response: WorkflowGetResponse):
self.error_message = get_response.error_message
# TODO add methods to decode the state results into objects
self._state_results = get_response.results


class WorkflowFailed(WorkflowAbnormalExitError):
pass


class WorkflowTimeout(WorkflowAbnormalExitError):
pass


class WorkflowTerminated(WorkflowAbnormalExitError):
pass


class WorkflowCanceled(WorkflowAbnormalExitError):
pass


def process_workflow_abnormal_exit_error(
get_response: WorkflowGetResponse,
) -> WorkflowAbnormalExitError:
status = get_response.workflow_status
if status == WorkflowStatus.CANCELED:
return WorkflowCanceled(get_response)
elif status == WorkflowStatus.FAILED:
return WorkflowFailed(get_response)
elif status == WorkflowStatus.TERMINATED:
return WorkflowTerminated(get_response)
elif status == WorkflowStatus.TIMEOUT:
return WorkflowTimeout(get_response)
return WorkflowAbnormalExitError(get_response)


def parse_unexpected_error(err) -> ErrorResponse:
encoding = guess_json_utf(err.content)
if encoding is not None:
err_dict = jsonlib.loads(err.content.decode(encoding))
else:
err_dict = jsonlib.loads(err.content)
return ErrorResponse.from_dict(err_dict)
116 changes: 116 additions & 0 deletions iwf/tests/test_workflow_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import inspect
import time
import unittest

from iwf_api.models import WorkflowStopType

from iwf.client import Client
from iwf.command_request import CommandRequest, InternalChannelCommand
from iwf.command_results import CommandResults
from iwf.communication import Communication
from iwf.errors import (
WorkflowAlreadyStartedError,
WorkflowCanceled,
WorkflowFailed,
WorkflowNotExistsError,
WorkflowStillRunningError,
WorkflowTerminated,
WorkflowTimeout,
)
from iwf.persistence import Persistence
from iwf.state_decision import StateDecision
from iwf.state_schema import StateSchema
from iwf.stop_workflow_options import StopWorkflowOptions
from iwf.tests.worker_server import registry
from iwf.workflow import ObjectWorkflow
from iwf.workflow_context import WorkflowContext
from iwf.workflow_state import T, WorkflowState

test_channel_name = "test-name"


class WaitState(WorkflowState[None]):
def wait_until(
self,
ctx: WorkflowContext,
input: T,
persistence: Persistence,
communication: Communication,
) -> CommandRequest:
return CommandRequest.for_all_command_completed(
InternalChannelCommand.by_name(test_channel_name)
)

def execute(
self,
ctx: WorkflowContext,
input: T,
command_results: CommandResults,
persistence: Persistence,
communication: Communication,
) -> StateDecision:
return StateDecision.graceful_complete_workflow()


class WaitWorkflow(ObjectWorkflow):
def get_workflow_states(self) -> StateSchema:
return StateSchema.with_starting_state(WaitState())


wf = WaitWorkflow()
registry.add_workflow(wf)
client = Client(registry)


class TestWorkflowErrors(unittest.TestCase):
def test_workflow_timeout(self):
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
client.start_workflow(WaitWorkflow, wf_id, 1)
with self.assertRaises(WorkflowTimeout):
client.get_simple_workflow_result_with_wait(wf_id, str)
with self.assertRaises(WorkflowNotExistsError):
client.get_simple_workflow_result_with_wait("invalid_id", str)

def test_workflow_still_running_when_wait(self):
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
# client_options = ClientOptions.local_default()
# client_options.api_timeout = 5
# TODO using a shorter api timeout will throw a different timeout eror, it's better to unify it
client.start_workflow(WaitWorkflow, wf_id, 61)

with self.assertRaises(WorkflowAlreadyStartedError):
client.start_workflow(WaitWorkflow, wf_id, 61)

with self.assertRaises(WorkflowStillRunningError):
client.get_simple_workflow_result_with_wait(wf_id, str)

def test_workflow_canceled(self):
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
client.start_workflow(WaitWorkflow, wf_id, 10)
client.stop_workflow(wf_id)
with self.assertRaises(WorkflowCanceled):
client.get_simple_workflow_result_with_wait(wf_id, str)

def test_workflow_terminated(self):
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
client.start_workflow(WaitWorkflow, wf_id, 10)
client.stop_workflow(
wf_id,
StopWorkflowOptions(
workflow_stop_type=WorkflowStopType.TERMINATE, reason="test"
),
)
with self.assertRaises(WorkflowTerminated):
client.get_simple_workflow_result_with_wait(wf_id, str)

def test_workflow_failed(self):
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
client.start_workflow(WaitWorkflow, wf_id, 10)
client.stop_workflow(
wf_id,
StopWorkflowOptions(
workflow_stop_type=WorkflowStopType.FAIL, reason="test"
),
)
with self.assertRaises(WorkflowFailed):
client.get_simple_workflow_result_with_wait(wf_id, str)
57 changes: 47 additions & 10 deletions iwf/unregistered_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import http
from dataclasses import dataclass
from http import HTTPStatus
from typing import Any, List, Optional, Type, TypeVar

from iwf_api import Client
from iwf_api import Client, errors
from iwf_api.api.default import (
post_api_v1_workflow_dataobjects_get,
post_api_v1_workflow_reset,
Expand Down Expand Up @@ -42,10 +43,11 @@

from iwf.client_options import ClientOptions
from iwf.errors import (
WorkflowAbnormalExitError,
WorkflowDefinitionError,
WorkflowStillRunningError,
parse_unexpected_error,
process_http_error,
process_http_error_get_api,
process_workflow_abnormal_exit_error,
)
from iwf.reset_workflow_type_and_options import ResetWorkflowTypeAndOptions
from iwf.stop_workflow_options import StopWorkflowOptions
Expand All @@ -64,10 +66,36 @@ class UnregisteredWorkflowOptions:
T = TypeVar("T")


# from https://stackoverflow.com/questions/45028991/best-way-to-extend-httpstatus-with-custom-value
# HERE BE DRAGONS!
# DO NOT do this unless you absolutely have to.
def add_http_status(name, value, phrase, description=""):
# call our new member factory, it's essentially the `HTTPStatus.__new__` method
new_status = HTTPStatus.__new_member__(HTTPStatus, value, phrase, description)
new_status._name_ = name # store the enum's member internal name
new_status.__objclass__ = (
HTTPStatus.__class__
) # store the enum's member parent class
setattr(HTTPStatus, name, new_status) # add it to the global HTTPStatus namespace
HTTPStatus._member_map_[name] = new_status # add it to the name=>member map
HTTPStatus._member_names_.append(
name
) # append the names so it appears in __members__
HTTPStatus._value2member_map_[value] = new_status # add it to the value=>member map


add_http_status("IWF_CUSTOM_ERROR_1", 420, "IWF_CUSTOM_ERROR_1")
add_http_status("IWF_CUSTOM_ERROR_2", 450, "IWF_CUSTOM_ERROR_2")


class UnregisteredClient:
def __init__(self, client_options: ClientOptions):
self.client_options = client_options
self.api_client = Client(base_url=client_options.server_url, timeout=60)
self.api_client = Client(
base_url=client_options.server_url,
timeout=client_options.api_timeout,
raise_on_unexpected_status=True,
)

def start_workflow(
self,
Expand Down Expand Up @@ -125,18 +153,27 @@ def get_simple_workflow_result_with_wait(
workflow_run_id=workflow_run_id,
needs_results=True,
)
response = post_api_v_1_workflow_get_with_wait.sync_detailed(
client=self.api_client,
json_body=request,
)

try:
response = post_api_v_1_workflow_get_with_wait.sync_detailed(
client=self.api_client,
json_body=request,
)
except errors.UnexpectedStatus as err:
err_resp = parse_unexpected_error(err)
if err.status_code == 420:
raise WorkflowStillRunningError(err.status_code, err_resp)
else:
raise RuntimeError(f"unknown error code {err.status_code}")

if response.status_code != http.HTTPStatus.OK:
assert isinstance(response.parsed, ErrorResponse)
raise process_http_error_get_api(response.status_code, response.parsed)
raise process_http_error(response.status_code, response.parsed)

parsed = response.parsed
assert isinstance(parsed, WorkflowGetResponse)
if parsed.workflow_status != WorkflowStatus.COMPLETED:
raise WorkflowAbnormalExitError(parsed)
raise process_workflow_abnormal_exit_error(parsed)

if not parsed.results or len(parsed.results) == 0:
return None
Expand Down
Loading