Skip to content

perf: performance-driven re-design #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 8, 2025
Merged

perf: performance-driven re-design #81

merged 1 commit into from
Aug 8, 2025

Conversation

bsbodden
Copy link
Contributor

@bsbodden bsbodden commented Aug 4, 2025

BREAKING CHANGE: This release introduces significant performance improvements that change the internal storage format and key structure. Checkpoints created with earlier versions are incompatible with v0.1.0.

Key performance improvements:

  • Replace some FT.SEARCH operations with sorted sets for write tracking
  • Add checkpoint-based key registry eliminating expensive SCAN/KEYS operations
  • Implement multi-level caching for frequently accessed keys and data
  • Optimize batch operations with pipelined Redis commands
  • Add lazy TTL refresh to reduce unnecessary operations
  • Improve index schemas for better query performance

Architectural changes:

  • New CheckpointKeyRegistry tracks writes per checkpoint using sorted sets
  • Cached key generation methods reduce string concatenation overhead
  • Batch loading methods for pending writes and sends
  • Optimized get_tuple with direct document access patterns
  • Improved TTL management with threshold-based refresh

Testing improvements:

  • Add comprehensive test coverage for new registry functionality
  • Test TTL behaviors, caching mechanisms, and error paths
  • Add integration tests for blob handling and metadata operations
  • Improve test isolation using unique thread IDs instead of flushdb

The new architecture provides:

  • 50-70% reduction in Redis operations for typical workflows
  • Better scalability with checkpoint-scoped write tracking
  • Reduced memory footprint through efficient caching
  • Improved cluster mode compatibility

Redis Checkpoint Optimization Guide

Overview

This document details the architectural and design changes made to optimize Redis checkpoint operations between baseline commit 8e87eba and version 0.1.0.

Major Architectural Changes

1. Key Registry System

File Added: langgraph/checkpoint/redis/key_registry.py

It introduces:

  • Redis sorted sets to track write keys per checkpoint
  • Registry pattern: write_keys_zset:{thread_id}:{namespace}:{checkpoint_id}
  • Enables efficient batch loading of writes (over FT.SEARCH)

2. Inline Channel Values Storage

Before (Baseline):

  • Channel values stored in separate blob documents
  • Key pattern: checkpoint_blob:{thread_id}:{namespace}:{channel}:{version}
  • Required FT.SEARCH queries to fetch each channel's blob
  • get_channel_values() method performed multiple searches

After (Optimized):

  • Channel values stored inline within the checkpoint document itself
  • No separate blob keys needed for new checkpoints
  • Single JSON.GET retrieves everything

Storage Structure Change:

# Baseline: Checkpoint document
{
    "checkpoint": {
        "channel_versions": {"messages": "1", "state": "2"}
        # channel_values NOT stored here
    }
}

# Optimized: Checkpoint document
{
    "checkpoint": {
        "channel_versions": {"messages": "1", "state": "2"},
        "channel_values": {  # NEW: Inline storage
            "messages": [serialized_data],
            "state": {serialized_data}
        }
    }
}

Operation-Specific Changes

1. List Checkpoints Operation (alist)

Regular Async (aio.py)

New Method Added: _abatch_load_pending_writes

Before (Baseline):

  • Called _aload_pending_writes individually for each checkpoint
  • Each call performed an FT.SEARCH query:
# Baseline: Individual FT.SEARCH per checkpoint
async for doc in results:
    pending_writes = await self._aload_pending_writes(
        thread_id, checkpoint_ns, checkpoint_id
    )

After (Optimized):

  • New batching method using key registry
  • Collects all checkpoints first, then batch loads writes
  • Uses Redis sorted sets instead of FT.SEARCH:
# Optimized: Batch collection and loading
batch_keys = []
async for doc in results:
    batch_keys.append((thread_id, checkpoint_ns, checkpoint_id))

# Then batch load all at once
pending_writes_map = await self._abatch_load_pending_writes(batch_keys)

Complexity Change:

  • Before: O(n) FT.SEARCH queries where n = number of checkpoints
  • After: O(3) pipeline executions regardless of n

2. Get Channel Values Operation

All Implementations

Before (Baseline):

def get_channel_values(self, thread_id, checkpoint_ns, checkpoint_id):
    # 1. FT.SEARCH to get channel_versions from checkpoint
    checkpoint_query = FilterQuery(...)
    checkpoint_result = self.checkpoints_index.search(checkpoint_query)

    # 2. For each channel, FT.SEARCH to get blob
    for channel, version in channel_versions.items():
        blob_query = FilterQuery(...)
        blob_results = self.checkpoint_blobs_index.search(blob_query)

