Skip to content

Commit fa48f97

Browse files
committed
Remove protocol versions >5 from MAX_SUPPORTED
Without removing associated code yet, this patch just ensure we begin negotiating protocol version 5, then fallback to 4, reducing the amount of protocol errors that the Scylla server sends when it sees the unsupported versions. Refs: #244 Signed-off-by: Yaniv Kaul <[email protected]>
1 parent c7ca1c6 commit fa48f97

18 files changed

+1403
-156
lines changed

cassandra/__init__.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,9 @@ class ProtocolVersion(object):
170170
DSE private protocol v2, supported in DSE 6.0+
171171
"""
172172

173-
SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V6, V5, V4, V3)
173+
SUPPORTED_VERSIONS = (V5, V4, V3)
174174
"""
175-
A tuple of all supported protocol versions
175+
A tuple of all supported protocol versions for ScyllaDB, including future v5 support.
176176
"""
177177

178178
BETA_VERSIONS = (V6,)
@@ -227,10 +227,6 @@ def uses_keyspace_flag(cls, version):
227227
def has_continuous_paging_support(cls, version):
228228
return version >= cls.DSE_V1
229229

230-
@classmethod
231-
def has_continuous_paging_next_pages(cls, version):
232-
return version >= cls.DSE_V2
233-
234230
@classmethod
235231
def has_checksumming_support(cls, version):
236232
return cls.V5 <= version < cls.DSE_V1

cassandra/cluster.py

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
from cassandra.connection import (ConnectionException, ConnectionShutdown,
5252
ConnectionHeartbeat, ProtocolVersionUnsupported,
5353
EndPoint, DefaultEndPoint, DefaultEndPointFactory,
54-
ContinuousPagingState, SniEndPointFactory, ConnectionBusy)
54+
SniEndPointFactory, ConnectionBusy)
5555
from cassandra.cqltypes import UserType
5656
import cassandra.cqltypes as types
5757
from cassandra.encoder import Encoder
@@ -672,7 +672,7 @@ class Cluster(object):
672672
server will be automatically used.
673673
"""
674674

675-
protocol_version = ProtocolVersion.DSE_V2
675+
protocol_version = ProtocolVersion.V5
676676
"""
677677
The maximum version of the native protocol to use.
678678
@@ -2692,7 +2692,6 @@ def __init__(self, cluster, hosts, keyspace=None):
26922692
raise NoHostAvailable(msg, [h.address for h in hosts])
26932693

26942694
self.session_id = uuid.uuid4()
2695-
self._graph_paging_available = self._check_graph_paging_available()
26962695

26972696
if self.cluster.column_encryption_policy is not None:
26982697
try:
@@ -2889,26 +2888,10 @@ def execute_graph_async(self, query, parameters=None, trace=False, execution_pro
28892888
def _maybe_set_graph_paging(self, execution_profile):
28902889
graph_paging = execution_profile.continuous_paging_options
28912890
if execution_profile.continuous_paging_options is _NOT_SET:
2892-
graph_paging = ContinuousPagingOptions() if self._graph_paging_available else None
2891+
graph_paging = None
28932892

28942893
execution_profile.continuous_paging_options = graph_paging
28952894

2896-
def _check_graph_paging_available(self):
2897-
"""Verify if we can enable graph paging. This executed only once when the session is created."""
2898-
2899-
if not ProtocolVersion.has_continuous_paging_next_pages(self._protocol_version):
2900-
return False
2901-
2902-
for host in self.cluster.metadata.all_hosts():
2903-
if host.dse_version is None:
2904-
return False
2905-
2906-
version = Version(host.dse_version)
2907-
if version < _GRAPH_PAGING_MIN_DSE_VERSION:
2908-
return False
2909-
2910-
return True
2911-
29122895
def _resolve_execution_profile_options(self, execution_profile):
29132896
"""
29142897
Determine the GraphSON protocol and row factory for a graph query. This is useful
@@ -3053,13 +3036,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
30533036
else:
30543037
timestamp = None
30553038

3056-
supports_continuous_paging_state = (
3057-
ProtocolVersion.has_continuous_paging_next_pages(self._protocol_version)
3058-
)
3059-
if continuous_paging_options and supports_continuous_paging_state:
3060-
continuous_paging_state = ContinuousPagingState(continuous_paging_options.max_queue_size)
3061-
else:
3062-
continuous_paging_state = None
3039+
continuous_paging_state = None
30633040

30643041
if isinstance(query, SimpleStatement):
30653042
query_string = query.query_string

cassandra/connection.py

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -444,32 +444,6 @@ class CrcMismatchException(ConnectionException):
444444
pass
445445

