Skip to content

refactor: Add read_from() and write_to() to TableMetadata #1523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 2 additions & 7 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,7 @@ impl Catalog for GlueCatalog {
.metadata;
let metadata_location = create_metadata_location(&location, 0)?;

self.file_io
.new_output(&metadata_location)?
.write(serde_json::to_vec(&metadata)?.into())
.await?;
metadata.write_to(&self.file_io, &metadata_location).await?;

let glue_table = convert_to_glue_table(
&table_name,
Expand Down Expand Up @@ -463,9 +460,7 @@ impl Catalog for GlueCatalog {
Some(table) => {
let metadata_location = get_metadata_location(&table.parameters)?;

let input_file = self.file_io.new_input(&metadata_location)?;
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;

Table::builder()
.file_io(self.file_io())
Expand Down
8 changes: 2 additions & 6 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,7 @@ impl Catalog for HmsCatalog {

let metadata_location = create_metadata_location(&location, 0)?;

self.file_io
.new_output(&metadata_location)?
.write(serde_json::to_vec(&metadata)?.into())
.await?;
metadata.write_to(&self.file_io, &metadata_location).await?;

let hive_table = convert_to_hive_table(
db_name.clone(),
Expand Down Expand Up @@ -406,8 +403,7 @@ impl Catalog for HmsCatalog {

let metadata_location = get_metadata_location(&hive_table.parameters)?;

let metadata_content = self.file_io.new_input(&metadata_location)?.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;

Table::builder()
.file_io(self.file_io())
Expand Down
1 change: 0 additions & 1 deletion crates/catalog/s3tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ async-trait = { workspace = true }
aws-config = { workspace = true }
aws-sdk-s3tables = "1.10.0"
iceberg = { workspace = true }
serde_json = { workspace = true }
typed-builder = { workspace = true }
uuid = { workspace = true, features = ["v4"] }

Expand Down
9 changes: 2 additions & 7 deletions crates/catalog/s3tables/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,7 @@ impl Catalog for S3TablesCatalog {
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
self.file_io
.new_output(&metadata_location)?
.write(serde_json::to_vec(&metadata)?.into())
.await?;
metadata.write_to(&self.file_io, &metadata_location).await?;

// update metadata location
self.s3tables_client
Expand Down Expand Up @@ -389,9 +386,7 @@ impl Catalog for S3TablesCatalog {
),
)
})?;
let input_file = self.file_io.new_input(metadata_location)?;
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;

let table = Table::builder()
.identifier(table_ident.clone())
Expand Down
1 change: 0 additions & 1 deletion crates/catalog/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ repository = { workspace = true }
[dependencies]
async-trait = { workspace = true }
iceberg = { workspace = true }
serde_json = { workspace = true }
sqlx = { version = "0.8.1", features = ["any"], default-features = false }
typed-builder = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
Expand Down
8 changes: 3 additions & 5 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,7 @@ impl Catalog for SqlCatalog {
.try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
.map_err(from_sqlx_error)?;

let file = self.fileio.new_input(&tbl_metadata_location)?;
let metadata_content = file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?;

Ok(Table::builder()
.file_io(self.fileio.clone())
Expand Down Expand Up @@ -708,8 +706,8 @@ impl Catalog for SqlCatalog {
Uuid::new_v4()
);

let file = self.fileio.new_output(&tbl_metadata_location)?;
file.write(serde_json::to_vec(&tbl_metadata)?.into())
tbl_metadata
.write_to(&self.fileio, &tbl_metadata_location)
.await?;

self.execute(&format!(
Expand Down
13 changes: 3 additions & 10 deletions crates/iceberg/src/catalog/memory/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,7 @@ impl Catalog for MemoryCatalog {
Uuid::new_v4()
);

self.file_io
.new_output(&metadata_location)?
.write(serde_json::to_vec(&metadata)?.into())
.await?;
metadata.write_to(&self.file_io, &metadata_location).await?;

root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;

Expand All @@ -230,9 +227,7 @@ impl Catalog for MemoryCatalog {
let root_namespace_state = self.root_namespace_state.lock().await;

let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
let input_file = self.file_io.new_input(metadata_location)?;
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;

Table::builder()
.file_io(self.file_io.clone())
Expand Down Expand Up @@ -284,9 +279,7 @@ impl Catalog for MemoryCatalog {
let mut root_namespace_state = self.root_namespace_state.lock().await;
root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;

let input_file = self.file_io.new_input(metadata_location.clone())?;
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;

Table::builder()
.file_io(self.file_io.clone())
Expand Down
71 changes: 71 additions & 0 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use super::{
SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
};
use crate::error::{Result, timestamp_ms_to_utc};
use crate::io::FileIO;
use crate::{Error, ErrorKind};

static MAIN_BRANCH: &str = "main";
Expand Down Expand Up @@ -464,6 +465,29 @@ impl TableMetadata {
self.encryption_keys.get(key_id)
}

/// Read table metadata from the given location.
pub async fn read_from(
file_io: &FileIO,
metadata_location: impl AsRef<str>,
) -> Result<TableMetadata> {
let input_file = file_io.new_input(metadata_location)?;
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
Ok(metadata)
}

/// Write table metadata to the given location.
pub async fn write_to(
&self,
file_io: &FileIO,
metadata_location: impl AsRef<str>,
) -> Result<()> {
file_io
.new_output(metadata_location)?
.write(serde_json::to_vec(self)?.into())
.await
}

/// Normalize this partition spec.
///
/// This is an internal method
Expand Down Expand Up @@ -1355,10 +1379,12 @@ mod tests {

use anyhow::Result;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use uuid::Uuid;

use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
use crate::TableCreation;
use crate::io::FileIOBuilder;
use crate::spec::table_metadata::TableMetadata;
use crate::spec::{
BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PartitionStatisticsFile,
Expand Down Expand Up @@ -3050,4 +3076,49 @@ mod tests {
)])
);
}

#[tokio::test]
async fn test_table_metadata_io_read_write() {
// Create a temporary directory for our test
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().to_str().unwrap();

// Create a FileIO instance
let file_io = FileIOBuilder::new_fs_io().build().unwrap();

// Use an existing test metadata from the test files
let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");

// Define the metadata location
let metadata_location = format!("{}/metadata.json", temp_path);

// Write the metadata
original_metadata
.write_to(&file_io, &metadata_location)
.await
.unwrap();

// Verify the file exists
assert!(fs::metadata(&metadata_location).is_ok());

// Read the metadata back
let read_metadata = TableMetadata::read_from(&file_io, &metadata_location)
.await
.unwrap();

// Verify the metadata matches
assert_eq!(read_metadata, original_metadata);
}

#[tokio::test]
async fn test_table_metadata_io_read_nonexistent_file() {
// Create a FileIO instance
let file_io = FileIOBuilder::new_fs_io().build().unwrap();

// Try to read a non-existent file
let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await;

// Verify it returns an error
assert!(result.is_err());
}
}
Loading