Skip to content

Commit b1619c5

Browse files
authored
fix(iceberg): fix rewrite-files partition-spec-id (#54)
* fix(iceberg): fix rewrite-files partition-spec-id * fix(docker): update docker file * add test * update minio * Revert "update minio" This reverts commit 4464d90.
1 parent 7c7c84c commit b1619c5

File tree

4 files changed

+117
-6
lines changed

4 files changed

+117
-6
lines changed

crates/iceberg/src/spec/manifest/data_file.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,11 @@ impl DataFile {
236236
self.sort_order_id
237237
}
238238

239+
/// Get the partition spec id of the data file.
240+
pub fn partition_spec_id(&self) -> i32 {
241+
self.partition_spec_id
242+
}
243+
239244
pub(crate) fn rewrite_partition(&mut self, partition: Struct) {
240245
self.partition = partition;
241246
}

crates/iceberg/src/transaction/rewrite_files.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -287,11 +287,7 @@ impl SnapshotProduceOperation for RewriteFilesOperation {
287287
{
288288
let mut manifest_writer = snapshot_produce.new_manifest_writer(
289289
&ManifestContentType::Data,
290-
snapshot_produce
291-
.tx
292-
.current_table
293-
.metadata()
294-
.default_partition_spec_id(),
290+
manifest_file.partition_spec_id,
295291
)?;
296292

297293
for entry in manifest.entries() {

crates/integration_tests/testdata/spark/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ ENV SPARK_VERSION=3.5.5
3131
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
3232
ENV ICEBERG_VERSION=1.6.0
3333

34-
RUN curl --retry 5 -s -C - https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
34+
RUN curl --retry 5 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
3535
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
3636
&& rm -rf spark-${SPARK_VERSION}-bin-hadoop3.tgz
3737

crates/integration_tests/tests/shared_tests/rewrite_files_test.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use std::sync::Arc;
2121

2222
use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
2323
use futures::TryStreamExt;
24+
use iceberg::spec::DataFile;
25+
use iceberg::table::Table;
2426
use iceberg::transaction::Transaction;
2527
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
2628
use iceberg::writer::file_writer::location_generator::{
@@ -458,3 +460,111 @@ async fn test_sequence_number_in_manifest_entry() {
458460
}
459461
}
460462
}
463+
464+
#[tokio::test]
465+
async fn test_partition_spec_id_in_manifest() {
466+
let fixture = get_shared_containers();
467+
let rest_catalog = RestCatalog::new(fixture.catalog_config.clone());
468+
let ns = random_ns().await;
469+
let schema = test_schema();
470+
471+
let table_creation = TableCreation::builder()
472+
.name("t1".to_string())
473+
.schema(schema.clone())
474+
.build();
475+
476+
let mut table = rest_catalog
477+
.create_table(ns.name(), table_creation)
478+
.await
479+
.unwrap();
480+
481+
// Create the writer and write the data
482+
let schema: Arc<arrow_schema::Schema> = Arc::new(
483+
table
484+
.metadata()
485+
.current_schema()
486+
.as_ref()
487+
.try_into()
488+
.unwrap(),
489+
);
490+
let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
491+
let file_name_generator = DefaultFileNameGenerator::new(
492+
"test".to_string(),
493+
None,
494+
iceberg::spec::DataFileFormat::Parquet,
495+
);
496+
497+
// commit result
498+
let mut data_files_vec = Vec::default();
499+
500+
async fn build_data_file_f(
501+
schema: Arc<arrow_schema::Schema>,
502+
table: &Table,
503+
location_generator: DefaultLocationGenerator,
504+
file_name_generator: DefaultFileNameGenerator,
505+
) -> DataFile {
506+
let parquet_writer_builder = ParquetWriterBuilder::new(
507+
WriterProperties::default(),
508+
table.metadata().current_schema().clone(),
509+
table.file_io().clone(),
510+
location_generator,
511+
file_name_generator,
512+
);
513+
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
514+
let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
515+
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
516+
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
517+
let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
518+
let batch = RecordBatch::try_new(schema.clone(), vec![
519+
Arc::new(col1) as ArrayRef,
520+
Arc::new(col2) as ArrayRef,
521+
Arc::new(col3) as ArrayRef,
522+
])
523+
.unwrap();
524+
data_file_writer.write(batch.clone()).await.unwrap();
525+
data_file_writer.close().await.unwrap()[0].clone()
526+
}
527+
528+
for _ in 0..10 {
529+
let data_file = build_data_file_f(
530+
schema.clone(),
531+
&table,
532+
location_generator.clone(),
533+
file_name_generator.clone(),
534+
)
535+
.await;
536+
data_files_vec.push(data_file.clone());
537+
let tx = Transaction::new(&table);
538+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
539+
append_action.add_data_files(vec![data_file]).unwrap();
540+
let tx = append_action.apply().await.unwrap();
541+
table = tx.commit(&rest_catalog).await.unwrap();
542+
}
543+
544+
let last_data_files = data_files_vec.last().unwrap();
545+
let partition_spec_id = last_data_files.partition_spec_id();
546+
547+
// remove the data files by RewriteAction
548+
for data_file in &data_files_vec {
549+
let tx = Transaction::new(&table);
550+
let mut rewrite_action = tx.rewrite_files(None, vec![]).unwrap();
551+
rewrite_action = rewrite_action
552+
.delete_files(vec![data_file.clone()])
553+
.unwrap();
554+
let tx = rewrite_action.apply().await.unwrap();
555+
table = tx.commit(&rest_catalog).await.unwrap();
556+
}
557+
558+
// TODO: test update partition spec
559+
// Verify that the partition spec ID is correctly set
560+
561+
let last_snapshot = table.metadata().current_snapshot().unwrap();
562+
let manifest_list = last_snapshot
563+
.load_manifest_list(table.file_io(), table.metadata())
564+
.await
565+
.unwrap();
566+
assert_eq!(manifest_list.entries().len(), 1);
567+
for manifest_file in manifest_list.entries() {
568+
assert_eq!(manifest_file.partition_spec_id, partition_spec_id);
569+
}
570+
}

0 commit comments

Comments
 (0)