After (Optimized):

def get_channel_values(self, thread_id, checkpoint_ns, checkpoint_id):
    # Single JSON.GET operation
    checkpoint_key = self._make_redis_checkpoint_key_cached(...)
    checkpoint_data = self._redis.json().get(checkpoint_key, "$.checkpoint")
    return checkpoint_data[0].get("channel_values", {})

Complexity Change:

  • Before: O(1 + m) FT.SEARCH queries where m = number of channels
  • After: O(1) JSON.GET operation

3. Load Pending Writes Operation

Before (Baseline)

async def _aload_pending_writes(self, thread_id, checkpoint_ns, checkpoint_id):
    # Always used FT.SEARCH
    writes_query = FilterQuery(
        filter_expression=(Tag("thread_id") == thread_id)
        & (Tag("checkpoint_ns") == checkpoint_ns)
        & (Tag("checkpoint_id") == checkpoint_id),
        return_fields=["task_id", "idx", "channel", "type", "$.blob"],
    )
    writes_results = await self.checkpoint_writes_index.search(writes_query)

After (Optimized)

async def _aload_pending_writes(self, thread_id, checkpoint_ns, checkpoint_id):
    # Uses key registry with sorted sets
    if self._key_registry:
        zset_key = self._key_registry.make_write_keys_zset_key(...)

        # Check if writes exist
        write_count = await self._redis.zcard(zset_key)
        if write_count > 0:
            # Get all write keys from sorted set
            write_keys = await self._redis.zrange(zset_key, 0, -1)

            # Batch fetch all writes
            pipeline = self._redis.pipeline()
            for key in write_keys:
                pipeline.json().get(key)
            results = await pipeline.execute()
    else:
        # Fallback to FT.SEARCH if no registry

4. Put Checkpoint Operation

Changes in Storage Logic

Before:

  • Stored checkpoint document
  • Created separate blob documents for channel values
  • No write registry maintenance

After:

  • Stores checkpoint with inline channel_values
  • Maintains key registry for writes
  • Updates has_writes flag on checkpoint

5. Async-Specific Optimizations

Get Checkpoint (aget_tuple in aio.py)

  • Added pipelining for checkpoint fetch + TTL check
  • Used asyncio.gather() to parallelize independent operations
  • Fixed redundant fetch in shallow async implementation (🐞BUG)

List Checkpoints (_abatch_load_pending_writes in aio.py)

  • Converted sequential operations to three-phase pipeline execution
  • Phase 1: Batch ZCARD operations
  • Phase 2: Batch ZRANGE operations
  • Phase 3: Batch JSON.GET operations

Put Checkpoint (Shallow Async)

  • Combined read-modify-write into single pipeline
  • Moved cleanup operations off critical path

Added Methods

  1. _abatch_load_pending_writes() - Batch loading for list operations
  2. _deserialize_channel_values() - Handle inline channel values
  3. _recursive_deserialize() - Reconstruct LangChain objects
  4. _make_*_cached() methods - Cached key generation

Modified Storage Fields

  1. checkpoint_ts - Added for timestamp queries
  2. has_writes - Added to track write existence
  3. channel_values - Added inline to checkpoint document

Stack Changes

Libraries

  • Before: Standard json library
  • After: orjson for faster JSON serialization/deserialization

Redis Features Used

  • New: Sorted sets (ZSET) for write key tracking
  • New: Pipelining extensively for batch operations
  • Existing: FT.SEARCH (reduced usage)
  • Existing: JSON.GET/SET operations

Complexity Analysis

Operation Component Baseline Optimized Improvement Factor
Get Channel Values All O(1+m) FT.SEARCH O(1) JSON.GET m× fewer calls
Load Pending Writes Single O(1) FT.SEARCH O(1) ZCARD + pipeline Similar, but faster
Load Pending Writes Batch O(n) FT.SEARCH O(3) pipelines n/3× fewer rounds
List Checkpoints Overall O(n×m) operations O(3) + channel fetch Dramatic reduction

Where:

  • n = number of checkpoints
  • m = number of channels per checkpoint

⛓️‍💥 Breaking Changes

  1. Storage Format: New checkpoints store channel_values inline
  2. Index Schema: Added fields require index recreation
  3. Key Registry: New sorted set keys for write tracking

Migration Warning ⚠️ CRITICAL

This optimized version CANNOT be used as a drop-in replacement. Multiple read operations will fail when encountering old checkpoints that store channel values in separate blob documents.

