You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am writing to see if there's a solution out there for handling creation of partition-specific configs for assets.
In the general case when using op jobs, we can use the partitioned config decorators (e.g. @daily_partitioned_config) to dynamically generate config based on the partition key/range. This works great, and the UI supports loading this config into the Launchpad so we can view and modify the values before running a job.
However, I have not been able to figure out the equivalent for assets or asset jobs. You can use the decorator for asset jobs, but you cannot use it if you want to do adhoc materializations (i.e. outside of the asset job) as it doesn't asset job configuration doesn't apply in that scenario.
This makes sense as assets may belong to multiple asset jobs, however there should be a way to pass partitioned config to assets directly.
In my own implementations, to get around this issue I have had to create two Config objects. One that stores any static configs (e.g. the name of the table or file the asset represents) and one that stores the partitioned config. Then, within the asset code, I call a function which generates the runtime partitioned config.
Here is an example:
# define Config objects
class GradeChangeLogStaticConfig(Config):
sf_schema: str = '<schema name>'
target_table_name: str = '<table name>'
class GradeChangeLogRunConfig(Config):
env: str
db: str
partition_key: Union[str, None]
partition_start_datetime: datetime
partition_end_datetime: datetime
partition_start_str: str
partition_end_str: str
partition_folder_name: str
partition_folder_path: str
target_table_name: str
target_table_path: str
sf_last_updated: str
# define function to generate runtime configs based on the current partition in context
def run_config_for_partition_fn(context: OpExecutionContext, config: GradeChangeLogStaticConfig) -> GradeChangeLogRunConfig:
env = os.environ.get('ENVIRONMENT', 'QA')
db = env_switch_db_name(env, 'RAW_DATA')
partition_key = context.partition_key if context.has_partition_key else None
partition_start_datetime = context.partition_time_window.start
partition_end_datetime = context.partition_time_window.end
partition_start_str = partition_start_datetime.strftime(FORMAT_DATE_STANDARD)
partition_end_str = partition_end_datetime.strftime(FORMAT_DATE_STANDARD)
partition_folder_name = partition_key if partition_key else f"{partition_start_str}_to_{partition_end_str}"
partition_folder_path = f'{config.sf_schema}/{config.target_table_name}/{partition_folder_name}'
target_table_path = f'{db}.{config.sf_schema}.{config.target_table_name}'
sf_last_updated = datetime.now(timezone.utc).strftime(FORMAT_TIMESTAMP_STANDARD)
return GradeChangeLogRunConfig(
env=env,
db=db,
partition_key=partition_key,
partition_start_datetime=partition_start_datetime,
partition_end_datetime=partition_end_datetime,
partition_start_str=partition_start_str,
partition_end_str=partition_end_str,
partition_folder_name=partition_folder_name,
partition_folder_path=partition_folder_path,
target_table_name=config.target_table_name,
target_table_path=target_table_path,
sf_last_updated=sf_last_updated
)
...
# within the asset/op, generate the run config from the context/static config
@op(description='Load Grade Change Log data to Snowflake')
def op_load_grade_change_log(context: OpExecutionContext, config: GradeChangeLogStaticConfig, upstream: list) -> None:
run_config = run_config_for_partition_fn(context, config)
(in my example I am using a graph asset, but still applies if using a normal asset)
This works fine as the asset is able to get its partitioned config regardless if it triggered via an asset job or an adhoc materialization (which is required if I want to do a single run backfill)
The issue is that we'd only be able to change the static config via the UI, we wouldn't be able to view or override the partitioned config like we can for a standard op job.
It would really be nice if these two features could play together nicely, if an asset could accept a PartitionedConfig instead of only a ConfigMapping.
From what I see, when doing adhoc materializations, there isn't a way to handle partitioned config except at runtime. Has any else run into this issue?
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Greetings,
I am writing to see if there's a solution out there for handling creation of partition-specific configs for assets.
In the general case when using op jobs, we can use the partitioned config decorators (e.g.
@daily_partitioned_config) to dynamically generate config based on the partition key/range. This works great, and the UI supports loading this config into the Launchpad so we can view and modify the values before running a job.However, I have not been able to figure out the equivalent for assets or asset jobs. You can use the decorator for asset jobs, but you cannot use it if you want to do adhoc materializations (i.e. outside of the asset job) as it doesn't asset job configuration doesn't apply in that scenario.
This makes sense as assets may belong to multiple asset jobs, however there should be a way to pass partitioned config to assets directly.
In my own implementations, to get around this issue I have had to create two Config objects. One that stores any static configs (e.g. the name of the table or file the asset represents) and one that stores the partitioned config. Then, within the asset code, I call a function which generates the runtime partitioned config.
Here is an example:
(in my example I am using a graph asset, but still applies if using a normal asset)
This works fine as the asset is able to get its partitioned config regardless if it triggered via an asset job or an adhoc materialization (which is required if I want to do a single run backfill)
The issue is that we'd only be able to change the static config via the UI, we wouldn't be able to view or override the partitioned config like we can for a standard op job.
It would really be nice if these two features could play together nicely, if an asset could accept a
PartitionedConfiginstead of only aConfigMapping.From what I see, when doing adhoc materializations, there isn't a way to handle partitioned config except at runtime. Has any else run into this issue?
Beta Was this translation helpful? Give feedback.
All reactions