From a65f77182b3a4600b5ba39a553298ce6ee4c9998 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 20 Jul 2023 16:23:15 +0200 Subject: [PATCH 1/2] eager database schedule on allocate --- libsqlx-server/src/http/admin.rs | 13 ++++++++----- libsqlx-server/src/http/user/extractors.rs | 2 +- libsqlx-server/src/linc/bus.rs | 4 ++++ libsqlx-server/src/main.rs | 2 +- libsqlx-server/src/manager.rs | 14 ++++++++++++-- libsqlx-server/src/meta.rs | 5 +++-- 6 files changed, 29 insertions(+), 11 deletions(-) diff --git a/libsqlx-server/src/http/admin.rs b/libsqlx-server/src/http/admin.rs index 9323bcdd..8a28f4bc 100644 --- a/libsqlx-server/src/http/admin.rs +++ b/libsqlx-server/src/http/admin.rs @@ -10,16 +10,17 @@ use hyper::server::accept::Accept; use serde::{Deserialize, Deserializer, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; -use crate::meta::Store; +use crate::linc::bus::Bus; +use crate::manager::Manager; use crate::allocation::config::{AllocConfig, DbConfig}; use crate::linc::NodeId; pub struct Config { - pub meta_store: Arc, + pub bus: Arc>>, } struct AdminServerState { - meta_store: Arc, + bus: Arc>>, } pub async fn run_admin_api(config: Config, listener: I) -> Result<()> @@ -28,7 +29,7 @@ where I::Conn: AsyncRead + AsyncWrite + Send + Unpin + 'static, { let state = AdminServerState { - meta_store: config.meta_store, + bus: config.bus, }; let app = Router::new() @@ -114,7 +115,9 @@ async fn allocate( }, }, }; - state.meta_store.allocate(&req.alloc_id, &config).await; + + let dispatcher = state.bus.clone(); + state.bus.handler().allocate(&req.alloc_id, &config, dispatcher).await; Ok(Json(AllocateResp {})) } diff --git a/libsqlx-server/src/http/user/extractors.rs b/libsqlx-server/src/http/user/extractors.rs index 962eb060..582b0fd6 100644 --- a/libsqlx-server/src/http/user/extractors.rs +++ b/libsqlx-server/src/http/user/extractors.rs @@ -20,7 +20,7 @@ impl FromRequestParts> for Database { let Ok(host_str) = std::str::from_utf8(host.as_bytes()) else {return Err(UserApiError::MissingHost)}; let db_name = parse_host(host_str)?; let db_id = DatabaseId::from_name(db_name); - let Some(sender) = state.manager.alloc(db_id, state.bus.clone()).await else { return Err(UserApiError::UnknownDatabase(db_name.to_owned())) }; + let Some(sender) = state.manager.schedule(db_id, state.bus.clone()).await else { return Err(UserApiError::UnknownDatabase(db_name.to_owned())) }; Ok(Database { sender }) } diff --git a/libsqlx-server/src/linc/bus.rs b/libsqlx-server/src/linc/bus.rs index 7c7b70dd..5072c8ae 100644 --- a/libsqlx-server/src/linc/bus.rs +++ b/libsqlx-server/src/linc/bus.rs @@ -31,6 +31,10 @@ impl Bus { self.node_id } + pub fn handler(&self) -> &H { + &self.handler + } + pub async fn incomming(self: &Arc, incomming: Inbound) { self.handler.handle(self.clone(), incomming).await; } diff --git a/libsqlx-server/src/main.rs b/libsqlx-server/src/main.rs index 454ae954..742be0b5 100644 --- a/libsqlx-server/src/main.rs +++ b/libsqlx-server/src/main.rs @@ -39,7 +39,7 @@ async fn spawn_admin_api( ) -> Result<()> { let admin_api_listener = TcpListener::bind(config.addr).await?; let fut = run_admin_api( - http::admin::Config { meta_store }, + http::admin::Config { manager: meta_store }, AddrIncoming::from_listener(admin_api_listener)?, ); set.spawn(fut); diff --git a/libsqlx-server/src/manager.rs b/libsqlx-server/src/manager.rs index 414e17bf..64f63437 100644 --- a/libsqlx-server/src/manager.rs +++ b/libsqlx-server/src/manager.rs @@ -6,6 +6,7 @@ use moka::future::Cache; use tokio::sync::mpsc; use tokio::task::JoinSet; +use crate::allocation::config::AllocConfig; use crate::allocation::{Allocation, AllocationMessage, Database}; use crate::hrana; use crate::linc::bus::Dispatch; @@ -31,7 +32,7 @@ impl Manager { } /// Returns a handle to an allocation, lazily initializing if it isn't already loaded. - pub async fn alloc( + pub async fn schedule( self: &Arc, database_id: DatabaseId, dispatcher: Arc, @@ -65,6 +66,15 @@ impl Manager { None } + + pub async fn allocate(self: &Arc, database_name: &str, meta: &AllocConfig, dispatcher: Arc) { + let id = self.store().allocate(database_name, meta).await; + self.schedule(id, dispatcher).await; + } + + pub fn store(&self) -> &Store { + &self.meta_store + } } #[async_trait::async_trait] @@ -72,7 +82,7 @@ impl Handler for Arc { async fn handle(&self, bus: Arc, msg: Inbound) { if let Some(sender) = self .clone() - .alloc(msg.enveloppe.database_id.unwrap(), bus.clone()) + .schedule(msg.enveloppe.database_id.unwrap(), bus.clone()) .await { let _ = sender.send(AllocationMessage::Inbound(msg)).await; diff --git a/libsqlx-server/src/meta.rs b/libsqlx-server/src/meta.rs index 0167497b..0770e7ed 100644 --- a/libsqlx-server/src/meta.rs +++ b/libsqlx-server/src/meta.rs @@ -56,16 +56,17 @@ impl Store { Self { meta_store } } - pub async fn allocate(&self, database_name: &str, meta: &AllocConfig) { + pub async fn allocate(&self, database_name: &str, meta: &AllocConfig) -> DatabaseId { //TODO: Handle conflict + let id = DatabaseId::from_name(database_name); block_in_place(|| { let meta_bytes = bincode::serialize(meta).unwrap(); - let id = DatabaseId::from_name(database_name); self.meta_store .compare_and_swap(id, None as Option<&[u8]>, Some(meta_bytes)) .unwrap() .unwrap(); }); + id } pub async fn deallocate(&self, _database_name: &str) { From 5d16027c9e24f1a95a1edf4f70d235bec0741eaf Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 20 Jul 2023 16:50:49 +0200 Subject: [PATCH 2/2] deallocate database --- libsqlx-server/src/allocation/mod.rs | 22 ++++++------- libsqlx-server/src/http/admin.rs | 47 ++++++++++++++++++---------- libsqlx-server/src/main.rs | 6 ++-- libsqlx-server/src/manager.rs | 18 +++++++++-- libsqlx-server/src/meta.rs | 8 ++--- 5 files changed, 63 insertions(+), 38 deletions(-) diff --git a/libsqlx-server/src/allocation/mod.rs b/libsqlx-server/src/allocation/mod.rs index a41f2cd7..7bea3ddd 100644 --- a/libsqlx-server/src/allocation/mod.rs +++ b/libsqlx-server/src/allocation/mod.rs @@ -590,14 +590,16 @@ impl ReplicaConnection { .unwrap(), BuilderStep::BeginRows => req.builder.begin_rows().unwrap(), BuilderStep::BeginRow => req.builder.begin_row().unwrap(), - BuilderStep::AddRowValue(v) => req.builder.add_row_value((&v).into()).unwrap(), + BuilderStep::AddRowValue(v) => { + req.builder.add_row_value((&v).into()).unwrap() + } BuilderStep::FinishRow => req.builder.finish_row().unwrap(), BuilderStep::FinishRows => req.builder.finish_rows().unwrap(), BuilderStep::Finnalize { is_txn, frame_no } => { let _ = req.builder.finnalize(is_txn, frame_no).unwrap(); finnalized = true; - }, - BuilderStep::FinnalizeError(e) => { + } + BuilderStep::FinnalizeError(e) => { req.builder.finnalize_error(e); finnalized = true; } @@ -625,15 +627,13 @@ impl ConnectionHandler for ReplicaConnection { // self.conn.writer().current_req.timeout.poll() let mut req = self.conn.writer().current_req.lock(); let should_abort_query = match &mut *req { - Some(ref mut req) => { - match req.timeout.as_mut().poll(cx) { - Poll::Ready(_) => { - req.builder.finnalize_error("request timed out".to_string()); - true - } - Poll::Pending => return Poll::Pending, + Some(ref mut req) => match req.timeout.as_mut().poll(cx) { + Poll::Ready(_) => { + req.builder.finnalize_error("request timed out".to_string()); + true } - } + Poll::Pending => return Poll::Pending, + }, None => return Poll::Ready(()), }; diff --git a/libsqlx-server/src/http/admin.rs b/libsqlx-server/src/http/admin.rs index 8a28f4bc..0e263ddf 100644 --- a/libsqlx-server/src/http/admin.rs +++ b/libsqlx-server/src/http/admin.rs @@ -1,19 +1,20 @@ -use std::sync::Arc; use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; +use axum::extract::{Path, State}; +use axum::routing::{delete, post}; use axum::{Json, Router}; -use axum::routing::post; -use axum::extract::State; use color_eyre::eyre::Result; use hyper::server::accept::Accept; use serde::{Deserialize, Deserializer, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; -use crate::linc::bus::Bus; -use crate::manager::Manager; use crate::allocation::config::{AllocConfig, DbConfig}; +use crate::linc::bus::Bus; use crate::linc::NodeId; +use crate::manager::Manager; +use crate::meta::DatabaseId; pub struct Config { pub bus: Arc>>, @@ -28,12 +29,11 @@ where I: Accept, I::Conn: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - let state = AdminServerState { - bus: config.bus, - }; + let state = AdminServerState { bus: config.bus }; let app = Router::new() .route("/manage/allocation", post(allocate).get(list_allocs)) + .route("/manage/allocation/:db_name", delete(deallocate)) .with_state(Arc::new(state)); axum::Server::builder(listener) .serve(app.into_make_service()) @@ -50,7 +50,7 @@ struct AllocateResp {} #[derive(Deserialize, Debug)] struct AllocateReq { - alloc_id: String, + database_name: String, max_conccurent_connection: Option, config: DbConfigReq, } @@ -61,7 +61,10 @@ pub enum DbConfigReq { Primary {}, Replica { primary_node_id: NodeId, - #[serde(deserialize_with = "deserialize_duration", default = "default_proxy_timeout")] + #[serde( + deserialize_with = "deserialize_duration", + default = "default_proxy_timeout" + )] proxy_request_timeout_duration: Duration, }, } @@ -79,8 +82,8 @@ where type Value = Duration; fn visit_str(self, v: &str) -> std::result::Result - where - E: serde::de::Error, + where + E: serde::de::Error, { match humantime::Duration::from_str(v) { Ok(d) => Ok(*d), @@ -91,7 +94,6 @@ where fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.write_str("a duration, in a string format") } - } deserializer.deserialize_str(Visitor) @@ -103,7 +105,7 @@ async fn allocate( ) -> Result, Json> { let config = AllocConfig { max_conccurent_connection: req.max_conccurent_connection.unwrap_or(16), - db_name: req.alloc_id.clone(), + db_name: req.database_name.clone(), db_config: match req.config { DbConfigReq::Primary {} => DbConfig::Primary {}, DbConfigReq::Replica { @@ -117,7 +119,18 @@ async fn allocate( }; let dispatcher = state.bus.clone(); - state.bus.handler().allocate(&req.alloc_id, &config, dispatcher).await; + let id = DatabaseId::from_name(&req.database_name); + state.bus.handler().allocate(id, &config, dispatcher).await; + + Ok(Json(AllocateResp {})) +} + +async fn deallocate( + State(state): State>, + Path(database_name): Path, +) -> Result, Json> { + let id = DatabaseId::from_name(&database_name); + state.bus.handler().deallocate(id).await; Ok(Json(AllocateResp {})) } @@ -136,7 +149,9 @@ async fn list_allocs( State(state): State>, ) -> Result, Json> { let allocs = state - .meta_store + .bus + .handler() + .store() .list_allocs() .await .into_iter() diff --git a/libsqlx-server/src/main.rs b/libsqlx-server/src/main.rs index 742be0b5..296c54c9 100644 --- a/libsqlx-server/src/main.rs +++ b/libsqlx-server/src/main.rs @@ -35,11 +35,11 @@ struct Args { async fn spawn_admin_api( set: &mut JoinSet>, config: &AdminApiConfig, - meta_store: Arc, + bus: Arc>>, ) -> Result<()> { let admin_api_listener = TcpListener::bind(config.addr).await?; let fut = run_admin_api( - http::admin::Config { manager: meta_store }, + http::admin::Config { bus }, AddrIncoming::from_listener(admin_api_listener)?, ); set.spawn(fut); @@ -98,7 +98,7 @@ async fn main() -> Result<()> { let bus = Arc::new(Bus::new(config.cluster.id, manager.clone())); spawn_cluster_networking(&mut join_set, &config.cluster, bus.clone()).await?; - spawn_admin_api(&mut join_set, &config.admin_api, store.clone()).await?; + spawn_admin_api(&mut join_set, &config.admin_api, bus.clone()).await?; spawn_user_api(&mut join_set, &config.user_api, manager, bus).await?; join_set.join_next().await; diff --git a/libsqlx-server/src/manager.rs b/libsqlx-server/src/manager.rs index 64f63437..69d1376f 100644 --- a/libsqlx-server/src/manager.rs +++ b/libsqlx-server/src/manager.rs @@ -67,9 +67,21 @@ impl Manager { None } - pub async fn allocate(self: &Arc, database_name: &str, meta: &AllocConfig, dispatcher: Arc) { - let id = self.store().allocate(database_name, meta).await; - self.schedule(id, dispatcher).await; + pub async fn allocate( + self: &Arc, + database_id: DatabaseId, + meta: &AllocConfig, + dispatcher: Arc, + ) { + self.store().allocate(database_id, meta).await; + self.schedule(database_id, dispatcher).await; + } + + pub async fn deallocate(&self, database_id: DatabaseId) { + self.meta_store.deallocate(database_id).await; + self.cache.remove(&database_id).await; + let db_path = self.db_path.join("dbs").join(database_id.to_string()); + tokio::fs::remove_dir_all(db_path).await.unwrap(); } pub fn store(&self) -> &Store { diff --git a/libsqlx-server/src/meta.rs b/libsqlx-server/src/meta.rs index 0770e7ed..0d61d04f 100644 --- a/libsqlx-server/src/meta.rs +++ b/libsqlx-server/src/meta.rs @@ -56,9 +56,8 @@ impl Store { Self { meta_store } } - pub async fn allocate(&self, database_name: &str, meta: &AllocConfig) -> DatabaseId { + pub async fn allocate(&self, id: DatabaseId, meta: &AllocConfig) { //TODO: Handle conflict - let id = DatabaseId::from_name(database_name); block_in_place(|| { let meta_bytes = bincode::serialize(meta).unwrap(); self.meta_store @@ -66,11 +65,10 @@ impl Store { .unwrap() .unwrap(); }); - id } - pub async fn deallocate(&self, _database_name: &str) { - todo!() + pub async fn deallocate(&self, id: DatabaseId) { + block_in_place(|| self.meta_store.remove(id).unwrap()); } pub async fn meta(&self, database_id: &DatabaseId) -> Option {