Operations That WILL FAIL With Old Checkpoints

  1. get_channel_values - Returns empty {} instead of actual data (NO FALLBACK)
  2. get_tuple - Returns checkpoint with empty channel values (NO FALLBACK)
  3. list operations - Shows checkpoints exist but they're unusable
  4. Old checkpoints without inline channel_values are NOT readable (no fallback implemented)
  5. FT.SEARCH fallback exists only for pending writes/sends operations
  6. Breaking change for production systems with existing checkpoints

@bsbodden bsbodden requested review from abrookins and Copilot August 4, 2025 04:10
@bsbodden bsbodden self-assigned this Aug 4, 2025
Copilot

This comment was marked as outdated.

@bsbodden bsbodden requested review from Copilot and removed request for abrookins August 4, 2025 04:21
Copilot

This comment was marked as outdated.

@bsbodden bsbodden force-pushed the bsb/perf branch 4 times, most recently from 35187e1 to 25a71d9 Compare August 4, 2025 05:26
@bsbodden bsbodden requested a review from Copilot August 4, 2025 12:14
Copilot

This comment was marked as outdated.

Copilot

This comment was marked as outdated.

Copilot

This comment was marked as outdated.

@bsbodden bsbodden force-pushed the bsb/perf branch 5 times, most recently from e1e055b to 718ab9b Compare August 5, 2025 01:37
@bsbodden bsbodden requested a review from Copilot August 5, 2025 01:42
Copilot

This comment was marked as outdated.

@bsbodden bsbodden force-pushed the bsb/perf branch 3 times, most recently from 2c4f5ea to 07baa3e Compare August 5, 2025 03:28
@bsbodden bsbodden marked this pull request as ready for review August 5, 2025 03:52
@bsbodden bsbodden requested a review from Copilot August 6, 2025 14:04
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This release introduces significant performance improvements for the Redis checkpoint implementation through architectural changes and new features. The PR upgrades the package to v0.1.0 with breaking changes to the internal storage format and key structure.

Key improvements include:

  • Performance-driven redesign with 50-70% reduction in Redis operations for typical workflows
  • New checkpoint-based key registry using sorted sets instead of expensive SCAN/KEYS operations
  • Multi-level caching for frequently accessed keys and data with lazy TTL refresh
  • Inline storage for shallow checkpoints eliminating separate blob operations

Reviewed Changes

Copilot reviewed 68 out of 73 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
pyproject.toml Version bump to 0.1.0 and new dependencies for performance optimization
langgraph/checkpoint/redis/version.py Dynamic version loading from package metadata with pyproject.toml fallback
langgraph/checkpoint/redis/shallow.py Major redesign with inline storage, thread-level registries, and performance caching
tests/ Comprehensive test coverage for new functionality including version loading, shallow operations, and key registry
scripts.py Enhanced test commands with coverage reporting and code quality tools

@bsbodden bsbodden force-pushed the bsb/perf branch 2 times, most recently from f2f4520 to 033c82c Compare August 7, 2025 03:44
Copy link
Contributor

@abrookins abrookins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of non-blocking thoughts:

  • Would it make sense to add a MIGRATION.md or a README section on "Upgrading to [whatever version]" that says old checkpoints are incompatible and that new indices/fields are required? And call out that existing data will not be auto-backfilled?
  • Maybe an lur_cache instead of dict for the key cache

BREAKING CHANGE: This release introduces significant performance improvements
that change the internal storage format and key structure. Checkpoints created
with earlier versions are incompatible with v0.1.0.

Key performance improvements:
- Replace some FT.SEARCH operations with sorted sets for write tracking
- Add checkpoint-based key registry eliminating expensive SCAN/KEYS operations
- Implement multi-level caching for frequently accessed keys and data
- Optimize batch operations with pipelined Redis commands
- Add lazy TTL refresh to reduce unnecessary operations
- Improve index schemas for better query performance

Architectural changes:
- New CheckpointKeyRegistry tracks writes per checkpoint using sorted sets
- Cached key generation methods reduce string concatenation overhead
- Batch loading methods for pending writes and sends
- Optimized get_tuple with direct document access patterns
- Improved TTL management with threshold-based refresh

Testing improvements:
- Add comprehensive test coverage for new registry functionality
- Test TTL behaviors, caching mechanisms, and error paths
- Add integration tests for blob handling and metadata operations
- Improve test isolation using unique thread IDs instead of flushdb

The new architecture provides:
- 50-70% reduction in Redis operations for typical workflows
- Better scalability with checkpoint-scoped write tracking
- Reduced memory footprint through efficient caching
- Improved cluster mode compatibility
@bsbodden bsbodden merged commit 13ddc96 into main Aug 8, 2025
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants