Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 4 additions & 17 deletions cfa/cloudops/_cloudclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,22 +837,6 @@ def add_task(
container_name = container_image_name
logger.debug(f"Using provided container name: {container_name}.")

if self.save_logs_to_blob:
logger.debug("Configuring log saving to blob storage.")
rel_mnt_path = batch_helpers.get_rel_mnt_path(
blob_name=self.save_logs_to_blob,
pool_name=pool_name,
resource_group_name=self.cred.azure_resource_group_name,
account_name=self.cred.azure_batch_account,
batch_mgmt_client=self.batch_mgmt_client,
)
if rel_mnt_path != "ERROR!":
rel_mnt_path = "/" + helpers.format_rel_path(rel_path=rel_mnt_path)
logger.debug(f"Relative mount path for logs set to: {rel_mnt_path}")
else:
rel_mnt_path = None
logger.debug("No log saving to blob storage configured.")

# get all mounts from pool info
if mount_pairs is None:
self.mounts = batch_helpers.get_pool_mounts(
Expand All @@ -871,13 +855,16 @@ def add_task(
]

logger.debug("Adding tasks to job.")

tid = batch_helpers.add_task(
job_name=job_name,
task_id_base=job_name,
command_line=command_line,
save_logs_rel_path=rel_mnt_path,
save_logs_rel_path=self.save_logs_to_blob,
logs_folder=self.logs_folder,
name_suffix=name_suffix,
blob_container=self.save_logs_to_blob,
blob_storage_account=self.cred.azure_blob_storage_account,
mounts=self.mounts,
depends_on=depends_on,
depends_on_range=depends_on_range,
Expand Down
121 changes: 76 additions & 45 deletions cfa/cloudops/batch_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
TaskConstraints,
)

from cfa.cloudops.task import (
get_container_settings,
get_task_config,
output_task_files_to_blob,
)

logger = logging.getLogger(__name__)

AZ_MOUNT_DIR = "$AZ_BATCH_NODE_MOUNTS_DIR"
Expand Down Expand Up @@ -63,7 +69,6 @@ def _generate_command_for_saving_logs(
command_line: str,
job_name: str,
task_id: str,
save_logs_rel_path: str,
logs_folder: str = "logs",
) -> str:
"""Generate a command line that saves stdout and stderr to log files.
Expand All @@ -72,19 +77,15 @@ def _generate_command_for_saving_logs(
command_line (str): Original command line to execute.
job_name (str): Name/ID of the job.
task_id (str): ID of the task.
save_logs_rel_path (str): Relative path where logs should be saved.
logs_folder (str): Subfolder name within the save_logs_rel_path to store logs. Defaults to "logs".
logs_folder (str): Subfolder name to store logs. Defaults to "logs".
"""
t = datetime.datetime.now(zi("America/New_York"))
s_time = t.strftime("%Y%m%d_%H%M%S")
if not save_logs_rel_path.startswith("/"):
save_logs_rel_path = "/" + save_logs_rel_path
_folder = f"{save_logs_rel_path}/{logs_folder}/"
stdout_file = f"{_folder}/stdout_{job_name}_{task_id}_{s_time}.txt"
stderr_file = f"{_folder}/stderr_{job_name}_{task_id}_{s_time}.txt"
stdout_file = f"/{logs_folder}/stdout_{job_name}_{task_id}_{s_time}.txt"
stderr_file = f"/{logs_folder}/stderr_{job_name}_{task_id}_{s_time}.txt"
logger.debug(f"Stdout will be saved to: '{stdout_file}'")
logger.debug(f"Stderr will be saved to: '{stderr_file}'")
return f"""/bin/bash -c "mkdir -p {_folder}; {command_line} > {stdout_file} 2> {stderr_file}" """
return f"""/bin/bash -c "mkdir -p /{logs_folder}; {command_line} > >(tee {stdout_file}) 2> >(tee {stderr_file})" """


def _generate_mount_string(mounts):
Expand Down Expand Up @@ -1192,6 +1193,8 @@ def add_task(
save_logs_rel_path: str | None = None,
logs_folder: str = "stdout_stderr",
name_suffix: str = "",
blob_container: str | None = None,
blob_storage_account: str | None = None,
mounts: list[dict] | None = None,
depends_on: str | list[str] | None = None,
depends_on_range: tuple | None = None,
Expand Down Expand Up @@ -1219,6 +1222,10 @@ def add_task(
logs_folder (str): Name of the folder to create for saving logs. Defaults to
"stdout_stderr".
name_suffix (str): Suffix to append to the task ID for uniqueness. Defaults to "".
blob_container (str, optional): Name of Azure blob storage container where the logs
should be uploaded.
blob_storage_account (str, optional): Name of Azure blob storage account where the logs
should be uploaded. If blob_container is specified, it should exist in the storage account.
mounts (list[dict], optional): List of mount configurations as dicts
of {"source": <container_name>, "target": <relative_mount_path>).
depends_on (str | list[str], optional): Task ID(s) that this task depends on.
Expand Down Expand Up @@ -1309,24 +1316,16 @@ def add_task(
logger.debug(f"Generated string-based task ID: '{task_id}'")

if save_logs_rel_path is not None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this whole check be removed?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need this check so I can modify the command sent to Azure task.

if save_logs_rel_path == "ERROR!":
logger.warning("could not find rel path")
print(
"could not find rel path. Stdout and stderr will not be saved to blob storage."
)
full_cmd = cmd_str
else:
logger.debug(
f"Configuring log saving to path: '{save_logs_rel_path}' in folder: '{logs_folder}'"
)
full_cmd = _generate_command_for_saving_logs(
command_line=cmd_str,
job_name=job_name,
task_id=task_id,
save_logs_rel_path=save_logs_rel_path,
logs_folder=logs_folder,
)
logger.debug(f"Modified command for log capture: '{full_cmd}'")
logger.debug(
f"Configuring log saving to blob container: '{save_logs_rel_path}' in folder: '{logs_folder}'"
)
full_cmd = _generate_command_for_saving_logs(
command_line=cmd_str,
job_name=job_name,
task_id=task_id,
logs_folder=logs_folder,
)
logger.debug(f"Modified command for log capture: '{full_cmd}'")
else:
logger.debug("No log saving configured - using command as-is")
full_cmd = cmd_str
Expand All @@ -1352,14 +1351,26 @@ def add_task(

# Create the task parameter
logger.debug("Creating task parameter object")
task_param = batch_models.TaskAddParameter(
id=task_id,
command_line=command_line,
container_settings=batch_models.TaskContainerSettings(
image_name=full_container_name,
container_run_options=container_run_options,
working_directory="containerImageDefault",
),
output_files = None
if blob_container and blob_storage_account:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of blob_container we check for save_logs_blob_container or whatever the parameter name is that you choose (from the comment above)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CloudClient will pass the self.save_logs_blob_container as value of blob_container parameter to batch_helpers.add_task

output_file = output_task_files_to_blob(
file_pattern="../std*.txt",
blob_container=blob_container,
blob_account=blob_storage_account,
path=f"{logs_folder}/{job_name}",
upload_condition="taskCompletion",
)
output_files = [output_file]
container_settings = get_container_settings(
container_image_name=full_container_name,
additional_options=container_run_options,
working_directory="containerImageDefault",
)
new_task = get_task_config(
task_id=task_id,
base_call=command_line,
output_files=output_files,
container_settings=container_settings,
user_identity=user_identity,
constraints=task_constraints,
depends_on=task_deps,
Expand All @@ -1369,7 +1380,7 @@ def add_task(

# Add the task to the job
logger.debug(f"Submitting task '{task_id}' to Azure Batch service")
batch_client.task.add(job_id=job_name, task=task_param)
batch_client.task.add(job_id=job_name, task=new_task)
logger.debug(f"Task '{task_id}' successfully added to job '{job_name}'")
return task_id

Expand All @@ -1379,6 +1390,8 @@ def add_task_collection(
task_id_base: str,
tasks: list[dict],
name_suffix: str = "",
blob_container: str | None = None,
blob_storage_account: str | None = None,
batch_client: object | None = None,
task_id_max: int = 0,
task_id_ints: bool = False,
Expand Down Expand Up @@ -1412,6 +1425,10 @@ def add_task_collection(
no timeout is set.
- full_container_name (str, optional): Full container image name to use for the task.
name_suffix (str): Suffix to append to the task ID for uniqueness. Defaults to "".
blob_container (str, optional): Name of Azure blob storage container where the logs
should be uploaded.
blob_storage_account (str, optional): Name of Azure blob storage account where the logs
should be uploaded. If blob_container is specified, it should exist in the storage account.
batch_client (object): Azure Batch service client instance for API calls.
task_id_max (int): Current maximum task ID number for generating unique task IDs.
Defaults to 0.
Expand Down Expand Up @@ -1481,6 +1498,18 @@ def add_task_collection(
)

logger.debug("Creating task collection")

output_files = None
if blob_container and blob_storage_account:
output_file = output_task_files_to_blob(
file_pattern="../std*.txt",
blob_container=blob_container,
blob_account=blob_storage_account,
path=f"stdout_stderr/{job_name}",
upload_condition="taskCompletion",
)
output_files = [output_file]

tasks_to_add = []
for n, task in enumerate(tasks):
if task_id_ints:
Expand All @@ -1499,7 +1528,6 @@ def add_task_collection(
command_line=command_line,
job_name=job_name,
task_id=task_id,
save_logs_rel_path=save_logs_rel_path,
logs_folder=logs_folder,
)
else:
Expand All @@ -1523,14 +1551,16 @@ def add_task_collection(
container_name = f"{job_name}_{str(task_id_max + 1)}"
container_run_options = f"--name={container_name} --rm " + mount_str

new_task = batch_models.TaskAddParameter(
id=task_id,
command_line=full_command,
container_settings=batch_models.TaskContainerSettings(
image_name=task["full_container_name"],
container_run_options=container_run_options,
working_directory="containerImageDefault",
),
container_settings = get_container_settings(
container_image_name=task["full_container_name"],
additional_options=container_run_options,
working_directory="containerImageDefault",
)
new_task = get_task_config(
task_id=task_id,
base_call=full_command,
output_files=output_files,
container_settings=container_settings,
user_identity=user_identity,
constraints=task_constraints,
depends_on=task_deps,
Expand All @@ -1543,6 +1573,7 @@ def add_task_collection(
logger.debug(
f"Adding '{len(tasks_to_add)}' to job '{job_name}' i Azure Batch service"
)

result = batch_client.task.add_collection(job_id=job_name, value=tasks_to_add)
logger.debug(f"Successfully added {len(tasks_to_add)}' tasks job '{job_name}'")
return result
Expand Down
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/).
The versioning pattern is `major.minor.patch`.

---
## v0.4.0
- Modified `cfa.cloudops.batch_helpers._generate_command_for_saving_logs` to copy data from standard input to standard output and standard error files

## v0.3.22
- Added/updated node profiling option to `create_pool` function in `_cloudclient.py`

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "cfa.cloudops"
version = "0.3.22"
version = "0.4.0"
description = "Cloud storage, batch, functions, MLOps assistance"
authors = [
{name = "Ryan Raasch", email = "xng3@cdc.gov"}
Expand Down
Loading