Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion SpiffWorkflow/bpmn/serializer/default/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,65 @@
from uuid import UUID

from SpiffWorkflow.bpmn.specs.mixins.subworkflow_task import SubWorkflowTask
from SpiffWorkflow.util.deep_merge import DeepMerge

from ..helpers.bpmn_converter import BpmnConverter

class TaskConverter(BpmnConverter):

def to_dict(self, task):
# Optimize serialization by storing only local changes (delta)
# instead of the full materialized data when task has a parent

if task.parent is None:
data = task.data
delta = {}
else:
data = {}
delta = {
'updates': DeepMerge.get_updated_keys(task.parent.data, task.data),
'deletions': DeepMerge.get_deleted_keys(task.parent.data, task.data),
}

return {
'id': str(task.id),
'parent': str(task._parent) if task.parent is not None else None,
'children': [ str(child) for child in task._children ],
'last_state_change': task.last_state_change,
'state': task.state,
'task_spec': task.task_spec.name,
'triggered': task.triggered,
'internal_data': self.registry.convert(task.internal_data),
'data': data,
'delta': delta
}

def from_dict(self, dct, workflow):

task_spec = workflow.spec.task_specs.get(dct['task_spec'])
task = self.target_class(workflow, task_spec, state=dct['state'], id=UUID(dct['id']))
task._parent = UUID(dct['parent']) if dct['parent'] is not None else None
task._children = [UUID(child) for child in dct['children']]
task.last_state_change = dct['last_state_change']
task.triggered = dct['triggered']
task.internal_data = self.registry.restore(dct['internal_data'])

delta = dct.get('delta')
if delta and task.parent is not None:
data = DeepMerge.merge({}, task.parent.data)
data.update(self.registry.restore(delta.get('updates', {})))
for key in delta.get('deletions', {}):
if key in data:
del data[key]
else:
data = self.registry.restore(dct['data'])

task.data = data

return task

class SimpleTaskConverter(BpmnConverter):

