Skip to content

Commit e887cb0

Browse files
author
KJ Tsanaktsidis
committed
Implement validation of slot numbers in #with
This is achieved by having the #with method "lock" the yielded connection to a particular slot, and having middleware which implements that locking.
1 parent 4554b4b commit e887cb0

File tree

4 files changed

+68
-4
lines changed

4 files changed

+68
-4
lines changed

lib/redis_client/cluster.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,19 @@ def multi(watch: nil, &block)
9393
::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block)
9494
end
9595

96-
def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block)
96+
def with(key: nil, hashtag: nil, write: true, retry_count: 0)
9797
key = process_with_arguments(key, hashtag)
9898

9999
node_key = @router.find_node_key_by_key(key, primary: write)
100100
node = @router.find_node(node_key)
101101
# Calling #with checks out the underlying connection if this is a pooled connection
102102
# Calling it through #try_delegate ensures we handle any redirections and retry the entire
103103
# transaction if so.
104-
@router.try_delegate(node, :with, retry_count: retry_count, &block)
104+
@router.try_delegate(node, :with, retry_count: retry_count) do |conn|
105+
conn.locked_to_key_slot(key) do
106+
yield conn
107+
end
108+
end
105109
end
106110

107111
def pubsub

lib/redis_client/cluster/node.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
require 'redis_client/cluster/node/random_replica'
99
require 'redis_client/cluster/node/random_replica_or_primary'
1010
require 'redis_client/cluster/node/latency_replica'
11+
require 'redis_client/cluster/pinning'
1112

1213
class RedisClient
1314
class Cluster
@@ -79,13 +80,15 @@ def []=(index, element)
7980
end
8081

8182
class SingleNodeRedisClient < ::RedisClient
83+
include Pinning::ClientMixin
8284
end
8385

8486
class Config < ::RedisClient::Config
8587
def initialize(cluster_commands:, scale_read: false, middlewares: nil, **kwargs)
8688
@scale_read = scale_read
8789
@cluster_commands = cluster_commands
8890
middlewares ||= []
91+
middlewares.unshift Pinning::ClientMiddleware
8992
middlewares.unshift ErrorIdentification::Middleware
9093
super(
9194
middlewares: middlewares,

lib/redis_client/cluster/pinning.rb

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# frozen_string_literal: true
2+
3+
class RedisClient
4+
class Cluster
5+
module Pinning
6+
module ClientMixin
7+
attr_reader :locked_key_slot
8+
9+
# This gets called when handing out connections in Cluster#with to lock the returned
10+
# connections to a given slot.
11+
def locked_to_key_slot(key_slot)
12+
raise ArgumentError, 'recursive slot locking is not allowed' if @locked_key_slot
13+
14+
begin
15+
@locked_key_slot = key_slot
16+
yield
17+
ensure
18+
@locked_key_slot = nil
19+
end
20+
end
21+
end
22+
23+
# This middleware is what actually enforces the slot locking above.
24+
module ClientMiddleware
25+
def initialize(client)
26+
@client = client
27+
super
28+
end
29+
30+
def assert_slot_valid!(command, config) # rubocop:disable Metrics/AbcSize
31+
return unless @client.locked_key_slot
32+
return unless config.cluster_commands.loaded?
33+
34+
keys = config.cluster_commands.extract_all_keys(command)
35+
key_slots = keys.map { |k| ::RedisClient::Cluster::KeySlotConverter.convert(k) }
36+
locked_slot = ::RedisClient::Cluster::KeySlotConverter.convert(@client.locked_key_slot)
37+
return if key_slots.all? { |slot| slot == locked_slot }
38+
39+
key_slot_pairs = keys.zip(key_slots).map { |key, slot| "#{key} => #{slot}" }.join(', ')
40+
raise ::RedisClient::Cluster::Transaction::ConsistencyError, <<~MESSAGE
41+
Connection is pinned to slot #{locked_slot} (via key #{@client.locked_key_slot}). \
42+
However, command #{command.inspect} has keys hashing to slots #{key_slot_pairs}. \
43+
Transactions in redis cluster must only refer to keys hashing to the same slot.
44+
MESSAGE
45+
end
46+
47+
def call(command, config)
48+
assert_slot_valid!(command, config)
49+
super
50+
end
51+
52+
def call_pipelined(command, config)
53+
assert_slot_valid!(command, config)
54+
super
55+
end
56+
end
57+
end
58+
end
59+
end

test/redis_client/test_cluster.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -533,8 +533,6 @@ def test_pinning_two_keys
533533
end
534534

535535
def test_pinning_cross_slot
536-
skip 'This is not implemented yet!'
537-
538536
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
539537
@client.with(hashtag: 'slot1') do |conn|
540538
conn.call('GET', '{slot2}')

0 commit comments

Comments
 (0)