Skip to content

Core SDK: sync/async duplication in Process, unsafe nested event-loop dispatch, and module-level mutable globals #1449

@MervinPraison

Description

@MervinPraison

Summary

In-depth analysis of src/praisonai-agents/praisonaiagents identified three architectural gaps that violate the project's stated engineering principles (DRY, multi-agent + async safe by default, no global singletons). Each is validated with exact file paths, line numbers, and reproduction reasoning.


1. Massive sync/async code duplication in process/process.py — violates DRY principle

What's wrong

Process.workflow() (lines 885–1439, ~555 lines) and Process.aworkflow() (lines 393–767, ~375 lines) are near-identical copy-paste implementations of the same workflow orchestration logic. The same pattern repeats for hierarchical() (lines 1446–1549) vs ahierarchical() (lines 774–884).

The sync version includes additional loop-task file-reading logic (CSV/text parsing, subtask creation) that was added after the initial duplication but never back-ported to the async version, meaning the two code paths have silently diverged.

Evidence

Duplicated workflow-finished checks — identical logic in both methods:

# async version — process.py:442-448
if self.workflow_finished:
    logging.info("Workflow finished early as all tasks are completed.")
    break
if self.workflow_cancelled:
    logging.warning("Workflow has been cancelled, stopping task execution.")
    break

# sync version — process.py:1045-1052 (exact copy)
if self.workflow_finished:
    logging.info("Workflow finished early as all tasks are completed.")
    break
if self.workflow_cancelled:
    logging.warning("Workflow has been cancelled, stopping task execution.")
    break

Duplicated subtask status checking — identical logic at lines 504-548 (async) and 1182-1216 (sync).

Duplicated task context building and loop routing — identical patterns at lines 483-489 (async) and corresponding sync section.

Divergence bug: The sync workflow() (lines 936-1032) includes full CSV/text file reading with csv.reader, Q&A pair handling, and subtask creation logic for loop tasks at runtime. The async aworkflow() has a simpler version of this at lines 415-425 that lacks the same runtime file-reading capabilities — meaning loop tasks with input files behave differently in sync vs async execution.

How to fix

Apply the same "unified async-first with sync bridge" pattern already used successfully in agent/unified_execution_mixin.py:

class Process:
    async def _workflow_impl(self):
        """Single source of truth for all workflow orchestration logic."""
        current_iter = 0
        workflow_start = time.monotonic()
        
        # Build workflow relationships
        for task in self.tasks.values():
            if task.next_tasks:
                for next_task_name in task.next_tasks:
                    next_task = next((t for t in self.tasks.values() if t.name == next_task_name), None)
                    if next_task:
                        next_task.previous_tasks.append(task.name)

        start_task = self._find_start_task()
        self._prepare_loop_tasks(start_task)
        current_task = start_task

        while current_task:
            current_iter += 1
            if current_iter > self.max_iter:
                break
            if self._check_timeout(workflow_start):
                break
            if self.workflow_finished or self.workflow_cancelled:
                break
            if self._check_all_tasks_completed():
                break

            # ... single implementation of task execution, routing, loop handling
            yield current_task.id
            # ... single implementation of next-task resolution

    async def aworkflow(self) -> AsyncGenerator[str, None]:
        async for task_id in self._workflow_impl():
            yield task_id

    def workflow(self):
        # Sync bridge — reuse the same implementation
        gen = self._workflow_impl()
        # Use helper to drive async generator from sync context
        for task_id in _sync_drive_async_gen(gen):
            yield task_id

This eliminates ~550 lines of duplicated code and ensures loop-task file-reading, subtask creation, and routing logic can never diverge between sync and async paths again.


2. Unsafe async callback dispatch — asyncio.run() inside running event loops

What's wrong

In agents/agents.py, async task callbacks are dispatched using a pattern that will crash with RuntimeError: asyncio.run() cannot be called from a running event loop when the code is already inside an async context:

# agents/agents.py:935-941 (first occurrence)
if asyncio.iscoroutinefunction(task.callback):
    try:
        loop = asyncio.get_running_loop()
        loop.create_task(task.callback(task_output))
    except RuntimeError:
        asyncio.run(task.callback(task_output))

