Skip to content

Commit f0e74e1

Browse files
Merge pull request #26438 from wdberkeley/v25.1.x-iceberg-dbx-backport
Backport #26426 to v25.1.x
2 parents 792b119 + 52af8d0 commit f0e74e1

17 files changed

+119
-45
lines changed

src/v/datalake/catalog_schema_manager.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,21 @@
2020
namespace datalake {
2121

2222
namespace {
23+
24+
// NB: Use the concrete type not the alias iceberg::table_properties_t
25+
// so changes to the alias can't silently destroy the deep copy semantics
26+
// of this function.
27+
chunked_hash_map<ss::sstring, ss::sstring, sstring_hash, sstring_eq>
28+
copy_properties(
29+
const chunked_hash_map<ss::sstring, ss::sstring, sstring_hash, sstring_eq>&
30+
props) {
31+
chunked_hash_map<ss::sstring, ss::sstring, sstring_hash, sstring_eq> result;
32+
for (const auto& [key, value] : props) {
33+
result.emplace(key, value);
34+
}
35+
return result;
36+
}
37+
2338
schema_manager::errc log_and_convert_catalog_err(
2439
iceberg::catalog::errc e, std::string_view log_msg) {
2540
switch (e) {
@@ -146,6 +161,7 @@ simple_schema_manager::ensure_table_schema(
146161
table_location_prefix_(),
147162
fmt::join(table_id.ns, "/"),
148163
table_id.table)),
164+
.properties = std::nullopt,
149165
});
150166

151167
co_return std::nullopt;
@@ -164,6 +180,7 @@ simple_schema_manager::get_table_info(
164180
.schema = it->second.schema.copy(),
165181
.partition_spec = it->second.partition_spec.copy(),
166182
.location = it->second.location,
183+
.properties = it->second.properties.transform(copy_properties),
167184
};
168185
}
169186

@@ -281,6 +298,7 @@ catalog_schema_manager::get_table_info(
281298
.schema = cur_schema->copy(),
282299
.partition_spec = cur_spec->copy(),
283300
.location = table.location,
301+
.properties = table.properties.transform(copy_properties),
284302
};
285303
}
286304

