Skip to content

Commit 61f1d13

Browse files
authored
Merge pull request #151 from jaysonsantos/distributed-client
Add distributed client
2 parents d888b6b + 8fb9633 commit 61f1d13

File tree

11 files changed

+429
-116
lines changed

11 files changed

+429
-116
lines changed

CHANGELOG.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
CHANGELOG
2+
---------
3+
4+
v0.17
5+
`````
6+
7+
- moved bmemcached.Client to bmemcached.ReplicantClient *but keeps backward compatibility*
8+
- added DistributedClient to distribute keys over servers using consistent hashing

MANIFEST.in

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ exclude **/__pycache__/**
33
recursive-include docs *
44
recursive-include tests *
55
prune docs/_build
6+
include README.rst
7+
include CHANGELOG.rst

bmemcached/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
__all__ = ('Client', 'ReplicatingClient', )
1+
__all__ = ('Client', 'ReplicatingClient', 'DistributedClient', )
22

3-
from bmemcached.client import Client
4-
from bmemcached.client.replicating import ReplicatingClient
3+
from bmemcached.client import Client, ReplicatingClient, DistributedClient

bmemcached/client/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
from bmemcached.client.constants import SOCKET_TIMEOUT
22

33
from .replicating import ReplicatingClient
4+
from .distributed import DistributedClient
5+
6+
__all__ = ('Client', 'ReplicatingClient', 'DistributedClient', )
47

58

69
# Keep compatibility with old versions

bmemcached/client/distributed.py

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
from collections import defaultdict
2+
from uhashring import HashRing
3+
4+
from bmemcached.client import SOCKET_TIMEOUT
5+
from bmemcached.client.mixin import ClientMixin
6+
from bmemcached.compat import pickle
7+
8+
9+
class DistributedClient(ClientMixin):
10+
"""This is intended to be a client class which implement standard cache interface that common libs do...
11+
12+
It tries to distribute keys over the specified servers using `HashRing` consistent hash.
13+
"""
14+
def __init__(self, servers=('127.0.0.1:11211',), username=None, password=None, compression=None,
15+
socket_timeout=SOCKET_TIMEOUT, pickle_protocol=0, pickler=pickle.Pickler, unpickler=pickle.Unpickler):
16+
super(DistributedClient, self).__init__(servers, username, password, compression, socket_timeout,
17+
pickle_protocol, pickler, unpickler)
18+
self._ring = HashRing(self._servers)
19+
20+
def _get_server(self, key):
21+
return self._ring.get_node(key)
22+
23+
def delete(self, key, cas=0):
24+
"""
25+
Delete a key/value from server. If key does not exist, it returns True.
26+
27+
:param key: Key's name to be deleted
28+
:param cas: CAS of the key
29+
:return: True in case o success and False in case of failure.
30+
"""
31+
server = self._get_server(key)
32+
return server.delete(key, cas)
33+
34+
def delete_multi(self, keys):
35+
servers = defaultdict(list)
36+
for key in keys:
37+
server_key = self._get_server(key)
38+
servers[server_key].append(key)
39+
return all([server.delete_multi(keys_) for server, keys_ in servers.items()])
40+
41+
def set(self, key, value, time=0, compress_level=-1):
42+
"""
43+
Set a value for a key on server.
44+
45+
:param key: Key's name
46+
:type key: str
47+
:param value: A value to be stored on server.
48+
:type value: object
49+
:param time: Time in seconds that your key will expire.
50+
:type time: int
51+
:param compress_level: How much to compress.
52+
0 = no compression, 1 = fastest, 9 = slowest but best,
53+
-1 = default compression level.
54+
:type compress_level: int
55+
:return: True in case of success and False in case of failure
56+
:rtype: bool
57+
"""
58+
server = self._get_server(key)
59+
return server.set(key, value, time, compress_level)
60+
61+
def set_multi(self, mappings, time=0, compress_level=-1):
62+
"""
63+
Set multiple keys with it's values on server.
64+
65+
:param mappings: A dict with keys/values
66+
:type mappings: dict
67+
:param time: Time in seconds that your key will expire.
68+
:type time: int
69+
:param compress_level: How much to compress.
70+
0 = no compression, 1 = fastest, 9 = slowest but best,
71+
-1 = default compression level.
72+
:type compress_level: int
73+
:return: True in case of success and False in case of failure
74+
:rtype: bool
75+
"""
76+
returns = []
77+
if not mappings:
78+
return False
79+
server_mappings = defaultdict(dict)
80+
for key, value in mappings.items():
81+
server_key = self._get_server(key)
82+
server_mappings[server_key].update([(key, value)])
83+
for server, m in server_mappings.items():
84+
returns.append(server.set_multi(m, time, compress_level))
85+
86+
return all(returns)
87+
88+
def add(self, key, value, time=0, compress_level=-1):
89+
"""
90+
Add a key/value to server ony if it does not exist.
91+
92+
:param key: Key's name
93+
:type key: six.string_types
94+
:param value: A value to be stored on server.
95+
:type value: object
96+
:param time: Time in seconds that your key will expire.
97+
:type time: int
98+
:param compress_level: How much to compress.
99+
0 = no compression, 1 = fastest, 9 = slowest but best,
100+
-1 = default compression level.
101+
:type compress_level: int
102+
:return: True if key is added False if key already exists
103+
:rtype: bool
104+
"""
105+
server = self._get_server(key)
106+
return server.add(key, value, time, compress_level)
107+
108+
def replace(self, key, value, time=0, compress_level=-1):
109+
"""
110+
Replace a key/value to server ony if it does exist.
111+
112+
:param key: Key's name
113+
:type key: six.string_types
114+
:param value: A value to be stored on server.
115+
:type value: object
116+
:param time: Time in seconds that your key will expire.
117+
:type time: int
118+
:param compress_level: How much to compress.
119+
0 = no compression, 1 = fastest, 9 = slowest but best,
120+
-1 = default compression level.
121+
:type compress_level: int
122+
:return: True if key is replace False if key does not exists
123+
:rtype: bool
124+
"""
125+
server = self._get_server(key)
126+
return server.replace(key, value, time, compress_level)
127+
128+
def get(self, key, get_cas=False):
129+
"""
130+
Get a key from server.
131+
132+
:param key: Key's name
133+
:type key: six.string_types
134+
:param get_cas: If true, return (value, cas), where cas is the new CAS value.
135+
:type get_cas: boolean
136+
:return: Returns a key data from server.
137+
:rtype: object
138+
"""
139+
server = self._get_server(key)
140+
value, cas = server.get(key)
141+
if value is not None:
142+
if get_cas:
143+
return value, cas
144+
return value
145+
146+
if get_cas:
147+
return None, None
148+
149+
def get_multi(self, keys, get_cas=False):
150+
"""
151+
Get multiple keys from server.
152+
153+
:param keys: A list of keys to from server.
154+
:type keys: list
155+
:param get_cas: If get_cas is true, each value is (data, cas), with each result's CAS value.
156+
:type get_cas: boolean
157+
:return: A dict with all requested keys.
158+
:rtype: dict
159+
"""
160+
servers = defaultdict(list)
161+
d = {}
162+
for key in keys:
163+
server_key = self._get_server(key)
164+
servers[server_key].append(key)
165+
for server, keys in servers.items():
166+
results = server.get_multi(keys)
167+
if not get_cas:
168+
# Remove CAS data
169+
for key, (value, cas) in results.items():
170+
results[key] = value
171+
d.update(results)
172+
return d
173+
174+
def gets(self, key):
175+
server = self._get_server(key)
176+
return server.get(key)
177+
178+
def cas(self, key, value, cas, time=0, compress_level=-1):
179+
"""
180+
Set a value for a key on server if its CAS value matches cas.
181+
182+
:param key: Key's name
183+
:type key: six.string_types
184+
:param value: A value to be stored on server.
185+
:type value: object
186+
:param cas: The CAS value previously obtained from a call to get*.
187+
:type cas: int
188+
:param time: Time in seconds that your key will expire.
189+
:type time: int
190+
:param compress_level: How much to compress.
191+
0 = no compression, 1 = fastest, 9 = slowest but best,
192+
-1 = default compression level.
193+
:type compress_level: int
194+
:return: True in case of success and False in case of failure
195+
:rtype: bool
196+
"""
197+
server = self._get_server(key)
198+
return server.cas(key, value, cas, time, compress_level)
199+
200+
def incr(self, key, value):
201+
"""
202+
Increment a key, if it exists, returns it's actual value, if it don't, return 0.
203+
204+
:param key: Key's name
205+
:type key: six.string_types
206+
:param value: Number to be incremented
207+
:type value: int
208+
:return: Actual value of the key on server
209+
:rtype: int
210+
"""
211+
server = self._get_server(key)
212+
return server.incr(key, value)
213+
214+
def decr(self, key, value):
215+
"""
216+
Decrement a key, if it exists, returns it's actual value, if it don't, return 0.
217+
Minimum value of decrement return is 0.
218+
219+
:param key: Key's name
220+
:type key: six.string_types
221+
:param value: Number to be decremented
222+
:type value: int
223+
:return: Actual value of the key on server
224+
:rtype: int
225+
"""
226+
server = self._get_server(key)
227+
return server.decr(key, value)

0 commit comments

Comments
 (0)