# agents/agents.py:1166-1172 (exact duplicate — second occurrence)
if asyncio.iscoroutinefunction(task.callback):
    try:
        loop = asyncio.get_running_loop()
        loop.create_task(task.callback(task_output))
    except RuntimeError:
        asyncio.run(task.callback(task_output))

Problem 1 — Fire-and-forget with no await: When a running loop IS found (line 938), loop.create_task() creates the callback as a fire-and-forget task. The result is never awaited, exceptions are silently swallowed, and the callback may not complete before the next task starts — violating the sequential execution guarantee.

Problem 2 — Crash in sync context called from async: The except RuntimeError branch (line 940-941) calls asyncio.run() which creates a new event loop. But this code path is inside the sync _run_task_agent() method which can itself be called from an async context via akickoff(). In that case asyncio.get_running_loop() succeeds but loop.create_task() may fail for other reasons, and the except branch would then try asyncio.run() inside a running loop — crash.

Problem 3 — Duplicated twice: This exact pattern appears at two locations (lines 935-941 and 1166-1172), meaning any fix must be applied in two places — further evidence of the DRY violation in Issue #1's pattern.

A related issue exists in process/process.py:612-616 where asyncio.Lock() is lazily created via double-checked locking:

# process/process.py:612-616
if self._state_lock is None:
    with self._state_lock_init:          # threading.Lock
        if self._state_lock is None:
            self._state_lock = asyncio.Lock()  # Requires running event loop
async with self._state_lock:

In Python <3.10, asyncio.Lock() required a running event loop. Even in 3.10+, creating it in a sync context and using it in a different async context can bind it to the wrong loop.

How to fix

The project already has a correct pattern in approval/utils.py:run_coroutine_safely() that uses ThreadPoolExecutor to bridge sync→async safely. Apply the same approach for callbacks:

# New utility: agents/agents.py or a shared utils module
def _dispatch_async_callback(callback, *args):
    """Safely dispatch an async callback from any context (sync or async)."""
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None

    if loop and loop.is_running():
        # Already in async context — schedule and return a future we can track
        return loop.create_task(callback(*args))
    else:
        # Sync context — safe to use asyncio.run()
        return asyncio.run(callback(*args))

Then in both callback sites (lines 935 and 1166), replace with:

if task.callback:
    try:
        if asyncio.iscoroutinefunction(task.callback):
            _dispatch_async_callback(task.callback, task_output)
        else:
            task.callback(task_output)
    except Exception as e:
        logger.error(f"Error executing task callback for task {task_id}: {e}")

For the asyncio.Lock() issue in process.py, defer creation to within the async method body:

# process/process.py — fix lazy lock creation
async def _get_state_lock(self):
    """Get or create the async state lock (must be called from async context)."""
    if self._state_lock is None:
        self._state_lock = asyncio.Lock()
    return self._state_lock

# Usage:
lock = await self._get_state_lock()  # Guaranteed to have running loop
async with lock:
    ...

3. Module-level mutable global dicts in agents/agents.py — violates "no global singletons" principle

What's wrong

Four module-level mutable globals manage shared HTTP server state across all AgentTeam instances:

# agents/agents.py:34-38
_agents_server_lock = threading.Lock()
_agents_server_started = {}          # Dict of port -> bool
_agents_registered_endpoints = {}    # Dict of port -> Dict of path -> endpoint_id
_agents_shared_apps = {}             # Dict of port -> FastAPI app

These violate the project's "no global singletons" principle and create three concrete problems:

Problem 1 — Incomplete lock coverage (race condition): Route registration on the shared FastAPI app happens outside the lock:

# agents/agents.py:1725-1728 — OUTSIDE lock
_agents_registered_endpoints[port][path] = endpoint_id  # line 1725 (inside lock above)
# lock released here

@_agents_shared_apps[port].post(path)  # line 1728 — OUTSIDE lock
async def handle_query(request: Request, ...):
    ...

# agents/agents.py:1807 — OUTSIDE lock
@_agents_shared_apps[port].get(f"{path}/list")

