diff --git a/app.py b/app.py index dc6789da..32404a6e 100644 --- a/app.py +++ b/app.py @@ -8,6 +8,8 @@ # agreement to the Shotgun Pipeline Toolkit Source Code License. All rights # not expressly granted therein are reserved by Autodesk, Inc. +from types import ModuleType + import sgtk @@ -21,6 +23,7 @@ def init_app(self): """Called as the application is being initialized.""" tk_multi_breakdown2 = self.import_module("tk_multi_breakdown2") + self._flowam = tk_multi_breakdown2.flowam # Store a reference to manager class to expose its functionality at the application level. self._manager_class = tk_multi_breakdown2.BreakdownManager @@ -141,3 +144,15 @@ def _on_dialog_close(self, dialog): elif dialog == self._current_panel: self.log_debug("Current panel has been closed, clearing reference.") self._current_panel = None + + @property + def flowam(self) -> ModuleType: + """ + Access to the FlowAM integration module for this app. This module provides + drop-in replacements for the standard Shotgun-based Scene Breakdown models and actions, + backed by Flow Asset Management (FlowAM). + + :returns: The FlowAM integration module for this app + :rtype: :mod:`tk_multi_breakdown2.flowam` + """ + return self._flowam diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 0602b723..0b4e8cd9 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -43,3 +43,4 @@ jobs: additional_repositories: - name: tk-framework-qtwidgets - name: tk-framework-shotgunutils + tk_core_ref: ticket/sg-43461/migrate-host-base diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 00000000..61ca777a --- /dev/null +++ b/codecov.yml @@ -0,0 +1,3 @@ +ignore: + # flowam and other files not covered by unit tests + - "**python/tk_multi_breakdown2/flowam/*" diff --git a/hooks/tk-mari_scene_operations.py b/hooks/tk-mari_scene_operations.py index 4b97eca8..dab7afdf 100644 --- a/hooks/tk-mari_scene_operations.py +++ b/hooks/tk-mari_scene_operations.py @@ -8,8 +8,6 @@ # agreement to the Shotgun Pipeline Toolkit Source Code License. All rights # not expressly granted therein are reserved by Autodesk, Inc. -import os - import sgtk from sgtk import TankError diff --git a/hooks/tk-maya_scene_operations.py b/hooks/tk-maya_scene_operations.py index 5841bb44..a61ae684 100644 --- a/hooks/tk-maya_scene_operations.py +++ b/hooks/tk-maya_scene_operations.py @@ -23,6 +23,8 @@ class BreakdownSceneOperations(HookBaseClass): This implementation handles detection of maya references and file texture nodes. """ + __callback_ids = [] + def scan_scene(self): """ The scan scene method is executed once at startup and its purpose is @@ -104,7 +106,6 @@ def update(self, item): self.logger.debug( "File Texture %s: Updating to version %s" % (node_name, path) ) - file_name = cmds.getAttr("%s.fileTextureName" % node_name) cmds.setAttr("%s.fileTextureName" % node_name, path, type="string") def register_scene_change_callback(self, scene_change_callback): @@ -151,4 +152,7 @@ def unregister_scene_change_callback(self): """Unregister the scene change callbacks by disconnecting any signals.""" for callback_id in self.__callback_ids: - OpenMaya.MSceneMessage.removeCallback(callback_id) + try: + OpenMaya.MSceneMessage.removeCallback(callback_id) + except RuntimeError: + pass diff --git a/info.yml b/info.yml index 3b86c7c3..83388e49 100644 --- a/info.yml +++ b/info.yml @@ -126,7 +126,7 @@ configuration: create and execute actions. default_value: {} -# The Flow Production Tracking fields that this app needs in order to operate correctly +# The Flow Production Tracking fields this app needs in order to operate correctly requires_shotgun_fields: # linked_projects.Asset is required for references in multiple Flow Production Tracking projects diff --git a/python/tk_multi_breakdown2/__init__.py b/python/tk_multi_breakdown2/__init__.py index 5387c573..83dc5a3e 100644 --- a/python/tk_multi_breakdown2/__init__.py +++ b/python/tk_multi_breakdown2/__init__.py @@ -8,6 +8,7 @@ # agreement to the Shotgun Pipeline Toolkit Source Code License. All rights # not expressly granted therein are reserved by Autodesk, Inc. +from . import flowam # noqa: F401 from .api import BreakdownManager try: diff --git a/python/tk_multi_breakdown2/actions.py b/python/tk_multi_breakdown2/actions.py index bb71e891..a5a8827d 100644 --- a/python/tk_multi_breakdown2/actions.py +++ b/python/tk_multi_breakdown2/actions.py @@ -58,7 +58,7 @@ def add_update_to_specific_version_action(file_item, model, sg_data, parent=None :rtype: QtGui.QAction """ - if not sg_data.get("version_number"): + if sg_data.get("version_number") is None: return action = UpdateToSpecificVersionAction( @@ -186,6 +186,7 @@ def execute(self): index, [self._model.FILE_ITEM_ROLE, self._model.FILE_ITEM_SG_DATA_ROLE], ) + self._model.reload() class UpdateToSpecificVersionAction(Action): @@ -226,3 +227,4 @@ def execute(self): index, [self._model.FILE_ITEM_ROLE, self._model.FILE_ITEM_SG_DATA_ROLE], ) + self._model.reload() diff --git a/python/tk_multi_breakdown2/api/manager.py b/python/tk_multi_breakdown2/api/manager.py index 7af584ff..7ff98970 100644 --- a/python/tk_multi_breakdown2/api/manager.py +++ b/python/tk_multi_breakdown2/api/manager.py @@ -8,6 +8,8 @@ # agreement to the Shotgun Pipeline Toolkit Source Code License. All rights # not expressly granted therein are reserved by Autodesk, Inc. +from typing import Any, Optional + import sgtk from tank.errors import TankHookMethodDoesNotExistError @@ -22,6 +24,8 @@ def __init__(self, bundle): """Initialize the manager.""" self._bundle = bundle + _engine = sgtk.platform.current_engine() + self._flow_host = _engine.flow_host if _engine else None @sgtk.LogManager.log_timing def get_scene_objects(self, execute_in_main_thread=True): @@ -228,22 +232,43 @@ def get_history_published_file_filters(self): return self._bundle.get_setting("history_published_file_filters", []) - def get_latest_published_file(self, item, data_retriever=None, extra_fields=None): + def get_latest_published_file( + self, + item: FileItem, + data_retriever: Optional[Any] = None, + extra_fields: Optional[list[str]] = None, + bg_task_manager: Optional[Any] = None, + ) -> Any: """ Get the latest available published file according to the current item context. :param item: :class`FileItem` object we want to get the latest published file :type item: FileItem - :param data_retreiver: If provided, the api request will be async. The default value + :param data_retriever: If provided, the api request will be async. The default value will execute the api request synchronously. :type data_retriever: ShotgunDataRetriever + :param bg_task_manager: Used for async execution. + :type bg_task_manager: BackgroundTaskManager :return: The latest published file as a Flow Production Tracking entity dictionary if the request was synchronous, else the request background task id if the request was async. """ + is_async = data_retriever or bg_task_manager + if not item or not item.sg_data: - return None if data_retriever else {} + return None if is_async else {} + + if self._bundle.context.flow_project_id and self._flow_host: + result = self._bundle.flowam.get_latest_revision( + item=item, + bg_task_manager=bg_task_manager, + ) + if not is_async: + if not isinstance(result, dict): + result = {} + item.latest_published_file = result + return result fields = self.get_published_file_fields() if extra_fields: @@ -267,24 +292,38 @@ def get_latest_published_file(self, item, data_retriever=None, extra_fields=None return result def get_published_files_for_items( - self, items, data_retriever=None, extra_fields=None - ): + self, + items: list[FileItem], + data_retriever: Optional[Any] = None, + extra_fields: Optional[list[str]] = None, + bg_task_manager: Optional[Any] = None, + ) -> Any: """ Get all published files (history) for the given items. :param items: the list of :class`FileItem` we want to get published files for. :type items: List[FileItem] - :param data_retreiver: If provided, the api request will be async. The default value + :param data_retriever: If provided, the api request will be async. The default value will execute the api request synchronously. :type data_retriever: ShotgunDataRetriever + :param bg_task_manager: Used for async execution. + :type bg_task_manager: BackgroundTaskManager :return: If the request is async, then the request task id is returned, else the published file data result from the api request. :rtype: str | dict """ + is_async = data_retriever or bg_task_manager + if not items: - return None if data_retriever else {} + return None if is_async else {} + + if self._bundle.context.flow_project_id and self._flow_host: + return self._bundle.flowam.get_assets_for_items( + items=items, + bg_task_manager=bg_task_manager, + ) fields = self.get_published_file_fields() if extra_fields: @@ -301,7 +340,13 @@ def get_published_files_for_items( published_file_filters=filters, ) - def get_published_file_history(self, item, extra_fields=None, data_retriever=None): + def get_published_file_history( + self, + item: FileItem, + extra_fields: Optional[list[str]] = None, + data_retriever: Optional[Any] = None, + bg_task_manager: Optional[Any] = None, + ) -> Any: """ Get the published history for the selected item. It will gather all the published files with the same context than the current item (project, name, task, ...) @@ -310,9 +355,11 @@ def get_published_file_history(self, item, extra_fields=None, data_retriever=Non :type item: FileItem :param extra_fields: A list of Flow Production Tracking fields to append to the Flow Production Tracking query fields. :type extra_fields: List[str] - :param data_retreiver: If provided, the api request will be async. The default value + :param data_retriever: If provided, the api request will be async. The default value will execute the api request synchronously. :type data_retriever: ShotgunDataRetriever + :param bg_task_manager: Used for async execution. + :type bg_task_manager: BackgroundTaskManager :return: If the request is async, then the request task id is returned, else the published file history. @@ -323,7 +370,10 @@ def get_published_file_history(self, item, extra_fields=None, data_retriever=Non return [] result = self.get_published_files_for_items( - [item], data_retriever=data_retriever, extra_fields=extra_fields + [item], + data_retriever=data_retriever, + extra_fields=extra_fields, + bg_task_manager=bg_task_manager, ) if result and isinstance(result, list): item.latest_published_file = result[0] @@ -343,6 +393,27 @@ def update_to_latest_version(self, items): if not isinstance(items, list): items = [items] + if self._bundle.context.flow_project_id and self._flow_host: + items_to_update = self._bundle.flowam.update_to_latest(items) + + # The FlowAM method performs the DCC-side update but does not update the + # Python FileItem model data. We do that here so callers always get a + # consistent, up-to-date list of updated FileItem objects regardless of + # which code path ran. A non-list return (including None) means the hook + # chose not to filter, so attempt to update all items. + if not isinstance(items_to_update, list): + items_to_update = items + + updated_items = [] + for item in items_to_update: + data = item.latest_published_file + if not data or not data.get("path", {}).get("local_path", None): + continue + item.sg_data = data + item.path = data["path"]["local_path"] + updated_items.append(item) + return updated_items + # First try to execute the hook method to update items in batch for performance. try: return self.update_items_to_latest_version(items) @@ -436,6 +507,20 @@ def update_to_specific_version(self, item, sg_data): if not sg_data or not sg_data.get("path", {}).get("local_path", None): return False + if self._bundle.context.flow_project_id and self._flow_host: + do_update = self._bundle.flowam.update_to_revision( + item=item.to_dict(), + item_data=sg_data, + ) + if do_update is None: + # Default to True if the hook return value was not explictly set + do_update = True + + if do_update: + item.sg_data = sg_data + item.path = sg_data["path"]["local_path"] + return do_update + item_dict = item.to_dict() item_dict["path"] = sg_data["path"]["local_path"] if item_dict["extra_data"] is None: diff --git a/python/tk_multi_breakdown2/dialog.py b/python/tk_multi_breakdown2/dialog.py index 87bf6e12..f25d815f 100644 --- a/python/tk_multi_breakdown2/dialog.py +++ b/python/tk_multi_breakdown2/dialog.py @@ -872,8 +872,7 @@ def _show_history_item_context_menu(self, view, index, pos): # passed in references the file history item. if isinstance(index.model(), QtGui.QSortFilterProxyModel): index = index.model().mapToSource(index) - history_item = index.model().itemFromIndex(index) - sg_data = history_item.get_sg_data() + sg_data = index.data(FileHistoryModel.SG_DATA_ROLE) update_action = ActionManager.add_update_to_specific_version_action( file_item_to_update, self._file_model, sg_data, None @@ -1350,6 +1349,12 @@ def _on_update_selected_to_latest(self): self._listen_for_events(False) try: ActionManager.execute_update_to_latest_action(file_items, self._file_model) + except Exception as e: + QtGui.QMessageBox.critical( + None, + "Scene Breakdown", + "Error: {}".format(e), + ) finally: self.__executing_bulk_action = False # Turn on event handling if it was on before diff --git a/python/tk_multi_breakdown2/file_history_model.py b/python/tk_multi_breakdown2/file_history_model.py index 453db6fa..d8925193 100644 --- a/python/tk_multi_breakdown2/file_history_model.py +++ b/python/tk_multi_breakdown2/file_history_model.py @@ -8,6 +8,8 @@ # agreement to the Shotgun Pipeline Toolkit Source Code License. All rights # not expressly granted therein are reserved by Autodesk, Inc. +from typing import Any + import sgtk from sgtk.platform.qt import QtCore, QtGui @@ -64,7 +66,10 @@ def __init__(self, parent, bg_task_manager): ShotgunModel.__init__(self, parent, bg_task_manager=bg_task_manager) self._app = sgtk.platform.current_bundle() + self._engine = sgtk.platform.current_engine() self.__manager = self._app.create_breakdown_manager() + self.__bg_task_manager = bg_task_manager + self.__pending_medm_history_task = None # Store parent file item data self.__parent_sg_data = None @@ -135,6 +140,11 @@ def is_current(self, history_sg_data): if not self.parent_entity: return False + # For FlowAM items (id is None), compare by revision id + parent_revision_id = self.parent_entity.get("sg_flow_revision_id") + if parent_revision_id is not None: + return parent_revision_id == history_sg_data.get("sg_flow_revision_id") + return self.parent_entity.get("id") == history_sg_data.get("id") def load_data(self, parent_file): @@ -154,6 +164,10 @@ def load_data(self, parent_file): else -1 ) + if self._app.context.flow_project_id and self._engine.flow_host: + self._load_medm_history(parent_file) + return + fields = self.__manager.get_published_file_fields() fields += get_ui_published_file_fields(self._app) filters = [ @@ -226,6 +240,124 @@ def _populate_item(self, item, sg_data): # Set up the methods to call to retrieve the data for the specified role. self.set_data_for_role_methods(item, sg_data) + def _load_medm_history(self, parent_file: Any) -> None: + """ + Load published file history from the FlowAM integration via hook_scene_operations. + + :param parent_file: The parent file item to load history for. + :type parent_file: FileItem + """ + + self.clear() + + # Cancel any in-flight task + if self.__pending_medm_history_task is not None: + try: + self.__bg_task_manager.stop_task(self.__pending_medm_history_task) + except Exception: + pass + self.__pending_medm_history_task = None + + self.__pending_medm_history_task = self.__manager.get_published_file_history( + parent_file, + bg_task_manager=self.__bg_task_manager, + ) + + if self.__bg_task_manager: + self.__bg_task_manager.task_completed.connect( + self.__on_medm_history_task_completed + ) + self.__bg_task_manager.task_failed.connect( + self.__on_medm_history_task_failed + ) + + def _populate_medm_items(self, version_list: list[dict]) -> None: + """ + Populate the model with history items returned by the FlowAM integration. + + :param version_list: List of published file dictionaries. + :type version_list: list[dict] + """ + + self.clear() + + for sg_data in version_list: + item = QtGui.QStandardItem() + item.setData(sg_data, self.SG_DATA_ROLE) + + self._populate_item(item, sg_data) + + # Override the thumbnail set by _populate_item (via role_methods) with + # the resolved local path provided by the FlowAM integration, since the + # hook does not know about sg_flow_thumbnail_path. + thumbnail_path = sg_data.get("sg_flow_thumbnail_path") + if thumbnail_path: + item.setData( + QtGui.QIcon(thumbnail_path), + self.VIEW_ITEM_THUMBNAIL_ROLE, + ) + + self.appendRow(item) + + def __on_medm_history_task_completed( + self, uid: str, group_id: Any, result: Any + ) -> None: + """ + Slot called when the FlowAM history background task completes successfully. + + :param uid: The unique id of the task that was completed. + :param group_id: The group id of the task. + :param result: The result data from the task. + """ + + if uid != self.__pending_medm_history_task: + return + + self.__pending_medm_history_task = None + + try: + self.__bg_task_manager.task_completed.disconnect( + self.__on_medm_history_task_completed + ) + self.__bg_task_manager.task_failed.disconnect( + self.__on_medm_history_task_failed + ) + except Exception: + pass + + self._populate_medm_items(result or []) + + def __on_medm_history_task_failed( + self, uid: str, group_id: Any, msg: str, stack_trace: str + ) -> None: + """ + Slot called when the FlowAM history background task fails. + + :param uid: The unique id of the task that failed. + :param group_id: The group id of the task. + :param msg: The error message. + :param stack_trace: The stack trace of the error. + """ + + if uid != self.__pending_medm_history_task: + return + + self.__pending_medm_history_task = None + + try: + self.__bg_task_manager.task_completed.disconnect( + self.__on_medm_history_task_completed + ) + self.__bg_task_manager.task_failed.disconnect( + self.__on_medm_history_task_failed + ) + except Exception: + pass + + self._app.logger.warning( + "Failed to load FlowAM history for file item. %s\n%s" % (msg, stack_trace) + ) + def _set_tooltip(self, item, sg_item): """ Override base method to ensure no tooltip is set from the model. Let the delegate diff --git a/python/tk_multi_breakdown2/file_item_model.py b/python/tk_multi_breakdown2/file_item_model.py index 33cd9e43..1e92df3f 100644 --- a/python/tk_multi_breakdown2/file_item_model.py +++ b/python/tk_multi_breakdown2/file_item_model.py @@ -132,6 +132,7 @@ def __init__( # ------------------------------------------------------------------------------------ self._app = sgtk.platform.current_bundle() + self._engine = sgtk.platform.current_engine() # Flag indicating if the model is dynamically loaded as it is retrieved async. False # will show a loader until all data is loaded in. @@ -694,23 +695,30 @@ def reload(self): # all async tasks are complete to reload the model. self.stop_timer() - # Run the scan scene method in the main thread (not a background task) since this - # may cause issues for certain DCCs - self.__scene_objects = self._manager.get_scene_objects() - - # Make an async request to get the published files for the references in the scene. - # This will omit any objects from the scene that do not have a - # Flow Production Tracking Published File. Some files can come from other projects - # so we cannot rely on templates, and instead need to query Flow Production Tracking. - file_paths = [o["path"] for o in self.__scene_objects] - self.__pending_published_file_data_request = ( - self._manager.get_published_files_from_file_paths( - file_paths, - extra_fields=self._published_file_fields, - bg_task_manager=self._bg_task_manager, + if self._app.context.flow_project_id and self._engine.flow_host: + ( + self.__scene_objects, + self.__pending_published_file_data_request, + ) = self._app.flowam.get_scene_objects(self._bg_task_manager) + else: + # Run the scan scene method in the main thread (not a background task) since this + # may cause issues for certain DCCs + self.__scene_objects = self._manager.get_scene_objects() + + # Make an async request to get the published files for the references in the scene. + # This will omit any objects from the scene that do not have a + # Flow Production Tracking Published File. Some files can come from other projects + # so we cannot rely on templates, and instead need to query Flow Production Tracking. + file_paths = [o["path"] for o in self.__scene_objects] + self.__pending_published_file_data_request = ( + self._manager.get_published_files_from_file_paths( + file_paths, + extra_fields=self._published_file_fields, + bg_task_manager=self._bg_task_manager, + ) ) - ) - except: + + except Exception: # Reset on failure to reload self.__pending_published_file_data_request = None finally: @@ -1180,6 +1188,9 @@ def _get_published_files_for_items(self, file_items, data_retriever=None): will execute synchronously. For async requests, the background task id will be returned, else the published file data will be returned for synchronous requests. + When FlowAM is enabled, the bg_task_manager is used for async execution + instead of the SG data_retriever, since the data comes from FlowAM rather than FPT. + :param file_items: The file item objects to get the published file data for. :type file_items: List[FileItem] :param data_retriever: The Shotgun data retriever to make the api request async, if @@ -1191,6 +1202,12 @@ def _get_published_files_for_items(self, file_items, data_retriever=None): :rtype: str | dict """ + if self._app.context.flow_project_id and self._engine.flow_host: + bg = self._bg_task_manager if data_retriever else None + return self._manager.get_published_files_for_items( + file_items, bg_task_manager=bg + ) + return self._manager.get_published_files_for_items( file_items, data_retriever=data_retriever ) @@ -1420,6 +1437,28 @@ def __get_index_from_item(self, item): # ---------------------------------------------------------------------------------------- # Background task and Data Retriever callbacks + def _handle_latest_published_files_result( + self, published_file_data: list[dict] + ) -> None: + """ + Process the latest published files data to update the model. + + This is called from both the SG data retriever and the bg task manager + callbacks when the latest published files data is received. + + :param published_file_data: List of published file data dicts. + :type published_file_data: list[dict] + """ + + published_files_mapping = self._get_published_files_mapping(published_file_data) + + if self.__is_reloading: + self._build_model_from_file_items(published_files_mapping) + if self.dynamic_loading: + self._finish_reload() + else: + self._update_latest_published_files(published_files_mapping) + def _on_data_retriever_work_completed(self, uid, request_type, data): """ Slot triggered when the data-retriever has finished doing some work. The data retriever is currently @@ -1463,19 +1502,7 @@ def _on_data_retriever_work_completed(self, uid, request_type, data): elif uid == self.__pending_latest_published_files_data_request: self.__pending_latest_published_files_data_request = None - published_files_mapping = self._get_published_files_mapping( - data.get("sg", []) - ) - - if self.__is_reloading: - self._build_model_from_file_items(published_files_mapping) - if self.dynamic_loading: - # Emit signals that data has finished loading. Any data still loading will - # be dynamically populated as it is retrieved (e.g. thumbnails). - self._finish_reload() - else: - # Only update the latest published file data - self._update_latest_published_files(published_files_mapping) + self._handle_latest_published_files_result(data.get("sg", [])) def _on_data_retriever_work_failed(self, uid, error_msg): """ @@ -1516,6 +1543,14 @@ def _on_background_task_completed(self, uid, group_id, result): self.__scene_objects, result ) + # For FlowAM items, the thumbnail path may already be resolved in the stub + # data. Set it now so it's available when the model items are created. + if self._app.context.flow_project_id and self._engine.flow_host: + for file_item in self.__file_items: + thumb = (file_item.sg_data or {}).get("sg_flow_thumbnail_path") + if thumb: + file_item.thumbnail_path = thumb + # Make an async request to get all published file data necessary to determine the # latest published file per file item. Get all info in a single request. self.__pending_latest_published_files_data_request = ( @@ -1524,6 +1559,13 @@ def _on_background_task_completed(self, uid, group_id, result): ) ) + elif uid == self.__pending_latest_published_files_data_request: + # FlowAM: latest published files came through bg_task_manager + self.__pending_latest_published_files_data_request = None + self._handle_latest_published_files_result( + result if isinstance(result, list) else [] + ) + def _on_background_task_failed(self, uid, group_id, msg, stack_trace): """ Callback triggered when the background manager failed to complete a task. @@ -1538,6 +1580,11 @@ def _on_background_task_failed(self, uid, group_id, msg, stack_trace): self.__pending_published_file_data_request = None self._finish_reload() + elif uid == self.__pending_latest_published_files_data_request: + self.__pending_latest_published_files_data_request = None + if self.__is_reloading: + self._finish_reload() + if msg: raise Exception(msg) diff --git a/python/tk_multi_breakdown2/flowam/__init__.py b/python/tk_multi_breakdown2/flowam/__init__.py new file mode 100644 index 00000000..ccf784c5 --- /dev/null +++ b/python/tk_multi_breakdown2/flowam/__init__.py @@ -0,0 +1,23 @@ +# Copyright (c) 2026 Shotgun Software Inc. +# +# CONFIDENTIAL AND PROPRIETARY +# +# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit +# Source Code License included in this distribution package. See LICENSE. +# By accessing, using, copying or modifying this work you indicate your +# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights +# not expressly granted therein are reserved by Shotgun Software Inc. + +"""FlowAM integration for the Scene Breakdown app. + +This package provides drop-in replacements for the standard Shotgun-based +Scene Breakdown models and actions, backed by Flow Asset Management (FlowAM). +""" + +from .reference import ( + get_assets_for_items, + get_latest_revision, + get_scene_objects, + update_to_latest, + update_to_revision, +) diff --git a/python/tk_multi_breakdown2/flowam/reference.py b/python/tk_multi_breakdown2/flowam/reference.py new file mode 100644 index 00000000..055e07aa --- /dev/null +++ b/python/tk_multi_breakdown2/flowam/reference.py @@ -0,0 +1,511 @@ +# Copyright (c) 2026 Autodesk, Inc. +# +# CONFIDENTIAL AND PROPRIETARY +# +# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit +# Source Code License included in this distribution package. See LICENSE. +# By accessing, using, copying or modifying this work you indicate your +# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights +# not expressly granted therein are reserved by Autodesk, Inc. + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Optional + +import sgtk +from tank_vendor.flow_integration_sdk import ( + dependency, + exceptions, + globals, + objects, + schema, + utils, +) + +if TYPE_CHECKING: + from ..api import FileItem + + +def get_scene_objects( + bg_task_manager: Optional[Any], +) -> tuple[list[dict[str, Any]], Any]: + """ + Retrieve the scene objects and their associated published files + using Flow Asset Manager integration. + + Each dependency is mapped to a stub entry shaped like a SG PublishedFile + but populated with MEDM data (asset_id, revision_id, version_number, etc.) + so the breakdown app can build its file item model and determine statuses. + + Args: + bg_task_manager: Background task manager for async operations. If None, execute synchronously. + Returns: + A tuple of (scene_objects, published_file_data) where: + - scene_objects is a list of dictionaries with keys: node_name, node_type, path + - published_file_data is either a background task id or a dictionary mapping + file paths to a stub PublishedFile dict populated with MEDM data. + """ + + flow_dependencies = _get_dependencies() + + scene_objects = [ + { + "node_name": dep_info.node_handle, + "node_type": dep_info.node_type, + "path": dep_info.file_path, + } + for dep_info in flow_dependencies + ] + + def build_published_file_stubs(): + result = {} + for dep_info in flow_dependencies: + version_number = None + created_at = None + if dep_info.version_id: + version_number = dep_info.version_num + + thumbnail_path = None + if dep_info.revision_id: + rev = objects.FlowRevision.get_revision(dep_info.revision_id) + thumbnail_path = rev.get_thumbnail_file() + + entity = None + published_file_type = None + if dep_info.asset_id: + asset = objects.FlowAsset(dep_info.asset_id) + entity = { + "type": "Asset", + "id": dep_info.asset_id, + "name": asset.name, + } + revision = objects.FlowRevision.get_revision(dep_info.revision_id) + type_comps = revision.find_components(type_id=globals.BASE_TYPE_ID) + type_ids = [c.type_id for c in type_comps] + published_file_type = None + created_at = asset.created_at + if type_ids: + try: + display_name = schema.get_schema_display_name(type_ids[0]) + except exceptions.FlowError: + display_name = type_ids[0] + published_file_type = { + "type": "PublishedFileType", + "id": None, + "code": display_name, + } + + stub = { + "type": "PublishedFile", + "id": None, + "project": sgtk.platform.current_engine().context.project, + "entity": entity, + "name": dep_info.component_name or dep_info.node_handle, + "created_at": created_at, + "created_by.HumanUser.name": asset.created_by, + "description": asset.description, + "task": None, + "task.Task.sg_status_list": "No Status", + "tags": "No Tags", + "published_file_type": published_file_type, + "published_file_type.PublishedFileType.code": ( + published_file_type["code"] if published_file_type else None + ), + "path": {"local_path": dep_info.file_path}, + "version_number": version_number, + "sg_flow_revision_id": dep_info.revision_id, + "sg_flow_asset_id": dep_info.asset_id, + "sg_flow_version_id": dep_info.version_id, + "sg_flow_blob_index": dep_info.blob_index, + "sg_flow_thumbnail_path": thumbnail_path, + } + result[dep_info.file_path] = stub + return result + + if bg_task_manager: + return scene_objects, bg_task_manager.add_task(build_published_file_stubs) + + return scene_objects, build_published_file_stubs() + + +def _get_published_file_type(asset: objects.FlowAsset) -> Optional[dict[str, Any]]: + """Return a PublishedFileType-shaped stub derived from the asset's type_ids. + + Uses the first type_id in asset.type_ids and resolves its display name via + schema.get_schema_display_name. Returns None if no type is available. + + Args: + asset: A Flow Asset object exposing a type_ids attribute (list of str). + Returns: + A dict shaped like a SG PublishedFileType entity. + """ + if not asset or not getattr(asset, "type_ids", None): + return None + type_id = asset.type_ids[0] + try: + display_name = schema.get_schema_display_name(type_id) + except exceptions.FlowError: + display_name = type_id + return {"type": "PublishedFileType", "id": None, "code": display_name} + + +def _get_dependencies() -> list[dependency.DependencyData]: + """Return the list of asset dependencies found in current scene. + List will contain DependencyData objects which contain pertinent info + about each asset dependency instance found. + + The DependencyData node contains detailed information of the dependency such as + * node_handle (node name) + * node_type (e.g. "reference") + * file_path (local file path of dependency) + + Only the top level asset dependencies will be returned. + """ + # Introspect current scene and get dependency tree + # (this will include asset and local dependencies together) + dep_tree = sgtk.platform.current_engine().flow_host.get_dependency_tree() + + # Filter out only the top-level asset (internal) dependencies + asset_deps = dep_tree.get_internal_dependencies(top_level=True) + + return asset_deps + + +# ============================ +# Get revisions in FPT format +# ============================ +def get_latest_revision( + item: FileItem, + bg_task_manager: Optional[Any] = None, +) -> dict[str, Any] | "BackgroundTaskManager": + """ + Get the latest published file (revision) for a single item using MEDM. + + Args: + item: FileItem to get the latest revision for. + bg_task_manager: If provided, the query runs async and the task id + is returned. Otherwise executes synchronously. + Returns: + If async, the background task id. Otherwise a dict shaped like a + SG PublishedFile entity representing the latest revision. + """ + _bundle = sgtk.platform.current_bundle() + + def _fetch_latest(): + item_data = item.sg_data or {} + asset_id = item_data.get("sg_flow_asset_id") + if not asset_id: + raise exceptions.FlowError("No asset ID found for item") + + project = _bundle.context.project + + try: + asset = objects.FlowAsset(asset_id) + latest_revision = asset.get_latest_revision() + if not latest_revision: + _bundle.logger.warning(f"No latest revision found for asset {asset_id}") + raise exceptions.FlowError( + f"No latest revision found for asset {asset_id}" + ) + except exceptions.FlowError as e: + _bundle.logger.error( + f"Failed to get latest revision for asset {asset_id}: {e}" + ) + raise + + published_file_type = _get_published_file_type(asset) + + local_path = latest_revision.get_storage_component_path( + component_purpose=globals.SOURCE_PURPOSE + ) + + created_at = asset.created_at + + return { + "type": "PublishedFile", + "id": None, + "project": project, + "entity": { + "type": "Asset", + "id": asset_id, + "name": asset.name, + }, + "name": item_data.get("name"), + "task": None, + "task.Task.sg_status_list": "No Status", + "tags": "No Tags", + "published_file_type": published_file_type, + "published_file_type.PublishedFileType.code": ( + published_file_type["code"] if published_file_type else None + ), + "path": {"local_path": local_path} if local_path else None, + "version_number": asset.version_number, + "created_at": created_at, + "created_by.HumanUser.name": asset.created_by, + "description": asset.description, + "sg_flow_revision_id": latest_revision.id, + "sg_flow_asset_id": asset_id, + "sg_flow_version_id": asset.version_id, + } + + if bg_task_manager: + return bg_task_manager.add_task(_fetch_latest) + return _fetch_latest() + + +def get_assets_for_items( + items: list[FileItem], + bg_task_manager: Optional[Any] = None, +) -> list[dict[str, Any]] | "BackgroundTaskManager": + """ + Get all published file revisions for the given items using FlowAM. + + Queries FlowAM for all numbered versions of each item's asset, returning + them as PublishedFile-shaped dicts so the breakdown model can build its + version mapping and determine statuses. + + Args: + items: List of FileItem objects to get published files for. + bg_task_manager: If provided, the query runs async and the task id + is returned. Otherwise executes synchronously. + Returns: + If async, the background task id. Otherwise a list of dicts shaped + like SG PublishedFile entities, sorted newest-first per asset. + """ + + _bundle = sgtk.platform.current_bundle() + + def _fetch_all_versions(): + project = _bundle.context.project + result = [] + processed_assets = {} + + for item in items: + item_data = item.sg_data or {} + asset_id = item_data.get("sg_flow_asset_id") + if not asset_id: + continue + + name = item_data.get("name") + + if asset_id not in processed_assets: + try: + asset = objects.FlowAsset(asset_id) + versions = list(asset.iterate_versions()) + processed_assets[asset_id] = (asset, versions) + except exceptions.FlowError: + _bundle.logger.error( + f"Failed to query versions for asset {asset_id}" + ) + continue + + asset, versions = processed_assets[asset_id] + published_file_type = _get_published_file_type(asset) + + for version in versions: + revision = version.revision + local_path = revision.get_storage_component_path( + component_purpose=globals.SOURCE_PURPOSE + ) + + created_at = version.created_at + + thumbnail_path = None + try: + thumbnail_path = revision.get_thumbnail_file() + except exceptions.FlowError: + _bundle.logger.error( + f"Failed to get thumbnail path for revision {revision.id}" + ) + + result.append( + { + "type": "PublishedFile", + "id": None, + "project": project, + "entity": { + "type": "Asset", + "id": asset_id, + "name": asset.name, + }, + "name": name, + "task": None, + "task.Task.sg_status_list": "No Status", + "tags": "No Tags", + "published_file_type": published_file_type, + "published_file_type.PublishedFileType.code": ( + published_file_type["code"] if published_file_type else None + ), + "path": {"local_path": local_path} if local_path else None, + "version_number": version.version_number, + "created_at": created_at, + "created_by.HumanUser.name": version.created_by, + "description": revision.comment, + "sg_flow_revision_id": revision.id, + "sg_flow_asset_id": asset_id, + "sg_flow_version_id": version.id, + "sg_flow_thumbnail_path": thumbnail_path, + } + ) + + return result + + if bg_task_manager: + return bg_task_manager.add_task(_fetch_all_versions) + return _fetch_all_versions() + + +# ============================ +# Update depdendencies methods +# ============================ +def update_to_latest(items: list[FileItem]) -> list[FileItem]: + """Update the given items in the scene. + + Args: + items: list of file item object to update + Returns: + list of file item that were updated + """ + items_to_update = [] + for file_item in items: + res = _update_dependency( + file_item.sg_data["sg_flow_revision_id"], + node_handle=file_item.node_name, + ) + if res: + items_to_update.append(file_item) + + return items_to_update + + +def update_to_revision( + item: Optional[FileItem], + item_data: Optional[dict[str, Any]] = None, +) -> bool: + """Update the item to a specific version. + + Args: + item: Dictionary representation of the FileItem to update + item_data: Dictionary of Flow Production Tracking data representing the target + published file revision to update to. + + Returns: + True if the item requires the data model to update, else False will not + trigger a model update. + """ + _bundle = sgtk.platform.current_bundle() + # Validate item_data contains the required revision ID + if not item_data or not item_data.get("sg_flow_revision_id"): + _bundle.logger.warning( + "Cannot update to revision: item_data is missing or lacks sg_flow_revision_id" + ) + return False + + # Validate item structure + flowam_data = item.get("sg_data") if item else None + if not flowam_data or not flowam_data.get("sg_flow_revision_id"): + _bundle.logger.warning( + "Cannot update to revision: item sg_data is missing or lacks sg_flow_revision_id" + ) + return False + + do_update = _update_dependency( + revision_id=flowam_data["sg_flow_revision_id"], + new_revision_id=item_data["sg_flow_revision_id"], + node_handle=item.get("node_name"), + ) + + return do_update + + +def _update_dependency( + revision_id: str, + new_revision_id: str | None = None, + node_handle: str | None = None, +) -> bool: + """Given an asset dependency, update all relevant dependencies to that revision + within the scene to the latest version of that asset or a specific version if provided. + + Args: + revision_id: Revision id of original dependency. This can be a version id. + new_revision_id: Revision id to change to. If None, update to latest revision + of the same asset. This can be a version id. + node_handle: Unique dependency identifier. If not provided, all dependencies + of given revision id will be updated. + + Returns: + True if dependency(ies) were updated. + False if revision is already matching the specification and nothing was done. + + Raises: + EntityNotFoundError + FlowError + """ + if sgtk.platform.current_engine().name == "tk-desktop": + raise exceptions.FlowError( + "Updating dependency is not relevant outside of a DCC context." + ) + + try: + if objects.FlowVersion.is_version_id(revision_id): + input_type = "version" + rev = objects.FlowVersion(revision_id).revision + else: + input_type = "revision" + rev = objects.FlowRevision.get_revision(revision_id) + except exceptions.FlowError as exc: + msg = f"Invalid {input_type} id provided: {revision_id}." + raise exceptions.EntityNotFoundError( + entity_id=revision_id, details=msg + ) from exc + + if new_revision_id: + try: + if objects.FlowVersion.is_version_id(new_revision_id): + input_type = "version" + new_rev = objects.FlowVersion(new_revision_id).revision + else: + input_type = "revision" + new_rev = objects.FlowRevision.get_revision(new_revision_id) + except exceptions.FlowError as exc: + msg = f"Invalid new {input_type} id provided: {new_revision_id}." + raise exceptions.EntityNotFoundError( + entity_id=new_revision_id, details=msg + ) from exc + else: + # Determine the latest revision of same asset + asset = objects.FlowAsset(rev.asset_id) + new_rev = asset.get_latest_revision() + + if rev.revision_number == new_rev.revision_number: + # Already matching the spec, so nothing to be done + return False + + # Fetch source component of new revision + new_rev.fetch(component_purpose=globals.SOURCE_PURPOSE) + + file_seq_comp = new_rev.find_component( + type_id=schema.get_schema_id(globals.FILE_SEQ_TYPE) + ) + if file_seq_comp: + # Return a file path with embedded frame padding + new_path = utils.cleanpath( + new_rev.get_storage_dir(), file_seq_comp.properties["fileFormat"] + ) + else: + new_path = new_rev.get_storage_component_path( + component_purpose=globals.SOURCE_PURPOSE + ) + + # Get list of top-level asset dependencies in scene + orig_revision_id = rev.id + host = sgtk.platform.current_engine().flow_host + dep_tree = host.get_dependency_tree() + asset_deps = dep_tree.get_internal_dependencies(top_level=True) + for dep in asset_deps: + if dep.revision_id == orig_revision_id: + if node_handle and dep.node_handle != node_handle: + continue # skip if node handle doesn't match + host.update_dependency(dep, new_path) + + return True