Skip to content

Conversation

@sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Sep 4, 2025

Description

This PR

  • allows to sync a pipeline from destination if it is not found locally by the attach command.
  • Allows to set the port and host when launching the dashboard programmatically

@netlify
Copy link

netlify bot commented Sep 4, 2025

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 1a6724f
🔍 Latest deploy log https://app.netlify.com/projects/dlt-hub-docs/deploys/68c82a9ae603fd00085edc69

dataset_name: str = None,
sync_if_missing: bool = False,
**injection_kwargs: Any,
) -> Pipeline:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the changes to def attach. Maybe we should have something like attach_remote with a reduced arg set that does this and leave the attach unchanged.

if extended_info:
d_t_node = call_args.arguments.get("destination")
if d_t_node:
destination = evaluate_node_literal(d_t_node)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does not work for any destination that is not a string literal..

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we could also parse destination factory. but IMO we should not invest too much in AST right now

pipeline_name = pipeline_info["pipeline_name"]
pipelines_dir = pipeline_info["pipelines_dir"]

dlt.attach(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What can happen here is, that a user wants to open the pipeline as defined in the script, but there already is the state of some other pipeline with the same name on the local machine and that one is opened.


@utils.track_command("dashboard", True)
def dashboard_command_wrapper(pipelines_dir: Optional[str], edit: bool) -> None:
def dashboard_command_wrapper(
Copy link
Collaborator Author

@sh-rp sh-rp Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternatively do changing this top level dashboard command, we could also allow the pipeline command to not take a pipeline name but a script file. But that is probably quite confusing.

if d_t_node:
destination = evaluate_node_literal(d_t_node)
if destination is None:
raise CliCommandInnerException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just warn here to be backward compatible

if extended_info:
d_t_node = call_args.arguments.get("destination")
if d_t_node:
destination = evaluate_node_literal(d_t_node)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we could also parse destination factory. but IMO we should not invest too much in AST right now


run_dashboard(pipelines_dir=pipelines_dir, edit=edit)
# if a pipeline script path is provided, we need to parse out pipeline info from script and sync it
pipeline_name: str = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get what you do here. but this code should be executed by deploy command (or not at all because deploy command has access to pipeline state and trace). I was possibly not specific when we discussed that:

  1. workspace dashboard and report notebook should attach in the same way dlt.attach(pipeline_name) should be enough in both cases
  2. it is the task of deployment script to generate additional information (form AST/state/trace) and add it to the job package. here it may just emit env variables:
PIPELINES__<pipeline_name>__DESTINATION_TYPE=...
PIPELINES__<pipeline_name>__DESTINATION_NAME=...
PIPELINES__<pipeline_name>__DATASET_NAME=...
...

attach will see it automatically even without those parameters being passed. look at the code

now the big question is how we gather this parameters. if you are against runtime information like using state or trace then we'll invest in AST parsing. but IMO it will never be as good

destination_name=injection_kwargs.get("staging_name", None),
)

pipeline_kwargs = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a cleaner way. will leave a comment separately

@sh-rp sh-rp force-pushed the feat/improved_synching branch from f1d5a8f to 46edfa0 Compare September 9, 2025 15:28
@sh-rp sh-rp force-pushed the feat/improved_synching branch 2 times, most recently from c9ddb93 to 3e8a486 Compare September 11, 2025 14:29
@sh-rp sh-rp force-pushed the feat/improved_synching branch from 3e8a486 to 2c8ed76 Compare September 11, 2025 14:29
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to me it looks pretty OK. see my comments:

  1. we need to test edge cases in my comments
  2. look where attach is used. AFAIK we use it in cli to restore pipeline:
try:
        if verbosity > 0:
            fmt.echo("Attaching to pipeline %s" % fmt.bold(pipeline_name))
        p = dlt.attach(pipeline_name=pipeline_name, pipelines_dir=pipelines_dir)
    except CannotRestorePipelineException as e:
        if operation not in {"sync", "drop"}:
            raise
        fmt.warning(str(e))
        if not fmt.confirm(
            "Do you want to attempt to restore the pipeline state from destination?",
            default=False,
        ):
            return
        destination = destination or fmt.text_input(
            f"Enter destination name for pipeline {fmt.bold(pipeline_name)}"
        )
        dataset_name = dataset_name or fmt.text_input(
            f"Enter dataset name for pipeline {fmt.bold(pipeline_name)}"
        )
        p = dlt.pipeline(
            pipeline_name,
            pipelines_dir,
            destination=destination,
            dataset_name=dataset_name,
        )
        p.sync_destination()
        if p.first_run:
            # remote state was not found
            p._wipe_working_folder()
            fmt.error(
                f"Pipeline {pipeline_name} was not found in dataset {dataset_name} in {destination}"
            )
            return
        if operation == "sync":
            return  # No need to sync again

which looks like what you already implemented :)

# set it as current pipeline
p.activate()
return p
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please allow for explicit dataset name in args.

return p
except CannotRestorePipelineException:
# we can try to sync a pipeline with the given name
p = pipeline(pipeline_name, pipelines_dir, destination=destination, staging=staging)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can attempt destination sync only if destination is set. otherwise rer-aise the exception.

except CannotRestorePipelineException:
# we can try to sync a pipeline with the given name
p = pipeline(pipeline_name, pipelines_dir, destination=destination, staging=staging)
p.sync_destination()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this can raise PipelineStepFailed if destination state is for some reason broken. this is OK to do

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if p.first_run is True - means that there's no remote state. in that case you should wipe the pipeline working dir that got created by dlt.pipeline and reraise original exception

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my review message. it looks like it is already implemented in pipeline_command

@sh-rp sh-rp changed the title PoC: Improved dashboard launching and database synching Improved pipeline attach command and Dashboard launcher extensions Sep 15, 2025
@sh-rp sh-rp requested a review from rudolfix September 15, 2025 11:09
@sh-rp sh-rp self-assigned this Sep 15, 2025
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@rudolfix rudolfix marked this pull request as ready for review September 15, 2025 17:50
@sh-rp sh-rp merged commit e0c6d20 into devel Sep 16, 2025
67 checks passed
@sh-rp sh-rp deleted the feat/improved_synching branch September 16, 2025 11:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants