Skip to content

Commit f493b32

Browse files
author
Dmitriy Astapkovich
committed
fix: added run_multiprocess
1 parent 5e37061 commit f493b32

File tree

8 files changed

+149
-51
lines changed

8 files changed

+149
-51
lines changed

examples/example_app.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -89,20 +89,15 @@ def run(self) -> None:
8989
asyncio.run(self._test_async_runtime())
9090

9191

92-
if __name__ == "__main__":
93-
app.add_worker(
94-
"worker_1",
95-
"task_q_1",
96-
activities=[test_boost_activity_1, test_boost_activity_3],
97-
)
98-
app.add_worker("worker_2", "task_q_2", activities=[test_boost_activity_2])
99-
100-
boost_worker = app.add_worker("worker_3", "task_q_3", workflows=[MyWorkflow])
101-
boost_worker.configur_temporal_client(use_pydantic_data_converter=True)
102-
boost_worker.configur_temporal_runtime(prometheus_bind_address="0.0.0.0:8801")
103-
104-
app.add_asgi_worker("asgi_worker", fastapi_app, "0.0.0.0", 8001)
105-
106-
app.add_exec_method_sync("migrate_db", fake_db_migration)
92+
app.add_worker(
93+
"worker_1",
94+
"task_q_1",
95+
activities=[test_boost_activity_1, test_boost_activity_3],
96+
)
97+
app.add_worker("worker_2", "task_q_2", activities=[test_boost_activity_2])
98+
boost_worker = app.add_worker("worker_3", "task_q_3", workflows=[MyWorkflow])
99+
boost_worker.configure_temporal_client(use_pydantic_data_converter=True)
100+
boost_worker.configur_temporal_runtime(prometheus_bind_address="0.0.0.0:8801")
107101

108-
app.run()
102+
app.add_asgi_worker("asgi_worker", fastapi_app, "0.0.0.0", 8001)
103+
app.add_exec_method_sync("migrate_db", fake_db_migration)

pyproject.toml

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,36 @@
11
[tool.poetry]
2-
name = "temporal-boost"
3-
version = "1.2.0"
4-
description = "Small framework for Temporal development"
52
authors = [
6-
"northpowered <[email protected]>",
7-
"dmastapkovich <[email protected]>",
3+
"northpowered <[email protected]>",
4+
"dmastapkovich <[email protected]>",
85
]
6+
description = "Small framework for Temporal development"
7+
keywords = ["temporal", "framework", "development", "python"]
98
license = "MIT"
10-
readme = "README.md"
9+
name = "temporal-boost"
1110
packages = [{include = "temporal_boost"}]
11+
readme = "README.md"
1212
repository = "https://github.com/northpowered/temporal-boost"
13-
keywords = ["temporal", "framework", "development", "python"]
13+
version = "1.2.0"
1414

1515
classifiers = [
16-
"Programming Language :: Python :: 3",
17-
"Programming Language :: Python :: 3.10",
18-
"Programming Language :: Python :: 3.11",
19-
"Programming Language :: Python :: 3.12",
20-
"Programming Language :: Python :: 3.13",
21-
"License :: OSI Approved :: MIT License",
22-
"Operating System :: OS Independent",
16+
"Programming Language :: Python :: 3",
17+
"Programming Language :: Python :: 3.10",
18+
"Programming Language :: Python :: 3.11",
19+
"Programming Language :: Python :: 3.12",
20+
"Programming Language :: Python :: 3.13",
21+
"License :: OSI Approved :: MIT License",
22+
"Operating System :: OS Independent",
2323
]
2424

2525
[tool.poetry.dependencies]
26-
python = "^3.10.0"
27-
typer = "^0.15.0"
28-
temporalio = "^1.10.0"
26+
granian = {version = "^2.0.0", optional = true}
27+
hypercorn = {version = "^0.17.0", optional = true}
2928
pydantic = "^2.10.0"
29+
python = "^3.10.0"
3030
pyyaml = "^6.0.0"
31+
temporalio = "^1.10.0"
32+
typer = "^0.15.0"
3133
uvicorn = {version = "^0.30.0", optional = true}
32-
granian = {version = "^2.0.0", optional = true}
33-
hypercorn = {version = "^0.17.0", optional = true}
3434
uvloop = {version = "^0.21.0", optional = true}
3535

