-
Notifications
You must be signed in to change notification settings - Fork 1
Add the ability to use Python files with inline-defined task metadata #163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThe changes introduce support for running Python files containing inline-defined task metadata within the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant SupertaskCLI
participant Timetable
participant TaskRunner
participant PythonScript
User->>SupertaskCLI: Run `supertask run my_task.py`
SupertaskCLI->>Timetable: load("my_task.py")
Timetable->>SupertaskCLI: Timetable instance (from inline metadata)
SupertaskCLI->>TaskRunner: run(timetable)
TaskRunner->>PythonScript: Import and execute `run` with kwargs
PythonScript-->>TaskRunner: Result
TaskRunner-->>SupertaskCLI: Log result
Poem
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #163 +/- ##
==========================================
- Coverage 81.29% 76.61% -4.69%
==========================================
Files 8 8
Lines 385 419 +34
==========================================
+ Hits 313 321 +8
- Misses 72 98 +26
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
🧹 Nitpick comments (5)
supertask/util.py (2)
35-35
: Document synchronization plan with external module.The TODO comment indicates a need to synchronize with
pueblo.sfa.pep723
. Please clarify when and how this synchronization will occur to prevent future maintenance issues.
43-49
: Consider error handling for TOML parsing.Currently, if
tomli.loads()
fails due to invalid TOML syntax in the metadata block, it will raise an exception that's not caught. Consider adding try/except handling to provide more helpful error messages.- content = "".join( - line[2:] if line.startswith("# ") else line[1:] - for line in matches[0].group("content").splitlines(keepends=True) - ) - return tomli.loads(content) + content = "".join( + line[2:] if line.startswith("# ") else line[1:] + for line in matches[0].group("content").splitlines(keepends=True) + ) + try: + return tomli.loads(content) + except tomli.TOMLDecodeError as e: + raise ValueError(f"Invalid TOML in {name} metadata block: {e}") from e🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 45-45: supertask/util.py#L45
Added line #L45 was not covered by testssupertask/core.py (1)
154-155
: Clarify TODO comment about refactoring.The TODO comment about refactoring into a single-line invocation isn't clear. Please specify what single-line invocation you're referring to or provide more context to make this comment more actionable for future developers.
examples/contrib/cratedb_cleanup.py (2)
40-40
: Consider making SQLAlchemy logging configurableSetting
echo=True
in the SQLAlchemy engine enables verbose logging of all SQL statements, which is useful for debugging but might be too noisy in production environments.- self.engine = sa.create_engine(database_url, echo=True) + self.engine = sa.create_engine(database_url, echo=os.getenv("SQL_DEBUG", "").lower() in ("true", "1", "yes"))
52-71
: Optimize table filtering logicThe current implementation loops through all tables and checks if they start with any of the given prefixes, which could be inefficient for large databases with many tables.
Consider using a more efficient approach by compiling a pattern once and using it for matching:
@property def table_addresses(self) -> t.List[TableAddress]: """ Table addresses selected by filter. TODO: Elaborate with `include` vs. `exclude` selectors? TODO: Q: How to make the current prefix match (`table_prefixes`) more advanced? A: Just use regexes, or provide other wildcard schemes? TODO: Possibly refactor to stdlib or CrateDB Toolkit. """ inspector = sa.inspect(self.engine) bucket: t.List[TableAddress] = [] + # Only check tables from schemas we're interested in + schemas_to_check = [schema for schema in inspector.get_schema_names() if schema in self.schemas] + # Pre-compile a regex pattern for efficiency if many tables need to be checked + prefix_pattern = "^(" + "|".join(re.escape(prefix) for prefix in self.table_prefixes) + ")" + pattern = re.compile(prefix_pattern) + + for schema in schemas_to_check: + tables = inspector.get_table_names(schema=schema) + for table in tables: + if pattern.match(table): + bucket.append(TableAddress(schema=schema, table=table)) - for schema in inspector.get_schema_names(): - if schema in self.schemas: - tables = inspector.get_table_names(schema=schema) - for table in tables: - for prefix in self.table_prefixes: - if table.startswith(prefix): - bucket.append(TableAddress(schema=schema, table=table)) return bucket
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
CHANGES.md
(1 hunks)examples/contrib/cratedb_cleanup.py
(1 hunks)pyproject.toml
(1 hunks)supertask/core.py
(2 hunks)supertask/model.py
(6 hunks)supertask/util.py
(2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
supertask/model.py (3)
supertask/util.py (1)
read_inline_script_metadata
(28-50)supertask/cli.py (1)
run
(62-74)supertask/core.py (1)
run
(143-163)
🪛 GitHub Check: codecov/patch
supertask/util.py
[warning] 38-43: supertask/util.py#L38-L43
Added lines #L38 - L43 were not covered by tests
[warning] 45-45: supertask/util.py#L45
Added line #L45 was not covered by tests
[warning] 49-50: supertask/util.py#L49-L50
Added lines #L49 - L50 were not covered by tests
supertask/core.py
[warning] 153-153: supertask/core.py#L153
Added line #L153 was not covered by tests
[warning] 155-161: supertask/core.py#L155-L161
Added lines #L155 - L161 were not covered by tests
supertask/model.py
[warning] 190-191: supertask/model.py#L190-L191
Added lines #L190 - L191 were not covered by tests
[warning] 199-204: supertask/model.py#L199-L204
Added lines #L199 - L204 were not covered by tests
[warning] 219-219: supertask/model.py#L219
Added line #L219 was not covered by tests
🔇 Additional comments (8)
pyproject.toml (1)
93-93
: Added support for PEP 723 metadata parsing.The dependency update to include
sfa-full
extra for the pueblo package is necessary for the new functionality that parses inline metadata from Python scripts using PEP 723.supertask/util.py (1)
2-3
: LGTM: Necessary imports for PEP 723 support.These imports are required to support the new metadata parsing functionality.
Also applies to: 7-7
CHANGES.md (1)
26-27
: LGTM: Clear documentation of the new feature.The changelog entry clearly describes the new functionality and provides a useful example of how to use it.
supertask/core.py (1)
15-15
: LGTM: Required import for Python file execution.This import provides the necessary components to load and execute Python files with inline metadata.
examples/contrib/cratedb_cleanup.py (1)
7-22
: Well-structured inline metadata using PEP 723 formatThis implementation correctly follows PEP 723 specification for inline script metadata, using both "script" and "task" blocks to separate dependencies from task scheduling configuration. This aligns well with the PR objective to support Python files with inline task metadata.
supertask/model.py (3)
124-124
: Good type change to support more flexible argument typesChanging the
kwargs
type fromDict[str, ScalarType]
toDict[str, t.Any]
is a necessary improvement to handle the dynamic nature of Python function arguments in tasks loaded from Python files.
141-142
: Appropriate default factories for meta and tasksAdding default factories ensures these fields initialize properly even when not explicitly provided, which is important for the new
from_python
method that builds a Timetable incrementally.
190-191
: Proper file extension handlingThe addition of Python file handling in the
load
method is implemented correctly, delegating to the specializedfrom_python
method when a.py
file is detected.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 190-191: supertask/model.py#L190-L191
Added lines #L190 - L191 were not covered by tests
def read_inline_script_metadata(type_: str, script: str) -> t.Dict[str, t.Any]: | ||
""" | ||
Reference implementation to read inline script metadata (PEP 723). | ||
|
||
https://packaging.python.org/en/latest/specifications/inline-script-metadata/ | ||
https://peps.python.org/pep-0723/ | ||
|
||
TODO: Synchronize with `pueblo.sfa.pep723`. | ||
""" | ||
|
||
name = type_ or "script" | ||
matches = list(filter(lambda m: m.group("type") == name, re.finditer(PEP_723_REGEX, script))) | ||
if len(matches) > 1: | ||
raise ValueError(f"Multiple {name} blocks found") | ||
if len(matches) == 1: | ||
import tomli | ||
|
||
content = "".join( | ||
line[2:] if line.startswith("# ") else line[1:] | ||
for line in matches[0].group("content").splitlines(keepends=True) | ||
) | ||
return tomli.loads(content) | ||
return {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Add tests for the new metadata parsing function.
The function looks well-implemented, but static analysis indicates it has no test coverage. Since this is a core component of the new feature, it should be thoroughly tested to ensure reliability.
Also, consider handling edge cases:
- Empty script input
- Malformed TOML content
- Multiple blocks with the same type
🏁 Script executed:
#!/bin/bash
# Check if there are any tests for the new inline script metadata function
fd "test_.*\.py" | xargs grep -l "read_inline_script_metadata" || echo "No tests found"
Length of output: 101
Add tests for read_inline_script_metadata
in supertask/util.py
No existing tests cover this function. Please add comprehensive test cases to ensure reliability:
- Empty script input → returns an empty dict.
- Single valid metadata block → correct parsing of TOML into a dict.
- Malformed TOML content → raises
tomli.TOMLDecodeError
. - Multiple blocks of the same type → raises
ValueError
. - Stripping logic for comment prefixes (
#
vs.#
) → yields the expected TOML content.
Place these tests in a new or existing test file (e.g., tests/test_util.py
) to achieve full coverage of this core utility.
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 38-43: supertask/util.py#L38-L43
Added lines #L38 - L43 were not covered by tests
[warning] 45-45: supertask/util.py#L45
Added line #L45 was not covered by tests
[warning] 49-50: supertask/util.py#L49-L50
Added lines #L49 - L50 were not covered by tests
elif step.uses == "python-file": | ||
# TODO: Refactor into single-line invocation when possible. | ||
address = ApplicationAddress.from_spec(step.run) | ||
app = SingleFileApplication(address=address) | ||
app.load_any() | ||
app.import_module() | ||
app._entrypoint = getattr(app._module, "run", None) | ||
retval = app.run(*step.args, **step.kwargs) | ||
logger.info(f"Result: {retval}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing test coverage for critical execution path.
This new execution path for handling Python files with inline metadata has no test coverage according to static analysis. Since this is a core feature, comprehensive tests should be added to verify its behavior.
#!/bin/bash
# Check if there are any tests for the Python file execution functionality
fd "test_.*\.py" | xargs grep -l "python-file" || echo "No tests found for python-file step type"
Consider adding tests that:
- Verify successful execution of a Python file with a run function
- Test behavior when the run function is missing
- Test error handling during module loading or execution
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 153-153: supertask/core.py#L153
Added line #L153 was not covered by tests
[warning] 155-161: supertask/core.py#L155-L161
Added lines #L155 - L161 were not covered by tests
app = SingleFileApplication(address=address) | ||
app.load_any() | ||
app.import_module() | ||
app._entrypoint = getattr(app._module, "run", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add explicit check for missing run function.
Currently, if the Python file doesn't have a run
function, app._entrypoint
will be set to None
. When executed, this will likely cause an error without a clear explanation.
- app._entrypoint = getattr(app._module, "run", None)
+ app._entrypoint = getattr(app._module, "run", None)
+ if app._entrypoint is None:
+ raise RuntimeError(f"Python file {step.run} does not define a 'run' function")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
app._entrypoint = getattr(app._module, "run", None) | |
app._entrypoint = getattr(app._module, "run", None) | |
if app._entrypoint is None: | |
raise RuntimeError(f"Python file {step.run} does not define a 'run' function") |
def __init__(self, schemas: t.List[str] = None, table_prefixes: t.List[str] = None): | ||
self.schemas = schemas | ||
self.table_prefixes = table_prefixes | ||
database_url = os.getenv("DATABASE_URL") | ||
if database_url is None: | ||
raise ValueError("Database URL environment variable is not set: DATABASE_URL") | ||
self.engine = sa.create_engine(os.getenv("DATABASE_URL"), echo=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid redundant environment variable lookup
You're calling os.getenv("DATABASE_URL")
twice - once to check if it's None and again to create the engine. This is inefficient and could lead to inconsistency if the environment changes between calls.
def __init__(self, schemas: t.List[str] = None, table_prefixes: t.List[str] = None):
self.schemas = schemas
self.table_prefixes = table_prefixes
database_url = os.getenv("DATABASE_URL")
if database_url is None:
raise ValueError("Database URL environment variable is not set: DATABASE_URL")
- self.engine = sa.create_engine(os.getenv("DATABASE_URL"), echo=True)
+ self.engine = sa.create_engine(database_url, echo=True)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def __init__(self, schemas: t.List[str] = None, table_prefixes: t.List[str] = None): | |
self.schemas = schemas | |
self.table_prefixes = table_prefixes | |
database_url = os.getenv("DATABASE_URL") | |
if database_url is None: | |
raise ValueError("Database URL environment variable is not set: DATABASE_URL") | |
self.engine = sa.create_engine(os.getenv("DATABASE_URL"), echo=True) | |
def __init__(self, schemas: t.List[str] = None, table_prefixes: t.List[str] = None): | |
self.schemas = schemas | |
self.table_prefixes = table_prefixes | |
database_url = os.getenv("DATABASE_URL") | |
if database_url is None: | |
raise ValueError("Database URL environment variable is not set: DATABASE_URL") | |
self.engine = sa.create_engine(database_url, echo=True) |
def run(**kwargs): | ||
logging.basicConfig(level=logging.INFO, handlers=[sys.stderr]) | ||
task = DatabaseCleanupTask(**kwargs) | ||
task.run() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding a dry run mode for safety
For a cleanup task that drops tables, having a confirmation or dry run mode would be a good safety feature, especially when used in production environments.
def run(**kwargs):
logging.basicConfig(level=logging.INFO, handlers=[sys.stderr])
+ dry_run = kwargs.pop("dry_run", False)
+ if dry_run:
+ logger.info("Running in dry run mode - no tables will be dropped")
task = DatabaseCleanupTask(**kwargs)
+ if dry_run:
+ logger.info("Tables that would be dropped:")
+ for table in task.table_addresses:
+ logger.info(f" - {table.fullname}")
+ return
task.run()
Committable suggestion skipped: line range outside the PR's diff.
def run(self) -> None: | ||
""" | ||
Inquire relevant table addresses and clean up temporary tables. | ||
""" | ||
with self.engine.connect() as conn: | ||
for table in self.table_addresses: | ||
sql = f"DROP TABLE IF EXISTS {table.fullname}" | ||
logger.info(f"Dropping table {table.fullname}: {sql}") | ||
conn.execute(sa.text(sql)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for database operations
The run
method doesn't handle potential database connection or query execution errors. Since this is a cleanup task, it should be resilient to failures.
def run(self) -> None:
"""
Inquire relevant table addresses and clean up temporary tables.
"""
- with self.engine.connect() as conn:
- for table in self.table_addresses:
- sql = f"DROP TABLE IF EXISTS {table.fullname}"
- logger.info(f"Dropping table {table.fullname}: {sql}")
- conn.execute(sa.text(sql))
+ try:
+ with self.engine.connect() as conn:
+ for table in self.table_addresses:
+ sql = f"DROP TABLE IF EXISTS {table.fullname}"
+ logger.info(f"Dropping table {table.fullname}: {sql}")
+ try:
+ conn.execute(sa.text(sql))
+ logger.info(f"Successfully dropped table {table.fullname}")
+ except Exception as e:
+ logger.error(f"Failed to drop table {table.fullname}: {e}")
+ except Exception as e:
+ logger.error(f"Database connection error: {e}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def run(self) -> None: | |
""" | |
Inquire relevant table addresses and clean up temporary tables. | |
""" | |
with self.engine.connect() as conn: | |
for table in self.table_addresses: | |
sql = f"DROP TABLE IF EXISTS {table.fullname}" | |
logger.info(f"Dropping table {table.fullname}: {sql}") | |
conn.execute(sa.text(sql)) | |
def run(self) -> None: | |
""" | |
Inquire relevant table addresses and clean up temporary tables. | |
""" | |
try: | |
with self.engine.connect() as conn: | |
for table in self.table_addresses: | |
sql = f"DROP TABLE IF EXISTS {table.fullname}" | |
logger.info(f"Dropping table {table.fullname}: {sql}") | |
try: | |
conn.execute(sa.text(sql)) | |
logger.info(f"Successfully dropped table {table.fullname}") | |
except Exception as e: | |
logger.error(f"Failed to drop table {table.fullname}: {e}") | |
except Exception as e: | |
logger.error(f"Database connection error: {e}") |
pythonfile_path = Path(pythonfile) | ||
tt.meta[cls.SOURCE_ATTRIBUTE] = pythonfile | ||
task_data = read_inline_script_metadata("task", pythonfile_path.read_text()) | ||
os.environ.update(task_data.get("env", {})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Update environment variables with caution
Directly updating os.environ
with values from the task metadata could have unintended side effects on the execution environment, especially if multiple tasks are loaded with conflicting environment variables.
Consider these alternatives:
- Add a configuration option to control whether environment variables should be updated
- Maintain task-specific environment variables instead of modifying the global environment
- Log a warning when environment variables are being modified
-os.environ.update(task_data.get("env", {}))
+# Option 1: Add configuration control
+env_vars = task_data.get("env", {})
+if env_vars:
+ logger.warning(f"Updating environment variables from {pythonfile}: {list(env_vars.keys())}")
+ if cls.ALLOW_ENV_UPDATES: # Add this as a class variable
+ os.environ.update(env_vars)
+ else:
+ logger.warning("Environment variable updates are disabled. Enable with ALLOW_ENV_UPDATES=True")
Committable suggestion skipped: line range outside the PR's diff.
tt = cls() | ||
pythonfile_path = Path(pythonfile) | ||
tt.meta[cls.SOURCE_ATTRIBUTE] = pythonfile | ||
task_data = read_inline_script_metadata("task", pythonfile_path.read_text()) | ||
os.environ.update(task_data.get("env", {})) | ||
tt.tasks.append( | ||
Task( | ||
meta=TaskMetadata(id="python", name=pythonfile_path.stem, description="TODO", enabled=True), | ||
on=Event(schedule=[ScheduleItem(cron=task_data["cron"])]), | ||
steps=[ | ||
Step( | ||
name=pythonfile_path.stem, | ||
uses="python-file", | ||
run=f"{pythonfile_path}:run", | ||
args=[], | ||
kwargs=task_data.get("options", {}), | ||
), | ||
], | ||
) | ||
) | ||
return tt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Add unit tests for the new Python file task loading functionality
Static analysis indicates that the new Python file loading functionality isn't covered by tests, which is essential for ensuring reliability.
Run the following script to check current test coverage for Python file loading:
Would you like me to create a sample test file for this functionality?
🏁 Script executed:
#!/bin/bash
# Check current test coverage for Python file tasks
# Look for existing tests for Python file tasks
echo "Checking for existing tests for Python file tasks..."
rg -i "python-file|from_python" --type py "tests/"
# Check if there are any test fixtures for Python file tasks
echo "Checking for test fixtures for Python file tasks..."
rg -i "\.py'" --type py "tests/" | rg "fixture|mock"
Length of output: 334
Add unit tests for Python file task loading
I didn’t find any existing tests in tests/
that cover the new Python‑file loader in supertask/model.py
(lines 199–219). Please add tests that:
- Invoke the class‑method which reads a
.py
file’s inline metadata and returns the task container. - Verify that
os.environ
is updated with theenv
entries from your inline metadata. - Assert the created
Task
has the correctmeta
(id/name),on.schedule
(cron), andsteps
(uses="python‑file", properrun
path, args, kwargs).
You can introduce a new test file, e.g. tests/test_python_file_task_loader.py
, to cover these scenarios.
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 199-204: supertask/model.py#L199-L204
Added lines #L199 - L204 were not covered by tests
[warning] 219-219: supertask/model.py#L219
Added line #L219 was not covered by tests
@classmethod | ||
def from_python(cls, pythonfile: str): | ||
tt = cls() | ||
pythonfile_path = Path(pythonfile) | ||
tt.meta[cls.SOURCE_ATTRIBUTE] = pythonfile | ||
task_data = read_inline_script_metadata("task", pythonfile_path.read_text()) | ||
os.environ.update(task_data.get("env", {})) | ||
tt.tasks.append( | ||
Task( | ||
meta=TaskMetadata(id="python", name=pythonfile_path.stem, description="TODO", enabled=True), | ||
on=Event(schedule=[ScheduleItem(cron=task_data["cron"])]), | ||
steps=[ | ||
Step( | ||
name=pythonfile_path.stem, | ||
uses="python-file", | ||
run=f"{pythonfile_path}:run", | ||
args=[], | ||
kwargs=task_data.get("options", {}), | ||
), | ||
], | ||
) | ||
) | ||
return tt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Improve task metadata extraction and error handling
The from_python
method needs improvements in error handling, task identification, and description generation.
@classmethod
def from_python(cls, pythonfile: str):
tt = cls()
pythonfile_path = Path(pythonfile)
tt.meta[cls.SOURCE_ATTRIBUTE] = pythonfile
- task_data = read_inline_script_metadata("task", pythonfile_path.read_text())
+ try:
+ content = pythonfile_path.read_text()
+ task_data = read_inline_script_metadata("task", content)
+ if not task_data:
+ raise ValueError(f"No task metadata found in {pythonfile}")
+ if "cron" not in task_data:
+ raise ValueError(f"Missing required 'cron' field in task metadata in {pythonfile}")
+ except Exception as e:
+ logger.error(f"Failed to read or parse task metadata from {pythonfile}: {e}")
+ raise
+
os.environ.update(task_data.get("env", {}))
+
+ # Extract docstring as description if available
+ description = "Python task"
+ docstring_pattern = r'^"""(.*?)"""'
+ docstring_match = re.search(docstring_pattern, content, re.DOTALL)
+ if docstring_match:
+ docstring = docstring_match.group(1).strip()
+ description = docstring.split("\n")[0] if docstring else description
+
+ # Generate a unique task ID based on file path
+ task_id = hashlib.md5(str(pythonfile_path.absolute()).encode()).hexdigest()[:8]
+
tt.tasks.append(
Task(
- meta=TaskMetadata(id="python", name=pythonfile_path.stem, description="TODO", enabled=True),
+ meta=TaskMetadata(id=f"python-{task_id}", name=pythonfile_path.stem, description=description, enabled=True),
on=Event(schedule=[ScheduleItem(cron=task_data["cron"])]),
steps=[
Step(
name=pythonfile_path.stem,
uses="python-file",
run=f"{pythonfile_path}:run",
args=[],
kwargs=task_data.get("options", {}),
),
],
)
)
return tt
Let's verify our implementation improvements with a script:
🏁 Script executed:
#!/bin/bash
# Check if the extraction of task metadata is working as expected
# Find all Python files with task metadata
echo "Checking Python files for task metadata..."
rg -l "# /// task" --type py
# Check if any Python files have multiple task blocks (which would be invalid)
echo "Checking for invalid multiple task blocks..."
rg -l "# /// task.*# /// task" --type py
# Check if any Python files are missing the cron field in task metadata
echo "Checking for missing cron field..."
files_with_task=$(rg -l "# /// task" --type py)
for file in $files_with_task; do
if ! rg -q "cron\s*=" "$file"; then
echo "WARNING: $file has task metadata but appears to be missing a cron field"
fi
done
Length of output: 554
Enhance from_python
in supertask/model.py
with robust metadata parsing and task identification
The current implementation reads inline metadata without validation, uses a fixed task ID and placeholder description. Let’s improve resilience and generate meaningful task identifiers and descriptions.
Key changes to apply in supertask/model.py
(lines ~197–219):
- Wrap
read_inline_script_metadata
call intry/except
, logging and re‑raising on missing or malformed metadata. - Explicitly check for the required
"cron"
field and error out if absent. - Read the file’s top‑level docstring and use its first line as the task description (fallback to a default).
- Compute a unique task ID (e.g. first 8 chars of
md5
of the absolute file path) instead of a constant"python"
. - Update
TaskMetadata(id=…, description=…)
accordingly.
Proposed diff:
@@ classmethod def from_python(cls, pythonfile: str):
- task_data = read_inline_script_metadata("task", pythonfile_path.read_text())
+ try:
+ content = pythonfile_path.read_text()
+ task_data = read_inline_script_metadata("task", content)
+ if not task_data:
+ raise ValueError(f"No task metadata found in {pythonfile}")
+ if "cron" not in task_data:
+ raise ValueError(f"Missing required 'cron' field in task metadata in {pythonfile}")
+ except Exception as e:
+ logger.error(f"Failed to read or parse task metadata from {pythonfile}: {e}")
+ raise
+
+ # Extract first line of top‑level docstring as description
+ description = "Python task"
+ match = re.search(r'^"""(.*?)"""', content, re.DOTALL)
+ if match:
+ desc = match.group(1).strip().split("\n", 1)[0]
+ if desc:
+ description = desc
+
+ # Generate unique task ID from file path
+ task_id = hashlib.md5(str(pythonfile_path.absolute()).encode()).hexdigest()[:8]
@@
- tt.tasks.append(
- Task(
- meta=TaskMetadata(id="python", name=pythonfile_path.stem, description="TODO", enabled=True),
+ tt.tasks.append(Task(
+ meta=TaskMetadata(
+ id=f"python-{task_id}",
+ name=pythonfile_path.stem,
+ description=description,
+ enabled=True
+ ),
on=Event(schedule=[ScheduleItem(cron=task_data["cron"])]),
steps=[ ... ],
))
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@classmethod | |
def from_python(cls, pythonfile: str): | |
tt = cls() | |
pythonfile_path = Path(pythonfile) | |
tt.meta[cls.SOURCE_ATTRIBUTE] = pythonfile | |
task_data = read_inline_script_metadata("task", pythonfile_path.read_text()) | |
os.environ.update(task_data.get("env", {})) | |
tt.tasks.append( | |
Task( | |
meta=TaskMetadata(id="python", name=pythonfile_path.stem, description="TODO", enabled=True), | |
on=Event(schedule=[ScheduleItem(cron=task_data["cron"])]), | |
steps=[ | |
Step( | |
name=pythonfile_path.stem, | |
uses="python-file", | |
run=f"{pythonfile_path}:run", | |
args=[], | |
kwargs=task_data.get("options", {}), | |
), | |
], | |
) | |
) | |
return tt | |
@classmethod | |
def from_python(cls, pythonfile: str): | |
tt = cls() | |
pythonfile_path = Path(pythonfile) | |
tt.meta[cls.SOURCE_ATTRIBUTE] = pythonfile | |
try: | |
content = pythonfile_path.read_text() | |
task_data = read_inline_script_metadata("task", content) | |
if not task_data: | |
raise ValueError(f"No task metadata found in {pythonfile}") | |
if "cron" not in task_data: | |
raise ValueError(f"Missing required 'cron' field in task metadata in {pythonfile}") | |
except Exception as e: | |
logger.error(f"Failed to read or parse task metadata from {pythonfile}: {e}") | |
raise | |
os.environ.update(task_data.get("env", {})) | |
# Extract first line of top-level docstring as description | |
description = "Python task" | |
match = re.search(r'^"""(.*?)"""', content, re.DOTALL) | |
if match: | |
desc = match.group(1).strip().split("\n", 1)[0] | |
if desc: | |
description = desc | |
# Generate unique task ID from file path | |
task_id = hashlib.md5(str(pythonfile_path.absolute()).encode()).hexdigest()[:8] | |
tt.tasks.append( | |
Task( | |
meta=TaskMetadata( | |
id=f"python-{task_id}", | |
name=pythonfile_path.stem, | |
description=description, | |
enabled=True, | |
), | |
on=Event(schedule=[ScheduleItem(cron=task_data["cron"])]), | |
steps=[ | |
Step( | |
name=pythonfile_path.stem, | |
uses="python-file", | |
run=f"{pythonfile_path}:run", | |
args=[], | |
kwargs=task_data.get("options", {}), | |
), | |
], | |
) | |
) | |
return tt |
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 199-204: supertask/model.py#L199-L204
Added lines #L199 - L204 were not covered by tests
[warning] 219-219: supertask/model.py#L219
Added line #L219 was not covered by tests
What the title says.