Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions dagster_meltano/meltano_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
from pathlib import Path
from typing import Dict, List, Optional, Union

from dagster import DagsterLogManager, resource, Field
from dagster import DagsterLogManager, resource, Field, OpExecutionContext, PipesSubprocessClient
from dagster_meltano.exceptions import MeltanoCommandError

from dagster_meltano.job import Job
from dagster_meltano.schedule import Schedule
from dagster_meltano.utils import Singleton
from dagster_shell import execute_shell_command

STDOUT = 1

Expand Down Expand Up @@ -46,32 +45,40 @@ def execute_command(
self,
command: str,
env: Dict[str, str],
logger: Union[logging.Logger, DagsterLogManager] = logging.Logger,
context: OpExecutionContext,
) -> str:
"""Execute a Meltano command.

Args:
context (OpExecutionContext): The Dagster execution context.
command (str): The Meltano command to execute.
env (Dict[str, str]): The environment variables to inject when executing the command.
context (OpExecutionContext): The Dagster execution context.

Returns:
str: The output of the command.
"""
output, exit_code = execute_shell_command(
f"{self.meltano_bin} {command}",
env={**self.default_env, **env},
output_logging="STREAM",
log=logger,
full_command = f"{self.meltano_bin} {command}"
merged_env = {**self.default_env, **env}

context.log.info(f"Executing command with PipesSubprocessClient: {full_command}")

client: PipesSubprocessClient = PipesSubprocessClient(
env=merged_env,
cwd=self.project_dir,
context_injector=None,
)
_command: list[str] = full_command.split(" ")
client.run(
command=_command,
context=context,
)
Comment on lines +60 to +74
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using PipesSubprocessClient adds the ability to inject context and get structured messages back from the subprocess. This is useful when the subprocess are python scripts or similar that could be updated to take advantage.

In this case it looks like we are explicitly disabling context injection always invoking a meltano command that I assume can not benefit from these capabilities.

I think the move here might be to just move to a stock subprocess.run call

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the move here might be to just move to a stock subprocess.run call

@alangenfeld thanks for that idea - I've opened #58 which does this (it reimplements what dagster-shell had been doing with subprocess).


# if exit_code != 0:
# raise MeltanoCommandError(
# f"Command '{command}' failed with exit code {exit_code}"
# )

if exit_code != 0:
raise MeltanoCommandError(
f"Command '{command}' failed with exit code {exit_code}"
)

return output
return ""

async def load_json_from_cli(self, command: List[str]) -> dict:
"""Use the Meltano CLI to load JSON data.
Expand Down
2 changes: 1 addition & 1 deletion dagster_meltano/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def dagster_op(
env = {**env, **config_env}

# Run the Meltano command
output = meltano_resource.execute_command(f"{command}", env, context.log)
output = meltano_resource.execute_command(f"{command}", env, context)

# Return the logs
return output
Expand Down
Loading