@@ -41,7 +41,7 @@ use crate::client::{
41
41
use crate :: types:: {
42
42
CatalogConfig , CommitTableRequest , CommitTableResponse , CreateTableRequest ,
43
43
ListNamespaceResponse , ListTableResponse , LoadTableResponse , NamespaceSerde ,
44
- RenameTableRequest ,
44
+ RegisterTableRequest , RenameTableRequest ,
45
45
} ;
46
46
47
47
const ICEBERG_REST_SPEC_VERSION : & str = "0.14.1" ;
@@ -100,6 +100,10 @@ impl RestCatalogConfig {
100
100
self . url_prefixed ( & [ "tables" , "rename" ] )
101
101
}
102
102
103
+ fn register_table_endpoint ( & self , ns : & NamespaceIdent ) -> String {
104
+ self . url_prefixed ( & [ "namespaces" , & ns. to_url_string ( ) , "register" ] )
105
+ }
106
+
103
107
fn table_endpoint ( & self , table : & TableIdent ) -> String {
104
108
self . url_prefixed ( & [
105
109
"namespaces" ,
@@ -237,7 +241,7 @@ struct RestContext {
237
241
pub struct RestCatalog {
238
242
/// User config is stored as-is and never be changed.
239
243
///
240
- /// It's could be different from the config fetched from the server and used at runtime.
244
+ /// It could be different from the config fetched from the server and used at runtime.
241
245
user_config : RestCatalogConfig ,
242
246
ctx : OnceCell < RestContext > ,
243
247
}
@@ -745,10 +749,86 @@ impl Catalog for RestCatalog {
745
749
_table_ident : & TableIdent ,
746
750
_metadata_location : String ,
747
751
) -> Result < Table > {
748
- Err ( Error :: new (
749
- ErrorKind :: FeatureUnsupported ,
750
- "Registering a table is not supported yet" ,
751
- ) )
752
+ let contex = self . context ( ) . await ?;
753
+
754
+ let request = contex
755
+ . client
756
+ . request (
757
+ Method :: POST ,
758
+ contex
759
+ . config
760
+ . register_table_endpoint ( _table_ident. namespace ( ) ) ,
761
+ )
762
+ . json ( & RegisterTableRequest {
763
+ name : _table_ident. name . clone ( ) ,
764
+ metadata_location : _metadata_location. clone ( ) ,
765
+ } )
766
+ . build ( ) ?;
767
+
768
+ let http_response = contex. client . query_catalog ( request) . await ?;
769
+
770
+ let response: LoadTableResponse = match http_response. status ( ) {
771
+ StatusCode :: OK => {
772
+ deserialize_catalog_response :: < LoadTableResponse > ( http_response) . await ?
773
+ }
774
+ StatusCode :: BAD_REQUEST => {
775
+ return Err ( Error :: new (
776
+ ErrorKind :: Unexpected ,
777
+ "Unexpected error while registering table." ,
778
+ ) ) ;
779
+ }
780
+ StatusCode :: UNAUTHORIZED => {
781
+ return Err ( Error :: new (
782
+ ErrorKind :: Unexpected ,
783
+ "Authenticated user does not have the necessary permissions." ,
784
+ ) ) ;
785
+ }
786
+ StatusCode :: FORBIDDEN => {
787
+ return Err ( Error :: new (
788
+ ErrorKind :: Unexpected ,
789
+ "Authenticated user does not have the necessary permissions." ,
790
+ ) ) ;
791
+ }
792
+ StatusCode :: NOT_FOUND => {
793
+ return Err ( Error :: new (
794
+ ErrorKind :: NamespaceNotFound ,
795
+ "The namespace specified does not exist." ,
796
+ ) ) ;
797
+ }
798
+ StatusCode :: CONFLICT => {
799
+ return Err ( Error :: new (
800
+ ErrorKind :: TableAlreadyExists ,
801
+ "The given table already exists." ,
802
+ ) ) ;
803
+ }
804
+ StatusCode :: SERVICE_UNAVAILABLE => {
805
+ return Err ( Error :: new (
806
+ ErrorKind :: Unexpected ,
807
+ "The service is not ready to handle the request." ,
808
+ ) ) ;
809
+ }
810
+ StatusCode :: INTERNAL_SERVER_ERROR => {
811
+ return Err ( Error :: new (
812
+ ErrorKind :: Unexpected ,
813
+ "An unknown server-side problem occurred; the commit state is unknown." ,
814
+ ) ) ;
815
+ }
816
+ _ => return Err ( deserialize_unexpected_catalog_error ( http_response) . await ) ,
817
+ } ;
818
+
819
+ let metadata_location = response. metadata_location . as_ref ( ) . ok_or ( Error :: new (
820
+ ErrorKind :: DataInvalid ,
821
+ "Metadata location missing in `register_table` response!" ,
822
+ ) ) ?;
823
+
824
+ let file_io = self . load_file_io ( Some ( metadata_location) , None ) . await ?;
825
+
826
+ Table :: builder ( )
827
+ . identifier ( _table_ident. clone ( ) )
828
+ . file_io ( file_io)
829
+ . metadata ( response. metadata )
830
+ . metadata_location ( metadata_location. clone ( ) )
831
+ . build ( )
752
832
}
753
833
754
834
async fn update_table ( & self , mut commit : TableCommit ) -> Result < Table > {
@@ -2457,4 +2537,87 @@ mod tests {
2457
2537
update_table_mock. assert_async ( ) . await ;
2458
2538
load_table_mock. assert_async ( ) . await ;
2459
2539
}
2540
+
2541
+ #[ tokio:: test]
2542
+ async fn test_register_table ( ) {
2543
+ let mut server = Server :: new_async ( ) . await ;
2544
+
2545
+ let config_mock = create_config_mock ( & mut server) . await ;
2546
+
2547
+ let register_table_mock = server
2548
+ . mock ( "POST" , "/v1/namespaces/ns1/register" )
2549
+ . with_status ( 200 )
2550
+ . with_body_from_file ( format ! (
2551
+ "{}/testdata/{}" ,
2552
+ env!( "CARGO_MANIFEST_DIR" ) ,
2553
+ "load_table_response.json"
2554
+ ) )
2555
+ . create_async ( )
2556
+ . await ;
2557
+
2558
+ let catalog = RestCatalog :: new ( RestCatalogConfig :: builder ( ) . uri ( server. url ( ) ) . build ( ) ) ;
2559
+ let table_ident =
2560
+ TableIdent :: new ( NamespaceIdent :: new ( "ns1" . to_string ( ) ) , "test1" . to_string ( ) ) ;
2561
+ let metadata_location = String :: from (
2562
+ "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json" ,
2563
+ ) ;
2564
+
2565
+ let table = catalog
2566
+ . register_table ( & table_ident, metadata_location)
2567
+ . await
2568
+ . unwrap ( ) ;
2569
+
2570
+ assert_eq ! (
2571
+ & TableIdent :: from_strs( vec![ "ns1" , "test1" ] ) . unwrap( ) ,
2572
+ table. identifier( )
2573
+ ) ;
2574
+ assert_eq ! (
2575
+ "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json" ,
2576
+ table. metadata_location( ) . unwrap( )
2577
+ ) ;
2578
+
2579
+ config_mock. assert_async ( ) . await ;
2580
+ register_table_mock. assert_async ( ) . await ;
2581
+ }
2582
+
2583
+ #[ tokio:: test]
2584
+ async fn test_register_table_404 ( ) {
2585
+ let mut server = Server :: new_async ( ) . await ;
2586
+
2587
+ let config_mock = create_config_mock ( & mut server) . await ;
2588
+
2589
+ let register_table_mock = server
2590
+ . mock ( "POST" , "/v1/namespaces/ns1/register" )
2591
+ . with_status ( 404 )
2592
+ . with_body (
2593
+ r#"
2594
+ {
2595
+ "error": {
2596
+ "message": "The namespace specified does not exist",
2597
+ "type": "NoSuchNamespaceErrorException",
2598
+ "code": 404
2599
+ }
2600
+ }
2601
+ "# ,
2602
+ )
2603
+ . create_async ( )
2604
+ . await ;
2605
+
2606
+ let catalog = RestCatalog :: new ( RestCatalogConfig :: builder ( ) . uri ( server. url ( ) ) . build ( ) ) ;
2607
+
2608
+ let table_ident =
2609
+ TableIdent :: new ( NamespaceIdent :: new ( "ns1" . to_string ( ) ) , "test1" . to_string ( ) ) ;
2610
+ let metadata_location = String :: from (
2611
+ "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json" ,
2612
+ ) ;
2613
+ let table = catalog
2614
+ . register_table ( & table_ident, metadata_location)
2615
+ . await ;
2616
+
2617
+ assert ! ( table. is_err( ) ) ;
2618
+ assert ! ( table. err( ) . unwrap( ) . message( ) . contains( "does not exist" ) ) ;
2619
+
2620
+ config_mock. assert_async ( ) . await ;
2621
+ register_table_mock. assert_async ( ) . await ;
2622
+ }
2460
2623
}
0 commit comments