diff --git a/aggregate-messages-servicebus-correlationid/default/manifest.json b/aggregate-messages-servicebus-correlationid/default/manifest.json new file mode 100644 index 0000000..6be0a3e --- /dev/null +++ b/aggregate-messages-servicebus-correlationid/default/manifest.json @@ -0,0 +1,92 @@ +{ + "id": "default", + "title": "Aggregate messages from Azure Service Bus by CorrelationId", + "summary": "Aggregates messages from an Azure Service Bus queue by grouping them using the CorrelationId property. Designed to replicate BizTalk Server Aggregator pattern in Azure Logic Apps Standard. Processes messages in batches, decodes flat file content using an XSD schema, and returns aggregated results.", + "description": "This workflow template implements the Aggregator enterprise integration pattern for Azure Logic Apps Standard, designed to facilitate migration from BizTalk Server to Azure. It retrieves messages from an Azure Service Bus queue in batches, decodes flat file content using XSD schemas (compatible with BizTalk flat file schemas), and groups messages by their CorrelationId property. The template uses built-in Service Bus operations for optimal performance and includes comprehensive error handling, making it ideal for organizations modernizing their BizTalk Server integration solutions.", + "prerequisites": "**Azure Service Bus**: You need an Azure Service Bus namespace with a non-session queue configured. **Flat file schema**: You need an XSD schema uploaded to the logic app's Artifacts/Schemas folder for message decoding (supports BizTalk flat file schemas). **Connection**: You need to configure an Azure Service Bus connection in your logic app. For more information, see [Azure Service Bus connector overview](https://learn.microsoft.com/connectors/servicebus/).", + "kinds": [ + "stateful" + ], + "artifacts": [ + { + "type": "workflow", + "file": "workflow.json" + } + ], + "images": { + "light": "workflow-light", + "dark": "workflow-dark" + }, + "parameters": [ + { + "name": "ServiceBusQueueName_#workflowname#", + "displayName": "Azure Service Bus queue name", + "type": "String", + "default": "your-queue-name", + "description": "The name of the Azure Service Bus queue to monitor for incoming messages. This queue must exist in your Service Bus namespace and should have sessions disabled.", + "required": true + }, + { + "name": "MaxBatchSize_#workflowname#", + "displayName": "Maximum batch size", + "type": "Int", + "default": "10", + "description": "The maximum number of messages to retrieve and process in a single batch. Valid range is 1-100. Adjust this value based on your message size and throughput requirements.", + "required": true + }, + { + "name": "FlatFileSchemaName_#workflowname#", + "displayName": "Flat file schema name", + "type": "String", + "default": "Invoice.xsd", + "description": "The name of the flat file schema (XSD) to use for decoding message content. This schema must be uploaded to your logic app's Artifacts/Schemas folder before deployment.", + "required": true + }, + { + "name": "DefaultCorrelationId_#workflowname#", + "displayName": "Default CorrelationId", + "type": "String", + "default": "NO_CORRELATION_ID", + "description": "The fallback value to use when a message does not have a CorrelationId property. Messages with this value will be grouped together.", + "required": false + }, + { + "name": "ServiceBusConnectionName_#workflowname#", + "displayName": "Azure Service Bus connection name", + "type": "String", + "default": "serviceBus", + "description": "The name of the Azure Service Bus connection reference in your logic app's connections.json file. This connection provides authentication credentials for accessing the Service Bus namespace.", + "required": true + }, + { + "name": "EnableSequentialProcessing_#workflowname#", + "displayName": "Enable sequential processing", + "type": "Bool", + "default": "true", + "description": "When set to true, processes messages sequentially (concurrency=1) to ensure order. When set to false, allows parallel processing for higher throughput. Use true for scenarios requiring strict message ordering.", + "required": false + }, + { + "name": "ResponseStatusCode_#workflowname#", + "displayName": "HTTP response status code", + "type": "Int", + "default": "200", + "description": "The HTTP status code to return in the response for successful processing. Standard value is 200 (OK).", + "required": false + }, + { + "name": "ResponseContentType_#workflowname#", + "displayName": "HTTP response content type", + "type": "String", + "default": "application/json", + "description": "The Content-Type header value for the HTTP response. Use 'application/json' for JSON-formatted responses.", + "required": false + } + ], + "connections": { + "serviceBus_#workflowname#": { + "connectorId": "/serviceProviders/serviceBus", + "kind": "inapp" + } + } +} \ No newline at end of file diff --git a/aggregate-messages-servicebus-correlationid/workflow-dark.png b/aggregate-messages-servicebus-correlationid/default/workflow-dark.png similarity index 100% rename from aggregate-messages-servicebus-correlationid/workflow-dark.png rename to aggregate-messages-servicebus-correlationid/default/workflow-dark.png diff --git a/aggregate-messages-servicebus-correlationid/workflow-light.png b/aggregate-messages-servicebus-correlationid/default/workflow-light.png similarity index 100% rename from aggregate-messages-servicebus-correlationid/workflow-light.png rename to aggregate-messages-servicebus-correlationid/default/workflow-light.png diff --git a/aggregate-messages-servicebus-correlationid/default/workflow.json b/aggregate-messages-servicebus-correlationid/default/workflow.json new file mode 100644 index 0000000..a9f38e0 --- /dev/null +++ b/aggregate-messages-servicebus-correlationid/default/workflow.json @@ -0,0 +1,428 @@ +{ + "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#", + "contentVersion": "1.0.0.0", + "parameters": { + "ServiceBusQueueName_#workflowname#": { + "type": "string", + "defaultValue": "insbcorrelation", + "metadata": { + "description": "Name of the Azure Service Bus queue to monitor for incoming messages" + } + }, + "MaxBatchSize_#workflowname#": { + "type": "int", + "defaultValue": 10, + "metadata": { + "description": "Maximum number of messages to retrieve and process in a single batch (1-100)" + } + }, + "FlatFileSchemaName_#workflowname#": { + "type": "string", + "defaultValue": "Invoice.xsd", + "metadata": { + "description": "Name of the flat file schema (XSD) to use for message decoding" + } + }, + "DefaultCorrelationId_#workflowname#": { + "type": "string", + "defaultValue": "NO_CORRELATION_ID", + "metadata": { + "description": "Default value to use when a message does not have a CorrelationId" + } + }, + "ServiceBusConnectionName_#workflowname#": { + "type": "string", + "defaultValue": "serviceBus_#workflowname#", + "metadata": { + "description": "Name of the Azure Service Bus connection defined in connections.json" + } + }, + "EnableSequentialProcessing_#workflowname#": { + "type": "bool", + "defaultValue": true, + "metadata": { + "description": "When true, processes messages sequentially (concurrency=1); when false, allows parallel processing" + } + }, + "ResponseStatusCode_#workflowname#": { + "type": "int", + "defaultValue": 200, + "metadata": { + "description": "HTTP status code for successful response" + } + }, + "ResponseContentType_#workflowname#": { + "type": "string", + "defaultValue": "application/json", + "metadata": { + "description": "Content-Type header for the HTTP response" + } + } + }, + "actions": { + "Process_Batch_Messages": { + "type": "Foreach", + "foreach": "@triggerBody()", + "actions": { + "Process_Message_Scope": { + "type": "Scope", + "actions": { + "Get_CorrelationId": { + "type": "Compose", + "inputs": "@coalesce(items('Process_Batch_Messages')?['correlationId'], parameters('DefaultCorrelationId_#workflowname#'))", + "metadata": { + "description": "Extract CorrelationId from Azure Service Bus message properties. Falls back to DefaultCorrelationId if not present" + } + }, + "Decode_Flat_File": { + "type": "FlatFileDecoding", + "inputs": { + "content": "@items('Process_Batch_Messages')?['contentData']", + "schema": { + "source": "LogicApp", + "name": "@parameters('FlatFileSchemaName_#workflowname#')" + } + }, + "runAfter": { + "Get_CorrelationId": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Decode flat file message content using the configured XSD schema. Converts positional text format to XML" + } + }, + "Get_Current_Group": { + "type": "Compose", + "inputs": "@if(contains(variables('CorrelationGroups'), outputs('Get_CorrelationId')), variables('CorrelationGroups')[outputs('Get_CorrelationId')], json('[]'))", + "runAfter": { + "Decode_Flat_File": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Retrieve existing messages for current CorrelationId from dictionary. Returns empty array if this is the first message for this CorrelationId" + } + }, + "Append_To_Group": { + "type": "Compose", + "inputs": "@union(outputs('Get_Current_Group'), array(body('Decode_Flat_File')))", + "runAfter": { + "Get_Current_Group": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Append newly decoded message to existing messages array for this CorrelationId using union operation" + } + }, + "Build_Updated_Groups": { + "type": "Compose", + "inputs": "@setProperty(variables('CorrelationGroups'), outputs('Get_CorrelationId'), outputs('Append_To_Group'))", + "runAfter": { + "Append_To_Group": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Build updated CorrelationGroups dictionary with new message array for current CorrelationId using setProperty function" + } + }, + "Update_CorrelationGroups": { + "type": "SetVariable", + "inputs": { + "name": "CorrelationGroups", + "value": "@outputs('Build_Updated_Groups')" + }, + "runAfter": { + "Build_Updated_Groups": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Update CorrelationGroups variable with new dictionary containing the appended message" + } + }, + "Check_If_New_CorrelationId": { + "type": "If", + "expression": { + "and": [ + { + "not": { + "contains": [ + "@variables('ProcessedCorrelationIds')", + "@outputs('Get_CorrelationId')" + ] + } + } + ] + }, + "actions": { + "Add_To_ProcessedIds": { + "type": "AppendToArrayVariable", + "inputs": { + "name": "ProcessedCorrelationIds", + "value": "@outputs('Get_CorrelationId')" + }, + "metadata": { + "description": "Add CorrelationId to tracking array to preserve order of first occurrence" + } + } + }, + "else": { + "actions": {} + }, + "runAfter": { + "Update_CorrelationGroups": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Check if this is the first message with this CorrelationId in current batch. If new, add to ProcessedCorrelationIds tracking array" + } + } + } + }, + "Handle_Scope_Error": { + "type": "If", + "expression": { + "and": [ + { + "equals": [ + "@result('Process_Message_Scope')[0]['status']", + "Failed" + ] + } + ] + }, + "actions": { + "Log_Error": { + "type": "Compose", + "inputs": { + "ErrorType": "MessageProcessingFailed", + "CorrelationId": "@coalesce(items('Process_Batch_Messages')?['correlationId'], 'UNKNOWN')", + "MessageId": "@items('Process_Batch_Messages')?['messageId']", + "ErrorDetails": "@result('Process_Message_Scope')[0]", + "Timestamp": "@utcNow()" + }, + "metadata": { + "description": "Compose error log entry with message details and scope failure information for troubleshooting" + } + } + }, + "else": { + "actions": {} + }, + "runAfter": { + "Process_Message_Scope": [ + "SUCCEEDED", + "FAILED", + "SKIPPED", + "TIMEDOUT" + ] + }, + "metadata": { + "description": "Error handler for Process_Message_Scope. Captures failures from FlatFileDecoding or other processing steps. Workflow continues processing remaining messages" + } + } + }, + "runAfter": { + "Initialize_ProcessedCorrelationIds": [ + "SUCCEEDED" + ] + }, + "runtimeConfiguration": { + "concurrency": { + "repetitions": 1 + } + }, + "metadata": { + "description": "Iterate through all messages in batch. Process each message: extract CorrelationId, decode flat file, and group by CorrelationId. Sequential processing (concurrency=1) ensures correct order" + } + }, + "Build_Aggregated_Messages_Scope": { + "type": "Scope", + "actions": { + "Build_Aggregated_Messages": { + "type": "Foreach", + "foreach": "@variables('ProcessedCorrelationIds')", + "actions": { + "Get_Messages_For_CorrelationId": { + "type": "Compose", + "inputs": "@variables('CorrelationGroups')[items('Build_Aggregated_Messages')]", + "metadata": { + "description": "Retrieve all messages for current CorrelationId from CorrelationGroups dictionary" + } + }, + "Build_Result_Object": { + "type": "Compose", + "inputs": { + "CorrelationId": "@items('Build_Aggregated_Messages')", + "MessageCount": "@length(outputs('Get_Messages_For_CorrelationId'))", + "Messages": "@outputs('Get_Messages_For_CorrelationId')" + }, + "runAfter": { + "Get_Messages_For_CorrelationId": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Build result object containing CorrelationId, count of messages, and array of decoded message bodies" + } + }, + "Append_To_Results": { + "type": "AppendToArrayVariable", + "inputs": { + "name": "AggregatedResults", + "value": "@outputs('Build_Result_Object')" + }, + "runAfter": { + "Build_Result_Object": [ + "SUCCEEDED" + ] + }, + "metadata": { + "description": "Add aggregated result object to final results array" + } + } + }, + "runtimeConfiguration": { + "concurrency": { + "repetitions": 1 + } + }, + "metadata": { + "description": "Build final aggregated results array. Iterates through ProcessedCorrelationIds (preserving order) and creates result objects with message groups" + } + } + }, + "runAfter": { + "Process_Batch_Messages": [ + "SUCCEEDED", + "FAILED" + ] + }, + "metadata": { + "description": "Scope for aggregation process with error handling support" + } + }, + "Handle_Aggregation_Error": { + "type": "If", + "expression": { + "and": [ + { + "equals": [ + "@result('Build_Aggregated_Messages_Scope')[0]['status']", + "Failed" + ] + } + ] + }, + "actions": { + "Log_Aggregation_Error": { + "type": "Compose", + "inputs": { + "ErrorType": "AggregationFailed", + "ErrorDetails": "@result('Build_Aggregated_Messages_Scope')[0]", + "ProcessedCorrelationIds": "@variables('ProcessedCorrelationIds')", + "PartialResults": "@variables('AggregatedResults')", + "Timestamp": "@utcNow()" + }, + "metadata": { + "description": "Log aggregation error with context including partial results and correlation IDs" + } + } + }, + "else": { + "actions": {} + }, + "runAfter": { + "Build_Aggregated_Messages_Scope": [ + "SUCCEEDED", + "FAILED", + "SKIPPED", + "TIMEDOUT" + ] + }, + "metadata": { + "description": "Error handler for aggregation scope. Logs errors but allows workflow to continue and return response" + } + }, + "Response": { + "type": "Response", + "kind": "http", + "inputs": { + "statusCode": "@parameters('ResponseStatusCode_#workflowname#')", + "headers": { + "Content-Type": "@parameters('ResponseContentType_#workflowname#')" + }, + "body": { + "ProcessedBatchSize": "@length(triggerBody())", + "UniqueCorrelationIds": "@length(variables('AggregatedResults'))", + "AggregatedMessages": "@variables('AggregatedResults')", + "ProcessingTimestamp": "@utcNow()", + "Configuration": { + "QueueName": "@parameters('ServiceBusQueueName_#workflowname#')", + "MaxBatchSize": "@parameters('MaxBatchSize_#workflowname#')", + "SchemaName": "@parameters('FlatFileSchemaName_#workflowname#')", + "SequentialProcessing": "@parameters('EnableSequentialProcessing_#workflowname#')" + } + } + }, + "runAfter": { + "Handle_Aggregation_Error": [ + "SUCCEEDED", + "FAILED", + "SKIPPED" + ] + }, + "metadata": { + "description": "Return HTTP response with aggregated results. Includes batch statistics, grouped messages by CorrelationId, processing timestamp, and configuration details" + } + }, + "Initialize_ProcessedCorrelationIds": { + "type": "InitializeVariable", + "inputs": { + "variables": [ + { + "name": "CorrelationGroups", + "type": "object", + "value": {} + }, + { + "name": "AggregatedResults", + "type": "array", + "value": [] + }, + { + "name": "ProcessedCorrelationIds", + "type": "array", + "value": [] + } + ] + }, + "runAfter": {} + } + }, + "outputs": {}, + "triggers": { + "When_messages_are_available_in_a_queue": { + "type": "ServiceProvider", + "inputs": { + "parameters": { + "queueName": "@parameters('ServiceBusQueueName_#workflowname#')", + "isSessionsEnabled": false, + "maxMessageCount": "@parameters('MaxBatchSize_#workflowname#')" + }, + "serviceProviderConfiguration": { + "connectionName": "serviceBus_#workflowname#", + "operationId": "peekLockQueueMessagesV2", + "serviceProviderId": "/serviceProviders/serviceBus" + } + }, + "metadata": { + "description": "Built-in Azure Service Bus trigger (ServiceProvider). Retrieves batch of messages using peek-lock from non-session queue. Uses peekLockQueueMessagesV2 for better performance in Azure Logic Apps Standard" + } + } + } +} \ No newline at end of file diff --git a/aggregate-messages-servicebus-correlationid/manifest.json b/aggregate-messages-servicebus-correlationid/manifest.json index f474cfa..e9ec7d6 100644 --- a/aggregate-messages-servicebus-correlationid/manifest.json +++ b/aggregate-messages-servicebus-correlationid/manifest.json @@ -1,7 +1,26 @@ { + "id": "aggregate-messages-servicebus-correlationid", "title": "Aggregate messages from Azure Service Bus by CorrelationId", - "description": "Aggregates messages from an Azure Service Bus queue by grouping them using the CorrelationId property. Designed to replicate BizTalk Server Aggregator pattern in Azure Logic Apps Standard. Processes messages in batches, decodes flat file content using an XSD schema, and returns aggregated results.", - "prerequisites": "**Azure Service Bus**: You need an Azure Service Bus namespace with a non-session queue configured. **Flat file schema**: You need an XSD schema uploaded to the logic app's Artifacts/Schemas folder for message decoding (supports BizTalk flat file schemas). **Connection**: You need to configure an Azure Service Bus connection in your logic app. For more information, see [Azure Service Bus connector overview](https://learn.microsoft.com/connectors/servicebus/).", + "summary": "Aggregates messages from an Azure Service Bus queue by grouping them using the CorrelationId property. Designed to replicate BizTalk Server Aggregator pattern in Azure Logic Apps Standard. Processes messages in batches, decodes flat file content using an XSD schema, and returns aggregated results.", + "skus": [ + "standard" + ], + "workflows": { + "default": { + "name": "Aggregate_Messages_Servicebus_Correlationid" + } + }, + "featuredConnectors": [ + { + "id": "/serviceProviders/serviceBus", + "kind": "inapp" + } + ], + "details": { + "By": "Microsoft", + "Type": "Workflow", + "Trigger": "Event" + }, "tags": [ "Azure Service Bus", "Aggregator", @@ -11,112 +30,5 @@ "Message Grouping", "Integration Pattern", "BizTalk Migration" - ], - "skus": [ - "standard" - ], - "kinds": [ - "stateful" - ], - "detailsDescription": "This workflow template implements the Aggregator enterprise integration pattern for Azure Logic Apps Standard, designed to facilitate migration from BizTalk Server to Azure. It retrieves messages from an Azure Service Bus queue in batches, decodes flat file content using XSD schemas (compatible with BizTalk flat file schemas), and groups messages by their CorrelationId property. The template uses built-in Service Bus operations for optimal performance and includes comprehensive error handling, making it ideal for organizations modernizing their BizTalk Server integration solutions.", - "details": [ - { - "name": "By", - "value": "Microsoft" - }, - { - "name": "Type", - "value": "Workflow" - }, - { - "name": "Trigger", - "value": "Azure Service Bus - When messages are available" - } - ], - "artifacts": [ - { - "type": "workflow", - "file": "workflow.json" - }, - { - "type": "workflow", - "file": "parameters.json" - } - ], - "images": { - "light": "workflow-light", - "dark": "workflow-dark" - }, - "parameters": [ - { - "name": "ServiceBusQueueName_#workflowname#", - "displayName": "Azure Service Bus queue name", - "type": "String", - "default": "your-queue-name", - "description": "The name of the Azure Service Bus queue to monitor for incoming messages. This queue must exist in your Service Bus namespace and should have sessions disabled.", - "required": true - }, - { - "name": "MaxBatchSize_#workflowname#", - "displayName": "Maximum batch size", - "type": "Int", - "default": "10", - "description": "The maximum number of messages to retrieve and process in a single batch. Valid range is 1-100. Adjust this value based on your message size and throughput requirements.", - "required": true - }, - { - "name": "FlatFileSchemaName_#workflowname#", - "displayName": "Flat file schema name", - "type": "String", - "default": "Invoice.xsd", - "description": "The name of the flat file schema (XSD) to use for decoding message content. This schema must be uploaded to your logic app's Artifacts/Schemas folder before deployment.", - "required": true - }, - { - "name": "DefaultCorrelationId_#workflowname#", - "displayName": "Default CorrelationId", - "type": "String", - "default": "NO_CORRELATION_ID", - "description": "The fallback value to use when a message does not have a CorrelationId property. Messages with this value will be grouped together.", - "required": false - }, - { - "name": "ServiceBusConnectionName_#workflowname#", - "displayName": "Azure Service Bus connection name", - "type": "String", - "default": "serviceBus", - "description": "The name of the Azure Service Bus connection reference in your logic app's connections.json file. This connection provides authentication credentials for accessing the Service Bus namespace.", - "required": true - }, - { - "name": "EnableSequentialProcessing_#workflowname#", - "displayName": "Enable sequential processing", - "type": "Bool", - "default": "true", - "description": "When set to true, processes messages sequentially (concurrency=1) to ensure order. When set to false, allows parallel processing for higher throughput. Use true for scenarios requiring strict message ordering.", - "required": false - }, - { - "name": "ResponseStatusCode_#workflowname#", - "displayName": "HTTP response status code", - "type": "Int", - "default": "200", - "description": "The HTTP status code to return in the response for successful processing. Standard value is 200 (OK).", - "required": false - }, - { - "name": "ResponseContentType_#workflowname#", - "displayName": "HTTP response content type", - "type": "String", - "default": "application/json", - "description": "The Content-Type header value for the HTTP response. Use 'application/json' for JSON-formatted responses.", - "required": false - } - ], - "connections": [ - { - "connectorId": "serviceBus_#workflowname#", - "kind": "inapp" - } ] -} +} \ No newline at end of file diff --git a/aggregate-messages-servicebus-correlationid/workflow.json b/aggregate-messages-servicebus-correlationid/workflow.json deleted file mode 100644 index 32444d7..0000000 --- a/aggregate-messages-servicebus-correlationid/workflow.json +++ /dev/null @@ -1,431 +0,0 @@ -{ - "definition": { - "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#", - "contentVersion": "1.0.0.0", - "parameters": { - "ServiceBusQueueName": { - "type": "string", - "defaultValue": "insbcorrelation", - "metadata": { - "description": "Name of the Azure Service Bus queue to monitor for incoming messages" - } - }, - "MaxBatchSize": { - "type": "int", - "defaultValue": 10, - "metadata": { - "description": "Maximum number of messages to retrieve and process in a single batch (1-100)" - } - }, - "FlatFileSchemaName": { - "type": "string", - "defaultValue": "Invoice.xsd", - "metadata": { - "description": "Name of the flat file schema (XSD) to use for message decoding" - } - }, - "DefaultCorrelationId": { - "type": "string", - "defaultValue": "NO_CORRELATION_ID", - "metadata": { - "description": "Default value to use when a message does not have a CorrelationId" - } - }, - "ServiceBusConnectionName": { - "type": "string", - "defaultValue": "serviceBus", - "metadata": { - "description": "Name of the Azure Service Bus connection defined in connections.json" - } - }, - "EnableSequentialProcessing": { - "type": "bool", - "defaultValue": true, - "metadata": { - "description": "When true, processes messages sequentially (concurrency=1); when false, allows parallel processing" - } - }, - "ResponseStatusCode": { - "type": "int", - "defaultValue": 200, - "metadata": { - "description": "HTTP status code for successful response" - } - }, - "ResponseContentType": { - "type": "string", - "defaultValue": "application/json", - "metadata": { - "description": "Content-Type header for the HTTP response" - } - } - }, - "actions": { - "Process_Batch_Messages": { - "type": "Foreach", - "foreach": "@triggerBody()", - "actions": { - "Process_Message_Scope": { - "type": "Scope", - "actions": { - "Get_CorrelationId": { - "type": "Compose", - "inputs": "@coalesce(items('Process_Batch_Messages')?['correlationId'], parameters('DefaultCorrelationId'))", - "metadata": { - "description": "Extract CorrelationId from Azure Service Bus message properties. Falls back to DefaultCorrelationId if not present" - } - }, - "Decode_Flat_File": { - "type": "FlatFileDecoding", - "inputs": { - "content": "@items('Process_Batch_Messages')?['contentData']", - "schema": { - "source": "LogicApp", - "name": "@parameters('FlatFileSchemaName')" - } - }, - "runAfter": { - "Get_CorrelationId": [ - "SUCCEEDED" - ] - }, - "metadata": { - "description": "Decode flat file message content using the configured XSD schema. Converts positional text format to XML" - } - }, - "Get_Current_Group": { - "type": "Compose", - "inputs": "@if(contains(variables('CorrelationGroups'), outputs('Get_CorrelationId')), variables('CorrelationGroups')[outputs('Get_CorrelationId')], json('[]'))", - "runAfter": { - "Decode_Flat_File": [ - "SUCCEEDED" - ] - }, - "metadata": { - "description": "Retrieve existing messages for current CorrelationId from dictionary. Returns empty array if this is the first message for this CorrelationId" - } - }, - "Append_To_Group": { - "type": "Compose", - "inputs": "@union(outputs('Get_Current_Group'), array(body('Decode_Flat_File')))", - "runAfter": { - "Get_Current_Group": [ - "SUCCEEDED" - ] - }, - "metadata": { - "description": "Append newly decoded message to existing messages array for this CorrelationId using union operation" - } - }, - "Build_Updated_Groups": { - "type": "Compose", - "inputs": "@setProperty(variables('CorrelationGroups'), outputs('Get_CorrelationId'), outputs('Append_To_Group'))", - "runAfter": { - "Append_To_Group": [ - "SUCCEEDED" - ] - }, - "metadata": { - "description": "Build updated CorrelationGroups dictionary with new message array for current CorrelationId using setProperty function" - } - }, - "Update_CorrelationGroups": { - "type": "SetVariable", - "inputs": { - "name": "CorrelationGroups", - "value": "@outputs('Build_Updated_Groups')" - }, - "runAfter": { - "Build_Updated_Groups": [ - "SUCCEEDED" - ] - }, - "metadata": { - "description": "Update CorrelationGroups variable with new dictionary containing the appended message" - } - }, - "Check_If_New_CorrelationId": { - "type": "If", - "expression": { - "and": [ - { - "not": { - "contains": [ - "@variables('ProcessedCorrelationIds')", - "@outputs('Get_CorrelationId')" - ] - } - } - ] - }, - "actions": { - "Add_To_ProcessedIds": { - "type": "AppendToArrayVariable", - "inputs": { - "name": "ProcessedCorrelationIds", - "value": "@outputs('Get_CorrelationId')" - }, - "metadata": { - "description": "Add CorrelationId to tracking array to preserve order of first occurrence" - } - } - }, - "else": { - "actions": {} - }, - "runAfter": { - "Update_CorrelationGroups": [ - "SUCCEEDED" - ] - }, - "metadata": { - "description": "Check if this is the first message with this CorrelationId in current batch. If new, add to ProcessedCorrelationIds tracking array" - } - } - } - }, - "Handle_Scope_Error": { - "type": "If", - "expression": { - "and": [ - { - "equals": [ - "@result('Process_Message_Scope')[0]['status']", - "Failed" - ] - } - ] - }, - "actions": { - "Log_Error": { - "type": "Compose", - "inputs": { - "ErrorType": "MessageProcessingFailed", - "CorrelationId": "@coalesce(items('Process_Batch_Messages')?['correlationId'], 'UNKNOWN')", - "MessageId": "@items('Process_Batch_Messages')?['messageId']", - "ErrorDetails": "@result('Process_Message_Scope')[0]", - "Timestamp": "@utcNow()" - }, - "metadata": { - "description": "Compose error log entry with message details and scope failure information for troubleshooting" - } - } - }, - "else": { - "actions": {} - }, - "runAfter": { - "Process_Message_Scope": [ - "SUCCEEDED", - "FAILED", - "SKIPPED", - "TIMEDOUT" - ] - }, - "metadata": { - "description": "Error handler for Process_Message_Scope. Captures failures from FlatFileDecoding or other processing steps. Workflow continues processing remaining messages" - } - } - }, - "runAfter": { - "Initialize_ProcessedCorrelationIds": [ - "SUCCEEDED" - ] - }, - "runtimeConfiguration": { - "concurrency": { - "repetitions": 1 - } - }, - "metadata": { - "description": "Iterate through all messages in batch. Process each message: extract CorrelationId, decode flat file, and group by CorrelationId. Sequential processing (concurrency=1) ensures correct order" - } - }, - "Build_Aggregated_Messages_Scope": { - "type": "Scope", - "actions": { - "Build_Aggregated_Messages": { - "type": "Foreach", - "foreach": "@variables('ProcessedCorrelationIds')", - "actions": { - "Get_Messages_For_CorrelationId": { - "type": "Compose", - "inputs": "@variables('CorrelationGroups')[items('Build_Aggregated_Messages')]", - "metadata": { - "description": "Retrieve all messages for current CorrelationId from CorrelationGroups dictionary" - } - }, - "Build_Result_Object": { - "type": "Compose", - "inputs": { - "CorrelationId": "@items('Build_Aggregated_Messages')", - "MessageCount": "@length(outputs('Get_Messages_For_CorrelationId'))", - "Messages": "@outputs('Get_Messages_For_CorrelationId')" - }, - "runAfter": { - "Get_Messages_For_CorrelationId": [ - "SUCCEEDED" - ] - }, - "metadata": { - "description": "Build result object containing CorrelationId, count of messages, and array of decoded message bodies" - } - }, - "Append_To_Results": { - "type": "AppendToArrayVariable", - "inputs": { - "name": "AggregatedResults", - "value": "@outputs('Build_Result_Object')" - }, - "runAfter": { - "Build_Result_Object": [ - "SUCCEEDED" - ] - }, - "metadata": { - "description": "Add aggregated result object to final results array" - } - } - }, - "runtimeConfiguration": { - "concurrency": { - "repetitions": 1 - } - }, - "metadata": { - "description": "Build final aggregated results array. Iterates through ProcessedCorrelationIds (preserving order) and creates result objects with message groups" - } - } - }, - "runAfter": { - "Process_Batch_Messages": [ - "SUCCEEDED", - "FAILED" - ] - }, - "metadata": { - "description": "Scope for aggregation process with error handling support" - } - }, - "Handle_Aggregation_Error": { - "type": "If", - "expression": { - "and": [ - { - "equals": [ - "@result('Build_Aggregated_Messages_Scope')[0]['status']", - "Failed" - ] - } - ] - }, - "actions": { - "Log_Aggregation_Error": { - "type": "Compose", - "inputs": { - "ErrorType": "AggregationFailed", - "ErrorDetails": "@result('Build_Aggregated_Messages_Scope')[0]", - "ProcessedCorrelationIds": "@variables('ProcessedCorrelationIds')", - "PartialResults": "@variables('AggregatedResults')", - "Timestamp": "@utcNow()" - }, - "metadata": { - "description": "Log aggregation error with context including partial results and correlation IDs" - } - } - }, - "else": { - "actions": {} - }, - "runAfter": { - "Build_Aggregated_Messages_Scope": [ - "SUCCEEDED", - "FAILED", - "SKIPPED", - "TIMEDOUT" - ] - }, - "metadata": { - "description": "Error handler for aggregation scope. Logs errors but allows workflow to continue and return response" - } - }, - "Response": { - "type": "Response", - "kind": "http", - "inputs": { - "statusCode": "@parameters('ResponseStatusCode')", - "headers": { - "Content-Type": "@parameters('ResponseContentType')" - }, - "body": { - "ProcessedBatchSize": "@length(triggerBody())", - "UniqueCorrelationIds": "@length(variables('AggregatedResults'))", - "AggregatedMessages": "@variables('AggregatedResults')", - "ProcessingTimestamp": "@utcNow()", - "Configuration": { - "QueueName": "@parameters('ServiceBusQueueName')", - "MaxBatchSize": "@parameters('MaxBatchSize')", - "SchemaName": "@parameters('FlatFileSchemaName')", - "SequentialProcessing": "@parameters('EnableSequentialProcessing')" - } - } - }, - "runAfter": { - "Handle_Aggregation_Error": [ - "SUCCEEDED", - "FAILED", - "SKIPPED" - ] - }, - "metadata": { - "description": "Return HTTP response with aggregated results. Includes batch statistics, grouped messages by CorrelationId, processing timestamp, and configuration details" - } - }, - "Initialize_ProcessedCorrelationIds": { - "type": "InitializeVariable", - "inputs": { - "variables": [ - { - "name": "CorrelationGroups", - "type": "object", - "value": {} - }, - { - "name": "AggregatedResults", - "type": "array", - "value": [] - }, - { - "name": "ProcessedCorrelationIds", - "type": "array", - "value": [] - } - ] - }, - "runAfter": {} - } - }, - "outputs": {}, - "triggers": { - "When_messages_are_available_in_a_queue": { - "type": "ServiceProvider", - "inputs": { - "parameters": { - "queueName": "@parameters('ServiceBusQueueName')", - "isSessionsEnabled": false, - "maxMessageCount": "@parameters('MaxBatchSize')" - }, - "serviceProviderConfiguration": { - "connectionName": "serviceBus", - "operationId": "peekLockQueueMessagesV2", - "serviceProviderId": "/serviceProviders/serviceBus" - } - }, - "metadata": { - "description": "Built-in Azure Service Bus trigger (ServiceProvider). Retrieves batch of messages using peek-lock from non-session queue. Uses peekLockQueueMessagesV2 for better performance in Azure Logic Apps Standard" - } - } - } - }, - "kind": "Stateful" -} \ No newline at end of file