Skip to content

Add the ability to use Python files with inline-defined task metadata #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@
https://supertask.readthedocs.io/
- Refactored data model and CLI
- Removed HTTP service
- Added the ability to use Python files including inline-defined task metadata, e.g.
`supertask run my_task.py`
89 changes: 89 additions & 0 deletions examples/contrib/cratedb_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env python3
import logging
import os
import sys
import typing as t

# /// script
# requires-python = ">=3.9"
# dependencies = [
# "cratedb-toolkit",
# "sqlalchemy-cratedb",
# "tqdm",
# ]
# ///
# /// task
# cron = "*/5 * * * * * *"
# [env]
# DATABASE_URL = "crate://crate@localhost:4200/"
# [options]
# schemas = ["foo", "bar"]
# table_prefixes = ["tmp_", "temp_"]
# ///
import sqlalchemy as sa
from cratedb_toolkit.model import TableAddress

logger = logging.getLogger(__name__)


class DatabaseCleanupTask:
"""
A task definition to clean up temporary tables in a database.
"""

def __init__(self, schemas: t.List[str] = None, table_prefixes: t.List[str] = None):
self.schemas = schemas
self.table_prefixes = table_prefixes
database_url = os.getenv("DATABASE_URL")
if database_url is None:
raise ValueError("Database URL environment variable is not set: DATABASE_URL")
self.engine = sa.create_engine(os.getenv("DATABASE_URL"), echo=True)
Comment on lines +34 to +40
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid redundant environment variable lookup

You're calling os.getenv("DATABASE_URL") twice - once to check if it's None and again to create the engine. This is inefficient and could lead to inconsistency if the environment changes between calls.

def __init__(self, schemas: t.List[str] = None, table_prefixes: t.List[str] = None):
    self.schemas = schemas
    self.table_prefixes = table_prefixes
    database_url = os.getenv("DATABASE_URL")
    if database_url is None:
        raise ValueError("Database URL environment variable is not set: DATABASE_URL")
-    self.engine = sa.create_engine(os.getenv("DATABASE_URL"), echo=True)
+    self.engine = sa.create_engine(database_url, echo=True)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(self, schemas: t.List[str] = None, table_prefixes: t.List[str] = None):
self.schemas = schemas
self.table_prefixes = table_prefixes
database_url = os.getenv("DATABASE_URL")
if database_url is None:
raise ValueError("Database URL environment variable is not set: DATABASE_URL")
self.engine = sa.create_engine(os.getenv("DATABASE_URL"), echo=True)
def __init__(self, schemas: t.List[str] = None, table_prefixes: t.List[str] = None):
self.schemas = schemas
self.table_prefixes = table_prefixes
database_url = os.getenv("DATABASE_URL")
if database_url is None:
raise ValueError("Database URL environment variable is not set: DATABASE_URL")
self.engine = sa.create_engine(database_url, echo=True)


def run(self) -> None:
"""
Inquire relevant table addresses and clean up temporary tables.
"""
with self.engine.connect() as conn:
for table in self.table_addresses:
sql = f"DROP TABLE IF EXISTS {table.fullname}"
logger.info(f"Dropping table {table.fullname}: {sql}")
conn.execute(sa.text(sql))

Comment on lines +42 to +51
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for database operations

The run method doesn't handle potential database connection or query execution errors. Since this is a cleanup task, it should be resilient to failures.

def run(self) -> None:
    """
    Inquire relevant table addresses and clean up temporary tables.
    """