446446

447-
class ContinuousPagingState(object):
448-
"""
449-
A class for specifying continuous paging state, only supported starting with DSE_V2.
450-
"""
451-
452-
num_pages_requested = None
453-
"""
454-
How many pages we have already requested
455-
"""
456-
457-
num_pages_received = None
458-
"""
459-
How many pages we have already received
460-
"""
461-
462-
max_queue_size = None
463-
"""
464-
The max queue size chosen by the user via the options
465-
"""
466-
467-
def __init__(self, max_queue_size):
468-
self.num_pages_requested = max_queue_size # the initial query requests max_queue_size
469-
self.num_pages_received = 0
470-
self.max_queue_size = max_queue_size
471-
472-
473447
class ContinuousPagingSession(object):
474448
def __init__(self, stream_id, decoder, row_factory, connection, state):
475449
self.stream_id = stream_id

cassandra/protocol.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -615,8 +615,6 @@ def _write_query_params(self, f, protocol_version):
615615
def _write_paging_options(self, f, paging_options, protocol_version):
616616
write_int(f, paging_options.max_pages)
617617
write_int(f, paging_options.max_pages_per_second)
618-
if ProtocolVersion.has_continuous_paging_next_pages(protocol_version):
619-
write_int(f, paging_options.max_queue_size)
620618

621619

622620
class QueryMessage(_QueryMessage):
@@ -1050,12 +1048,10 @@ def send_body(self, f, protocol_version):
10501048
if self.op_type == ReviseRequestMessage.RevisionType.PAGING_BACKPRESSURE:
10511049
if self.next_pages <= 0:
10521050
raise UnsupportedOperation("Continuous paging backpressure requires next_pages > 0")
1053-
elif not ProtocolVersion.has_continuous_paging_next_pages(protocol_version):
1051+
else:
10541052
raise UnsupportedOperation(
10551053
"Continuous paging backpressure may only be used with protocol version "
10561054
"ProtocolVersion.DSE_V2 or higher. Consider setting Cluster.protocol_version to ProtocolVersion.DSE_V2.")
1057-
else:
1058-
write_int(f, self.next_pages)
10591055

10601056

10611057
class _ProtocolHandler(object):

tests/integration/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ def get_supported_protocol_versions():
192192
4.0(C*) -> 6(beta),5,4,3
193193
` """
194194
if CASSANDRA_VERSION >= Version('4.0-beta5'):
195-
return (3, 4, 5, 6)
195+
return (3, 4, 5)
196196
if CASSANDRA_VERSION >= Version('4.0-a'):
197197
return (3, 4, 5)
198198
elif CASSANDRA_VERSION >= Version('3.10'):
@@ -261,7 +261,6 @@ def _id_and_mark(f):
261261
local = local_decorator_creator()
262262
notprotocolv1 = unittest.skipUnless(PROTOCOL_VERSION > 1, 'Protocol v1 not supported')
263263
greaterthanprotocolv3 = unittest.skipUnless(PROTOCOL_VERSION >= 4, 'Protocol versions less than 4 are not supported')
264-
protocolv6 = unittest.skipUnless(6 in get_supported_protocol_versions(), 'Protocol versions less than 6 are not supported')
265264

266265
greaterthancass20 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.1'), 'Cassandra version 2.1 or greater required')
267266
greaterthancass21 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.2'), 'Cassandra version 2.2 or greater required')
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Copyright DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from cassandra import cluster
16+
from cassandra.cluster import ContinuousPagingOptions
17+
from cassandra.datastax.graph.fluent import DseGraph
18+
from cassandra.graph import VertexProperty
19+
20+
from tests.integration.advanced.graph import (
21+
GraphUnitTestCase, ClassicGraphSchema, CoreGraphSchema,
22+
VertexLabel, GraphTestConfiguration
23+
)
24+
from tests.integration.advanced.graph.fluent import (
25+
BaseExplicitExecutionTest, create_traversal_profiles, check_equality_base)
26+
27+
import unittest
28+
29+
30+
class ContinuousPagingOptionsForTests(ContinuousPagingOptions):
31+
def __init__(self,
32+
page_unit=ContinuousPagingOptions.PagingUnit.ROWS, max_pages=1, # max_pages=1
33+
max_pages_per_second=0, max_queue_size=4):
34+
super(ContinuousPagingOptionsForTests, self).__init__(page_unit, max_pages, max_pages_per_second,
35+
max_queue_size)
36+
37+
38+
def reset_paging_options():
39+
cluster.ContinuousPagingOptions = ContinuousPagingOptions
40+

0 commit comments

Comments
 (0)