Skip to content

Commit 36b503a

Browse files
committed
fix: enhance message deserialization to prevent MESSAGE_COERCION_FAILURE (#85)
- Add comprehensive error handling and recovery in _deserialize_channel_values - Enhance _recursive_deserialize with detailed error messages for debugging - Document the message serialization/deserialization process extensively - Add tests to verify proper handling of LangChain message objects - Ensure messages stored in serialized format are properly reconstructed - Test partial deserialization failures with recovery - Verify error logging (not silent failures) - Test channel isolation (failure in one channel doesn't affect others) - Ensure fallback behavior returns original data when deserialization fails
1 parent b1eb0b6 commit 36b503a

File tree

5 files changed

+894
-13
lines changed

5 files changed

+894
-13
lines changed

langgraph/checkpoint/redis/base.py

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import base64
22
import binascii
3+
import logging
34
import random
45
from abc import abstractmethod
56
from typing import Any, Dict, Generic, List, Optional, Sequence, Tuple, Union, cast
@@ -27,6 +28,8 @@
2728
from .jsonplus_redis import JsonPlusRedisSerializer
2829
from .types import IndexType, RedisClientType
2930

31+
logger = logging.getLogger(__name__)
32+
3033
REDIS_KEY_SEPARATOR = ":"
3134
CHECKPOINT_PREFIX = "checkpoint"
3235
CHECKPOINT_BLOB_PREFIX = "checkpoint_blob"
@@ -305,25 +308,78 @@ def _deserialize_channel_values(
305308
306309
When channel values are stored inline in the checkpoint, they're in their
307310
serialized form. This method deserializes them back to their original types.
311+
312+
This specifically handles LangChain message objects that may be stored in their
313+
serialized format: {'lc': 1, 'type': 'constructor', 'id': [...], 'kwargs': {...}}
314+
and ensures they are properly reconstructed as message objects.
308315
"""
309316
if not channel_values:
310317
return {}
311318

312-
# Apply recursive deserialization to handle nested structures and LangChain objects
313-
return self._recursive_deserialize(channel_values)
319+
try:
320+
# Apply recursive deserialization to handle nested structures and LangChain objects
321+
return self._recursive_deserialize(channel_values)
322+
except Exception as e:
323+
logger.warning(
324+
f"Error deserializing channel values, attempting recovery: {e}"
325+
)
326+
# Attempt to recover by processing each channel individually
327+
recovered = {}
328+
for key, value in channel_values.items():
329+
try:
330+
recovered[key] = self._recursive_deserialize(value)
331+
except Exception as inner_e:
332+
logger.error(
333+
f"Failed to deserialize channel '{key}': {inner_e}. "
334+
f"Value will be returned as-is."
335+
)
336+
recovered[key] = value
337+
return recovered
314338

315339
def _recursive_deserialize(self, obj: Any) -> Any:
316-
"""Recursively deserialize LangChain objects and nested structures."""
340+
"""Recursively deserialize LangChain objects and nested structures.
341+
342+
This method specifically handles the deserialization of LangChain message objects
343+
that may be stored in their serialized format to prevent MESSAGE_COERCION_FAILURE.
344+
345+
Args:
346+
obj: The object to deserialize, which may be a dict, list, or primitive.
347+
348+
Returns:
349+
The deserialized object, with LangChain objects properly reconstructed.
350+
"""
317351
if isinstance(obj, dict):
318352
# Check if this is a LangChain serialized object
319353
if obj.get("lc") in (1, 2) and obj.get("type") == "constructor":
320-
# Use the serde's reviver to reconstruct the object
321-
if hasattr(self.serde, "_reviver"):
322-
return self.serde._reviver(obj)
323-
elif hasattr(self.serde, "_revive_if_needed"):
324-
return self.serde._revive_if_needed(obj)
325-
else:
326-
# Fallback: return as-is if serde doesn't have reviver
354+
try:
355+
# Use the serde's reviver to reconstruct the object
356+
if hasattr(self.serde, "_reviver"):
357+
return self.serde._reviver(obj)
358+
elif hasattr(self.serde, "_revive_if_needed"):
359+
return self.serde._revive_if_needed(obj)
360+
else:
361+
# Log warning if serde doesn't have reviver
362+
logger.warning(
363+
"Serializer does not have a reviver method. "
364+
"LangChain object may not be properly deserialized. "
365+
f"Object ID: {obj.get('id')}"
366+
)
367+
return obj
368+
except Exception as e:
369+
# Provide detailed error message for debugging
370+
obj_id = obj.get("id", "unknown")
371+
obj_type = (
372+
obj.get("id", ["unknown"])[-1]
373+
if isinstance(obj.get("id"), list)
374+
else "unknown"
375+
)
376+
logger.error(
377+
f"Failed to deserialize LangChain object of type '{obj_type}'. "
378+
f"This may cause MESSAGE_COERCION_FAILURE. Error: {e}. "
379+
f"Object structure: lc={obj.get('lc')}, type={obj.get('type')}, "
380+
f"id={obj_id}"
381+
)
382+
# Return the object as-is to prevent complete failure
327383
return obj
328384
# Recursively process nested dicts
329385
return {k: self._recursive_deserialize(v) for k, v in obj.items()}

langgraph/checkpoint/redis/jsonplus_redis.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,27 @@
99

1010

1111
class JsonPlusRedisSerializer(JsonPlusSerializer):
12-
"""Redis-optimized serializer using orjson for faster JSON processing."""
12+
"""Redis-optimized serializer using orjson for faster JSON processing.
13+
14+
This serializer handles the conversion of LangChain objects (including messages)
15+
to and from their serialized format. It specifically addresses the MESSAGE_COERCION_FAILURE
16+
issue by ensuring that LangChain message objects stored in their serialized format
17+
(with 'lc', 'type', 'constructor' fields) are properly reconstructed as message objects
18+
rather than being left as raw dictionaries.
19+
20+
The serialized format for LangChain objects looks like:
21+
{
22+
'lc': 1, # LangChain version marker
23+
'type': 'constructor',
24+
'id': ['langchain', 'schema', 'messages', 'HumanMessage'],
25+
'kwargs': {'content': '...', 'type': 'human', 'id': '...'}
26+
}
27+
28+
This serializer ensures such objects are properly deserialized back to their
29+
original message object form (e.g., HumanMessage, AIMessage) to prevent
30+
downstream errors when the application expects message objects with specific
31+
attributes and methods.
32+
"""
1333

1434
SENTINEL_FIELDS = [
1535
"thread_id",
@@ -39,16 +59,31 @@ def loads(self, data: bytes) -> Any:
3959
return super().loads(data)
4060

4161
def _revive_if_needed(self, obj: Any) -> Any:
42-
"""Recursively apply reviver to handle LangChain serialized objects."""
62+
"""Recursively apply reviver to handle LangChain serialized objects.
63+
64+
This method is crucial for preventing MESSAGE_COERCION_FAILURE by ensuring
65+
that LangChain message objects stored in their serialized format are properly
66+
reconstructed. Without this, messages would remain as dictionaries with
67+
'lc', 'type', and 'constructor' fields, causing errors when the application
68+
expects actual message objects with 'role' and 'content' attributes.
69+
70+
Args:
71+
obj: The object to potentially revive, which may be a dict, list, or primitive.
72+
73+
Returns:
74+
The revived object with LangChain objects properly reconstructed.
75+
"""
4376
if isinstance(obj, dict):
4477
# Check if this is a LangChain serialized object
4578
if obj.get("lc") in (1, 2) and obj.get("type") == "constructor":
4679
# Use parent's reviver method to reconstruct the object
80+
# This converts {'lc': 1, 'type': 'constructor', ...} back to
81+
# the actual LangChain object (e.g., HumanMessage, AIMessage)
4782
return self._reviver(obj)
4883
# Recursively process nested dicts
4984
return {k: self._revive_if_needed(v) for k, v in obj.items()}
5085
elif isinstance(obj, list):
51-
# Recursively process lists
86+
# Recursively process lists (e.g., lists of messages)
5287
return [self._revive_if_needed(item) for item in obj]
5388
else:
5489
# Return primitives as-is

0 commit comments

Comments
 (0)