-    with self.engine.connect() as conn:
-        for table in self.table_addresses:
-            sql = f"DROP TABLE IF EXISTS {table.fullname}"
-            logger.info(f"Dropping table {table.fullname}: {sql}")
-            conn.execute(sa.text(sql))
+    try:
+        with self.engine.connect() as conn:
+            for table in self.table_addresses:
+                sql = f"DROP TABLE IF EXISTS {table.fullname}"
+                logger.info(f"Dropping table {table.fullname}: {sql}")
+                try:
+                    conn.execute(sa.text(sql))
+                    logger.info(f"Successfully dropped table {table.fullname}")
+                except Exception as e:
+                    logger.error(f"Failed to drop table {table.fullname}: {e}")
+    except Exception as e:
+        logger.error(f"Database connection error: {e}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def run(self) -> None:
"""
Inquire relevant table addresses and clean up temporary tables.
"""
with self.engine.connect() as conn:
for table in self.table_addresses:
sql = f"DROP TABLE IF EXISTS {table.fullname}"
logger.info(f"Dropping table {table.fullname}: {sql}")
conn.execute(sa.text(sql))
def run(self) -> None:
"""
Inquire relevant table addresses and clean up temporary tables.
"""
try:
with self.engine.connect() as conn:
for table in self.table_addresses:
sql = f"DROP TABLE IF EXISTS {table.fullname}"
logger.info(f"Dropping table {table.fullname}: {sql}")
try:
conn.execute(sa.text(sql))
logger.info(f"Successfully dropped table {table.fullname}")
except Exception as e:
logger.error(f"Failed to drop table {table.fullname}: {e}")
except Exception as e:
logger.error(f"Database connection error: {e}")

@property
def table_addresses(self) -> t.List[TableAddress]:
"""
Table addresses selected by filter.

TODO: Elaborate with `include` vs. `exclude` selectors?
TODO: Q: How to make the current prefix match (`table_prefixes`) more advanced?
A: Just use regexes, or provide other wildcard schemes?
TODO: Possibly refactor to stdlib or CrateDB Toolkit.
"""
inspector = sa.inspect(self.engine)
bucket: t.List[TableAddress] = []
for schema in inspector.get_schema_names():
if schema in self.schemas:
tables = inspector.get_table_names(schema=schema)
for table in tables:
for prefix in self.table_prefixes:
if table.startswith(prefix):
bucket.append(TableAddress(schema=schema, table=table))
return bucket


def run(**kwargs):
logging.basicConfig(level=logging.INFO, handlers=[sys.stderr])
task = DatabaseCleanupTask(**kwargs)
task.run()
Comment on lines +74 to +77
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding a dry run mode for safety

For a cleanup task that drops tables, having a confirmation or dry run mode would be a good safety feature, especially when used in production environments.

def run(**kwargs):
    logging.basicConfig(level=logging.INFO, handlers=[sys.stderr])
+   dry_run = kwargs.pop("dry_run", False)
+   if dry_run:
+       logger.info("Running in dry run mode - no tables will be dropped")
    task = DatabaseCleanupTask(**kwargs)
+   if dry_run:
+       logger.info("Tables that would be dropped:")
+       for table in task.table_addresses:
+           logger.info(f"  - {table.fullname}")
+       return
    task.run()

Committable suggestion skipped: line range outside the PR's diff.



if __name__ == "__main__":
"""
crash -c "create table testdrive.tmp_foo (id int)"
export DATABASE_URL=crate://crate@localhost:4200/
python examples/contrib/cratedb_cleanup.py
"""
run(
schemas=["testdrive"],
table_prefixes=["tmp_", "temp_"],
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ dependencies = [
"icecream<3",
"jinja2<4",
"markupsafe<4",
"pueblo[fileio]",
"pueblo[fileio,sfa-full]",
"pydantic>=2,<3",
"python-dotenv[cli]<2",
"python-multipart==0.0.20",
Expand Down
10 changes: 10 additions & 0 deletions supertask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from apscheduler.util import ref_to_obj
from halo import Halo
from icecream import ic
from pueblo.sfa.core import ApplicationAddress, SingleFileApplication

from supertask.model import JobStore, Settings, Task
from supertask.store.cratedb import CrateDBSQLAlchemyJobStore
Expand Down Expand Up @@ -149,6 +150,15 @@
func = ref_to_obj(step.run)
retval = func(*step.args, **step.kwargs)
logger.info(f"Result: {retval}")
elif step.uses == "python-file":

Check warning on line 153 in supertask/core.py

View check run for this annotation

Codecov / codecov/patch

supertask/core.py#L153

Added line #L153 was not covered by tests
# TODO: Refactor into single-line invocation when possible.
address = ApplicationAddress.from_spec(step.run)
app = SingleFileApplication(address=address)
app.load_any()
app.import_module()
app._entrypoint = getattr(app._module, "run", None)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add explicit check for missing run function.

Currently, if the Python file doesn't have a run function, app._entrypoint will be set to None. When executed, this will likely cause an error without a clear explanation.

-                app._entrypoint = getattr(app._module, "run", None)
+                app._entrypoint = getattr(app._module, "run", None)
+                if app._entrypoint is None:
+                    raise RuntimeError(f"Python file {step.run} does not define a 'run' function")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
app._entrypoint = getattr(app._module, "run", None)
app._entrypoint = getattr(app._module, "run", None)
if app._entrypoint is None:
raise RuntimeError(f"Python file {step.run} does not define a 'run' function")

retval = app.run(*step.args, **step.kwargs)
logger.info(f"Result: {retval}")

Check warning on line 161 in supertask/core.py

View check run for this annotation

Codecov / codecov/patch

supertask/core.py#L155-L161

Added lines #L155 - L161 were not covered by tests
Comment on lines +153 to +161
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing test coverage for critical execution path.

This new execution path for handling Python files with inline metadata has no test coverage according to static analysis. Since this is a core feature, comprehensive tests should be added to verify its behavior.

#!/bin/bash
# Check if there are any tests for the Python file execution functionality
fd "test_.*\.py" | xargs grep -l "python-file" || echo "No tests found for python-file step type"

Consider adding tests that:

  1. Verify successful execution of a Python file with a run function
  2. Test behavior when the run function is missing
  3. Test error handling during module loading or execution
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 153-153: supertask/core.py#L153
Added line #L153 was not covered by tests


[warning] 155-161: supertask/core.py#L155-L161
Added lines #L155 - L161 were not covered by tests

else:
raise RuntimeError(f"Unknown step type: {step.uses}")

Expand Down
40 changes: 34 additions & 6 deletions supertask/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import hashlib
import json
import logging
import os
import re
import socket
import typing as t
Expand All @@ -14,6 +15,8 @@
from pueblo.io import to_io
from pydantic import BaseModel, Field

from supertask.util import read_inline_script_metadata

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -118,7 +121,7 @@
uses: str
run: str
args: t.List[ScalarType] = Field(default_factory=list)
kwargs: t.Dict[str, ScalarType] = Field(default_factory=dict)
kwargs: t.Dict[str, t.Any] = Field(default_factory=dict)
if_: bool = Field(alias="if", default=True)


Expand All @@ -135,15 +138,15 @@
Manage information about a whole timetable, including multiple task definitions.
"""

meta: t.Dict[str, t.Any]
tasks: t.List[Task]
meta: t.Dict[str, t.Any] = Field(default_factory=dict)
tasks: t.List[Task] = Field(default_factory=list)

NAMESPACE_ATTRIBUTE: t.ClassVar = "namespace"
SOURCE_ATTRIBUTE: t.ClassVar = "taskfile"

def model_post_init(self, __context: t.Any) -> None:
"""
Adjust model after initialization.
Adjust the model after initialization.
"""
# If the timetable file or resource does not provide a namespace identifier, provide an ephemeral one.
if self.NAMESPACE_ATTRIBUTE not in self.meta or not self.meta[self.NAMESPACE_ATTRIBUTE]:
Expand Down Expand Up @@ -174,7 +177,7 @@
@classmethod
def load(cls, taskfile: str):
"""
Load task definitions from file or resource.
Load task definitions from a file or resource.
"""
logger.info(f"Loading task(s) from file. Source: {taskfile}")

Expand All @@ -184,12 +187,37 @@
elif taskfile.endswith(".yaml") or taskfile.endswith(".yml"):
# Use YAML 1.2 compliant loading, otherwise "on" will be translated to `True`, for example.
data = yaml.load(f, Loader=yamlcore.CoreLoader) # noqa: S506
elif taskfile.endswith(".py"):
return cls.from_python(taskfile)

Check warning on line 191 in supertask/model.py

View check run for this annotation

Codecov / codecov/patch

supertask/model.py#L190-L191

Added lines #L190 - L191 were not covered by tests
else:
raise NotImplementedError(f"Task or timetable file type not supported: {taskfile}")
data.setdefault("meta", {})
data["meta"][cls.SOURCE_ATTRIBUTE] = taskfile
return cls(**data)

@classmethod
def from_python(cls, pythonfile: str):
tt = cls()
pythonfile_path = Path(pythonfile)
tt.meta[cls.SOURCE_ATTRIBUTE] = pythonfile
task_data = read_inline_script_metadata("task", pythonfile_path.read_text())
os.environ.update(task_data.get("env", {}))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Update environment variables with caution

Directly updating os.environ with values from the task metadata could have unintended side effects on the execution environment, especially if multiple tasks are loaded with conflicting environment variables.

Consider these alternatives:

  1. Add a configuration option to control whether environment variables should be updated
  2. Maintain task-specific environment variables instead of modifying the global environment
  3. Log a warning when environment variables are being modified
-os.environ.update(task_data.get("env", {}))
+# Option 1: Add configuration control
+env_vars = task_data.get("env", {})
+if env_vars:
+    logger.warning(f"Updating environment variables from {pythonfile}: {list(env_vars.keys())}")
+    if cls.ALLOW_ENV_UPDATES:  # Add this as a class variable
+        os.environ.update(env_vars)
+    else:
+        logger.warning("Environment variable updates are disabled. Enable with ALLOW_ENV_UPDATES=True")

Committable suggestion skipped: line range outside the PR's diff.

tt.tasks.append(

Check warning on line 204 in supertask/model.py

View check run for this annotation

Codecov / codecov/patch

supertask/model.py#L199-L204

Added lines #L199 - L204 were not covered by tests
Task(
meta=TaskMetadata(id="python", name=pythonfile_path.stem, description="TODO", enabled=True),
on=Event(schedule=[ScheduleItem(cron=task_data["cron"])]),
steps=[
Step(
name=pythonfile_path.stem,
uses="python-file",
run=f"{pythonfile_path}:run",
args=[],
kwargs=task_data.get("options", {}),
),
],
)
)
return tt

Check warning on line 219 in supertask/model.py

View check run for this annotation

Codecov / codecov/patch

supertask/model.py#L219

Added line #L219 was not covered by tests
Comment on lines +199 to +219
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Add unit tests for the new Python file task loading functionality

Static analysis indicates that the new Python file loading functionality isn't covered by tests, which is essential for ensuring reliability.

Run the following script to check current test coverage for Python file loading:

Would you like me to create a sample test file for this functionality?


🏁 Script executed:

#!/bin/bash
# Check current test coverage for Python file tasks

# Look for existing tests for Python file tasks
echo "Checking for existing tests for Python file tasks..."
rg -i "python-file|from_python" --type py "tests/"

# Check if there are any test fixtures for Python file tasks
echo "Checking for test fixtures for Python file tasks..."
rg -i "\.py'" --type py "tests/" | rg "fixture|mock"

Length of output: 334


Add unit tests for Python file task loading

I didn’t find any existing tests in tests/ that cover the new Python‑file loader in supertask/model.py (lines 199–219). Please add tests that:

  • Invoke the class‑method which reads a .py file’s inline metadata and returns the task container.
  • Verify that os.environ is updated with the env entries from your inline metadata.
  • Assert the created Task has the correct meta (id/name), on.schedule (cron), and steps (uses="python‑file", proper run path, args, kwargs).

You can introduce a new test file, e.g. tests/test_python_file_task_loader.py, to cover these scenarios.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 199-204: supertask/model.py#L199-L204
Added lines #L199 - L204 were not covered by tests


[warning] 219-219: supertask/model.py#L219
Added line #L219 was not covered by tests

Comment on lines +197 to +219
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Improve task metadata extraction and error handling

The from_python method needs improvements in error handling, task identification, and description generation.

@classmethod
def from_python(cls, pythonfile: str):
    tt = cls()
    pythonfile_path = Path(pythonfile)
    tt.meta[cls.SOURCE_ATTRIBUTE] = pythonfile
-   task_data = read_inline_script_metadata("task", pythonfile_path.read_text())
+   try:
+       content = pythonfile_path.read_text()
+       task_data = read_inline_script_metadata("task", content)
+       if not task_data:
+           raise ValueError(f"No task metadata found in {pythonfile}")
+       if "cron" not in task_data:
+           raise ValueError(f"Missing required 'cron' field in task metadata in {pythonfile}")
+   except Exception as e:
+       logger.error(f"Failed to read or parse task metadata from {pythonfile}: {e}")
+       raise
+   
    os.environ.update(task_data.get("env", {}))
+   
+   # Extract docstring as description if available
+   description = "Python task"
+   docstring_pattern = r'^"""(.*?)"""'
+   docstring_match = re.search(docstring_pattern, content, re.DOTALL)
+   if docstring_match:
+       docstring = docstring_match.group(1).strip()
+       description = docstring.split("\n")[0] if docstring else description
+   
+   # Generate a unique task ID based on file path
+   task_id = hashlib.md5(str(pythonfile_path.absolute()).encode()).hexdigest()[:8]
+   
    tt.tasks.append(
        Task(
-           meta=TaskMetadata(id="python", name=pythonfile_path.stem, description="TODO", enabled=True),
+           meta=TaskMetadata(id=f"python-{task_id}", name=pythonfile_path.stem, description=description, enabled=True),
            on=Event(schedule=[ScheduleItem(cron=task_data["cron"])]),
            steps=[
                Step(
                    name=pythonfile_path.stem,
                    uses="python-file",
                    run=f"{pythonfile_path}:run",
                    args=[],
                    kwargs=task_data.get("options", {}),
                ),
            ],
        )
    )
    return tt

Let's verify our implementation improvements with a script:


🏁 Script executed:

#!/bin/bash
# Check if the extraction of task metadata is working as expected

# Find all Python files with task metadata
echo "Checking Python files for task metadata..."
rg -l "# /// task" --type py

# Check if any Python files have multiple task blocks (which would be invalid)
echo "Checking for invalid multiple task blocks..."
rg -l "# /// task.*# /// task" --type py

# Check if any Python files are missing the cron field in task metadata
echo "Checking for missing cron field..."
files_with_task=$(rg -l "# /// task" --type py)
for file in $files_with_task; do
    if ! rg -q "cron\s*=" "$file"; then
        echo "WARNING: $file has task metadata but appears to be missing a cron field"
    fi
done

Length of output: 554


Enhance from_python in supertask/model.py with robust metadata parsing and task identification

The current implementation reads inline metadata without validation, uses a fixed task ID and placeholder description. Let’s improve resilience and generate meaningful task identifiers and descriptions.

Key changes to apply in supertask/model.py (lines ~197–219):

  • Wrap read_inline_script_metadata call in try/except, logging and re‑raising on missing or malformed metadata.
  • Explicitly check for the required "cron" field and error out if absent.
  • Read the file’s top‑level docstring and use its first line as the task description (fallback to a default).
  • Compute a unique task ID (e.g. first 8 chars of md5 of the absolute file path) instead of a constant "python".
  • Update TaskMetadata(id=…, description=…) accordingly.

Proposed diff:

@@ classmethod def from_python(cls, pythonfile: str):
-   task_data = read_inline_script_metadata("task", pythonfile_path.read_text())
+   try:
+       content = pythonfile_path.read_text()
+       task_data = read_inline_script_metadata("task", content)
+       if not task_data:
+           raise ValueError(f"No task metadata found in {pythonfile}")
+       if "cron" not in task_data:
+           raise ValueError(f"Missing required 'cron' field in task metadata in {pythonfile}")
+   except Exception as e:
+       logger.error(f"Failed to read or parse task metadata from {pythonfile}: {e}")
+       raise
+
+   # Extract first line of top‑level docstring as description
+   description = "Python task"
+   match = re.search(r'^"""(.*?)"""', content, re.DOTALL)
+   if match:
+       desc = match.group(1).strip().split("\n", 1)[0]
+       if desc:
+           description = desc
+
+   # Generate unique task ID from file path
+   task_id = hashlib.md5(str(pythonfile_path.absolute()).encode()).hexdigest()[:8]
@@
-   tt.tasks.append(
-       Task(
-           meta=TaskMetadata(id="python", name=pythonfile_path.stem, description="TODO", enabled=True),
+   tt.tasks.append(Task(
+       meta=TaskMetadata(
+           id=f"python-{task_id}",
+           name=pythonfile_path.stem,
+           description=description,
+           enabled=True
+       ),
        on=Event(schedule=[ScheduleItem(cron=task_data["cron"])]),
        steps=[ ... ],
    ))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@classmethod
def from_python(cls, pythonfile: str):
tt = cls()
pythonfile_path = Path(pythonfile)
tt.meta[cls.SOURCE_ATTRIBUTE] = pythonfile
task_data = read_inline_script_metadata("task", pythonfile_path.read_text())
os.environ.update(task_data.get("env", {}))
tt.tasks.append(
Task(
meta=TaskMetadata(id="python", name=pythonfile_path.stem, description="TODO", enabled=True),
on=Event(schedule=[ScheduleItem(cron=task_data["cron"])]),
steps=[
Step(
name=pythonfile_path.stem,
uses="python-file",
run=f"{pythonfile_path}:run",
args=[],
kwargs=task_data.get("options", {}),
),
],
)
)
return tt
@classmethod
def from_python(cls, pythonfile: str):
tt = cls()
pythonfile_path = Path(pythonfile)
tt.meta[cls.SOURCE_ATTRIBUTE] = pythonfile
try:
content = pythonfile_path.read_text()
task_data = read_inline_script_metadata("task", content)
if not task_data:
raise ValueError(f"No task metadata found in {pythonfile}")
if "cron" not in task_data:
raise ValueError(f"Missing required 'cron' field in task metadata in {pythonfile}")
except Exception as e:
logger.error(f"Failed to read or parse task metadata from {pythonfile}: {e}")
raise
os.environ.update(task_data.get("env", {}))
# Extract first line of top-level docstring as description
description = "Python task"
match = re.search(r'^"""(.*?)"""', content, re.DOTALL)
if match:
desc = match.group(1).strip().split("\n", 1)[0]
if desc:
description = desc
# Generate unique task ID from file path
task_id = hashlib.md5(str(pythonfile_path.absolute()).encode()).hexdigest()[:8]
tt.tasks.append(
Task(
meta=TaskMetadata(
id=f"python-{task_id}",
name=pythonfile_path.stem,
description=description,
enabled=True,
),
on=Event(schedule=[ScheduleItem(cron=task_data["cron"])]),
steps=[
Step(
name=pythonfile_path.stem,
uses="python-file",
run=f"{pythonfile_path}:run",
args=[],
kwargs=task_data.get("options", {}),
),
],
)
)
return tt
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 199-204: supertask/model.py#L199-L204
Added lines #L199 - L204 were not covered by tests


[warning] 219-219: supertask/model.py#L219
Added line #L219 was not covered by tests



class CronJob(BaseModel):
"""
Expand Down
28 changes: 28 additions & 0 deletions supertask/util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
import re
import typing as t

import colorlog
from colorlog.escape_codes import escape_codes
from pueblo.sfa.pep723 import PEP_723_REGEX


def setup_logging(level=logging.INFO, debug: bool = False, width: int = 30):
Expand All @@ -20,3 +23,28 @@
logging.getLogger("crate.client").setLevel(level)
logging.getLogger("sqlalchemy_cratedb").setLevel(level)
logging.getLogger("urllib3.connectionpool").setLevel(level)


def read_inline_script_metadata(type_: str, script: str) -> t.Dict[str, t.Any]:
"""
Reference implementation to read inline script metadata (PEP 723).

https://packaging.python.org/en/latest/specifications/inline-script-metadata/
https://peps.python.org/pep-0723/

TODO: Synchronize with `pueblo.sfa.pep723`.
"""

name = type_ or "script"
matches = list(filter(lambda m: m.group("type") == name, re.finditer(PEP_723_REGEX, script)))
if len(matches) > 1:
raise ValueError(f"Multiple {name} blocks found")
if len(matches) == 1:
import tomli

Check warning on line 43 in supertask/util.py

View check run for this annotation

Codecov / codecov/patch

supertask/util.py#L38-L43

Added lines #L38 - L43 were not covered by tests

content = "".join(

Check warning on line 45 in supertask/util.py

View check run for this annotation

Codecov / codecov/patch

supertask/util.py#L45

Added line #L45 was not covered by tests
line[2:] if line.startswith("# ") else line[1:]
for line in matches[0].group("content").splitlines(keepends=True)
)
return tomli.loads(content)
return {}

Check warning on line 50 in supertask/util.py

View check run for this annotation

Codecov / codecov/patch

supertask/util.py#L49-L50

Added lines #L49 - L50 were not covered by tests
Comment on lines +28 to +50
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Add tests for the new metadata parsing function.

The function looks well-implemented, but static analysis indicates it has no test coverage. Since this is a core component of the new feature, it should be thoroughly tested to ensure reliability.

Also, consider handling edge cases:

  1. Empty script input
  2. Malformed TOML content
  3. Multiple blocks with the same type

🏁 Script executed:

#!/bin/bash
# Check if there are any tests for the new inline script metadata function
fd "test_.*\.py" | xargs grep -l "read_inline_script_metadata" || echo "No tests found"

Length of output: 101


Add tests for read_inline_script_metadata in supertask/util.py

No existing tests cover this function. Please add comprehensive test cases to ensure reliability:

  • Empty script input → returns an empty dict.
  • Single valid metadata block → correct parsing of TOML into a dict.
  • Malformed TOML content → raises tomli.TOMLDecodeError.
  • Multiple blocks of the same type → raises ValueError.
  • Stripping logic for comment prefixes (# vs. #) → yields the expected TOML content.

Place these tests in a new or existing test file (e.g., tests/test_util.py) to achieve full coverage of this core utility.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 38-43: supertask/util.py#L38-L43
Added lines #L38 - L43 were not covered by tests


[warning] 45-45: supertask/util.py#L45
Added line #L45 was not covered by tests


[warning] 49-50: supertask/util.py#L49-L50
Added lines #L49 - L50 were not covered by tests