Skip to content

Commit 57c57da

Browse files
authored
[WIP] Semi-automatic type-serialization (#109)
1 parent 53be9b6 commit 57c57da

File tree

17 files changed

+376
-14
lines changed

17 files changed

+376
-14
lines changed

azure/durable_functions/models/DurableOrchestrationClient.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus
1414
from ..models import DurableOrchestrationBindings
1515
from .utils.http_utils import get_async_request, post_async_request, delete_async_request
16+
from azure.functions._durable_functions import _serialize_custom_object
1617

1718

1819
class DurableOrchestrationClient:
@@ -432,8 +433,27 @@ def _create_http_response(status_code: int, body: Any) -> func.HttpResponse:
432433
return func.HttpResponse(**response_args)
433434

434435
@staticmethod
435-
def _get_json_input(client_input: object) -> object:
436-
return json.dumps(client_input) if client_input is not None else None
436+
def _get_json_input(client_input: object) -> str:
437+
"""Serialize the orchestrator input.
438+
439+
Parameters
440+
----------
441+
client_input: object
442+
The client's input, which we need to serialize
443+
444+
Returns
445+
-------
446+
str
447+
A string representing the JSON-serialization of `client_input`
448+
449+
Exceptions
450+
----------
451+
TypeError
452+
If the JSON serialization failed, see `serialize_custom_object`
453+
"""
454+
if client_input is not None:
455+
return json.dumps(client_input, default=_serialize_custom_object)
456+
return None
437457

438458
@staticmethod
439459
def _replace_url_origin(request_url, value_url):

azure/durable_functions/models/DurableOrchestrationContext.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from ..models.TokenSource import TokenSource
1111
from ..tasks import call_activity_task, task_all, task_any, call_activity_with_retry_task, \
1212
wait_for_external_event_task, continue_as_new, new_uuid, call_http
13+
from azure.functions._durable_functions import _deserialize_custom_object
1314

1415

1516
class DurableOrchestrationContext:
@@ -79,6 +80,8 @@ def from_json(cls, json_string: str):
7980
DurableOrchestrationContext
8081
New instance of the durable orchestration context class
8182
"""
83+
# We should consider parsing the `Input` field here as well,
84+
# intead of doing so lazily when `get_input` is called.
8285
json_dict = json.loads(json_string)
8386
return cls(**json_dict)
8487

@@ -165,7 +168,8 @@ def call_sub_orchestrator(self,
165168

166169
def get_input(self) -> str:
167170
"""Get the orchestration input."""
168-
return self._input
171+
return None if self._input is None else json.loads(self._input,
172+
object_hook=_deserialize_custom_object)
169173

170174
def new_uuid(self) -> str:
171175
"""Create a new UUID that is safe for replay within an orchestration or operation.

azure/durable_functions/models/actions/CallActivityAction.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from .Action import Action
44
from .ActionType import ActionType
55
from ..utils.json_utils import add_attrib
6+
from json import dumps
7+
from azure.functions._durable_functions import _serialize_custom_object
68

79

810
class CallActivityAction(Action):
@@ -13,7 +15,8 @@ class CallActivityAction(Action):
1315

1416
def __init__(self, function_name: str, input_=None):
1517
self.function_name: str = function_name
16-
self.input_ = input_
18+
# It appears that `.input_` needs to be JSON-serializable at this point
19+
self.input_ = dumps(input_, default=_serialize_custom_object)
1720

1821
if not self.function_name:
1922
raise ValueError("function_name cannot be empty")

azure/durable_functions/tasks/task_utilities.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
from ..models.history import HistoryEventType
3+
from azure.functions._durable_functions import _deserialize_custom_object
34

45

56
def should_suspend(partial_result) -> bool:
@@ -15,12 +16,14 @@ def parse_history_event(directive_result):
1516
if event_type is None:
1617
raise ValueError("EventType is not found in task object")
1718

19+
# We provide the ability to deserialize custom objects, because the output of this
20+
# will be passed directly to the orchestrator as the output of some activity
1821
if event_type == HistoryEventType.EVENT_RAISED:
19-
return json.loads(directive_result.Input)
22+
return json.loads(directive_result.Input, object_hook=_deserialize_custom_object)
2023
if event_type == HistoryEventType.SUB_ORCHESTRATION_INSTANCE_CREATED:
21-
return json.loads(directive_result.Result)
24+
return json.loads(directive_result.Result, object_hook=_deserialize_custom_object)
2225
if event_type == HistoryEventType.TASK_COMPLETED:
23-
return json.loads(directive_result.Result)
26+
return json.loads(directive_result.Result, object_hook=_deserialize_custom_object)
2427
return None
2528

2629

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import logging
2+
3+
def main(name):
4+
"""Activity function performing a specific step in the chain
5+
6+
Parameters
7+
----------
8+
name : str
9+
Name of the item to be hello'ed at
10+
11+
Returns
12+
-------
13+
str
14+
Returns a welcome string
15+
"""
16+
logging.warning(f"Activity Triggered: {name}")
17+
return name
18+
19+
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"scriptFile": "__init__.py",
3+
"bindings": [
4+
{
5+
"name": "name",
6+
"type": "activityTrigger",
7+
"direction": "in",
8+
"datatype": "string"
9+
}
10+
],
11+
"disabled": false
12+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import logging
2+
3+
import azure.functions as func
4+
import azure.durable_functions as df
5+
from ..shared_code.MyClasses import SerializableClass # TODO: this import is highlight 'red' in VSCode, but works at runtime
6+
7+
def orchestrator_function(context: df.DurableOrchestrationContext):
8+
"""This function provides the core function chaining orchestration logic
9+
10+
Parameters
11+
----------
12+
context: DurableOrchestrationContext
13+
This context has the past history and the durable orchestration API's to
14+
create orchestrations
15+
16+
Returns
17+
-------
18+
int
19+
The number contained in the input
20+
"""
21+
input_: SerializableClass = context.get_input()
22+
number: int = input_.show_number()
23+
24+
25+
# throwaway, seems necessary for the orchestration not to fail
26+
value = yield context.call_activity("DurableActivity", SerializableClass(24))
27+
return 11
28+
29+
main = df.Orchestrator.create(orchestrator_function)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"scriptFile": "__init__.py",
3+
"bindings": [
4+
{
5+
"name": "context",
6+
"type": "orchestrationTrigger",
7+
"direction": "in"
8+
}
9+
],
10+
"disabled": false
11+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import logging
2+
3+
from azure.durable_functions import DurableOrchestrationClient
4+
import azure.functions as func
5+
from ..shared_code.MyClasses import SerializableClass
6+
7+
8+
async def main(req: func.HttpRequest, starter: str, message):
9+
"""This function starts up the orchestrator from an HTTP endpoint
10+
11+
starter: str
12+
A JSON-formatted string describing the orchestration context
13+
14+
message:
15+
An azure functions http output binding, it enables us to establish
16+
an http response.
17+
18+
Parameters
19+
----------
20+
req: func.HttpRequest
21+
An HTTP Request object, it can be used to parse URL
22+
parameters.
23+
"""
24+
25+
26+
function_name = req.route_params.get('functionName')
27+
logging.info(starter)
28+
client = DurableOrchestrationClient(starter)
29+
instance_id = await client.start_new(function_name, client_input=SerializableClass(11))
30+
response = client.create_check_status_response(req, instance_id)
31+
message.set(response)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"scriptFile": "__init__.py",
3+
"bindings": [
4+
{
5+
"authLevel": "anonymous",
6+
"name": "req",
7+
"type": "httpTrigger",
8+
"direction": "in",
9+
"route": "orchestrators/{functionName}",
10+
"methods": [
11+
"post",
12+
"get"
13+
]
14+
},
15+
{
16+
"direction": "out",
17+
"name": "message",
18+
"type": "http"
19+
},
20+
{
21+
"name": "starter",
22+
"type": "orchestrationClient",
23+
"direction": "in",
24+
"datatype": "string"
25+
}
26+
]
27+
}

0 commit comments

Comments
 (0)