3636
[tool.poetry.extras]
@@ -40,6 +40,9 @@ hypercorn = ["hypercorn"]
4040
uvicorn = ["uvicorn"]
4141
uvloop = ["uvloop"]
4242

43+
[tool.poetry.scripts]
44+
temporal-boost = "temporal_boost.__main__:cli"
45+
4346
[tool.poetry.group.dev.dependencies]
4447
fastapi = "^0.110.0"
4548
pre-commit = "^3.8.0"

temporal_boost/__main__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from temporal_boost.cli.runner import cli
2+
3+
4+
if __name__ == "__main__":
5+
cli()

temporal_boost/boost_app.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def __init__( # noqa: PLR0913
3939
self._name: str = name or "temporal_generic_service"
4040
self._loop_impl = loop_impl
4141

42-
self._global_temporal_endpointe = temporal_endpoint
42+
self._global_temporal_endpoint = temporal_endpoint
4343
self._global_temporal_namespace = temporal_namespace
4444
self._global_use_pydantic = use_pydantic
4545

@@ -88,13 +88,13 @@ def add_worker( # noqa: PLR0913
8888
debug_mode=self._debug_mode,
8989
**worker_kwargs,
9090
)
91-
worker.configur_temporal_client(
92-
target_host=self._global_temporal_endpointe,
91+
worker.configure_temporal_client(
92+
target_host=self._global_temporal_endpoint,
9393
namespace=self._global_temporal_namespace,
9494
)
9595

9696
if self._global_use_pydantic is not None:
97-
worker.configur_temporal_client(use_pydantic_data_converter=self._global_use_pydantic)
97+
worker.configure_temporal_client(use_pydantic_data_converter=self._global_use_pydantic)
9898

9999
self._run_typer.command(name=worker_name)(worker.run)
100100

@@ -147,9 +147,9 @@ def add_async_runtime(self, worker_name: str, boost_worker: TemporalBoostWorker)
147147
self._registered_workers.append(boost_worker)
148148
logger.info(f"Async runtime {worker_name} was registered in CLI")
149149

150-
def run(self) -> None:
150+
def run(self, *args: Any, **kwargs: Any) -> None:
151151
self._loop = loops.get(self._loop_impl)
152-
self._loop.run_until_complete(self._root_typer())
152+
self._loop.run_until_complete(self._root_typer(*args, **kwargs))
153153

154154
@staticmethod
155155
def _configure_logging(log_config: dict[str, Any] | Path | None) -> None:

temporal_boost/cli/__init__.py

Whitespace-only changes.

temporal_boost/cli/runner.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import importlib
2+
import logging
3+
from multiprocessing import Process
4+
from typing import TYPE_CHECKING, Any
5+
6+
import typer
7+
8+
9+
if TYPE_CHECKING:
10+
from types import ModuleType
11+
12+
13+
logger = logging.getLogger(__name__)
14+
cli = typer.Typer(help="CLI runner for BoostApp services")
15+
16+
17+
def _import_app_object(app_path: str) -> Any:
18+
if ":" not in app_path:
19+
raise typer.BadParameter("App path must be in format 'module.submodule:app'")
20+
module_path, app_attr = app_path.split(":")
21+
module: ModuleType = importlib.import_module(module_path)
22+
app = getattr(module, app_attr, None)
23+
if app is None:
24+
raise typer.BadParameter(f"No '{app_attr}' found in module '{module_path}'")
25+
return app
26+
27+
28+
def _run_single(app_path: str, run_args: list[str]) -> None:
29+
app = _import_app_object(app_path)
30+
app.run(run_args)
31+
32+
33+
def _run_multiprocess(app_path: str, processes: int, run_args: list[str]) -> None:
34+
process_list: list[Process] = []
35+
for index in range(processes):
36+
process = Process(target=_run_single, args=(app_path, run_args), name=f"worker-{index}")
37+
process_list.append(process)
38+
process.start()
39+
for process in process_list:
40+
process.join()
41+
42+
43+
@cli.command("run")
44+
def run(
45+
app: str = typer.Argument(..., help="Path to BoostApp, e.g. 'my_app.app:app'"),
46+
workers: int = typer.Option(1, "--workers", "-w", help="Number of processes to run"),
47+
run_args: list[str] = typer.Argument(None, help="Additional arguments to pass to app.run()", show_default=False), # noqa: B008
48+
) -> None:
49+
additional_args = run_args or []
50+
if workers > 1:
51+
logger.info(f"Starting {workers} processes for app '{app}' with arguments: {additional_args}")
52+
_run_multiprocess(app, workers, additional_args)
53+
else:
54+
_run_single(app, additional_args)
55+
56+
57+
if __name__ == "__main__":
58+
cli()

temporal_boost/temporal/runtime.py

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import logging
2+
import socket
13
from collections.abc import Mapping
24

35
from temporalio.runtime import (
@@ -12,6 +14,16 @@
1214
from temporal_boost.temporal import config
1315

1416

17+
logger = logging.getLogger(__name__)
18+
19+
20+
def is_port_in_use(host: str, port: int) -> bool:
21+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
22+
sock.settimeout(1.0)
23+
result = sock.connect_ex((host, port))
24+
return result == 0
25+
26+
1527
def create_runtime( # noqa: PLR0913
1628
*,
1729
logging: LoggingConfig | None = None,
@@ -30,17 +42,42 @@ def create_runtime( # noqa: PLR0913
3042
global_tags = {}
3143

3244
if metrics is None and prometheus_bind_address is not None:
33-
metrics = PrometheusConfig(
34-
bind_address=prometheus_bind_address,
35-
counters_total_suffix=prometheus_counters_total_suffix or False,
36-
unit_suffix=prometheus_unit_suffix or False,
37-
durations_as_seconds=prometheus_durations_as_seconds or False,
38-
)
45+
try:
46+
host, port_str = prometheus_bind_address.split(":")
47+
port = int(port_str)
48+
except Exception as exc:
49+
raise ValueError("Invalid prometheus_bind_address format, expected 'host:port'") from exc
50+
51+
if is_port_in_use(host, port):
52+
logger.warning(f"Port {port} on {host} is already in use. Disabling Prometheus metrics.")
53+
metrics = None
54+
else:
55+
metrics = PrometheusConfig(
56+
bind_address=prometheus_bind_address,
57+
counters_total_suffix=prometheus_counters_total_suffix or False,
58+
unit_suffix=prometheus_unit_suffix or False,
59+
durations_as_seconds=prometheus_durations_as_seconds or False,
60+
)
61+
3962
telemetry_config = TelemetryConfig(
4063
logging=logging,
4164
metrics=metrics,
4265
global_tags=global_tags,
4366
attach_service_name=attach_service_name,
4467
metric_prefix=metric_prefix,
4568
)
46-
return Runtime(telemetry=telemetry_config)
69+
try:
70+
return Runtime(telemetry=telemetry_config)
71+
except ValueError as err:
72+
err_msg = str(err)
73+
if "Address already in use" in err_msg:
74+
logger.warning("Prometheus exporter port is already in use. Disabling metrics for this process.")
75+
telemetry_config_no_metrics = TelemetryConfig(
76+
logging=logging,
77+
metrics=None,
78+
global_tags=global_tags,
79+
attach_service_name=attach_service_name,
80+
metric_prefix=metric_prefix,
81+
)
82+
return Runtime(telemetry=telemetry_config_no_metrics)
83+
raise

temporal_boost/workers/temporal.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def __init__( # noqa: PLR0913
7070
def temporal_client(self) -> Client:
7171
if not self._client:
7272
raise RuntimeError(
73-
"Temporal client has not been initialized. Ensure 'configur_temporal_client()' has been called"
73+
"Temporal client has not been initialized. Ensure 'configure_temporal_client()' has been called"
7474
)
7575
return self._client
7676

@@ -92,7 +92,7 @@ def temporal_client_runtime(self) -> Runtime:
9292
self.configur_temporal_runtime()
9393
return cast("Runtime", self._runtime)
9494

95-
def configur_temporal_client( # noqa: PLR0913
95+
def configure_temporal_client( # noqa: PLR0913
9696
self,
9797
*,
9898
target_host: str | None = None,
@@ -161,7 +161,7 @@ def configur_temporal_runtime( # noqa: PLR0913
161161

162162
async def _build_worker(self) -> None:
163163
if not self._client_builder:
164-
self.configur_temporal_client()
164+
self.configure_temporal_client()
165165
self._client_builder = cast("TemporalClientBuilder", self._client_builder)
166166

167167
self._client = await self._client_builder.build()

0 commit comments

Comments
 (0)