diff --git a/hooks/tk-multi-publish2/flowam/collector.py b/hooks/tk-multi-publish2/flowam/collector.py
new file mode 100644
index 00000000..f19ccb84
--- /dev/null
+++ b/hooks/tk-multi-publish2/flowam/collector.py
@@ -0,0 +1,161 @@
+# Copyright (c) 2025 Shotgun Software Inc.
+#
+# CONFIDENTIAL AND PROPRIETARY
+#
+# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit
+# Source Code License included in this distribution package. See LICENSE.
+# By accessing, using, copying or modifying this work you indicate your
+# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights
+# not expressly granted therein are reserved by Shotgun Software Inc.
+
+import os
+
+import sgtk
+
+HookBaseClass = sgtk.get_hook_baseclass()
+
+
+class FlowDesktopFileCollector(HookBaseClass):
+ """
+ Collector that operates on the Flow Production Tracking Desktop publish
+ workflow. Should inherit from the basic collector hook in the
+ ``tk-multi-publish2`` app. The collector setting for this hook should look
+ something like this::
+
+ collector: "{self}/collector.py:{engine}/tk-multi-publish2/flowam/collector.py"
+
+ """
+
+ def process_file(self, settings, parent_item, path):
+ """
+ Analyzes the given file or folder and creates publish items.
+ Blocks DCC-specific files that must be published from within their applications.
+
+ Args:
+ settings (dict): Configured settings for this collector
+ parent_item: Root item instance
+ path: Path of the file
+
+ Returns:
+ The created file item, or None if path is a folder or blocked DCC file
+ """
+
+ # Check if this is a DCC file that should not be published from desktop
+ file_info = self.parent.util.get_file_path_components(path)
+ extension = file_info["extension"]
+
+ # Define DCC file extensions that cannot be published from desktop
+ # These files require their specific DCC application to be open for proper publishing
+ dcc_extensions = [
+ # Maya
+ "ma",
+ "mb",
+ # Nuke
+ "nk",
+ "nkple",
+ # Houdini
+ "hip",
+ "hipnc",
+ "hiplc",
+ # 3ds Max
+ "max",
+ # Hiero
+ "hrox",
+ # Photoshop
+ "psd",
+ "psb",
+ # VRED
+ "vpb",
+ "vpe",
+ "osb",
+ # Alias
+ "wire",
+ # After Effects
+ "aep",
+ "aet",
+ ]
+
+ if extension in dcc_extensions:
+ # Get the file type display name
+ file_type = "Unknown"
+ for display_name, type_info in self.common_file_info.items():
+ if extension in type_info["extensions"]:
+ file_type = display_name
+ break
+
+ # Log an error message that will be visible to the user
+ self.logger.error(
+ "Cannot publish {file_type} files from Desktop. "
+ "Please publish from within the application instead.".format(
+ file_type=file_type
+ ),
+ extra={
+ "action_show_more_info": {
+ "label": "Learn More",
+ "text": (
+ "DCC files must be published from within their application.
"
+ "Files like Maya scenes (.ma, .mb), Nuke scripts (.nk, .nkple), Houdini scenes "
+ "(.hip, .hipnc, .hiplc), 3ds Max scenes (.max), Photoshop images (.psd, .psb), "
+ "and other DCC-specific formats contain application-specific data that requires "
+ "the DCC to be open for proper publishing.
"
+ "The Desktop Publisher is designed for publishing rendered images, textures, "
+ "Alembic caches, and other standalone files."
+ ),
+ }
+ },
+ )
+ return None
+
+ return self._collect_file(parent_item, path)
+
+ def process_current_session(self, settings, parent_item):
+ """
+ Collect publishable items for desktop publishing from the current session.
+
+ This method performs the following:
+ 1. Captures the current app context and locks it to this publisher instance
+ 2. Reads the revision_id from environment variables (if publishing a new revision of an existing asset)
+ 3. Collects publishable items based on the desktop workflow
+
+ Args:
+ settings (dict): Configured settings for this collector
+ parent_item: Root item instance
+ """
+
+ # Get app context and set it to the parent item of current publish session
+ app_context = self.parent.context
+ parent_item.context = app_context
+
+ # Determine the appropriate env var based on context level
+ task = parent_item.context.task
+ revision_id_env_var = None
+
+ if task:
+ # Task-level context
+ task_id = task["id"]
+ revision_id_env_var = f"TK_FLOWAM_REVISION_ID_{task_id}"
+ else:
+ # Project-level context
+ project = parent_item.context.project
+ if project:
+ project_id = project["id"]
+ revision_id_env_var = f"TK_FLOWAM_REVISION_ID_PROJECT_{project_id}"
+
+ # Capture revision_id from environment variable and store it on root item (parent_item)
+ # so all child file items can access it via item.parent.properties. This identifies
+ # the existing AM asset we're publishing new revisions to.
+ if revision_id_env_var and revision_id_env_var in os.environ:
+ revision_id = os.environ[revision_id_env_var]
+ parent_item.properties["am_revision_id"] = revision_id
+
+ context_type = "task" if task else "project"
+ context_id = task["id"] if task else project["id"]
+ self.logger.debug(
+ f"Captured revision_id {revision_id} from environment variable for {context_type} {context_id}"
+ )
+
+ # Clean up environment variable after capturing
+ os.environ.pop(revision_id_env_var)
+ self.logger.debug(
+ f"Cleaned up environment variable {revision_id_env_var} after capturing revision_id"
+ )
diff --git a/hooks/tk-multi-publish2/flowam/icons/flow.png b/hooks/tk-multi-publish2/flowam/icons/flow.png
new file mode 100644
index 00000000..ef451528
Binary files /dev/null and b/hooks/tk-multi-publish2/flowam/icons/flow.png differ
diff --git a/hooks/tk-multi-publish2/flowam/publish_to_flow.py b/hooks/tk-multi-publish2/flowam/publish_to_flow.py
new file mode 100644
index 00000000..cb16ef1e
--- /dev/null
+++ b/hooks/tk-multi-publish2/flowam/publish_to_flow.py
@@ -0,0 +1,349 @@
+# Copyright (c) 2025 Shotgun Software Inc.
+#
+# CONFIDENTIAL AND PROPRIETARY
+#
+# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit
+# Source Code License included in this distribution package. See LICENSE.
+# By accessing, using, copying or modifying this work you indicate your
+# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights
+# not expressly granted therein are reserved by Shotgun Software Inc.
+
+import os
+import pprint
+
+import sgtk
+
+HookBaseClass = sgtk.get_hook_baseclass()
+
+
+class DesktopFlowPublishPlugin(HookBaseClass):
+ """
+ Self-contained desktop publish plugin for Flow Asset Management integration.
+
+ Subclasses ``publish_file.py`` directly via the hook chain::
+
+ hook: "{self}/publish_file.py:{engine}/tk-multi-publish2/flowam/publish_to_flow.py"
+
+ Handles the full desktop publish workflow: framework loading, project
+ validation, revision validation, and publishing via
+ ``create_generic_workfile`` (new asset) or ``publish_generic_revision``
+ (existing asset) in the Flow AM SDK.
+
+ The DCC counterpart lives in
+ ``tk-multi-publish2/hooks/flowam/publish_to_flow.py``
+ (``DccFlowPublishPlugin``). Shared logic (properties, ``publish``,
+ ``finalize``, ``get_publish_user``) is duplicated between the two so
+ each plugin can evolve independently across separate release cycles.
+ When updating shared logic, apply the change to both files.
+ """
+
+ def __init__(self, *args, **kwargs):
+ """Initialize the plugin."""
+ super().__init__(*args, **kwargs)
+ self.flow_module = None
+ self.sg_flow_am_id = None
+
+ ############################################################################
+ # standard publish plugin properties
+
+ @property
+ def icon(self):
+ return os.path.join(self.disk_location, "icons", "flow.png")
+
+ @property
+ def name(self):
+ return "Publish to Flow AM"
+
+ @property
+ def description(self):
+ return """
+ Publishes the file to Flow Production Tracking and Asset Manager. A Publish entry
+ will be created in Flow Production Tracking which will include a reference
+ to the file's current path on disk. Other users will be able to access the
+ published file via the Loader so long as they have
+ access to the file's location on disk.
+
+
" f"{error_msg}\n" "",
+ }
+ },
+ )
+ return False
+
+ return True
+
+ def publish(self, settings, item):
+ """Publish the item to Flow AM."""
+ try:
+ pub_info = self._publish_to_flow(item)
+
+ # Check if user cancelled (child process return None)
+ if pub_info is None:
+ raise self.parent.base_hooks.PublishCancelledException(
+ "User cancelled the publish to Flow AM."
+ )
+ self.logger.info("Publish to Flow AM successful")
+
+ # Store publish info for downstream plugins (e.g., alembic derivative)
+ item.properties["am_publish_info"] = pub_info
+ item.properties["entity"] = item.context.entity or item.context.project
+ item.properties["task"] = item.context.task
+
+ self.logger.info("Publish registered!")
+ self.logger.debug(
+ "Flow AM Publish info...",
+ extra={
+ "action_show_more_info": {
+ "label": "Flow AM Publish Info",
+ "tooltip": "Show the complete Flow AM Publish info",
+ "text": "%s" % (pprint.pformat(pub_info.__dict__),), + } + }, + ) + except self.parent.base_hooks.PublishCancelledException: + # Re-raise cancellation exception without logging as error + # The dialog will handle this and show "Publish Cancelled" + raise + except Exception as e: + self.logger.error( + "Failed to publish to Flow AM", + extra={ + "action_show_more_info": { + "label": "Error Details", + "text": "
" f"{e}\n" "",
+ }
+ },
+ )
+ raise
+
+ def finalize(self, settings, item):
+ # Override base class no-op: Flow AM publishes do not produce
+ # sg_publish_data, so the base finalize (which reads it) would fail.
+ pass
+
+ def get_publish_user(self, settings, item):
+ """
+ Get the user that will be associated with this publish.
+
+ If publish_user is not defined as a ``property`` or ``local_property``,
+ this method will return ``None``.
+
+ :param settings: This plugin instance's configured settings
+ :param item: The item to determine the publish template for
+
+ :return: A user entity dictionary or ``None`` if not defined.
+ """
+ return item.context.user
+
+ ############################################################################
+ # protected methods
+
+ def _publish_to_flow(self, item):
+ """
+ Delegates to ``_publish_revision`` or ``_create_new_asset`` depending
+ on whether a revision ID is present on the parent item.
+ """
+ flow_args = dict(
+ comment=item.description or "",
+ thumbnail_path=item.get_thumbnail_as_path(),
+ )
+
+ # Read from parent item - revision_id applies to entire publish session
+ revision_id = None
+ if item.parent:
+ revision_id = item.parent.properties.get("am_revision_id")
+
+ if revision_id:
+ pub_info = self._publish_revision(item, flow_args, revision_id)
+ else:
+ pub_info = self._create_new_asset(item, flow_args)
+
+ return pub_info
+
+ def _get_generic_inputs(self, item) -> dict:
+ """
+ Build the SG entity inputs required by the Flow AM SDK for generic
+ asset creation. Called by ``_create_new_asset`` to populate
+ ``CreateGenericInputs``.
+ """
+ sg_flow_am_id = sgtk.platform.current_engine().context.project["sg_flow_am_id"]
+ entity = item.context.entity or item.context.project
+ entity_type = entity["type"]
+ # When creating from project context, sg entity related parameters are not relevant
+ sg_entity_type = entity_type if entity_type != "Project" else None
+ sg_entity_name = entity["name"] if entity_type != "Project" else None
+ sg_pipeline_step = (
+ item.context.step["name"] if entity_type != "Project" else None
+ )
+ sg_task_name = item.context.task["name"] if entity_type != "Project" else None
+
+ return dict(
+ am_project_id=sg_flow_am_id,
+ sg_entity_name=sg_entity_name,
+ sg_entity_type=sg_entity_type,
+ sg_pipeline_step=sg_pipeline_step,
+ sg_task_name=sg_task_name,
+ source_path=item.get_property("path"),
+ )
+
+ def _publish_revision(self, item, flow_args: dict, revision_id: str):
+ """
+ Publish a new revision of an existing generic asset via the Flow AM SDK.
+ Called by ``_publish_to_flow`` when a revision ID is present on the
+ parent item, indicating we are updating an existing asset rather than
+ creating a new one.
+ """
+ self.logger.info(
+ f"Publishing new revision of existing generic asset (revision_id: {revision_id})"
+ )
+
+ # Prepare GenericPublishInputs
+ flow_args.update(
+ {
+ "am_asset_id": revision_id, # Revision id can be used as an asset id in MEDM
+ "source_path": item.get_property("path"),
+ }
+ )
+ publish_inputs = self.flow_module.asset_management.GenericPublishInputs(
+ **flow_args
+ )
+
+ self.logger.debug(
+ "Calling publish_generic_revision with:",
+ extra={
+ "action_show_more_info": {
+ "label": "See contents",
+ "text": "" f"{pprint.pformat(flow_args)}\n" "",
+ }
+ },
+ )
+
+ # Note: If this fails, the exception propagates to publish() which
+ # handles error logging.
+ pub_info = self.flow_module.asset_management.publish_generic_revision(
+ publish_inputs,
+ )
+ return pub_info
+
+ def _create_new_asset(self, item, flow_args: dict):
+ """
+ Create a new generic asset via the Flow AM SDK. Called by
+ ``_publish_to_flow`` when no revision ID is present on the parent
+ item, indicating this is a first-time publish rather than a revision.
+ Delegates SG entity resolution to ``_get_generic_inputs``.
+ """
+ self.logger.info("Creating new generic asset")
+
+ create_args = self._get_generic_inputs(item)
+ create_args.update(
+ {
+ "comment": flow_args.get("comment", ""),
+ "thumbnail_path": flow_args.get("thumbnail_path", ""),
+ }
+ )
+ create_inputs = self.flow_module.asset_management.CreateGenericInputs(
+ **create_args
+ )
+
+ self.logger.debug(
+ "Calling create_generic_workfile with:",
+ extra={
+ "action_show_more_info": {
+ "label": "See contents",
+ "text": "" f"{pprint.pformat(create_inputs)}\n" "",
+ }
+ },
+ )
+
+ # Note: If this fails, the exception propagates to publish() which
+ # handles error logging.
+ pub_info = self.flow_module.asset_management.create_generic_workfile(
+ create_inputs,
+ )
+ return pub_info