|
2 | 2 | from crate.client import connect
|
3 | 3 | from crate.client.exceptions import ProgrammingError
|
4 | 4 |
|
5 |
| -from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath |
| 5 | +from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath, assert_busy |
6 | 6 |
|
7 | 7 | ROLLING_UPGRADES_V4 = (
|
8 | 8 | # 4.0.0 -> 4.0.1 -> 4.0.2 don't support rolling upgrades due to a bug
|
|
41 | 41 | UpgradePath('5.8.x', '5.9.x'),
|
42 | 42 | UpgradePath('5.9.x', '5.10.x'),
|
43 | 43 | UpgradePath('5.10.x', '6.0.x'),
|
44 |
| - UpgradePath('6.0.x', 'latest-nightly'), |
| 44 | + UpgradePath('6.0.x', 'branch:jeeminso/jeeminso/lr-broken-subscription'), |
45 | 45 | )
|
46 | 46 |
|
47 | 47 |
|
@@ -88,6 +88,10 @@ def _test_rolling_upgrade(self, path, nodes):
|
88 | 88 | }
|
89 | 89 | cluster = self._new_cluster(path.from_version, nodes, settings=settings)
|
90 | 90 | cluster.start()
|
| 91 | + replica_cluster = None |
| 92 | + if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10: |
| 93 | + replica_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False) |
| 94 | + replica_cluster.start() |
91 | 95 | with connect(cluster.node().http_url, error_trace=True) as conn:
|
92 | 96 | c = conn.cursor()
|
93 | 97 | c.execute("create user arthur with (password = 'secret')")
|
@@ -152,6 +156,32 @@ def _test_rolling_upgrade(self, path, nodes):
|
152 | 156 | # Add the shards of the new partition primaries
|
153 | 157 | expected_active_shards += shards
|
154 | 158 |
|
| 159 | + # Set up tables for logical replications |
| 160 | + def num_docs_x(cursor): |
| 161 | + cursor.execute("select count(*) from doc.x") |
| 162 | + return cursor.fetchall()[0][0] |
| 163 | + def num_docs_rx(cursor): |
| 164 | + cursor.execute("select count(*) from doc.rx") |
| 165 | + return cursor.fetchall()[0][0] |
| 166 | + |
| 167 | + if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10: |
| 168 | + c.execute("create table doc.x (a int) clustered into 1 shards with (number_of_replicas=0)") |
| 169 | + c.execute("create publication p for table doc.x") |
| 170 | + with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn: |
| 171 | + rc = replica_conn.cursor() |
| 172 | + transport_port = cluster.node().addresses.transport.port |
| 173 | + replica_transport_port = replica_cluster.node().addresses.transport.port |
| 174 | + assert 4300 <= transport_port <= 4310 and 4300 <= replica_transport_port <= 4310 |
| 175 | + rc.execute("create table doc.rx (a int) clustered into 1 shards with (number_of_replicas=0)") |
| 176 | + rc.execute("create publication rp for table doc.rx") |
| 177 | + rc.execute(f"create subscription rs connection 'crate://localhost:{transport_port}?user=crate&sslmode=sniff' publication p") |
| 178 | + wait_for_active_shards(rc, 2) # doc.rx created via create-table and doc.x that is subscribed |
| 179 | + assert_busy(lambda: self.assertEqual(num_docs_x(rc), 0)) |
| 180 | + c.execute(f"create subscription s connection 'crate://localhost:{replica_transport_port}?user=crate&sslmode=sniff' publication rp") |
| 181 | + expected_active_shards += 2 |
| 182 | + wait_for_active_shards(c, expected_active_shards) |
| 183 | + assert_busy(lambda: self.assertEqual(num_docs_rx(c), 0)) |
| 184 | + |
155 | 185 | for idx, node in enumerate(cluster):
|
156 | 186 | # Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node.
|
157 | 187 | # Otherwise upgraded node simply requests N-1 columns from old version with N columns and it always works.
|
@@ -282,6 +312,25 @@ def _test_rolling_upgrade(self, path, nodes):
|
282 | 312 | c.execute("select version['created'] from information_schema.table_partitions where table_name = 't3' and values['a'] = ?", [idx])
|
283 | 313 | self.assertEqual(c.fetchall(), [[partition_version]])
|
284 | 314 |
|
| 315 | + # Ensure logical replications works |
| 316 | + if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 10: |
| 317 | + with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn: |
| 318 | + rc = replica_conn.cursor() |
| 319 | + |
| 320 | + # Cannot drop replicated tables |
| 321 | + with self.assertRaises(ProgrammingError): |
| 322 | + rc.execute("drop table doc.x") |
| 323 | + c.execute("drop table doc.rx") |
| 324 | + |
| 325 | + count = num_docs_x(rc) |
| 326 | + count2 = num_docs_rx(c) |
| 327 | + |
| 328 | + c.execute("insert into doc.x values (1)") |
| 329 | + rc.execute("insert into doc.rx values (1)") |
| 330 | + |
| 331 | + assert_busy(lambda: self.assertEqual(num_docs_x(rc), count + 1)) |
| 332 | + assert_busy(lambda: self.assertEqual(num_docs_rx(c), count2 + 1)) |
| 333 | + |
285 | 334 | # Finally validate that all shards (primaries and replicas) of all partitions are started
|
286 | 335 | # and writes into the partitioned table while upgrading were successful
|
287 | 336 | with connect(cluster.node().http_url, error_trace=True) as conn:
|
|
0 commit comments