diff --git a/app.py b/app.py index dc6789d..32404a6 100644 --- a/app.py +++ b/app.py @@ -8,6 +8,8 @@ # agreement to the Shotgun Pipeline Toolkit Source Code License. All rights # not expressly granted therein are reserved by Autodesk, Inc. +from types import ModuleType + import sgtk @@ -21,6 +23,7 @@ def init_app(self): """Called as the application is being initialized.""" tk_multi_breakdown2 = self.import_module("tk_multi_breakdown2") + self._flowam = tk_multi_breakdown2.flowam # Store a reference to manager class to expose its functionality at the application level. self._manager_class = tk_multi_breakdown2.BreakdownManager @@ -141,3 +144,15 @@ def _on_dialog_close(self, dialog): elif dialog == self._current_panel: self.log_debug("Current panel has been closed, clearing reference.") self._current_panel = None + + @property + def flowam(self) -> ModuleType: + """ + Access to the FlowAM integration module for this app. This module provides + drop-in replacements for the standard Shotgun-based Scene Breakdown models and actions, + backed by Flow Asset Management (FlowAM). + + :returns: The FlowAM integration module for this app + :rtype: :mod:`tk_multi_breakdown2.flowam` + """ + return self._flowam diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 0602b72..0b4e8cd 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -43,3 +43,4 @@ jobs: additional_repositories: - name: tk-framework-qtwidgets - name: tk-framework-shotgunutils + tk_core_ref: ticket/sg-43461/migrate-host-base diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..61ca777 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,3 @@ +ignore: + # flowam and other files not covered by unit tests + - "**python/tk_multi_breakdown2/flowam/*" diff --git a/hooks/flowam_scene_operations.py b/hooks/flowam_scene_operations.py deleted file mode 100644 index 01ebf9d..0000000 --- a/hooks/flowam_scene_operations.py +++ /dev/null @@ -1,426 +0,0 @@ -# Copyright (c) 2025 Autodesk, Inc. -# -# CONFIDENTIAL AND PROPRIETARY -# -# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit -# Source Code License included in this distribution package. See LICENSE. -# By accessing, using, copying or modifying this work you indicate your -# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights -# not expressly granted therein are reserved by Autodesk, Inc. - -from __future__ import annotations - -from types import ModuleType -from typing import TYPE_CHECKING, Any, Optional - -if TYPE_CHECKING: - from tk_multi_breakdown2.api.item import FileItem - -import sgtk - -HookBaseClass = sgtk.get_hook_baseclass() - - -class FlowBreakdownSceneOperations(HookBaseClass): - """ - Breakdown operations for Flow Asset Manager integration. - - This implementation handles detection of scene dependencies - across different DCC applications. - """ - - def _get_published_file_type( - self, asset: Any, flow_module: ModuleType - ) -> Optional[dict[str, Any]]: - """Return a PublishedFileType-shaped stub derived from the asset's type_ids. - - Uses the first type_id in asset.type_ids and resolves its display name via - flow_module.schema.get_schema_display_name. Returns None if no type is available. - - Args: - asset: A Flow Asset object exposing a type_ids attribute (list of str). - flow_module: The imported flow framework module. - Returns: - A dict shaped like a SG PublishedFileType entity, or None. - """ - if not asset or not getattr(asset, "type_ids", None): - return None - type_id = asset.type_ids[0] - try: - display_name = flow_module.schema.get_schema_display_name(type_id) - except flow_module.FlowError: - display_name = type_id - return {"type": "PublishedFileType", "id": None, "code": display_name} - - def get_scene_objects_and_publishes( - self, - manager: Any, - published_file_fields: list[str], - bg_task_manager: Optional[Any], - ) -> tuple[list[dict[str, Any]], Any]: - """ - Retrieve the scene objects and their associated published files - using Flow Asset Manager integration. - - Each dependency is mapped to a stub entry shaped like a SG PublishedFile - but populated with MEDM data (asset_id, revision_id, version_number, etc.) - so the breakdown app can build its file item model and determine statuses. - - Args: - manager: The breakdown manager instance, used to get published file fields. - published_file_fields: List of PublishedFile field names to query from Shotgun. - bg_task_manager: Background task manager for async operations. If None, execute synchronously. - Returns: - A tuple of (scene_objects, published_file_data) where: - - scene_objects is a list of dictionaries with keys: node_name, node_type, path - - published_file_data is either a background task id or a dictionary mapping - file paths to a stub PublishedFile dict populated with MEDM data. - """ - - self.logger.debug( - "=== FlowAM Scene Breakdown: get_scene_objects_and_publishes ===" - ) - - flow_am_fw = self.load_framework("tk-framework-flowam_v1.x.x") - flow_module = flow_am_fw.import_module("flow") - - flow_dependencies = flow_module.asset_management.get_dependencies() - - scene_objects = [ - { - "node_name": dep_info.node_handle, - "node_type": dep_info.node_type, - "path": dep_info.file_path, - } - for dep_info in flow_dependencies - ] - - project = self.parent.context.project - - def build_published_file_stubs(): - result = {} - for dep_info in flow_dependencies: - version_number = None - created_at = None # dep_info doesn't have it. Skip for now. - if dep_info.version_id: - version_number = dep_info.version_num - - thumbnail_path = None - if dep_info.revision_id: - try: - thumbnail_path = ( - flow_module.asset_management.get_thumbnail_file( - dep_info.revision_id - ) - ) - except flow_module.FlowError: - self.logger.warning( - f"No thumbnail path found for revision {dep_info.revision_id}" - ) - - entity = None - published_file_type = None - if dep_info.asset_id: - entity = { - "type": "Asset", - "id": dep_info.asset_id, - "name": None, - } - revision = flow_module.data.AssetRevision(dep_info.revision_id) - type_comps = revision.find_components( - type_id=flow_module.data.BASE_TYPE_ID - ) - type_ids = [c.type_id for c in type_comps] - published_file_type = None - if type_ids: - try: - display_name = flow_module.schema.get_schema_display_name( - type_ids[0] - ) - except flow_module.FlowError: - display_name = type_ids[0] - published_file_type = { - "type": "PublishedFileType", - "id": None, - "code": display_name, - } - - stub = { - "type": "PublishedFile", - "id": None, - "project": project, - "entity": entity, - "name": dep_info.component_name or dep_info.node_handle, - "task": None, - "published_file_type": published_file_type, - "published_file_type.PublishedFileType.code": ( - published_file_type["code"] if published_file_type else None - ), - "path": {"local_path": dep_info.file_path}, - "version_number": version_number, - "created_at": created_at, - "sg_flow_revision_id": dep_info.revision_id, - "sg_flow_asset_id": dep_info.asset_id, - "sg_flow_version_id": dep_info.version_id, - "sg_flow_blob_index": dep_info.blob_index, - "sg_flow_thumbnail_path": thumbnail_path, - } - result[dep_info.file_path] = stub - return result - - if bg_task_manager: - return scene_objects, bg_task_manager.add_task(build_published_file_stubs) - - return scene_objects, build_published_file_stubs() - - def get_published_files_for_items( - self, - items: list[FileItem], - bg_task_manager: Optional[Any] = None, - **kwargs: Any, - ) -> Any: - """ - Get all published file revisions for the given items using MEDM. - - Queries MEDM for all numbered versions of each item's asset, returning - them as PublishedFile-shaped dicts so the breakdown model can build its - version mapping and determine statuses. - - Args: - items: List of FileItem objects to get published files for. - bg_task_manager: If provided, the query runs async and the task id - is returned. Otherwise executes synchronously. - Returns: - If async, the background task id. Otherwise a list of dicts shaped - like SG PublishedFile entities, sorted newest-first per asset. - """ - - def _fetch_all_versions(): - flow_am_fw = self.load_framework("tk-framework-flowam_v1.x.x") - flow_module = flow_am_fw.import_module("flow") - - project = self.parent.context.project - result = [] - processed_assets = {} - - for item in items: - item_data = item.sg_data or {} - asset_id = item_data.get("sg_flow_asset_id") - if not asset_id: - continue - - name = item_data.get("name") - blob_index = item_data.get("sg_flow_blob_index", 0) - - if asset_id not in processed_assets: - try: - asset = flow_module.data.Asset(asset_id) - versions = list(asset.iterate_versions()) - processed_assets[asset_id] = (asset, versions) - except flow_module.FlowError: - self.logger.warning( - f"Failed to query versions for asset {asset_id}" - ) - continue - - asset, versions = processed_assets[asset_id] - published_file_type = self._get_published_file_type(asset, flow_module) - - for version in versions: - revision = version.revision - local_path = None - try: - local_path = revision.get_storage_source_path(blob_index) - except flow_module.FlowError: - pass - - created_at = version.created_at - - thumbnail_path = None - try: - thumbnail_path = ( - flow_module.asset_management.get_thumbnail_file(revision.id) - ) - except flow_module.FlowError: - self.logger.warning( - f"Failed to get thumbnail path for revision {revision.id}" - ) - - result.append( - { - "type": "PublishedFile", - "id": None, - "project": project, - "entity": { - "type": "Asset", - "id": asset_id, - "name": asset.name, - }, - "name": name, - "task": None, - "published_file_type": published_file_type, - "published_file_type.PublishedFileType.code": ( - published_file_type["code"] - if published_file_type - else None - ), - "path": {"local_path": local_path} if local_path else None, - "version_number": version.version_number, - "created_at": created_at, - "sg_flow_revision_id": revision.id, - "sg_flow_asset_id": asset_id, - "sg_flow_version_id": version.id, - "sg_flow_thumbnail_path": thumbnail_path, - } - ) - - return result - - if bg_task_manager: - return bg_task_manager.add_task(_fetch_all_versions) - return _fetch_all_versions() - - def get_latest_published_file( - self, - item: FileItem, - bg_task_manager: Optional[Any] = None, - **kwargs: Any, - ) -> Any: - """ - Get the latest published file (revision) for a single item using MEDM. - - Args: - item: FileItem to get the latest revision for. - bg_task_manager: If provided, the query runs async and the task id - is returned. Otherwise executes synchronously. - Returns: - If async, the background task id. Otherwise a dict shaped like a - SG PublishedFile entity representing the latest revision. - """ - - def _fetch_latest(): - flow_am_fw = self.load_framework("tk-framework-flowam_v1.x.x") - flow_module = flow_am_fw.import_module("flow") - - item_data = item.sg_data or {} - asset_id = item_data.get("sg_flow_asset_id") - if not asset_id: - return {} - - project = self.parent.context.project - blob_index = item_data.get("sg_flow_blob_index", 0) - - try: - asset = flow_module.data.Asset(asset_id) - latest_revision = asset.get_latest_revision() - if not latest_revision: - self.logger.warning( - f"No latest revision found for asset {asset_id}" - ) - return {} - except flow_module.FlowError as e: - self.logger.warning( - f"Failed to get latest revision for asset {asset_id}: {e}" - ) - return {} - - published_file_type = self._get_published_file_type(asset, flow_module) - - local_path = None - try: - local_path = latest_revision.get_storage_source_path(blob_index) - except flow_module.FlowError: - pass - - created_at = asset.created_at - - return { - "type": "PublishedFile", - "id": None, - "project": project, - "entity": { - "type": "Asset", - "id": asset_id, - "name": asset.name, - }, - "name": item_data.get("name"), - "task": None, - "published_file_type": published_file_type, - "published_file_type.PublishedFileType.code": ( - published_file_type["code"] if published_file_type else None - ), - "path": {"local_path": local_path} if local_path else None, - "version_number": asset.version_number, - "created_at": created_at, - "sg_flow_revision_id": latest_revision.id, - "sg_flow_asset_id": asset_id, - "sg_flow_version_id": asset.version_id, - } - - if bg_task_manager: - return bg_task_manager.add_task(_fetch_latest) - return _fetch_latest() - - def update_to_latest(self, items: list[FileItem]) -> list[FileItem]: - """Update the given items in the scene. - - Args: - items: list of file item object to update - Returns: - list of file item that were updated - """ - flow_am_fw = self.load_framework("tk-framework-flowam_v1.x.x") - flow_module = flow_am_fw.import_module("flow") - - items_to_update = [] - for file_item in items: - res = flow_module.asset_management.update_dependency( - file_item.sg_data["sg_flow_revision_id"], - node_handle=file_item.node_name, - ) - if res: - items_to_update.append(file_item) - - return items_to_update - - def update_to_revision( - self, - item: Optional[dict[str, Any]], - item_data: Optional[dict[str, Any]] = None, - ) -> bool: - """Update the item to a specific version. - - Args: - item: Dictionary representation of the FileItem to update - item_data: Dictionary of Flow Production Tracking data representing the target - published file revision to update to. - - Returns: - True if the item requires the data model to update, else False will not - trigger a model update. - """ - # Validate item_data contains the required revision ID - if not item_data or not item_data.get("sg_flow_revision_id"): - self.logger.warning( - "Cannot update to revision: item_data is missing or lacks sg_flow_revision_id" - ) - return False - - # Validate item structure - sg_data = item.get("sg_data") if item else None - if not sg_data or not sg_data.get("sg_flow_revision_id"): - self.logger.warning( - "Cannot update to revision: item sg_data is missing or lacks sg_flow_revision_id" - ) - return False - - flow_am_fw = self.load_framework("tk-framework-flowam_v1.x.x") - flow_module = flow_am_fw.import_module("flow") - - do_update = flow_module.asset_management.update_dependency( - revision_id=sg_data["sg_flow_revision_id"], - new_revision_id=item_data["sg_flow_revision_id"], - node_handle=item.get("node_name"), - ) - - return do_update diff --git a/info.yml b/info.yml index 32a691d..83388e4 100644 --- a/info.yml +++ b/info.yml @@ -126,13 +126,6 @@ configuration: create and execute actions. default_value: {} - enable_flowam: - type: bool - default_value: False - description: Set to True to enable the Flow Asset Management integration. - It requires three functions named `update_to_revision`, `update_to_latest`, - and `get_scene_objects_and_publishes` to be implemented in the scene operations hook. - # The Flow Production Tracking fields this app needs in order to operate correctly requires_shotgun_fields: # linked_projects.Asset is required for references in multiple Flow Production Tracking projects diff --git a/python/tk_multi_breakdown2/__init__.py b/python/tk_multi_breakdown2/__init__.py index 5387c57..83dc5a3 100644 --- a/python/tk_multi_breakdown2/__init__.py +++ b/python/tk_multi_breakdown2/__init__.py @@ -8,6 +8,7 @@ # agreement to the Shotgun Pipeline Toolkit Source Code License. All rights # not expressly granted therein are reserved by Autodesk, Inc. +from . import flowam # noqa: F401 from .api import BreakdownManager try: diff --git a/python/tk_multi_breakdown2/api/manager.py b/python/tk_multi_breakdown2/api/manager.py index 261e00c..7ff9897 100644 --- a/python/tk_multi_breakdown2/api/manager.py +++ b/python/tk_multi_breakdown2/api/manager.py @@ -24,6 +24,8 @@ def __init__(self, bundle): """Initialize the manager.""" self._bundle = bundle + _engine = sgtk.platform.current_engine() + self._flow_host = _engine.flow_host if _engine else None @sgtk.LogManager.log_timing def get_scene_objects(self, execute_in_main_thread=True): @@ -245,7 +247,7 @@ def get_latest_published_file( :param data_retriever: If provided, the api request will be async. The default value will execute the api request synchronously. :type data_retriever: ShotgunDataRetriever - :param bg_task_manager: If provided with enable_flowam, used for async execution. + :param bg_task_manager: Used for async execution. :type bg_task_manager: BackgroundTaskManager :return: The latest published file as a Flow Production Tracking entity dictionary if the request was @@ -257,14 +259,14 @@ def get_latest_published_file( if not item or not item.sg_data: return None if is_async else {} - if self._bundle.get_setting("enable_flowam"): - result = self._bundle.execute_hook_method( - "hook_scene_operations", - "get_latest_published_file", + if self._bundle.context.flow_project_id and self._flow_host: + result = self._bundle.flowam.get_latest_revision( item=item, bg_task_manager=bg_task_manager, ) if not is_async: + if not isinstance(result, dict): + result = {} item.latest_published_file = result return result @@ -304,7 +306,7 @@ def get_published_files_for_items( :param data_retriever: If provided, the api request will be async. The default value will execute the api request synchronously. :type data_retriever: ShotgunDataRetriever - :param bg_task_manager: If provided with enable_flowam, used for async execution. + :param bg_task_manager: Used for async execution. :type bg_task_manager: BackgroundTaskManager :return: If the request is async, then the request task id is returned, else the @@ -317,10 +319,8 @@ def get_published_files_for_items( if not items: return None if is_async else {} - if self._bundle.get_setting("enable_flowam"): - return self._bundle.execute_hook_method( - "hook_scene_operations", - "get_published_files_for_items", + if self._bundle.context.flow_project_id and self._flow_host: + return self._bundle.flowam.get_assets_for_items( items=items, bg_task_manager=bg_task_manager, ) @@ -358,7 +358,7 @@ def get_published_file_history( :param data_retriever: If provided, the api request will be async. The default value will execute the api request synchronously. :type data_retriever: ShotgunDataRetriever - :param bg_task_manager: If provided with enable_flowam, used for async execution. + :param bg_task_manager: Used for async execution. :type bg_task_manager: BackgroundTaskManager :return: If the request is async, then the request task id is returned, else the @@ -393,18 +393,26 @@ def update_to_latest_version(self, items): if not isinstance(items, list): items = [items] - if self._bundle.get_setting("enable_flowam"): - try: - return self._bundle.execute_hook_method( - "hook_scene_operations", - "update_to_latest", - items=items, - ) - except Exception as e: - self._bundle.logger.error( - f"Failed to execute hook method 'update_to_latest'. {e}" - ) - return [] + if self._bundle.context.flow_project_id and self._flow_host: + items_to_update = self._bundle.flowam.update_to_latest(items) + + # The FlowAM method performs the DCC-side update but does not update the + # Python FileItem model data. We do that here so callers always get a + # consistent, up-to-date list of updated FileItem objects regardless of + # which code path ran. A non-list return (including None) means the hook + # chose not to filter, so attempt to update all items. + if not isinstance(items_to_update, list): + items_to_update = items + + updated_items = [] + for item in items_to_update: + data = item.latest_published_file + if not data or not data.get("path", {}).get("local_path", None): + continue + item.sg_data = data + item.path = data["path"]["local_path"] + updated_items.append(item) + return updated_items # First try to execute the hook method to update items in batch for performance. try: @@ -499,13 +507,19 @@ def update_to_specific_version(self, item, sg_data): if not sg_data or not sg_data.get("path", {}).get("local_path", None): return False - if self._bundle.get_setting("enable_flowam"): - return self._bundle.execute_hook_method( - "hook_scene_operations", - "update_to_revision", + if self._bundle.context.flow_project_id and self._flow_host: + do_update = self._bundle.flowam.update_to_revision( item=item.to_dict(), item_data=sg_data, ) + if do_update is None: + # Default to True if the hook return value was not explictly set + do_update = True + + if do_update: + item.sg_data = sg_data + item.path = sg_data["path"]["local_path"] + return do_update item_dict = item.to_dict() item_dict["path"] = sg_data["path"]["local_path"] diff --git a/python/tk_multi_breakdown2/dialog.py b/python/tk_multi_breakdown2/dialog.py index 9b28ea9..f25d815 100644 --- a/python/tk_multi_breakdown2/dialog.py +++ b/python/tk_multi_breakdown2/dialog.py @@ -1349,6 +1349,12 @@ def _on_update_selected_to_latest(self): self._listen_for_events(False) try: ActionManager.execute_update_to_latest_action(file_items, self._file_model) + except Exception as e: + QtGui.QMessageBox.critical( + None, + "Scene Breakdown", + "Error: {}".format(e), + ) finally: self.__executing_bulk_action = False # Turn on event handling if it was on before diff --git a/python/tk_multi_breakdown2/file_history_model.py b/python/tk_multi_breakdown2/file_history_model.py index e2633ab..d892519 100644 --- a/python/tk_multi_breakdown2/file_history_model.py +++ b/python/tk_multi_breakdown2/file_history_model.py @@ -66,6 +66,7 @@ def __init__(self, parent, bg_task_manager): ShotgunModel.__init__(self, parent, bg_task_manager=bg_task_manager) self._app = sgtk.platform.current_bundle() + self._engine = sgtk.platform.current_engine() self.__manager = self._app.create_breakdown_manager() self.__bg_task_manager = bg_task_manager self.__pending_medm_history_task = None @@ -163,7 +164,7 @@ def load_data(self, parent_file): else -1 ) - if self._app.get_setting("enable_flowam"): + if self._app.context.flow_project_id and self._engine.flow_host: self._load_medm_history(parent_file) return diff --git a/python/tk_multi_breakdown2/file_item_model.py b/python/tk_multi_breakdown2/file_item_model.py index 4480731..1e92df3 100644 --- a/python/tk_multi_breakdown2/file_item_model.py +++ b/python/tk_multi_breakdown2/file_item_model.py @@ -132,6 +132,7 @@ def __init__( # ------------------------------------------------------------------------------------ self._app = sgtk.platform.current_bundle() + self._engine = sgtk.platform.current_engine() # Flag indicating if the model is dynamically loaded as it is retrieved async. False # will show a loader until all data is loaded in. @@ -694,7 +695,12 @@ def reload(self): # all async tasks are complete to reload the model. self.stop_timer() - if not self._app.get_setting("enable_flowam"): + if self._app.context.flow_project_id and self._engine.flow_host: + ( + self.__scene_objects, + self.__pending_published_file_data_request, + ) = self._app.flowam.get_scene_objects(self._bg_task_manager) + else: # Run the scan scene method in the main thread (not a background task) since this # may cause issues for certain DCCs self.__scene_objects = self._manager.get_scene_objects() @@ -711,17 +717,7 @@ def reload(self): bg_task_manager=self._bg_task_manager, ) ) - else: - ( - self.__scene_objects, - self.__pending_published_file_data_request, - ) = self._app.execute_hook_method( - "hook_scene_operations", - "get_scene_objects_and_publishes", - manager=self._manager, - published_file_fields=self._published_file_fields, - bg_task_manager=self._bg_task_manager, - ) + except Exception: # Reset on failure to reload self.__pending_published_file_data_request = None @@ -1192,7 +1188,7 @@ def _get_published_files_for_items(self, file_items, data_retriever=None): will execute synchronously. For async requests, the background task id will be returned, else the published file data will be returned for synchronous requests. - When enable_flowam is enabled, the bg_task_manager is used for async execution + When FlowAM is enabled, the bg_task_manager is used for async execution instead of the SG data_retriever, since the data comes from FlowAM rather than FPT. :param file_items: The file item objects to get the published file data for. @@ -1206,7 +1202,7 @@ def _get_published_files_for_items(self, file_items, data_retriever=None): :rtype: str | dict """ - if self._app.get_setting("enable_flowam"): + if self._app.context.flow_project_id and self._engine.flow_host: bg = self._bg_task_manager if data_retriever else None return self._manager.get_published_files_for_items( file_items, bg_task_manager=bg @@ -1549,7 +1545,7 @@ def _on_background_task_completed(self, uid, group_id, result): # For FlowAM items, the thumbnail path may already be resolved in the stub # data. Set it now so it's available when the model items are created. - if self._app.get_setting("enable_flowam"): + if self._app.context.flow_project_id and self._engine.flow_host: for file_item in self.__file_items: thumb = (file_item.sg_data or {}).get("sg_flow_thumbnail_path") if thumb: @@ -1564,7 +1560,7 @@ def _on_background_task_completed(self, uid, group_id, result): ) elif uid == self.__pending_latest_published_files_data_request: - # enable_flowam: latest published files came through bg_task_manager + # FlowAM: latest published files came through bg_task_manager self.__pending_latest_published_files_data_request = None self._handle_latest_published_files_result( result if isinstance(result, list) else [] diff --git a/python/tk_multi_breakdown2/flowam/__init__.py b/python/tk_multi_breakdown2/flowam/__init__.py new file mode 100644 index 0000000..ccf784c --- /dev/null +++ b/python/tk_multi_breakdown2/flowam/__init__.py @@ -0,0 +1,23 @@ +# Copyright (c) 2026 Shotgun Software Inc. +# +# CONFIDENTIAL AND PROPRIETARY +# +# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit +# Source Code License included in this distribution package. See LICENSE. +# By accessing, using, copying or modifying this work you indicate your +# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights +# not expressly granted therein are reserved by Shotgun Software Inc. + +"""FlowAM integration for the Scene Breakdown app. + +This package provides drop-in replacements for the standard Shotgun-based +Scene Breakdown models and actions, backed by Flow Asset Management (FlowAM). +""" + +from .reference import ( + get_assets_for_items, + get_latest_revision, + get_scene_objects, + update_to_latest, + update_to_revision, +) diff --git a/python/tk_multi_breakdown2/flowam/reference.py b/python/tk_multi_breakdown2/flowam/reference.py new file mode 100644 index 0000000..055e07a --- /dev/null +++ b/python/tk_multi_breakdown2/flowam/reference.py @@ -0,0 +1,511 @@ +# Copyright (c) 2026 Autodesk, Inc. +# +# CONFIDENTIAL AND PROPRIETARY +# +# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit +# Source Code License included in this distribution package. See LICENSE. +# By accessing, using, copying or modifying this work you indicate your +# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights +# not expressly granted therein are reserved by Autodesk, Inc. + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Optional + +import sgtk +from tank_vendor.flow_integration_sdk import ( + dependency, + exceptions, + globals, + objects, + schema, + utils, +) + +if TYPE_CHECKING: + from ..api import FileItem + + +def get_scene_objects( + bg_task_manager: Optional[Any], +) -> tuple[list[dict[str, Any]], Any]: + """ + Retrieve the scene objects and their associated published files + using Flow Asset Manager integration. + + Each dependency is mapped to a stub entry shaped like a SG PublishedFile + but populated with MEDM data (asset_id, revision_id, version_number, etc.) + so the breakdown app can build its file item model and determine statuses. + + Args: + bg_task_manager: Background task manager for async operations. If None, execute synchronously. + Returns: + A tuple of (scene_objects, published_file_data) where: + - scene_objects is a list of dictionaries with keys: node_name, node_type, path + - published_file_data is either a background task id or a dictionary mapping + file paths to a stub PublishedFile dict populated with MEDM data. + """ + + flow_dependencies = _get_dependencies() + + scene_objects = [ + { + "node_name": dep_info.node_handle, + "node_type": dep_info.node_type, + "path": dep_info.file_path, + } + for dep_info in flow_dependencies + ] + + def build_published_file_stubs(): + result = {} + for dep_info in flow_dependencies: + version_number = None + created_at = None + if dep_info.version_id: + version_number = dep_info.version_num + + thumbnail_path = None + if dep_info.revision_id: + rev = objects.FlowRevision.get_revision(dep_info.revision_id) + thumbnail_path = rev.get_thumbnail_file() + + entity = None + published_file_type = None + if dep_info.asset_id: + asset = objects.FlowAsset(dep_info.asset_id) + entity = { + "type": "Asset", + "id": dep_info.asset_id, + "name": asset.name, + } + revision = objects.FlowRevision.get_revision(dep_info.revision_id) + type_comps = revision.find_components(type_id=globals.BASE_TYPE_ID) + type_ids = [c.type_id for c in type_comps] + published_file_type = None + created_at = asset.created_at + if type_ids: + try: + display_name = schema.get_schema_display_name(type_ids[0]) + except exceptions.FlowError: + display_name = type_ids[0] + published_file_type = { + "type": "PublishedFileType", + "id": None, + "code": display_name, + } + + stub = { + "type": "PublishedFile", + "id": None, + "project": sgtk.platform.current_engine().context.project, + "entity": entity, + "name": dep_info.component_name or dep_info.node_handle, + "created_at": created_at, + "created_by.HumanUser.name": asset.created_by, + "description": asset.description, + "task": None, + "task.Task.sg_status_list": "No Status", + "tags": "No Tags", + "published_file_type": published_file_type, + "published_file_type.PublishedFileType.code": ( + published_file_type["code"] if published_file_type else None + ), + "path": {"local_path": dep_info.file_path}, + "version_number": version_number, + "sg_flow_revision_id": dep_info.revision_id, + "sg_flow_asset_id": dep_info.asset_id, + "sg_flow_version_id": dep_info.version_id, + "sg_flow_blob_index": dep_info.blob_index, + "sg_flow_thumbnail_path": thumbnail_path, + } + result[dep_info.file_path] = stub + return result + + if bg_task_manager: + return scene_objects, bg_task_manager.add_task(build_published_file_stubs) + + return scene_objects, build_published_file_stubs() + + +def _get_published_file_type(asset: objects.FlowAsset) -> Optional[dict[str, Any]]: + """Return a PublishedFileType-shaped stub derived from the asset's type_ids. + + Uses the first type_id in asset.type_ids and resolves its display name via + schema.get_schema_display_name. Returns None if no type is available. + + Args: + asset: A Flow Asset object exposing a type_ids attribute (list of str). + Returns: + A dict shaped like a SG PublishedFileType entity. + """ + if not asset or not getattr(asset, "type_ids", None): + return None + type_id = asset.type_ids[0] + try: + display_name = schema.get_schema_display_name(type_id) + except exceptions.FlowError: + display_name = type_id + return {"type": "PublishedFileType", "id": None, "code": display_name} + + +def _get_dependencies() -> list[dependency.DependencyData]: + """Return the list of asset dependencies found in current scene. + List will contain DependencyData objects which contain pertinent info + about each asset dependency instance found. + + The DependencyData node contains detailed information of the dependency such as + * node_handle (node name) + * node_type (e.g. "reference") + * file_path (local file path of dependency) + + Only the top level asset dependencies will be returned. + """ + # Introspect current scene and get dependency tree + # (this will include asset and local dependencies together) + dep_tree = sgtk.platform.current_engine().flow_host.get_dependency_tree() + + # Filter out only the top-level asset (internal) dependencies + asset_deps = dep_tree.get_internal_dependencies(top_level=True) + + return asset_deps + + +# ============================ +# Get revisions in FPT format +# ============================ +def get_latest_revision( + item: FileItem, + bg_task_manager: Optional[Any] = None, +) -> dict[str, Any] | "BackgroundTaskManager": + """ + Get the latest published file (revision) for a single item using MEDM. + + Args: + item: FileItem to get the latest revision for. + bg_task_manager: If provided, the query runs async and the task id + is returned. Otherwise executes synchronously. + Returns: + If async, the background task id. Otherwise a dict shaped like a + SG PublishedFile entity representing the latest revision. + """ + _bundle = sgtk.platform.current_bundle() + + def _fetch_latest(): + item_data = item.sg_data or {} + asset_id = item_data.get("sg_flow_asset_id") + if not asset_id: + raise exceptions.FlowError("No asset ID found for item") + + project = _bundle.context.project + + try: + asset = objects.FlowAsset(asset_id) + latest_revision = asset.get_latest_revision() + if not latest_revision: + _bundle.logger.warning(f"No latest revision found for asset {asset_id}") + raise exceptions.FlowError( + f"No latest revision found for asset {asset_id}" + ) + except exceptions.FlowError as e: + _bundle.logger.error( + f"Failed to get latest revision for asset {asset_id}: {e}" + ) + raise + + published_file_type = _get_published_file_type(asset) + + local_path = latest_revision.get_storage_component_path( + component_purpose=globals.SOURCE_PURPOSE + ) + + created_at = asset.created_at + + return { + "type": "PublishedFile", + "id": None, + "project": project, + "entity": { + "type": "Asset", + "id": asset_id, + "name": asset.name, + }, + "name": item_data.get("name"), + "task": None, + "task.Task.sg_status_list": "No Status", + "tags": "No Tags", + "published_file_type": published_file_type, + "published_file_type.PublishedFileType.code": ( + published_file_type["code"] if published_file_type else None + ), + "path": {"local_path": local_path} if local_path else None, + "version_number": asset.version_number, + "created_at": created_at, + "created_by.HumanUser.name": asset.created_by, + "description": asset.description, + "sg_flow_revision_id": latest_revision.id, + "sg_flow_asset_id": asset_id, + "sg_flow_version_id": asset.version_id, + } + + if bg_task_manager: + return bg_task_manager.add_task(_fetch_latest) + return _fetch_latest() + + +def get_assets_for_items( + items: list[FileItem], + bg_task_manager: Optional[Any] = None, +) -> list[dict[str, Any]] | "BackgroundTaskManager": + """ + Get all published file revisions for the given items using FlowAM. + + Queries FlowAM for all numbered versions of each item's asset, returning + them as PublishedFile-shaped dicts so the breakdown model can build its + version mapping and determine statuses. + + Args: + items: List of FileItem objects to get published files for. + bg_task_manager: If provided, the query runs async and the task id + is returned. Otherwise executes synchronously. + Returns: + If async, the background task id. Otherwise a list of dicts shaped + like SG PublishedFile entities, sorted newest-first per asset. + """ + + _bundle = sgtk.platform.current_bundle() + + def _fetch_all_versions(): + project = _bundle.context.project + result = [] + processed_assets = {} + + for item in items: + item_data = item.sg_data or {} + asset_id = item_data.get("sg_flow_asset_id") + if not asset_id: + continue + + name = item_data.get("name") + + if asset_id not in processed_assets: + try: + asset = objects.FlowAsset(asset_id) + versions = list(asset.iterate_versions()) + processed_assets[asset_id] = (asset, versions) + except exceptions.FlowError: + _bundle.logger.error( + f"Failed to query versions for asset {asset_id}" + ) + continue + + asset, versions = processed_assets[asset_id] + published_file_type = _get_published_file_type(asset) + + for version in versions: + revision = version.revision + local_path = revision.get_storage_component_path( + component_purpose=globals.SOURCE_PURPOSE + ) + + created_at = version.created_at + + thumbnail_path = None + try: + thumbnail_path = revision.get_thumbnail_file() + except exceptions.FlowError: + _bundle.logger.error( + f"Failed to get thumbnail path for revision {revision.id}" + ) + + result.append( + { + "type": "PublishedFile", + "id": None, + "project": project, + "entity": { + "type": "Asset", + "id": asset_id, + "name": asset.name, + }, + "name": name, + "task": None, + "task.Task.sg_status_list": "No Status", + "tags": "No Tags", + "published_file_type": published_file_type, + "published_file_type.PublishedFileType.code": ( + published_file_type["code"] if published_file_type else None + ), + "path": {"local_path": local_path} if local_path else None, + "version_number": version.version_number, + "created_at": created_at, + "created_by.HumanUser.name": version.created_by, + "description": revision.comment, + "sg_flow_revision_id": revision.id, + "sg_flow_asset_id": asset_id, + "sg_flow_version_id": version.id, + "sg_flow_thumbnail_path": thumbnail_path, + } + ) + + return result + + if bg_task_manager: + return bg_task_manager.add_task(_fetch_all_versions) + return _fetch_all_versions() + + +# ============================ +# Update depdendencies methods +# ============================ +def update_to_latest(items: list[FileItem]) -> list[FileItem]: + """Update the given items in the scene. + + Args: + items: list of file item object to update + Returns: + list of file item that were updated + """ + items_to_update = [] + for file_item in items: + res = _update_dependency( + file_item.sg_data["sg_flow_revision_id"], + node_handle=file_item.node_name, + ) + if res: + items_to_update.append(file_item) + + return items_to_update + + +def update_to_revision( + item: Optional[FileItem], + item_data: Optional[dict[str, Any]] = None, +) -> bool: + """Update the item to a specific version. + + Args: + item: Dictionary representation of the FileItem to update + item_data: Dictionary of Flow Production Tracking data representing the target + published file revision to update to. + + Returns: + True if the item requires the data model to update, else False will not + trigger a model update. + """ + _bundle = sgtk.platform.current_bundle() + # Validate item_data contains the required revision ID + if not item_data or not item_data.get("sg_flow_revision_id"): + _bundle.logger.warning( + "Cannot update to revision: item_data is missing or lacks sg_flow_revision_id" + ) + return False + + # Validate item structure + flowam_data = item.get("sg_data") if item else None + if not flowam_data or not flowam_data.get("sg_flow_revision_id"): + _bundle.logger.warning( + "Cannot update to revision: item sg_data is missing or lacks sg_flow_revision_id" + ) + return False + + do_update = _update_dependency( + revision_id=flowam_data["sg_flow_revision_id"], + new_revision_id=item_data["sg_flow_revision_id"], + node_handle=item.get("node_name"), + ) + + return do_update + + +def _update_dependency( + revision_id: str, + new_revision_id: str | None = None, + node_handle: str | None = None, +) -> bool: + """Given an asset dependency, update all relevant dependencies to that revision + within the scene to the latest version of that asset or a specific version if provided. + + Args: + revision_id: Revision id of original dependency. This can be a version id. + new_revision_id: Revision id to change to. If None, update to latest revision + of the same asset. This can be a version id. + node_handle: Unique dependency identifier. If not provided, all dependencies + of given revision id will be updated. + + Returns: + True if dependency(ies) were updated. + False if revision is already matching the specification and nothing was done. + + Raises: + EntityNotFoundError + FlowError + """ + if sgtk.platform.current_engine().name == "tk-desktop": + raise exceptions.FlowError( + "Updating dependency is not relevant outside of a DCC context." + ) + + try: + if objects.FlowVersion.is_version_id(revision_id): + input_type = "version" + rev = objects.FlowVersion(revision_id).revision + else: + input_type = "revision" + rev = objects.FlowRevision.get_revision(revision_id) + except exceptions.FlowError as exc: + msg = f"Invalid {input_type} id provided: {revision_id}." + raise exceptions.EntityNotFoundError( + entity_id=revision_id, details=msg + ) from exc + + if new_revision_id: + try: + if objects.FlowVersion.is_version_id(new_revision_id): + input_type = "version" + new_rev = objects.FlowVersion(new_revision_id).revision + else: + input_type = "revision" + new_rev = objects.FlowRevision.get_revision(new_revision_id) + except exceptions.FlowError as exc: + msg = f"Invalid new {input_type} id provided: {new_revision_id}." + raise exceptions.EntityNotFoundError( + entity_id=new_revision_id, details=msg + ) from exc + else: + # Determine the latest revision of same asset + asset = objects.FlowAsset(rev.asset_id) + new_rev = asset.get_latest_revision() + + if rev.revision_number == new_rev.revision_number: + # Already matching the spec, so nothing to be done + return False + + # Fetch source component of new revision + new_rev.fetch(component_purpose=globals.SOURCE_PURPOSE) + + file_seq_comp = new_rev.find_component( + type_id=schema.get_schema_id(globals.FILE_SEQ_TYPE) + ) + if file_seq_comp: + # Return a file path with embedded frame padding + new_path = utils.cleanpath( + new_rev.get_storage_dir(), file_seq_comp.properties["fileFormat"] + ) + else: + new_path = new_rev.get_storage_component_path( + component_purpose=globals.SOURCE_PURPOSE + ) + + # Get list of top-level asset dependencies in scene + orig_revision_id = rev.id + host = sgtk.platform.current_engine().flow_host + dep_tree = host.get_dependency_tree() + asset_deps = dep_tree.get_internal_dependencies(top_level=True) + for dep in asset_deps: + if dep.revision_id == orig_revision_id: + if node_handle and dep.node_handle != node_handle: + continue # skip if node handle doesn't match + host.update_dependency(dep, new_path) + + return True