def to_dict(self, task):
return {
'id': str(task.id),
Expand All @@ -49,7 +103,6 @@ def from_dict(self, dct, workflow):
task.data = self.registry.restore(dct['data'])
return task


class BpmnEventConverter(BpmnConverter):

def to_dict(self, event):
Expand Down
19 changes: 19 additions & 0 deletions SpiffWorkflow/bpmn/serializer/helpers/encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import json
from types import ModuleType


def create_encoder(registry, user_encoder_cls=None):
base = user_encoder_cls or json.JSONEncoder

class SpiffEncoder(base):
def default(self, obj):
typename = registry.typenames.get(type(obj))
if typename is not None:
return registry.convert_to_dict[typename](obj)
if callable(obj) or isinstance(obj, ModuleType):
return None
if isinstance(obj, set):
return list(obj)
return super().default(obj)

return SpiffEncoder
15 changes: 14 additions & 1 deletion SpiffWorkflow/bpmn/serializer/helpers/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from types import ModuleType
from uuid import UUID
from datetime import datetime, timedelta

Expand All @@ -32,6 +33,7 @@ class DefaultRegistry(DictionaryConverter):
def __init__(self):

super().__init__()
self._encoder_mode = False
self.register(UUID, lambda v: { 'value': str(v) }, lambda v: UUID(v['value']))
self.register(datetime, lambda v: { 'value': v.isoformat() }, lambda v: datetime.fromisoformat(v['value']))
self.register(timedelta, lambda v: { 'days': v.days, 'seconds': v.seconds }, lambda v: timedelta(**v))
Expand All @@ -45,9 +47,20 @@ def convert(self, obj):
Returns:
the result of `convert` conversion after preprocessing
"""
if self._encoder_mode:
return self._convert_for_encoder(obj)
cleaned = self.clean(obj)
return super().convert(cleaned)

def _convert_for_encoder(self, obj):
typename = self.typenames.get(obj.__class__)
if typename in self.convert_to_dict:
return self.convert_to_dict[typename](obj)
elif isinstance(obj, dict):
return self.clean(obj)
else:
return obj

def clean(self, obj):
"""A method that can be used to preprocess an object before conversion to a dict.

Expand All @@ -60,6 +73,6 @@ def clean(self, obj):
the preprocessed object
"""
if isinstance(obj, dict):
return dict((k, v) for k, v in obj.items() if not callable(v))
return dict((k, v) for k, v in obj.items() if not callable(v) and not isinstance(v, ModuleType))
else:
return obj
12 changes: 9 additions & 3 deletions SpiffWorkflow/bpmn/serializer/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from .migration.version_migration import MIGRATIONS
from .helpers import DefaultRegistry
from .helpers.encoder import create_encoder

from .config import DEFAULT_CONFIG

Expand Down Expand Up @@ -97,6 +98,7 @@ def __init__(self, registry=None, version=VERSION, json_encoder_cls=None, json_d
self.json_encoder_cls = json_encoder_cls
self.json_decoder_cls = json_decoder_cls
self.VERSION = version
self._encoder_cls = create_encoder(self.registry, json_encoder_cls)

def serialize_json(self, workflow, use_gzip=False):
"""Serialize the dictionary representation of the workflow to JSON.
Expand All @@ -108,9 +110,13 @@ def serialize_json(self, workflow, use_gzip=False):
Returns:
a JSON dump of the dictionary representation or a gzipped version of it
"""
dct = self.to_dict(workflow)
dct[self.VERSION_KEY] = self.VERSION
json_str = json.dumps(dct, cls=self.json_encoder_cls)
self.registry._encoder_mode = True
try:
dct = self.to_dict(workflow)
dct[self.VERSION_KEY] = self.VERSION
json_str = json.dumps(dct, cls=self._encoder_cls)
finally:
self.registry._encoder_mode = False
return gzip.compress(json_str.encode('utf-8')) if use_gzip else json_str

def deserialize_json(self, serialization, use_gzip=False):
Expand Down
9 changes: 7 additions & 2 deletions SpiffWorkflow/bpmn/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ def do_engine_steps(self, will_complete_task=None, did_complete_task=None):
:param will_complete_task: Callback that will be called prior to completing a task
:param did_complete_task: Callback that will be called after completing a task
"""
count = self._do_engine_steps(will_complete_task, did_complete_task)
while count > 0:
count = self._do_engine_steps(will_complete_task, did_complete_task)

def _do_engine_steps(self, will_complete_task=None, did_complete_task=None):

def update_workflow(wf):
count = 0
# Wanted to use the iterator method here, but at least this is a shorter list
Expand All @@ -187,8 +193,7 @@ def update_workflow(wf):
task.task_spec._update(task)

count = update_workflow(self)
if count > 0 or len(self.get_active_subprocesses()) > len(active_subprocesses):
self.do_engine_steps(will_complete_task, did_complete_task)
return count > 0 or len(self.get_active_subprocesses()) > len(active_subprocesses)

def refresh_waiting_tasks(self, will_refresh_task=None, did_refresh_task=None):
"""
Expand Down
4 changes: 2 additions & 2 deletions SpiffWorkflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ def _assign_new_thread_id(self, recursive=True):
return self.thread_id

def _inherit_data(self):
"""Copies the data from the parent."""
self.set_data(**deepcopy(self.parent.data))
"""Inherits data from the parent."""
self.data = DeepMerge.merge(self.data, self.parent.data)

def _set_internal_data(self, **kwargs):
"""Defines the given attribute/value pairs in this task's internal data."""
Expand Down
25 changes: 22 additions & 3 deletions SpiffWorkflow/util/deep_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from copy import deepcopy
from copy import copy

class DeepMerge(object):
# Merges two deeply nested json-like dictionaries,
Expand All @@ -44,9 +44,18 @@ def merge(a, b, path=None):
elif isinstance(a[key], list) and isinstance(b[key], list):
DeepMerge.merge_array(a[key], b[key], path + [str(key)])
else:
a[key] = deepcopy(b[key]) # Just overwrite the value in a.
# Shallow copy only for mutable types (dict/list/set)
# Immutable types (str/int/float/bool/None/tuple) don't need copying
if isinstance(b[key], (dict, list, set)):
a[key] = copy(b[key])
else:
a[key] = b[key]
else:
a[key] = deepcopy(b[key])
# Shallow copy only for mutable types
if isinstance(b[key], (dict, list, set)):
a[key] = copy(b[key])
else:
a[key] = b[key]
return a

@staticmethod
Expand All @@ -65,3 +74,13 @@ def merge_array(a, b, path=None):

# Trim a back to the length of b. In the end, the two arrays should match
del a[len(b):]

@staticmethod
def get_updated_keys(a, b):
"""get a list of keys from b that are different from a"""
return dict((key, b[key]) for key in b if key not in a or b[key] != a[key])

@staticmethod
def get_deleted_keys(a, b):
"""get a list of keys from a that do not exist in b"""
return [key for key in a if key not in b]
2 changes: 1 addition & 1 deletion SpiffWorkflow/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '3.1.2'
__version__ = '3.1.3a2'
Loading
Loading