Skip to content

Commit 5c342f3

Browse files
committed
add register table in rest catalog
1 parent 145afdf commit 5c342f3

File tree

3 files changed

+212
-6
lines changed

3 files changed

+212
-6
lines changed

crates/catalog/rest/src/catalog.rs

Lines changed: 169 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::client::{
4141
use crate::types::{
4242
CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest,
4343
ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde,
44-
RenameTableRequest,
44+
RegisterTableRequest, RenameTableRequest,
4545
};
4646

4747
const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1";
@@ -100,6 +100,10 @@ impl RestCatalogConfig {
100100
self.url_prefixed(&["tables", "rename"])
101101
}
102102

103+
fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String {
104+
self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"])
105+
}
106+
103107
fn table_endpoint(&self, table: &TableIdent) -> String {
104108
self.url_prefixed(&[
105109
"namespaces",
@@ -237,7 +241,7 @@ struct RestContext {
237241
pub struct RestCatalog {
238242
/// User config is stored as-is and never be changed.
239243
///
240-
/// It's could be different from the config fetched from the server and used at runtime.
244+
/// It could be different from the config fetched from the server and used at runtime.
241245
user_config: RestCatalogConfig,
242246
ctx: OnceCell<RestContext>,
243247
}
@@ -745,10 +749,86 @@ impl Catalog for RestCatalog {
745749
_table_ident: &TableIdent,
746750
_metadata_location: String,
747751
) -> Result<Table> {
748-
Err(Error::new(
749-
ErrorKind::FeatureUnsupported,
750-
"Registering a table is not supported yet",
751-
))
752+
let context = self.context().await?;
753+
754+
let request = context
755+
.client
756+
.request(
757+
Method::POST,
758+
context
759+
.config
760+
.register_table_endpoint(_table_ident.namespace()),
761+
)
762+
.json(&RegisterTableRequest {
763+
name: _table_ident.name.clone(),
764+
metadata_location: _metadata_location.clone(),
765+
})
766+
.build()?;
767+
768+
let http_response = context.client.query_catalog(request).await?;
769+
770+
let response: LoadTableResponse = match http_response.status() {
771+
StatusCode::OK => {
772+
deserialize_catalog_response::<LoadTableResponse>(http_response).await?
773+
}
774+
StatusCode::BAD_REQUEST => {
775+
return Err(Error::new(
776+
ErrorKind::Unexpected,
777+
"Unexpected error while registering table.",
778+
));
779+
}
780+
StatusCode::UNAUTHORIZED => {
781+
return Err(Error::new(
782+
ErrorKind::Unexpected,
783+
"Authenticated user does not have the necessary permissions.",
784+
));
785+
}
786+
StatusCode::FORBIDDEN => {
787+
return Err(Error::new(
788+
ErrorKind::Unexpected,
789+
"Authenticated user does not have the necessary permissions.",
790+
));
791+
}
792+
StatusCode::NOT_FOUND => {
793+
return Err(Error::new(
794+
ErrorKind::NamespaceNotFound,
795+
"The namespace specified does not exist.",
796+
));
797+
}
798+
StatusCode::CONFLICT => {
799+
return Err(Error::new(
800+
ErrorKind::TableAlreadyExists,
801+
"The given table already exists.",
802+
));
803+
}
804+
StatusCode::SERVICE_UNAVAILABLE => {
805+
return Err(Error::new(
806+
ErrorKind::Unexpected,
807+
"The service is not ready to handle the request.",
808+
));
809+
}
810+
StatusCode::INTERNAL_SERVER_ERROR => {
811+
return Err(Error::new(
812+
ErrorKind::Unexpected,
813+
"An unknown server-side problem occurred; the commit state is unknown.",
814+
));
815+
}
816+
_ => return Err(deserialize_unexpected_catalog_error(http_response).await),
817+
};
818+
819+
let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
820+
ErrorKind::DataInvalid,
821+
"Metadata location missing in `register_table` response!",
822+
))?;
823+
824+
let file_io = self.load_file_io(Some(metadata_location), None).await?;
825+
826+
Table::builder()
827+
.identifier(_table_ident.clone())
828+
.file_io(file_io)
829+
.metadata(response.metadata)
830+
.metadata_location(metadata_location.clone())
831+
.build()
752832
}
753833

754834
async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
@@ -2457,4 +2537,87 @@ mod tests {
24572537
update_table_mock.assert_async().await;
24582538
load_table_mock.assert_async().await;
24592539
}
2540+
2541+
#[tokio::test]
2542+
async fn test_register_table() {
2543+
let mut server = Server::new_async().await;
2544+
2545+
let config_mock = create_config_mock(&mut server).await;
2546+
2547+
let register_table_mock = server
2548+
.mock("POST", "/v1/namespaces/ns1/register")
2549+
.with_status(200)
2550+
.with_body_from_file(format!(
2551+
"{}/testdata/{}",
2552+
env!("CARGO_MANIFEST_DIR"),
2553+
"load_table_response.json"
2554+
))
2555+
.create_async()
2556+
.await;
2557+
2558+
let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2559+
let table_ident =
2560+
TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2561+
let metadata_location = String::from(
2562+
"s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2563+
);
2564+
2565+
let table = catalog
2566+
.register_table(&table_ident, metadata_location)
2567+
.await
2568+
.unwrap();
2569+
2570+
assert_eq!(
2571+
&TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
2572+
table.identifier()
2573+
);
2574+
assert_eq!(
2575+
"s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2576+
table.metadata_location().unwrap()
2577+
);
2578+
2579+
config_mock.assert_async().await;
2580+
register_table_mock.assert_async().await;
2581+
}
2582+
2583+
#[tokio::test]
2584+
async fn test_register_table_404() {
2585+
let mut server = Server::new_async().await;
2586+
2587+
let config_mock = create_config_mock(&mut server).await;
2588+
2589+
let register_table_mock = server
2590+
.mock("POST", "/v1/namespaces/ns1/register")
2591+
.with_status(404)
2592+
.with_body(
2593+
r#"
2594+
{
2595+
"error": {
2596+
"message": "The namespace specified does not exist",
2597+
"type": "NoSuchNamespaceErrorException",
2598+
"code": 404
2599+
}
2600+
}
2601+
"#,
2602+
)
2603+
.create_async()
2604+
.await;
2605+
2606+
let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2607+
2608+
let table_ident =
2609+
TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string());
2610+
let metadata_location = String::from(
2611+
"s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
2612+
);
2613+
let table = catalog
2614+
.register_table(&table_ident, metadata_location)
2615+
.await;
2616+
2617+
assert!(table.is_err());
2618+
assert!(table.err().unwrap().message().contains("does not exist"));
2619+
2620+
config_mock.assert_async().await;
2621+
register_table_mock.assert_async().await;
2622+
}
24602623
}

crates/catalog/rest/src/types.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,3 +191,10 @@ pub(super) struct CommitTableResponse {
191191
pub(super) metadata_location: String,
192192
pub(super) metadata: TableMetadata,
193193
}
194+
195+
#[derive(Debug, Serialize, Deserialize)]
196+
#[serde(rename_all = "kebab-case")]
197+
pub(super) struct RegisterTableRequest {
198+
pub(super) name: String,
199+
pub(super) metadata_location: String,
200+
}

crates/catalog/rest/tests/rest_catalog_test.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,3 +407,39 @@ async fn test_list_empty_multi_level_namespace() {
407407
.unwrap();
408408
assert!(nss.is_empty());
409409
}
410+
411+
#[tokio::test]
412+
async fn test_register_table() {
413+
let catalog = get_catalog().await;
414+
415+
// Create namespace
416+
let ns = NamespaceIdent::from_strs(["ns"]).unwrap();
417+
catalog.create_namespace(&ns, HashMap::new()).await.unwrap();
418+
419+
// Create the table, store the metadata location, drop the table
420+
let empty_schema = Schema::builder().build().unwrap();
421+
let table_creation = TableCreation::builder()
422+
.name("t1".to_string())
423+
.schema(empty_schema)
424+
.build();
425+
426+
let table = catalog.create_table(&ns, table_creation).await.unwrap();
427+
428+
let metadata_location = table.metadata_location().unwrap();
429+
catalog.drop_table(table.identifier()).await.unwrap();
430+
431+
let new_table_identifier = TableIdent::from_strs(["ns", "t2"]).unwrap();
432+
let table_registered = catalog
433+
.register_table(&new_table_identifier, metadata_location.to_string())
434+
.await
435+
.unwrap();
436+
437+
assert_eq!(
438+
table.metadata_location(),
439+
table_registered.metadata_location()
440+
);
441+
assert_ne!(
442+
table.identifier().to_string(),
443+
table_registered.identifier().to_string()
444+
);
445+
}

0 commit comments

Comments
 (0)