# agents/agents.py:1854 — OUTSIDE lock
_agents_shared_apps[port].post(agent_path)(create_agent_handler(agent_instance))

If two AgentTeam instances call launch() concurrently on the same port, routes can be registered on the FastAPI app simultaneously without synchronization, potentially corrupting the app's internal route table.

Problem 2 — TOCTOU race in server startup:

# agents/agents.py:1861-1884
with _agents_server_lock:
    if not _agents_server_started.get(port, False):
        _agents_server_started[port] = True      # Mark started
        should_start_server = True
    else:
        should_start_server = False

if should_start_server:                            # OUTSIDE lock — gap exists
    server_thread = threading.Thread(target=run_server, daemon=True)
    server_thread.start()
    time.sleep(0.5)                                # Hope-based synchronization

The flag is set inside the lock but the actual server start happens outside it. The time.sleep(0.5) is a "hope the server started" pattern — there's no actual readiness check.

Problem 3 — Cross-instance state leakage: Since these are module-level globals, state persists across AgentTeam instances. If one team is garbage-collected but another is created on the same port, the new team inherits stale endpoint registrations and a potentially dead server thread with no way to detect or recover.

How to fix

The codebase already has a correct pattern: agent/agent.py lines 173-236 implement a ServerRegistry class that encapsulates server state with proper locking. Apply the same pattern here:

class _AgentServerRegistry:
    """Encapsulates all shared HTTP server state with proper synchronization."""
    
    def __init__(self):
        self._lock = threading.Lock()
        self._started: Dict[int, bool] = {}
        self._endpoints: Dict[int, Dict[str, str]] = {}
        self._apps: Dict[int, Any] = {}  # FastAPI apps
        self._ready_events: Dict[int, threading.Event] = {}
    
    def get_or_create_app(self, port: int, title: str) -> Any:
        """Thread-safe app creation. Returns (app, is_new)."""
        with self._lock:
            if port not in self._apps:
                from fastapi import FastAPI
                self._apps[port] = FastAPI(title=title)
                self._endpoints[port] = {}
                self._ready_events[port] = threading.Event()
                return self._apps[port], True
            return self._apps[port], False
    
    def register_route(self, port: int, path: str, handler, method: str = "post"):
        """Thread-safe route registration."""
        with self._lock:
            app = self._apps[port]
            getattr(app, method)(path)(handler)
            self._endpoints[port][path] = True
    
    def start_server_if_needed(self, port: int, host: str, **kwargs):
        """Start server with proper readiness signaling."""
        with self._lock:
            if self._started.get(port, False):
                return
            self._started[port] = True
        
        ready_event = self._ready_events[port]
        
        def run_server():
            import uvicorn
            config = uvicorn.Config(self._apps[port], host=host, port=port, **kwargs)
            server = uvicorn.Server(config)
            ready_event.set()  # Signal readiness
            server.run()
        
        thread = threading.Thread(target=run_server, daemon=True)
        thread.start()
        ready_event.wait(timeout=5.0)  # Deterministic wait instead of sleep(0.5)


# Module level — single registry instance (acceptable: it's the lock, not the state)
_server_registry = _AgentServerRegistry()

This consolidates all four globals into a single encapsulated registry with:

  • All route registrations under the lock
  • Deterministic server readiness via threading.Event instead of time.sleep(0.5)
  • Clean separation that prevents cross-instance state leakage
  • Follows the existing ServerRegistry pattern from agent/agent.py

Impact Matrix

Issue Principle Violated Severity Affected Files
Sync/async duplication DRY, single source of truth High — silent feature divergence between sync/async paths process/process.py (~1000 duplicated lines)
Unsafe async callback dispatch Multi-agent + async safe by default Critical — RuntimeError crash in production async workflows agents/agents.py (lines 935-941, 1166-1172), process/process.py (lines 612-616)
Module-level mutable globals No global singletons, multi-agent safe High — race conditions in concurrent launch() calls agents/agents.py (lines 34-38, 1678-1891)

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingclaudeAuto-trigger Claude analysisenhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions