-
Notifications
You must be signed in to change notification settings - Fork 1
Fix for standard error and output stream #170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
fcb6485
7e13f84
746818a
9491fd2
3281955
cfd0f18
3f6e788
24943d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,12 @@ | |
| TaskConstraints, | ||
| ) | ||
|
|
||
| from cfa.cloudops.task import ( | ||
| get_container_settings, | ||
| get_task_config, | ||
| output_task_files_to_blob, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| AZ_MOUNT_DIR = "$AZ_BATCH_NODE_MOUNTS_DIR" | ||
|
|
@@ -63,7 +69,6 @@ def _generate_command_for_saving_logs( | |
| command_line: str, | ||
| job_name: str, | ||
| task_id: str, | ||
| save_logs_rel_path: str, | ||
| logs_folder: str = "logs", | ||
| ) -> str: | ||
| """Generate a command line that saves stdout and stderr to log files. | ||
|
|
@@ -72,19 +77,15 @@ def _generate_command_for_saving_logs( | |
| command_line (str): Original command line to execute. | ||
| job_name (str): Name/ID of the job. | ||
| task_id (str): ID of the task. | ||
| save_logs_rel_path (str): Relative path where logs should be saved. | ||
| logs_folder (str): Subfolder name within the save_logs_rel_path to store logs. Defaults to "logs". | ||
| logs_folder (str): Subfolder name to store logs. Defaults to "logs". | ||
| """ | ||
| t = datetime.datetime.now(zi("America/New_York")) | ||
| s_time = t.strftime("%Y%m%d_%H%M%S") | ||
| if not save_logs_rel_path.startswith("/"): | ||
| save_logs_rel_path = "/" + save_logs_rel_path | ||
| _folder = f"{save_logs_rel_path}/{logs_folder}/" | ||
| stdout_file = f"{_folder}/stdout_{job_name}_{task_id}_{s_time}.txt" | ||
| stderr_file = f"{_folder}/stderr_{job_name}_{task_id}_{s_time}.txt" | ||
| stdout_file = f"/{logs_folder}/stdout_{job_name}_{task_id}_{s_time}.txt" | ||
| stderr_file = f"/{logs_folder}/stderr_{job_name}_{task_id}_{s_time}.txt" | ||
| logger.debug(f"Stdout will be saved to: '{stdout_file}'") | ||
| logger.debug(f"Stderr will be saved to: '{stderr_file}'") | ||
| return f"""/bin/bash -c "mkdir -p {_folder}; {command_line} > {stdout_file} 2> {stderr_file}" """ | ||
| return f"""/bin/bash -c "mkdir -p /{logs_folder}; {command_line} > >(tee {stdout_file}) 2> >(tee {stderr_file})" """ | ||
|
|
||
|
|
||
| def _generate_mount_string(mounts): | ||
|
|
@@ -1192,6 +1193,8 @@ def add_task( | |
| save_logs_rel_path: str | None = None, | ||
| logs_folder: str = "stdout_stderr", | ||
| name_suffix: str = "", | ||
| blob_container: str | None = None, | ||
| blob_storage_account: str | None = None, | ||
| mounts: list[dict] | None = None, | ||
| depends_on: str | list[str] | None = None, | ||
| depends_on_range: tuple | None = None, | ||
|
|
@@ -1219,6 +1222,10 @@ def add_task( | |
| logs_folder (str): Name of the folder to create for saving logs. Defaults to | ||
| "stdout_stderr". | ||
| name_suffix (str): Suffix to append to the task ID for uniqueness. Defaults to "". | ||
| blob_container (str, optional): Name of Azure blob storage container where the logs | ||
| should be uploaded. | ||
| blob_storage_account (str, optional): Name of Azure blob storage account where the logs | ||
| should be uploaded. If blob_container is specified, it should exist in the storage account. | ||
| mounts (list[dict], optional): List of mount configurations as dicts | ||
| of {"source": <container_name>, "target": <relative_mount_path>). | ||
| depends_on (str | list[str], optional): Task ID(s) that this task depends on. | ||
|
|
@@ -1309,24 +1316,16 @@ def add_task( | |
| logger.debug(f"Generated string-based task ID: '{task_id}'") | ||
|
|
||
| if save_logs_rel_path is not None: | ||
| if save_logs_rel_path == "ERROR!": | ||
| logger.warning("could not find rel path") | ||
| print( | ||
| "could not find rel path. Stdout and stderr will not be saved to blob storage." | ||
| ) | ||
| full_cmd = cmd_str | ||
| else: | ||
| logger.debug( | ||
| f"Configuring log saving to path: '{save_logs_rel_path}' in folder: '{logs_folder}'" | ||
| ) | ||
| full_cmd = _generate_command_for_saving_logs( | ||
| command_line=cmd_str, | ||
| job_name=job_name, | ||
| task_id=task_id, | ||
| save_logs_rel_path=save_logs_rel_path, | ||
| logs_folder=logs_folder, | ||
| ) | ||
| logger.debug(f"Modified command for log capture: '{full_cmd}'") | ||
| logger.debug( | ||
| f"Configuring log saving to blob container: '{save_logs_rel_path}' in folder: '{logs_folder}'" | ||
| ) | ||
| full_cmd = _generate_command_for_saving_logs( | ||
| command_line=cmd_str, | ||
| job_name=job_name, | ||
| task_id=task_id, | ||
| logs_folder=logs_folder, | ||
| ) | ||
| logger.debug(f"Modified command for log capture: '{full_cmd}'") | ||
| else: | ||
| logger.debug("No log saving configured - using command as-is") | ||
| full_cmd = cmd_str | ||
|
|
@@ -1352,14 +1351,26 @@ def add_task( | |
|
|
||
| # Create the task parameter | ||
| logger.debug("Creating task parameter object") | ||
| task_param = batch_models.TaskAddParameter( | ||
| id=task_id, | ||
| command_line=command_line, | ||
| container_settings=batch_models.TaskContainerSettings( | ||
| image_name=full_container_name, | ||
| container_run_options=container_run_options, | ||
| working_directory="containerImageDefault", | ||
| ), | ||
| output_files = None | ||
| if blob_container and blob_storage_account: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of blob_container we check for save_logs_blob_container or whatever the parameter name is that you choose (from the comment above)
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CloudClient will pass the self.save_logs_blob_container as value of blob_container parameter to batch_helpers.add_task |
||
| output_file = output_task_files_to_blob( | ||
| file_pattern="../std*.txt", | ||
| blob_container=blob_container, | ||
| blob_account=blob_storage_account, | ||
| path=f"{logs_folder}/{job_name}", | ||
| upload_condition="taskCompletion", | ||
| ) | ||
| output_files = [output_file] | ||
| container_settings = get_container_settings( | ||
| container_image_name=full_container_name, | ||
| additional_options=container_run_options, | ||
| working_directory="containerImageDefault", | ||
| ) | ||
| new_task = get_task_config( | ||
| task_id=task_id, | ||
| base_call=command_line, | ||
| output_files=output_files, | ||
| container_settings=container_settings, | ||
| user_identity=user_identity, | ||
| constraints=task_constraints, | ||
| depends_on=task_deps, | ||
|
|
@@ -1369,7 +1380,7 @@ def add_task( | |
|
|
||
| # Add the task to the job | ||
| logger.debug(f"Submitting task '{task_id}' to Azure Batch service") | ||
| batch_client.task.add(job_id=job_name, task=task_param) | ||
| batch_client.task.add(job_id=job_name, task=new_task) | ||
| logger.debug(f"Task '{task_id}' successfully added to job '{job_name}'") | ||
| return task_id | ||
|
|
||
|
|
@@ -1379,6 +1390,8 @@ def add_task_collection( | |
| task_id_base: str, | ||
| tasks: list[dict], | ||
| name_suffix: str = "", | ||
| blob_container: str | None = None, | ||
| blob_storage_account: str | None = None, | ||
| batch_client: object | None = None, | ||
| task_id_max: int = 0, | ||
| task_id_ints: bool = False, | ||
|
|
@@ -1412,6 +1425,10 @@ def add_task_collection( | |
| no timeout is set. | ||
| - full_container_name (str, optional): Full container image name to use for the task. | ||
| name_suffix (str): Suffix to append to the task ID for uniqueness. Defaults to "". | ||
| blob_container (str, optional): Name of Azure blob storage container where the logs | ||
| should be uploaded. | ||
| blob_storage_account (str, optional): Name of Azure blob storage account where the logs | ||
| should be uploaded. If blob_container is specified, it should exist in the storage account. | ||
| batch_client (object): Azure Batch service client instance for API calls. | ||
| task_id_max (int): Current maximum task ID number for generating unique task IDs. | ||
| Defaults to 0. | ||
|
|
@@ -1481,6 +1498,18 @@ def add_task_collection( | |
| ) | ||
|
|
||
| logger.debug("Creating task collection") | ||
|
|
||
| output_files = None | ||
| if blob_container and blob_storage_account: | ||
| output_file = output_task_files_to_blob( | ||
| file_pattern="../std*.txt", | ||
| blob_container=blob_container, | ||
| blob_account=blob_storage_account, | ||
| path=f"stdout_stderr/{job_name}", | ||
| upload_condition="taskCompletion", | ||
| ) | ||
| output_files = [output_file] | ||
|
|
||
| tasks_to_add = [] | ||
| for n, task in enumerate(tasks): | ||
| if task_id_ints: | ||
|
|
@@ -1499,7 +1528,6 @@ def add_task_collection( | |
| command_line=command_line, | ||
| job_name=job_name, | ||
| task_id=task_id, | ||
| save_logs_rel_path=save_logs_rel_path, | ||
| logs_folder=logs_folder, | ||
| ) | ||
| else: | ||
|
|
@@ -1523,14 +1551,16 @@ def add_task_collection( | |
| container_name = f"{job_name}_{str(task_id_max + 1)}" | ||
| container_run_options = f"--name={container_name} --rm " + mount_str | ||
|
|
||
| new_task = batch_models.TaskAddParameter( | ||
| id=task_id, | ||
| command_line=full_command, | ||
| container_settings=batch_models.TaskContainerSettings( | ||
| image_name=task["full_container_name"], | ||
| container_run_options=container_run_options, | ||
| working_directory="containerImageDefault", | ||
| ), | ||
| container_settings = get_container_settings( | ||
| container_image_name=task["full_container_name"], | ||
| additional_options=container_run_options, | ||
| working_directory="containerImageDefault", | ||
| ) | ||
| new_task = get_task_config( | ||
| task_id=task_id, | ||
| base_call=full_command, | ||
| output_files=output_files, | ||
| container_settings=container_settings, | ||
| user_identity=user_identity, | ||
| constraints=task_constraints, | ||
| depends_on=task_deps, | ||
|
|
@@ -1543,6 +1573,7 @@ def add_task_collection( | |
| logger.debug( | ||
| f"Adding '{len(tasks_to_add)}' to job '{job_name}' i Azure Batch service" | ||
| ) | ||
|
|
||
| result = batch_client.task.add_collection(job_id=job_name, value=tasks_to_add) | ||
| logger.debug(f"Successfully added {len(tasks_to_add)}' tasks job '{job_name}'") | ||
| return result | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this whole check be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need this check so I can modify the command sent to Azure task.