Skip to content

feat(iceberg): introduce remove schemas #1115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,12 @@ pub enum TableUpdate {
/// Snapshot id to remove partition statistics for.
snapshot_id: i64,
},
/// Remove schemas
#[serde(rename_all = "kebab-case")]
RemoveSchemas {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// Schema IDs to remove.
schema_ids: Vec<i32>,
},
}

impl TableUpdate {
Expand Down Expand Up @@ -525,6 +531,7 @@ impl TableUpdate {
TableUpdate::RemovePartitionStatistics { snapshot_id } => {
Ok(builder.remove_partition_statistics(snapshot_id))
}
TableUpdate::RemoveSchemas { schema_ids } => builder.remove_schemas(&schema_ids),
}
}
}
Expand Down Expand Up @@ -2047,4 +2054,19 @@ mod tests {
},
)
}

#[test]
fn test_remove_schema_update() {
test_serde_json(
r#"
{
"action": "remove-schemas",
"schema-ids": [1, 2]
}
"#,
TableUpdate::RemoveSchemas {
schema_ids: vec![1, 2],
},
);
}
}
81 changes: 81 additions & 0 deletions crates/iceberg/src/spec/table_metadata_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,37 @@ impl TableMetadataBuilder {
fn highest_sort_order_id(&self) -> Option<i64> {
self.metadata.sort_orders.keys().max().copied()
}

/// Remove schemas by their ids from the table metadata.
/// Does nothing if a schema id is not present. Active schemas should not be removed.
pub fn remove_schemas(mut self, schema_id_to_remove: &[i32]) -> Result<Self> {
if schema_id_to_remove.contains(&self.metadata.current_schema_id) {
return Err(Error::new(
ErrorKind::DataInvalid,
"Cannot remove current schema",
));
}

if schema_id_to_remove.is_empty() {
return Ok(self);
}

let mut removed_schemas = Vec::with_capacity(schema_id_to_remove.len());
self.metadata.schemas.retain(|id, _schema| {
if schema_id_to_remove.contains(id) {
removed_schemas.push(*id);
false
} else {
true
}
});

self.changes.push(TableUpdate::RemoveSchemas {
schema_ids: removed_schemas,
});

Ok(self)
}
}

impl From<TableMetadataBuildResult> for TableMetadata {
Expand Down Expand Up @@ -2412,4 +2443,54 @@ mod tests {
table.metadata().current_snapshot_id().unwrap()
);
}

#[test]
fn test_active_schema_cannot_be_removed() {
let builder = builder_without_changes(FormatVersion::V2);
builder.remove_schemas(&[0]).unwrap_err();
}

#[test]
fn test_remove_schemas() {
let file = File::open(format!(
"{}/testdata/table_metadata/{}",
env!("CARGO_MANIFEST_DIR"),
"TableMetadataV2Valid.json"
))
.unwrap();
let reader = BufReader::new(file);
let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();

let table = Table::builder()
.metadata(resp)
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIOBuilder::new("memory").build().unwrap())
.build()
.unwrap();

assert_eq!(2, table.metadata().schemas.len());

{
// can not remove active schema
let meta_data_builder = table.metadata().clone().into_builder(None);
meta_data_builder.remove_schemas(&[1]).unwrap_err();
}

let mut meta_data_builder = table.metadata().clone().into_builder(None);
meta_data_builder = meta_data_builder.remove_schemas(&[0]).unwrap();
let build_result = meta_data_builder.build().unwrap();
assert_eq!(1, build_result.metadata.schemas.len());
assert_eq!(1, build_result.metadata.current_schema_id);
assert_eq!(1, build_result.metadata.current_schema().schema_id());
assert_eq!(1, build_result.changes.len());

let remove_schema_ids =
if let TableUpdate::RemoveSchemas { schema_ids } = &build_result.changes[0] {
schema_ids
} else {
unreachable!("Expected RemoveSchema change")
};
assert_eq!(remove_schema_ids, &[0]);
}
}
Loading