diff --git a/cfa/cloudops/_cloudclient.py b/cfa/cloudops/_cloudclient.py index 583e125..f2c280c 100644 --- a/cfa/cloudops/_cloudclient.py +++ b/cfa/cloudops/_cloudclient.py @@ -837,22 +837,6 @@ def add_task( container_name = container_image_name logger.debug(f"Using provided container name: {container_name}.") - if self.save_logs_to_blob: - logger.debug("Configuring log saving to blob storage.") - rel_mnt_path = batch_helpers.get_rel_mnt_path( - blob_name=self.save_logs_to_blob, - pool_name=pool_name, - resource_group_name=self.cred.azure_resource_group_name, - account_name=self.cred.azure_batch_account, - batch_mgmt_client=self.batch_mgmt_client, - ) - if rel_mnt_path != "ERROR!": - rel_mnt_path = "/" + helpers.format_rel_path(rel_path=rel_mnt_path) - logger.debug(f"Relative mount path for logs set to: {rel_mnt_path}") - else: - rel_mnt_path = None - logger.debug("No log saving to blob storage configured.") - # get all mounts from pool info if mount_pairs is None: self.mounts = batch_helpers.get_pool_mounts( @@ -871,13 +855,16 @@ def add_task( ] logger.debug("Adding tasks to job.") + tid = batch_helpers.add_task( job_name=job_name, task_id_base=job_name, command_line=command_line, - save_logs_rel_path=rel_mnt_path, + save_logs_rel_path=self.save_logs_to_blob, logs_folder=self.logs_folder, name_suffix=name_suffix, + blob_container=self.save_logs_to_blob, + blob_storage_account=self.cred.azure_blob_storage_account, mounts=self.mounts, depends_on=depends_on, depends_on_range=depends_on_range, diff --git a/cfa/cloudops/batch_helpers.py b/cfa/cloudops/batch_helpers.py index 1cbddb9..e2d7483 100644 --- a/cfa/cloudops/batch_helpers.py +++ b/cfa/cloudops/batch_helpers.py @@ -20,6 +20,12 @@ TaskConstraints, ) +from cfa.cloudops.task import ( + get_container_settings, + get_task_config, + output_task_files_to_blob, +) + logger = logging.getLogger(__name__) AZ_MOUNT_DIR = "$AZ_BATCH_NODE_MOUNTS_DIR" @@ -63,7 +69,6 @@ def _generate_command_for_saving_logs( command_line: str, job_name: str, task_id: str, - save_logs_rel_path: str, logs_folder: str = "logs", ) -> str: """Generate a command line that saves stdout and stderr to log files. @@ -72,19 +77,15 @@ def _generate_command_for_saving_logs( command_line (str): Original command line to execute. job_name (str): Name/ID of the job. task_id (str): ID of the task. - save_logs_rel_path (str): Relative path where logs should be saved. - logs_folder (str): Subfolder name within the save_logs_rel_path to store logs. Defaults to "logs". + logs_folder (str): Subfolder name to store logs. Defaults to "logs". """ t = datetime.datetime.now(zi("America/New_York")) s_time = t.strftime("%Y%m%d_%H%M%S") - if not save_logs_rel_path.startswith("/"): - save_logs_rel_path = "/" + save_logs_rel_path - _folder = f"{save_logs_rel_path}/{logs_folder}/" - stdout_file = f"{_folder}/stdout_{job_name}_{task_id}_{s_time}.txt" - stderr_file = f"{_folder}/stderr_{job_name}_{task_id}_{s_time}.txt" + stdout_file = f"/{logs_folder}/stdout_{job_name}_{task_id}_{s_time}.txt" + stderr_file = f"/{logs_folder}/stderr_{job_name}_{task_id}_{s_time}.txt" logger.debug(f"Stdout will be saved to: '{stdout_file}'") logger.debug(f"Stderr will be saved to: '{stderr_file}'") - return f"""/bin/bash -c "mkdir -p {_folder}; {command_line} > {stdout_file} 2> {stderr_file}" """ + return f"""/bin/bash -c "mkdir -p /{logs_folder}; {command_line} > >(tee {stdout_file}) 2> >(tee {stderr_file})" """ def _generate_mount_string(mounts): @@ -1192,6 +1193,8 @@ def add_task( save_logs_rel_path: str | None = None, logs_folder: str = "stdout_stderr", name_suffix: str = "", + blob_container: str | None = None, + blob_storage_account: str | None = None, mounts: list[dict] | None = None, depends_on: str | list[str] | None = None, depends_on_range: tuple | None = None, @@ -1219,6 +1222,10 @@ def add_task( logs_folder (str): Name of the folder to create for saving logs. Defaults to "stdout_stderr". name_suffix (str): Suffix to append to the task ID for uniqueness. Defaults to "". + blob_container (str, optional): Name of Azure blob storage container where the logs + should be uploaded. + blob_storage_account (str, optional): Name of Azure blob storage account where the logs + should be uploaded. If blob_container is specified, it should exist in the storage account. mounts (list[dict], optional): List of mount configurations as dicts of {"source": , "target": ). depends_on (str | list[str], optional): Task ID(s) that this task depends on. @@ -1309,24 +1316,16 @@ def add_task( logger.debug(f"Generated string-based task ID: '{task_id}'") if save_logs_rel_path is not None: - if save_logs_rel_path == "ERROR!": - logger.warning("could not find rel path") - print( - "could not find rel path. Stdout and stderr will not be saved to blob storage." - ) - full_cmd = cmd_str - else: - logger.debug( - f"Configuring log saving to path: '{save_logs_rel_path}' in folder: '{logs_folder}'" - ) - full_cmd = _generate_command_for_saving_logs( - command_line=cmd_str, - job_name=job_name, - task_id=task_id, - save_logs_rel_path=save_logs_rel_path, - logs_folder=logs_folder, - ) - logger.debug(f"Modified command for log capture: '{full_cmd}'") + logger.debug( + f"Configuring log saving to blob container: '{save_logs_rel_path}' in folder: '{logs_folder}'" + ) + full_cmd = _generate_command_for_saving_logs( + command_line=cmd_str, + job_name=job_name, + task_id=task_id, + logs_folder=logs_folder, + ) + logger.debug(f"Modified command for log capture: '{full_cmd}'") else: logger.debug("No log saving configured - using command as-is") full_cmd = cmd_str @@ -1352,14 +1351,26 @@ def add_task( # Create the task parameter logger.debug("Creating task parameter object") - task_param = batch_models.TaskAddParameter( - id=task_id, - command_line=command_line, - container_settings=batch_models.TaskContainerSettings( - image_name=full_container_name, - container_run_options=container_run_options, - working_directory="containerImageDefault", - ), + output_files = None + if blob_container and blob_storage_account: + output_file = output_task_files_to_blob( + file_pattern="../std*.txt", + blob_container=blob_container, + blob_account=blob_storage_account, + path=f"{logs_folder}/{job_name}", + upload_condition="taskCompletion", + ) + output_files = [output_file] + container_settings = get_container_settings( + container_image_name=full_container_name, + additional_options=container_run_options, + working_directory="containerImageDefault", + ) + new_task = get_task_config( + task_id=task_id, + base_call=command_line, + output_files=output_files, + container_settings=container_settings, user_identity=user_identity, constraints=task_constraints, depends_on=task_deps, @@ -1369,7 +1380,7 @@ def add_task( # Add the task to the job logger.debug(f"Submitting task '{task_id}' to Azure Batch service") - batch_client.task.add(job_id=job_name, task=task_param) + batch_client.task.add(job_id=job_name, task=new_task) logger.debug(f"Task '{task_id}' successfully added to job '{job_name}'") return task_id @@ -1379,6 +1390,8 @@ def add_task_collection( task_id_base: str, tasks: list[dict], name_suffix: str = "", + blob_container: str | None = None, + blob_storage_account: str | None = None, batch_client: object | None = None, task_id_max: int = 0, task_id_ints: bool = False, @@ -1412,6 +1425,10 @@ def add_task_collection( no timeout is set. - full_container_name (str, optional): Full container image name to use for the task. name_suffix (str): Suffix to append to the task ID for uniqueness. Defaults to "". + blob_container (str, optional): Name of Azure blob storage container where the logs + should be uploaded. + blob_storage_account (str, optional): Name of Azure blob storage account where the logs + should be uploaded. If blob_container is specified, it should exist in the storage account. batch_client (object): Azure Batch service client instance for API calls. task_id_max (int): Current maximum task ID number for generating unique task IDs. Defaults to 0. @@ -1481,6 +1498,18 @@ def add_task_collection( ) logger.debug("Creating task collection") + + output_files = None + if blob_container and blob_storage_account: + output_file = output_task_files_to_blob( + file_pattern="../std*.txt", + blob_container=blob_container, + blob_account=blob_storage_account, + path=f"stdout_stderr/{job_name}", + upload_condition="taskCompletion", + ) + output_files = [output_file] + tasks_to_add = [] for n, task in enumerate(tasks): if task_id_ints: @@ -1499,7 +1528,6 @@ def add_task_collection( command_line=command_line, job_name=job_name, task_id=task_id, - save_logs_rel_path=save_logs_rel_path, logs_folder=logs_folder, ) else: @@ -1523,14 +1551,16 @@ def add_task_collection( container_name = f"{job_name}_{str(task_id_max + 1)}" container_run_options = f"--name={container_name} --rm " + mount_str - new_task = batch_models.TaskAddParameter( - id=task_id, - command_line=full_command, - container_settings=batch_models.TaskContainerSettings( - image_name=task["full_container_name"], - container_run_options=container_run_options, - working_directory="containerImageDefault", - ), + container_settings = get_container_settings( + container_image_name=task["full_container_name"], + additional_options=container_run_options, + working_directory="containerImageDefault", + ) + new_task = get_task_config( + task_id=task_id, + base_call=full_command, + output_files=output_files, + container_settings=container_settings, user_identity=user_identity, constraints=task_constraints, depends_on=task_deps, @@ -1543,6 +1573,7 @@ def add_task_collection( logger.debug( f"Adding '{len(tasks_to_add)}' to job '{job_name}' i Azure Batch service" ) + result = batch_client.task.add_collection(job_id=job_name, value=tasks_to_add) logger.debug(f"Successfully added {len(tasks_to_add)}' tasks job '{job_name}'") return result diff --git a/changelog.md b/changelog.md index 1360053..aae034e 100644 --- a/changelog.md +++ b/changelog.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/). The versioning pattern is `major.minor.patch`. --- +## v0.4.0 +- Modified `cfa.cloudops.batch_helpers._generate_command_for_saving_logs` to copy data from standard input to standard output and standard error files + ## v0.3.22 - Added/updated node profiling option to `create_pool` function in `_cloudclient.py` diff --git a/pyproject.toml b/pyproject.toml index 5eaffcb..aad130c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "cfa.cloudops" -version = "0.3.22" +version = "0.4.0" description = "Cloud storage, batch, functions, MLOps assistance" authors = [ {name = "Ryan Raasch", email = "xng3@cdc.gov"}