diff --git a/SpiffWorkflow/bpmn/serializer/default/workflow.py b/SpiffWorkflow/bpmn/serializer/default/workflow.py index a26ceb0b..d2c34c16 100644 --- a/SpiffWorkflow/bpmn/serializer/default/workflow.py +++ b/SpiffWorkflow/bpmn/serializer/default/workflow.py @@ -20,11 +20,65 @@ from uuid import UUID from SpiffWorkflow.bpmn.specs.mixins.subworkflow_task import SubWorkflowTask +from SpiffWorkflow.util.deep_merge import DeepMerge from ..helpers.bpmn_converter import BpmnConverter class TaskConverter(BpmnConverter): + def to_dict(self, task): + # Optimize serialization by storing only local changes (delta) + # instead of the full materialized data when task has a parent + + if task.parent is None: + data = task.data + delta = {} + else: + data = {} + delta = { + 'updates': DeepMerge.get_updated_keys(task.parent.data, task.data), + 'deletions': DeepMerge.get_deleted_keys(task.parent.data, task.data), + } + + return { + 'id': str(task.id), + 'parent': str(task._parent) if task.parent is not None else None, + 'children': [ str(child) for child in task._children ], + 'last_state_change': task.last_state_change, + 'state': task.state, + 'task_spec': task.task_spec.name, + 'triggered': task.triggered, + 'internal_data': self.registry.convert(task.internal_data), + 'data': data, + 'delta': delta + } + + def from_dict(self, dct, workflow): + + task_spec = workflow.spec.task_specs.get(dct['task_spec']) + task = self.target_class(workflow, task_spec, state=dct['state'], id=UUID(dct['id'])) + task._parent = UUID(dct['parent']) if dct['parent'] is not None else None + task._children = [UUID(child) for child in dct['children']] + task.last_state_change = dct['last_state_change'] + task.triggered = dct['triggered'] + task.internal_data = self.registry.restore(dct['internal_data']) + + delta = dct.get('delta') + if delta and task.parent is not None: + data = DeepMerge.merge({}, task.parent.data) + data.update(self.registry.restore(delta.get('updates', {}))) + for key in delta.get('deletions', {}): + if key in data: + del data[key] + else: + data = self.registry.restore(dct['data']) + + task.data = data + + return task + +class SimpleTaskConverter(BpmnConverter): + def to_dict(self, task): return { 'id': str(task.id), @@ -49,7 +103,6 @@ def from_dict(self, dct, workflow): task.data = self.registry.restore(dct['data']) return task - class BpmnEventConverter(BpmnConverter): def to_dict(self, event): diff --git a/SpiffWorkflow/bpmn/serializer/helpers/encoder.py b/SpiffWorkflow/bpmn/serializer/helpers/encoder.py new file mode 100644 index 00000000..1d6afd30 --- /dev/null +++ b/SpiffWorkflow/bpmn/serializer/helpers/encoder.py @@ -0,0 +1,19 @@ +import json +from types import ModuleType + + +def create_encoder(registry, user_encoder_cls=None): + base = user_encoder_cls or json.JSONEncoder + + class SpiffEncoder(base): + def default(self, obj): + typename = registry.typenames.get(type(obj)) + if typename is not None: + return registry.convert_to_dict[typename](obj) + if callable(obj) or isinstance(obj, ModuleType): + return None + if isinstance(obj, set): + return list(obj) + return super().default(obj) + + return SpiffEncoder diff --git a/SpiffWorkflow/bpmn/serializer/helpers/registry.py b/SpiffWorkflow/bpmn/serializer/helpers/registry.py index caaa0b71..4e60d2a7 100644 --- a/SpiffWorkflow/bpmn/serializer/helpers/registry.py +++ b/SpiffWorkflow/bpmn/serializer/helpers/registry.py @@ -17,6 +17,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301 USA +from types import ModuleType from uuid import UUID from datetime import datetime, timedelta @@ -32,6 +33,7 @@ class DefaultRegistry(DictionaryConverter): def __init__(self): super().__init__() + self._encoder_mode = False self.register(UUID, lambda v: { 'value': str(v) }, lambda v: UUID(v['value'])) self.register(datetime, lambda v: { 'value': v.isoformat() }, lambda v: datetime.fromisoformat(v['value'])) self.register(timedelta, lambda v: { 'days': v.days, 'seconds': v.seconds }, lambda v: timedelta(**v)) @@ -45,9 +47,20 @@ def convert(self, obj): Returns: the result of `convert` conversion after preprocessing """ + if self._encoder_mode: + return self._convert_for_encoder(obj) cleaned = self.clean(obj) return super().convert(cleaned) + def _convert_for_encoder(self, obj): + typename = self.typenames.get(obj.__class__) + if typename in self.convert_to_dict: + return self.convert_to_dict[typename](obj) + elif isinstance(obj, dict): + return self.clean(obj) + else: + return obj + def clean(self, obj): """A method that can be used to preprocess an object before conversion to a dict. @@ -60,6 +73,6 @@ def clean(self, obj): the preprocessed object """ if isinstance(obj, dict): - return dict((k, v) for k, v in obj.items() if not callable(v)) + return dict((k, v) for k, v in obj.items() if not callable(v) and not isinstance(v, ModuleType)) else: return obj diff --git a/SpiffWorkflow/bpmn/serializer/workflow.py b/SpiffWorkflow/bpmn/serializer/workflow.py index a13af7d0..6ac7fcd6 100644 --- a/SpiffWorkflow/bpmn/serializer/workflow.py +++ b/SpiffWorkflow/bpmn/serializer/workflow.py @@ -21,6 +21,7 @@ from .migration.version_migration import MIGRATIONS from .helpers import DefaultRegistry +from .helpers.encoder import create_encoder from .config import DEFAULT_CONFIG @@ -97,6 +98,7 @@ def __init__(self, registry=None, version=VERSION, json_encoder_cls=None, json_d self.json_encoder_cls = json_encoder_cls self.json_decoder_cls = json_decoder_cls self.VERSION = version + self._encoder_cls = create_encoder(self.registry, json_encoder_cls) def serialize_json(self, workflow, use_gzip=False): """Serialize the dictionary representation of the workflow to JSON. @@ -108,9 +110,13 @@ def serialize_json(self, workflow, use_gzip=False): Returns: a JSON dump of the dictionary representation or a gzipped version of it """ - dct = self.to_dict(workflow) - dct[self.VERSION_KEY] = self.VERSION - json_str = json.dumps(dct, cls=self.json_encoder_cls) + self.registry._encoder_mode = True + try: + dct = self.to_dict(workflow) + dct[self.VERSION_KEY] = self.VERSION + json_str = json.dumps(dct, cls=self._encoder_cls) + finally: + self.registry._encoder_mode = False return gzip.compress(json_str.encode('utf-8')) if use_gzip else json_str def deserialize_json(self, serialization, use_gzip=False): diff --git a/SpiffWorkflow/bpmn/workflow.py b/SpiffWorkflow/bpmn/workflow.py index ad542abd..96015265 100644 --- a/SpiffWorkflow/bpmn/workflow.py +++ b/SpiffWorkflow/bpmn/workflow.py @@ -164,6 +164,12 @@ def do_engine_steps(self, will_complete_task=None, did_complete_task=None): :param will_complete_task: Callback that will be called prior to completing a task :param did_complete_task: Callback that will be called after completing a task """ + count = self._do_engine_steps(will_complete_task, did_complete_task) + while count > 0: + count = self._do_engine_steps(will_complete_task, did_complete_task) + + def _do_engine_steps(self, will_complete_task=None, did_complete_task=None): + def update_workflow(wf): count = 0 # Wanted to use the iterator method here, but at least this is a shorter list @@ -187,8 +193,7 @@ def update_workflow(wf): task.task_spec._update(task) count = update_workflow(self) - if count > 0 or len(self.get_active_subprocesses()) > len(active_subprocesses): - self.do_engine_steps(will_complete_task, did_complete_task) + return count > 0 or len(self.get_active_subprocesses()) > len(active_subprocesses) def refresh_waiting_tasks(self, will_refresh_task=None, did_refresh_task=None): """ diff --git a/SpiffWorkflow/task.py b/SpiffWorkflow/task.py index bf0d0f58..5a1ff2ba 100644 --- a/SpiffWorkflow/task.py +++ b/SpiffWorkflow/task.py @@ -314,8 +314,8 @@ def _assign_new_thread_id(self, recursive=True): return self.thread_id def _inherit_data(self): - """Copies the data from the parent.""" - self.set_data(**deepcopy(self.parent.data)) + """Inherits data from the parent.""" + self.data = DeepMerge.merge(self.data, self.parent.data) def _set_internal_data(self, **kwargs): """Defines the given attribute/value pairs in this task's internal data.""" diff --git a/SpiffWorkflow/util/deep_merge.py b/SpiffWorkflow/util/deep_merge.py index 53b5d174..dc406e48 100644 --- a/SpiffWorkflow/util/deep_merge.py +++ b/SpiffWorkflow/util/deep_merge.py @@ -17,7 +17,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301 USA -from copy import deepcopy +from copy import copy class DeepMerge(object): # Merges two deeply nested json-like dictionaries, @@ -44,9 +44,18 @@ def merge(a, b, path=None): elif isinstance(a[key], list) and isinstance(b[key], list): DeepMerge.merge_array(a[key], b[key], path + [str(key)]) else: - a[key] = deepcopy(b[key]) # Just overwrite the value in a. + # Shallow copy only for mutable types (dict/list/set) + # Immutable types (str/int/float/bool/None/tuple) don't need copying + if isinstance(b[key], (dict, list, set)): + a[key] = copy(b[key]) + else: + a[key] = b[key] else: - a[key] = deepcopy(b[key]) + # Shallow copy only for mutable types + if isinstance(b[key], (dict, list, set)): + a[key] = copy(b[key]) + else: + a[key] = b[key] return a @staticmethod @@ -65,3 +74,13 @@ def merge_array(a, b, path=None): # Trim a back to the length of b. In the end, the two arrays should match del a[len(b):] + + @staticmethod + def get_updated_keys(a, b): + """get a list of keys from b that are different from a""" + return dict((key, b[key]) for key in b if key not in a or b[key] != a[key]) + + @staticmethod + def get_deleted_keys(a, b): + """get a list of keys from a that do not exist in b""" + return [key for key in a if key not in b] diff --git a/SpiffWorkflow/version.py b/SpiffWorkflow/version.py index f45b23f5..7a242d52 100644 --- a/SpiffWorkflow/version.py +++ b/SpiffWorkflow/version.py @@ -1 +1 @@ -__version__ = '3.1.2' +__version__ = '3.1.3a2' diff --git a/tests/SpiffWorkflow/bpmn/data/performance_test.bpmn b/tests/SpiffWorkflow/bpmn/data/performance_test.bpmn new file mode 100644 index 00000000..53dea863 --- /dev/null +++ b/tests/SpiffWorkflow/bpmn/data/performance_test.bpmn @@ -0,0 +1,159 @@ + + + + + Flow_1r3h9c0 + Flow_03tsmi6 + + DataObjectReference_04mspxz + + item = { + "status": 0, + "reason": None, + "alphabet": ["a","b","c","d","e"], + "billy_hoel_quote": "I am no longer afraid of becoming lost, because the journey back always reveals something new, and that is ultimately good for the artist." +} + +items = [item]*20 +del(item) +i=0 + + + Flow_1eta012 + Flow_06kkg0k + + + DataObjectReference_07sb1pi + Property_15afdez + + results = items2 + + + + Flow_06kkg0k + + + + + Flow_03tsmi6 + Flow_1eta012 + + + DataObjectReference_04mspxz + Property_1i2rab9 + + + DataObjectReference_07sb1pi + + + items + items2 + + + + + Flow_0o0kubx + + + Flow_1gxax5j + Flow_0o0kubx + out_item = my_item +out_item["my_index"] = i + + + + Flow_1gxax5j + + + + + + + + Flow_1r3h9c0 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/SpiffWorkflow/bpmn/test_performance_test.py b/tests/SpiffWorkflow/bpmn/test_performance_test.py new file mode 100644 index 00000000..8a3527dd --- /dev/null +++ b/tests/SpiffWorkflow/bpmn/test_performance_test.py @@ -0,0 +1,281 @@ +""" +Performance tests for performance_test.bpmn. +Measures execution and serialization time across different item counts. +""" +import os +import time + +from SpiffWorkflow.bpmn.workflow import BpmnWorkflow +from .BpmnWorkflowTestCase import BpmnWorkflowTestCase + + +class PerformanceTest(BpmnWorkflowTestCase): + """ + Measure workflow execution and serialization/deserialization time for various item counts. + """ + + def _create_workflow_with_item_count(self, count): + """ + Create a workflow from performance_test.bpmn with modified item count. + + Args: + count: Number of items to create (replaces the hardcoded 20) + + Returns: + BpmnWorkflow instance ready to execute + """ + # Read the original BPMN file + bpmn_path = os.path.join( + os.path.dirname(__file__), + 'data', + 'performance_test.bpmn' + ) + with open(bpmn_path, 'r') as f: + bpmn_content = f.read() + + # Replace the item count + modified_content = bpmn_content.replace( + 'items = [item]*20', + f'items = [item]*{count}' + ) + + # Write to a temporary file in the data directory + tmp_filename = f'_temp_performance_test_{count}.bpmn' + tmp_path = os.path.join( + os.path.dirname(__file__), + 'data', + tmp_filename + ) + + with open(tmp_path, 'w') as f: + f.write(modified_content) + + try: + # Load the workflow spec + spec, subprocesses = self.load_workflow_spec( + tmp_filename, + 'Process_3no3Cw9', + validate=False + ) + workflow = BpmnWorkflow(spec, subprocesses) + finally: + # Clean up the temporary file + if os.path.exists(tmp_path): + os.unlink(tmp_path) + + return workflow + + def test_performance_20_items(self): + """Measure execution and serialization time with 20 items.""" + workflow = self._create_workflow_with_item_count(20) + + # Measure execution time + start_execution = time.time() + workflow.do_engine_steps() + end_execution = time.time() + execution_time = end_execution - start_execution + + # Verify workflow completed + self.assertTrue(workflow.completed) + + # Measure serialization + start_serialize = time.time() + state = self.serializer.serialize_json(workflow) + end_serialize = time.time() + serialize_time = end_serialize - start_serialize + + # Measure deserialization + start_deserialize = time.time() + restored_workflow = self.serializer.deserialize_json(state) + end_deserialize = time.time() + deserialize_time = end_deserialize - start_deserialize + + # Verify deserialization worked + self.assertTrue(restored_workflow.completed) + + # Print results + print("\n" + "="*80) + print("PERFORMANCE TEST (performance_test.bpmn)") + print("="*80) + print(f" 20 items:") + print(f" Execution: {execution_time:.6f} seconds") + print(f" Serialization: {serialize_time:.6f} seconds") + print(f" Deserialization: {deserialize_time:.6f} seconds") + print(f" Total: {execution_time + serialize_time + deserialize_time:.6f} seconds") + print("="*80) + + def test_performance_100_items(self): + """Measure execution and serialization time with 100 items.""" + workflow = self._create_workflow_with_item_count(100) + + # Measure execution time + start_execution = time.time() + workflow.do_engine_steps() + end_execution = time.time() + execution_time = end_execution - start_execution + + # Verify workflow completed + self.assertTrue(workflow.completed) + + # Measure serialization + start_serialize = time.time() + state = self.serializer.serialize_json(workflow) + end_serialize = time.time() + serialize_time = end_serialize - start_serialize + + # Measure deserialization + start_deserialize = time.time() + restored_workflow = self.serializer.deserialize_json(state) + end_deserialize = time.time() + deserialize_time = end_deserialize - start_deserialize + + # Verify deserialization worked + self.assertTrue(restored_workflow.completed) + + # Print results + print("\n" + "="*80) + print("PERFORMANCE TEST (performance_test.bpmn)") + print("="*80) + print(f" 100 items:") + print(f" Execution: {execution_time:.6f} seconds") + print(f" Serialization: {serialize_time:.6f} seconds") + print(f" Deserialization: {deserialize_time:.6f} seconds") + print(f" Total: {execution_time + serialize_time + deserialize_time:.6f} seconds") + print("="*80) + + def test_performance_200_items(self): + """Measure execution and serialization time with 200 items.""" + workflow = self._create_workflow_with_item_count(200) + + # Measure execution time + start_execution = time.time() + workflow.do_engine_steps() + end_execution = time.time() + execution_time = end_execution - start_execution + + # Verify workflow completed + self.assertTrue(workflow.completed) + + # Measure serialization + start_serialize = time.time() + state = self.serializer.serialize_json(workflow) + end_serialize = time.time() + serialize_time = end_serialize - start_serialize + + # Measure deserialization + start_deserialize = time.time() + restored_workflow = self.serializer.deserialize_json(state) + end_deserialize = time.time() + deserialize_time = end_deserialize - start_deserialize + + # Verify deserialization worked + self.assertTrue(restored_workflow.completed) + + # Print results + print("\n" + "="*80) + print("PERFORMANCE TEST (performance_test.bpmn)") + print("="*80) + print(f" 200 items:") + print(f" Execution: {execution_time:.6f} seconds") + print(f" Serialization: {serialize_time:.6f} seconds") + print(f" Deserialization: {deserialize_time:.6f} seconds") + print(f" Total: {execution_time + serialize_time + deserialize_time:.6f} seconds") + print("="*80) + + def test_performance_300_items(self): + """Measure execution and serialization time with 300 items.""" + workflow = self._create_workflow_with_item_count(300) + + # Measure execution time + start_execution = time.time() + workflow.do_engine_steps() + end_execution = time.time() + execution_time = end_execution - start_execution + + # Verify workflow completed + self.assertTrue(workflow.completed) + + # Measure serialization + start_serialize = time.time() + state = self.serializer.serialize_json(workflow) + end_serialize = time.time() + serialize_time = end_serialize - start_serialize + + # Measure deserialization + start_deserialize = time.time() + restored_workflow = self.serializer.deserialize_json(state) + end_deserialize = time.time() + deserialize_time = end_deserialize - start_deserialize + + # Verify deserialization worked + self.assertTrue(restored_workflow.completed) + + # Print results + print("\n" + "="*80) + print("PERFORMANCE TEST (performance_test.bpmn)") + print("="*80) + print(f" 300 items:") + print(f" Execution: {execution_time:.6f} seconds") + print(f" Serialization: {serialize_time:.6f} seconds") + print(f" Deserialization: {deserialize_time:.6f} seconds") + print(f" Total: {execution_time + serialize_time + deserialize_time:.6f} seconds") + print("="*80) + + def test_performance_periodic_serialization_300_items(self): + """Measure execution and periodic serialization time with 300 items.""" + workflow = self._create_workflow_with_item_count(300) + + # Track serialization metrics + serialization_checkpoints = [] + tasks_completed = 0 + checkpoint_interval = 10 # Serialize every 10 task completions + + def did_complete_task(task): + nonlocal tasks_completed + tasks_completed += 1 + + # Serialize at checkpoints + if tasks_completed % checkpoint_interval == 0: + start_serialize = time.time() + state = self.serializer.serialize_json(workflow) + end_serialize = time.time() + serialize_time = end_serialize - start_serialize + + serialization_checkpoints.append({ + 'steps': tasks_completed, + 'tasks': len(workflow.tasks), + 'time': serialize_time + }) + + # Measure execution time with periodic serialization + start_execution = time.time() + workflow.do_engine_steps(did_complete_task=did_complete_task) + end_execution = time.time() + execution_time = end_execution - start_execution + + # Verify workflow completed + self.assertTrue(workflow.completed) + + # Calculate summary metrics + total_serialization_time = sum(cp['time'] for cp in serialization_checkpoints) + num_serializations = len(serialization_checkpoints) + avg_serialization_time = total_serialization_time / num_serializations if num_serializations > 0 else 0 + overhead_percentage = (total_serialization_time / execution_time * 100) if execution_time > 0 else 0 + + # Print results + print("\n" + "="*80) + print("PERIODIC SERIALIZATION TEST (performance_test.bpmn)") + print("="*80) + print(f" 300 items (serialize every {checkpoint_interval} steps):") + print(f" Execution time: {execution_time:.6f} seconds") + print(f"") + print(f" Serialization checkpoints:") + for cp in serialization_checkpoints: + print(f" After {cp['steps']:3d} steps ({cp['tasks']:4d} tasks): {cp['time']:.6f} seconds") + print(f"") + print(f" Total serialization time: {total_serialization_time:.6f} seconds") + print(f" Serialization overhead: {overhead_percentage:.1f}% of execution time") + print(f" Number of serializations: {num_serializations}") + print(f" Average per serialization: {avg_serialization_time:.6f} seconds") + print("="*80) +