src/v/datalake/catalog_schema_manager.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class schema_manager {
4040
iceberg::schema schema;
4141
iceberg::partition_spec partition_spec;
4242
iceberg::uri location;
43+
std::optional<iceberg::table_properties_t> properties;
4344

4445
// Fills the field IDs of the given type with those in the current
4546
// schema. Returns true on success.

src/v/datalake/datalake_manager.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434
constexpr std::chrono::milliseconds translation_jitter{500};
3535
constexpr std::chrono::milliseconds translation_jitter_base{5000};
36-
static constexpr std::string_view iceberg_data_path_prefix = "data";
3736

3837
namespace datalake {
3938

@@ -702,7 +701,6 @@ datalake_manager::handle_translator_state_change(const model::ntp& ntp) {
702701
std::move(record_translator),
703702
std::move(table_creator),
704703
_location_provider,
705-
remote_path{iceberg_data_path_prefix},
706704
*reservations,
707705
_topic_table,
708706
_features,

src/v/datalake/partitioning_writer.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ partitioning_writer::finish() && {
136136

137137
files.push_back(partitioned_file{
138138
.local_file = std::move(file_res.value()),
139-
.table_location = remote_prefix_,
139+
.data_location = remote_prefix_,
140140
.schema_id = schema_id_,
141141
.partition_spec_id = spec_.spec_id,
142142
.partition_key = std::move(pk),

src/v/datalake/partitioning_writer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class partitioning_writer {
5858

5959
struct partitioned_file {
6060
local_file_metadata local_file;
61-
remote_path table_location;
61+
remote_path data_location;
6262
iceberg::schema::id_t schema_id;
6363
iceberg::partition_spec::id_t partition_spec_id;
6464
iceberg::partition_key partition_key;

src/v/datalake/record_multiplexer.cc

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,22 @@
2828
namespace datalake {
2929

3030
namespace {
31+
32+
// Get the data location for the table. Some catalogs require using the property
33+
// `write.data.path`. Otherwise, it defaults to <table location>/data.
34+
iceberg::uri get_data_location(const schema_manager::table_info& table_info) {
35+
static constexpr std::string_view write_data_path_prop = "write.data.path";
36+
37+
if (table_info.properties.has_value()) {
38+
auto it = table_info.properties->find(write_data_path_prop);
39+
if (it != table_info.properties->end()) {
40+
return iceberg::uri(it->second);
41+
}
42+
}
43+
44+
return iceberg::uri(fmt::format("{}/data", table_info.location));
45+
}
46+
3147
template<typename Func>
3248
requires requires(Func f, model::record_batch batch) {
3349
{ f(std::move(batch)) } -> std::same_as<ss::future<ss::stop_iteration>>;
@@ -258,9 +274,9 @@ ss::future<ss::stop_iteration> record_multiplexer::do_multiplex(
258274
co_return ss::stop_iteration::yes;
259275
}
260276

261-
auto table_remote_path = _location_provider.from_uri(
262-
load_res.value().location);
263-
if (!table_remote_path) {
277+
auto data_location = get_data_location(load_res.value());
278+
auto data_remote_path = _location_provider.from_uri(data_location);
279+
if (!data_remote_path) {
264280
vlog(
265281
_log.warn,
266282
"Error getting location prefix for {} while creating writer "
@@ -278,7 +294,7 @@ ss::future<ss::stop_iteration> record_multiplexer::do_multiplex(
278294
load_res.value().schema.schema_id,
279295
std::move(record_type.type),
280296
std::move(load_res.value().partition_spec),
281-
std::move(table_remote_path.value())));
297+
std::move(data_remote_path.value())));
282298
writer_iter = iter;
283299
}
284300

@@ -476,9 +492,9 @@ record_multiplexer::handle_invalid_record(
476492
co_return writer_error::unknown_error;
477493
}
478494

479-
auto table_remote_path = _location_provider.from_uri(
480-
load_res.value().location);
481-
if (!table_remote_path) {
495+
auto data_location = get_data_location(load_res.value());
496+
auto data_remote_path = _location_provider.from_uri(data_location);
497+
if (!data_remote_path) {
482498
vlog(
483499
_log.warn,
484500
"Error getting location prefix for {} while creating writer "
@@ -493,7 +509,7 @@ record_multiplexer::handle_invalid_record(
493509
load_res.value().schema.schema_id,
494510
std::move(record_type.type),
495511
std::move(load_res.value().partition_spec),
496-
std::move(table_remote_path.value()));
512+
std::move(data_remote_path.value()));
497513
}
498514

499515
int64_t estimated_size = (key ? key->size_bytes() : 0)

src/v/datalake/tests/translation_task_test.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,6 @@ TEST_F(TranslateTaskTest, TestHappyPathTranslation) {
199199
auto result = std::move(task)
200200
.finish(
201201
translation_task::custom_partitioning_enabled::yes,
202-
datalake::remote_path("test/location/1"),
203202
test_rcn,
204203
as)
205204
.get();
@@ -230,7 +229,6 @@ TEST_F(TranslateTaskTest, TestDataFileMissing) {
230229
auto result = std::move(task)
231230
.finish(
232231
translation_task::custom_partitioning_enabled::yes,
233-
datalake::remote_path("test/location/1"),
234232
test_rcn,
235233
as)
236234
.get();
@@ -265,7 +263,6 @@ TEST_F(TranslateTaskTest, TestUploadError) {
265263
auto result = std::move(task)
266264
.finish(
267265
translation_task::custom_partitioning_enabled::yes,
268-
datalake::remote_path("test/location/1"),
269266
test_rcn,
270267
as)
271268
.get();

src/v/datalake/translation/deps.cc

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,6 @@ class partition_translation_context : public translation_context {
493493
std::unique_ptr<record_translator> record_translator,
494494
std::unique_ptr<table_creator> table_creator,
495495
location_provider location_provider,
496-
remote_path upload_path_prefix,
497496
scheduling::reservations_tracker& reservations,
498497
ss::sharded<cluster::topic_table>* topics,
499498
ss::sharded<features::feature_table>* features,
@@ -507,7 +506,6 @@ class partition_translation_context : public translation_context {
507506
, _record_translator(std::move(record_translator))
508507
, _table_creator(std::move(table_creator))
509508
, _location_provider(std::move(location_provider))
510-
, _upload_path_prefix(std::move(upload_path_prefix))
511509
, _reservations(reservations)
512510
, _topics(topics->local())
513511
, _features(features->local())
@@ -605,8 +603,8 @@ class partition_translation_context : public translation_context {
605603
}
606604
vlog(datalake_log.debug, "[{}] finishing translation", _ntp);
607605
auto task = std::exchange(_in_progress_translation, std::nullopt);
608-
auto result = co_await std::move(task.value())
609-
.finish(_cp_enabled, _upload_path_prefix, rcn, as);
606+
auto result
607+
= co_await std::move(task.value()).finish(_cp_enabled, rcn, as);
610608

611609
if (result.has_error()) {
612610
co_return map_error_code(result.error());
@@ -668,7 +666,6 @@ class partition_translation_context : public translation_context {
668666
std::unique_ptr<record_translator> _record_translator;
669667
std::unique_ptr<table_creator> _table_creator;
670668
location_provider _location_provider;
671-
remote_path _upload_path_prefix;
672669
scheduling::reservations_tracker& _reservations;
673670
cluster::topic_table& _topics;
674671
features::feature_table& _features;
@@ -690,7 +687,6 @@ translation_context::make_default_translation_context(
690687
std::unique_ptr<record_translator> record_translator,
691688
std::unique_ptr<table_creator> table_creator,
692689
location_provider location_provider,
693-
remote_path upload_path_prefix,
694690
scheduling::reservations_tracker& reservations,
695691
ss::sharded<cluster::topic_table>* topics,
696692
ss::sharded<features::feature_table>* features,
@@ -705,7 +701,6 @@ translation_context::make_default_translation_context(
705701
std::move(record_translator),
706702
std::move(table_creator),
707703
std::move(location_provider),
708-
std::move(upload_path_prefix),
709704
reservations,
710705
topics,
711706
features,

src/v/datalake/translation/deps.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,6 @@ class translation_context {
294294
std::unique_ptr<record_translator>,
295295
std::unique_ptr<table_creator>,
296296
location_provider,
297-
remote_path,
298297
scheduling::reservations_tracker&,
299298
ss::sharded<cluster::topic_table>*,
300299
ss::sharded<features::feature_table>*,

src/v/datalake/translation_task.cc

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,10 @@ ss::future<checked<remote_path, translation_task::errc>> execute_single_upload(
6565
prefix_logger& logger,
6666
cloud_data_io& _cloud_io,
6767
const partitioning_writer::partitioned_file& file,
68-
const remote_path& remote_path_prefix,
6968
retry_chain_node& parent_rcn,
7069
lazy_abort_source& lazy_as) {
7170
auto file_remote_path = remote_path{
72-
file.table_location / remote_path_prefix / file.partition_key_path
71+
file.data_location / file.partition_key_path
7372
/ file.local_file.path().filename()};
7473

7574
// (Approximate because I'm ignoring backoff) ~5 retries at 1
@@ -126,7 +125,6 @@ upload_files(
126125
cloud_data_io& _cloud_io,
127126
const chunked_vector<partitioning_writer::partitioned_file>& files,
128127
translation_task::custom_partitioning_enabled is_custom_partitioning_enabled,
129-
const remote_path& remote_path_prefix,
130128
retry_chain_node& rcn,
131129
lazy_abort_source& lazy_as) {
132130
chunked_vector<coordinator::data_file> ret;
@@ -135,7 +133,7 @@ upload_files(
135133
std::optional<translation_task::errc> upload_error;
136134
for (auto& file : files) {
137135
auto r = co_await execute_single_upload(
138-
logger, _cloud_io, file, remote_path_prefix, rcn, lazy_as);
136+
logger, _cloud_io, file, rcn, lazy_as);
139137

140138
if (r.has_error()) {
141139
vlog(
@@ -280,7 +278,6 @@ ss::future<
280278
checked<coordinator::translated_offset_range, translation_task::errc>>
281279
translation_task::finish(
282280
custom_partitioning_enabled is_custom_partitioning_enabled,
283-
const remote_path& remote_path_prefix,
284281
retry_chain_node& rcn,
285282
ss::abort_source& as) && {
286283
auto mux_result = co_await std::move(_multiplexer).finish();
@@ -331,7 +328,6 @@ translation_task::finish(
331328
*_cloud_io,
332329
write_result.data_files,
333330
is_custom_partitioning_enabled,
334-
remote_path_prefix,
335331
rcn,
336332
lazy_as);
337333
if (upload_res.has_error()) {
@@ -347,7 +343,6 @@ translation_task::finish(
347343
*_cloud_io,
348344
write_result.dlq_files,
349345
is_custom_partitioning_enabled,
350-
remote_path_prefix,
351346
rcn,
352347
lazy_as);
353348
if (dlq_upload_res.has_error()) {

0 commit comments

Comments
 (0)