Skip to content

Commit be339a0

Browse files
authored
fix(iceberg): Introduce new data sequence for RewriteFilesAction (#51)
* feat(iceberg): rewrite_files support use_starting_sequence_number * chore(test): add test_sequence_number_in_manifest_entry
1 parent 8faccbc commit be339a0

File tree

3 files changed

+166
-22
lines changed

3 files changed

+166
-22
lines changed

crates/iceberg/src/transaction/rewrite_files.rs

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ use crate::spec::{
3434
Operation,
3535
};
3636

37+
pub const USE_STARTING_SEQUENCE_NUMBER: &str = "use-starting-sequence-number";
38+
pub const USE_STARTING_SEQUENCE_NUMBER_DEFAULT: bool = true;
39+
3740
/// Transaction action for rewriting files.
3841
pub struct RewriteFilesAction<'a> {
3942
snapshot_produce_action: SnapshotProduceAction<'a>,
@@ -74,15 +77,40 @@ impl<'a> RewriteFilesAction<'a> {
7477
.and_then(|s| s.parse().ok())
7578
.unwrap_or(MANIFEST_MERGE_ENABLED_DEFAULT);
7679

80+
// If the compaction should use the sequence number of the snapshot at compaction start time for
81+
// new data files, instead of using the sequence number of the newly produced snapshot.
82+
// This avoids commit conflicts with updates that add newer equality deletes at a higher sequence number.
83+
let use_starting_sequence_number = tx
84+
.current_table
85+
.metadata()
86+
.properties()
87+
.get(USE_STARTING_SEQUENCE_NUMBER)
88+
.and_then(|s| s.parse().ok())
89+
.unwrap_or(USE_STARTING_SEQUENCE_NUMBER_DEFAULT);
90+
91+
let mut snapshot_produce_action = SnapshotProduceAction::new(
92+
tx,
93+
snapshot_id,
94+
key_metadata,
95+
commit_uuid,
96+
snapshot_properties,
97+
)
98+
.unwrap();
99+
100+
if use_starting_sequence_number {
101+
if let Some(snapshot) = snapshot_produce_action
102+
.tx
103+
.base_table
104+
.metadata()
105+
.current_snapshot()
106+
{
107+
snapshot_produce_action
108+
.set_new_data_file_sequence_number(snapshot.sequence_number());
109+
}
110+
}
111+
77112
Ok(Self {
78-
snapshot_produce_action: SnapshotProduceAction::new(
79-
tx,
80-
snapshot_id,
81-
key_metadata,
82-
commit_uuid,
83-
snapshot_properties,
84-
)
85-
.unwrap(),
113+
snapshot_produce_action,
86114
target_size_bytes,
87115
min_count_to_merge,
88116
merge_enabled,
@@ -92,18 +120,18 @@ impl<'a> RewriteFilesAction<'a> {
92120
/// Add data files to the snapshot.
93121
94122
pub fn add_data_files(
95-
&mut self,
123+
mut self,
96124
data_files: impl IntoIterator<Item = DataFile>,
97-
) -> Result<&mut Self> {
125+
) -> Result<Self> {
98126
self.snapshot_produce_action.add_data_files(data_files)?;
99127
Ok(self)
100128
}
101129

102130
/// Add remove files to the snapshot.
103131
pub fn delete_files(
104-
&mut self,
132+
mut self,
105133
remove_data_files: impl IntoIterator<Item = DataFile>,
106-
) -> Result<&mut Self> {
134+
) -> Result<Self> {
107135
self.snapshot_produce_action
108136
.delete_files(remove_data_files)?;
109137
Ok(self)
@@ -123,6 +151,13 @@ impl<'a> RewriteFilesAction<'a> {
123151
.await
124152
}
125153
}
154+
155+
pub fn new_data_file_sequence_number(mut self, seq: i64) -> Result<Self> {
156+
self.snapshot_produce_action
157+
.set_new_data_file_sequence_number(seq);
158+
159+
Ok(self)
160+
}
126161
}
127162

128163
impl SnapshotProduceOperation for RewriteFilesOperation {

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ pub(crate) struct SnapshotProduceAction<'a> {
8989
// It starts from 0 and increments for each new manifest file.
9090
// Note: This counter is limited to the range of (0..u64::MAX).
9191
manifest_counter: RangeFrom<u64>,
92+
93+
new_data_file_sequence_number: Option<i64>,
9294
}
9395

9496
impl<'a> SnapshotProduceAction<'a> {
@@ -112,6 +114,7 @@ impl<'a> SnapshotProduceAction<'a> {
112114
removed_delete_file_paths: HashSet::new(),
113115
manifest_counter: (0..),
114116
key_metadata,
117+
new_data_file_sequence_number: None,
115118
})
116119
}
117120

@@ -252,6 +255,7 @@ impl<'a> SnapshotProduceAction<'a> {
252255
async fn write_added_manifest(
253256
&mut self,
254257
added_data_files: Vec<DataFile>,
258+
data_seq: Option<i64>,
255259
) -> Result<ManifestFile> {
256260
let snapshot_id = self.snapshot_id;
257261
let format_version = self.tx.current_table.metadata().format_version();
@@ -279,7 +283,9 @@ impl<'a> SnapshotProduceAction<'a> {
279283
let manifest_entries = added_data_files.into_iter().map(|data_file| {
280284
let builder = ManifestEntry::builder()
281285
.status(crate::spec::ManifestStatus::Added)
282-
.data_file(data_file);
286+
.data_file(data_file)
287+
.sequence_number_opt(data_seq);
288+
283289
if format_version == FormatVersion::V1 {
284290
builder.snapshot_id(snapshot_id).build()
285291
} else {
@@ -362,13 +368,18 @@ impl<'a> SnapshotProduceAction<'a> {
362368
let mut manifest_files = vec![];
363369
let data_files = std::mem::take(&mut self.added_data_files);
364370
let added_delete_files = std::mem::take(&mut self.added_delete_files);
371+
365372
if !data_files.is_empty() {
366-
let added_manifest = self.write_added_manifest(data_files).await?;
373+
let added_manifest = self
374+
.write_added_manifest(data_files, self.new_data_file_sequence_number)
375+
.await?;
367376
manifest_files.push(added_manifest);
368377
}
369378

370379
if !added_delete_files.is_empty() {
371-
let added_delete_manifest = self.write_added_manifest(added_delete_files).await?;
380+
let added_delete_manifest = self
381+
.write_added_manifest(added_delete_files, self.new_data_file_sequence_number)
382+
.await?;
372383
manifest_files.push(added_delete_manifest);
373384
}
374385

@@ -479,6 +490,10 @@ impl<'a> SnapshotProduceAction<'a> {
479490
)?;
480491
Ok(self.tx)
481492
}
493+
494+
pub fn set_new_data_file_sequence_number(&mut self, new_data_file_sequence_number: i64) {
495+
self.new_data_file_sequence_number = Some(new_data_file_sequence_number);
496+
}
482497
}
483498

484499
pub(crate) struct MergeManifestProcess {

crates/integration_tests/tests/shared_tests/rewrite_files_test.rs

Lines changed: 101 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,13 @@ async fn test_rewrite_data_files() {
165165

166166
// commit result again
167167
let tx = Transaction::new(&table);
168-
let mut rewrite_action = tx.rewrite_files(None, vec![]).unwrap();
169-
rewrite_action
168+
let rewrite_action = tx
169+
.rewrite_files(None, vec![])
170+
.unwrap()
170171
.add_data_files(data_file_rewrite.clone())
172+
.unwrap()
173+
.delete_files(data_file.clone())
171174
.unwrap();
172-
rewrite_action.delete_files(data_file.clone()).unwrap();
173175

174176
let tx = rewrite_action.apply().await.unwrap();
175177
let table = tx.commit(&rest_catalog).await.unwrap();
@@ -279,9 +281,13 @@ async fn test_multiple_file_rewrite() {
279281
let data_file2 = data_file_writer.close().await.unwrap();
280282

281283
let tx = Transaction::new(&table);
282-
let mut rewrite_action = tx.rewrite_files(None, vec![]).unwrap();
283-
rewrite_action.add_data_files(data_file1.clone()).unwrap();
284-
rewrite_action.add_data_files(data_file2.clone()).unwrap();
284+
let rewrite_action = tx
285+
.rewrite_files(None, vec![])
286+
.unwrap()
287+
.add_data_files(data_file1.clone())
288+
.unwrap()
289+
.add_data_files(data_file2.clone())
290+
.unwrap();
285291
let tx = rewrite_action.apply().await.unwrap();
286292
let table = tx.commit(&rest_catalog).await.unwrap();
287293

@@ -357,10 +363,98 @@ async fn test_rewrite_nonexistent_file() {
357363
let nonexistent_data_file = valid_data_file.clone();
358364

359365
let tx = Transaction::new(&table);
360-
let mut rewrite_action = tx.rewrite_files(None, vec![]).unwrap();
366+
let rewrite_action = tx.rewrite_files(None, vec![]).unwrap();
361367

362368
// Attempt to delete the nonexistent file
363369
let result = rewrite_action.delete_files(nonexistent_data_file);
364370

365371
assert!(result.is_ok());
366372
}
373+
374+
#[tokio::test]
375+
async fn test_sequence_number_in_manifest_entry() {
376+
let fixture = get_shared_containers();
377+
let rest_catalog = RestCatalog::new(fixture.catalog_config.clone());
378+
let ns = random_ns().await;
379+
let schema = test_schema();
380+
381+
let table_creation = TableCreation::builder()
382+
.name("t3".to_string())
383+
.schema(schema.clone())
384+
.build();
385+
386+
let table = rest_catalog
387+
.create_table(ns.name(), table_creation)
388+
.await
389+
.unwrap();
390+
391+
let schema: Arc<arrow_schema::Schema> = Arc::new(
392+
table
393+
.metadata()
394+
.current_schema()
395+
.as_ref()
396+
.try_into()
397+
.unwrap(),
398+
);
399+
let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
400+
let file_name_generator = DefaultFileNameGenerator::new(
401+
"test".to_string(),
402+
None,
403+
iceberg::spec::DataFileFormat::Parquet,
404+
);
405+
let parquet_writer_builder = ParquetWriterBuilder::new(
406+
WriterProperties::default(),
407+
table.metadata().current_schema().clone(),
408+
table.file_io().clone(),
409+
location_generator.clone(),
410+
file_name_generator.clone(),
411+
);
412+
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
413+
let mut data_file_writer = data_file_writer_builder.clone().build().await.unwrap();
414+
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
415+
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
416+
let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
417+
let batch = RecordBatch::try_new(schema.clone(), vec![
418+
Arc::new(col1) as ArrayRef,
419+
Arc::new(col2) as ArrayRef,
420+
Arc::new(col3) as ArrayRef,
421+
])
422+
.unwrap();
423+
data_file_writer.write(batch.clone()).await.unwrap();
424+
let data_file1 = data_file_writer.close().await.unwrap();
425+
426+
let mut data_file_writer = data_file_writer_builder.clone().build().await.unwrap();
427+
data_file_writer.write(batch.clone()).await.unwrap();
428+
let data_file2 = data_file_writer.close().await.unwrap();
429+
430+
// Commit with sequence number
431+
432+
let tx = Transaction::new(&table);
433+
let rewrite_action = tx
434+
.rewrite_files(None, vec![])
435+
.unwrap()
436+
.add_data_files(data_file1.clone())
437+
.unwrap()
438+
.add_data_files(data_file2.clone())
439+
.unwrap();
440+
// Set sequence number to 12345
441+
let rewrite_action = rewrite_action.new_data_file_sequence_number(12345).unwrap();
442+
let tx = rewrite_action.apply().await.unwrap();
443+
let table = tx.commit(&rest_catalog).await.unwrap();
444+
445+
// Verify manifest entry has correct sequence number
446+
let snapshot = table.metadata().current_snapshot().unwrap();
447+
let manifest_list = snapshot
448+
.load_manifest_list(table.file_io(), table.metadata())
449+
.await
450+
.unwrap();
451+
452+
assert_eq!(manifest_list.entries().len(), 1);
453+
454+
for manifest_file in manifest_list.entries() {
455+
let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap();
456+
for entry in manifest.entries() {
457+
assert_eq!(entry.sequence_number(), Some(12345));
458+
}
459+
}
460+
}

0 commit comments

Comments
 (0)