Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

deallocate #539

Merged
merged 2 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions libsqlx-server/src/allocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,14 +590,16 @@ impl ReplicaConnection {
.unwrap(),
BuilderStep::BeginRows => req.builder.begin_rows().unwrap(),
BuilderStep::BeginRow => req.builder.begin_row().unwrap(),
BuilderStep::AddRowValue(v) => req.builder.add_row_value((&v).into()).unwrap(),
BuilderStep::AddRowValue(v) => {
req.builder.add_row_value((&v).into()).unwrap()
}
BuilderStep::FinishRow => req.builder.finish_row().unwrap(),
BuilderStep::FinishRows => req.builder.finish_rows().unwrap(),
BuilderStep::Finnalize { is_txn, frame_no } => {
let _ = req.builder.finnalize(is_txn, frame_no).unwrap();
finnalized = true;
},
BuilderStep::FinnalizeError(e) => {
}
BuilderStep::FinnalizeError(e) => {
req.builder.finnalize_error(e);
finnalized = true;
}
Expand Down Expand Up @@ -625,15 +627,13 @@ impl ConnectionHandler for ReplicaConnection {
// self.conn.writer().current_req.timeout.poll()
let mut req = self.conn.writer().current_req.lock();
let should_abort_query = match &mut *req {
Some(ref mut req) => {
match req.timeout.as_mut().poll(cx) {
Poll::Ready(_) => {
req.builder.finnalize_error("request timed out".to_string());
true
}
Poll::Pending => return Poll::Pending,
Some(ref mut req) => match req.timeout.as_mut().poll(cx) {
Poll::Ready(_) => {
req.builder.finnalize_error("request timed out".to_string());
true
}
}
Poll::Pending => return Poll::Pending,
},
None => return Poll::Ready(()),
};

Expand Down
52 changes: 35 additions & 17 deletions libsqlx-server/src/http/admin.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,39 @@
use std::sync::Arc;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use axum::extract::{Path, State};
use axum::routing::{delete, post};
use axum::{Json, Router};
use axum::routing::post;
use axum::extract::State;
use color_eyre::eyre::Result;
use hyper::server::accept::Accept;
use serde::{Deserialize, Deserializer, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};

use crate::meta::Store;
use crate::allocation::config::{AllocConfig, DbConfig};
use crate::linc::bus::Bus;
use crate::linc::NodeId;
use crate::manager::Manager;
use crate::meta::DatabaseId;

pub struct Config {
pub meta_store: Arc<Store>,
pub bus: Arc<Bus<Arc<Manager>>>,
}

struct AdminServerState {
meta_store: Arc<Store>,
bus: Arc<Bus<Arc<Manager>>>,
}

pub async fn run_admin_api<I>(config: Config, listener: I) -> Result<()>
where
I: Accept<Error = std::io::Error>,
I::Conn: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
let state = AdminServerState {
meta_store: config.meta_store,
};
let state = AdminServerState { bus: config.bus };

let app = Router::new()
.route("/manage/allocation", post(allocate).get(list_allocs))
.route("/manage/allocation/:db_name", delete(deallocate))
.with_state(Arc::new(state));
axum::Server::builder(listener)
.serve(app.into_make_service())
Expand All @@ -49,7 +50,7 @@ struct AllocateResp {}

#[derive(Deserialize, Debug)]
struct AllocateReq {
alloc_id: String,
database_name: String,
max_conccurent_connection: Option<u32>,
config: DbConfigReq,
}
Expand All @@ -60,7 +61,10 @@ pub enum DbConfigReq {
Primary {},
Replica {
primary_node_id: NodeId,
#[serde(deserialize_with = "deserialize_duration", default = "default_proxy_timeout")]
#[serde(
deserialize_with = "deserialize_duration",
default = "default_proxy_timeout"
)]
proxy_request_timeout_duration: Duration,
},
}
Expand All @@ -78,8 +82,8 @@ where
type Value = Duration;

fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
where
E: serde::de::Error,
{
match humantime::Duration::from_str(v) {
Ok(d) => Ok(*d),
Expand All @@ -90,7 +94,6 @@ where
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.write_str("a duration, in a string format")
}

}

deserializer.deserialize_str(Visitor)
Expand All @@ -102,7 +105,7 @@ async fn allocate(
) -> Result<Json<AllocateResp>, Json<ErrorResponse>> {
let config = AllocConfig {
max_conccurent_connection: req.max_conccurent_connection.unwrap_or(16),
db_name: req.alloc_id.clone(),
db_name: req.database_name.clone(),
db_config: match req.config {
DbConfigReq::Primary {} => DbConfig::Primary {},
DbConfigReq::Replica {
Expand All @@ -114,7 +117,20 @@ async fn allocate(
},
},
};
state.meta_store.allocate(&req.alloc_id, &config).await;

let dispatcher = state.bus.clone();
let id = DatabaseId::from_name(&req.database_name);
state.bus.handler().allocate(id, &config, dispatcher).await;

Ok(Json(AllocateResp {}))
}

async fn deallocate(
State(state): State<Arc<AdminServerState>>,
Path(database_name): Path<String>,
) -> Result<Json<AllocateResp>, Json<ErrorResponse>> {
let id = DatabaseId::from_name(&database_name);
state.bus.handler().deallocate(id).await;

Ok(Json(AllocateResp {}))
}
Expand All @@ -133,7 +149,9 @@ async fn list_allocs(
State(state): State<Arc<AdminServerState>>,
) -> Result<Json<ListAllocResp>, Json<ErrorResponse>> {
let allocs = state
.meta_store
.bus
.handler()
.store()
.list_allocs()
.await
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion libsqlx-server/src/http/user/extractors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl FromRequestParts<Arc<UserApiState>> for Database {
let Ok(host_str) = std::str::from_utf8(host.as_bytes()) else {return Err(UserApiError::MissingHost)};
let db_name = parse_host(host_str)?;
let db_id = DatabaseId::from_name(db_name);
let Some(sender) = state.manager.alloc(db_id, state.bus.clone()).await else { return Err(UserApiError::UnknownDatabase(db_name.to_owned())) };
let Some(sender) = state.manager.schedule(db_id, state.bus.clone()).await else { return Err(UserApiError::UnknownDatabase(db_name.to_owned())) };

Ok(Database { sender })
}
Expand Down
4 changes: 4 additions & 0 deletions libsqlx-server/src/linc/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ impl<H: Handler> Bus<H> {
self.node_id
}

pub fn handler(&self) -> &H {
&self.handler
}

pub async fn incomming(self: &Arc<Self>, incomming: Inbound) {
self.handler.handle(self.clone(), incomming).await;
}
Expand Down
6 changes: 3 additions & 3 deletions libsqlx-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ struct Args {
async fn spawn_admin_api(
set: &mut JoinSet<Result<()>>,
config: &AdminApiConfig,
meta_store: Arc<Store>,
bus: Arc<Bus<Arc<Manager>>>,
) -> Result<()> {
let admin_api_listener = TcpListener::bind(config.addr).await?;
let fut = run_admin_api(
http::admin::Config { meta_store },
http::admin::Config { bus },
AddrIncoming::from_listener(admin_api_listener)?,
);
set.spawn(fut);
Expand Down Expand Up @@ -98,7 +98,7 @@ async fn main() -> Result<()> {
let bus = Arc::new(Bus::new(config.cluster.id, manager.clone()));

spawn_cluster_networking(&mut join_set, &config.cluster, bus.clone()).await?;
spawn_admin_api(&mut join_set, &config.admin_api, store.clone()).await?;
spawn_admin_api(&mut join_set, &config.admin_api, bus.clone()).await?;
spawn_user_api(&mut join_set, &config.user_api, manager, bus).await?;

join_set.join_next().await;
Expand Down
26 changes: 24 additions & 2 deletions libsqlx-server/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use moka::future::Cache;
use tokio::sync::mpsc;
use tokio::task::JoinSet;

use crate::allocation::config::AllocConfig;
use crate::allocation::{Allocation, AllocationMessage, Database};
use crate::hrana;
use crate::linc::bus::Dispatch;
Expand All @@ -31,7 +32,7 @@ impl Manager {
}

/// Returns a handle to an allocation, lazily initializing if it isn't already loaded.
pub async fn alloc(
pub async fn schedule(
self: &Arc<Self>,
database_id: DatabaseId,
dispatcher: Arc<dyn Dispatch>,
Expand Down Expand Up @@ -65,14 +66,35 @@ impl Manager {

None
}

pub async fn allocate(
self: &Arc<Self>,
database_id: DatabaseId,
meta: &AllocConfig,
dispatcher: Arc<dyn Dispatch>,
) {
self.store().allocate(database_id, meta).await;
self.schedule(database_id, dispatcher).await;
}

pub async fn deallocate(&self, database_id: DatabaseId) {
self.meta_store.deallocate(database_id).await;
self.cache.remove(&database_id).await;
let db_path = self.db_path.join("dbs").join(database_id.to_string());
tokio::fs::remove_dir_all(db_path).await.unwrap();
}

pub fn store(&self) -> &Store {
&self.meta_store
}
}

#[async_trait::async_trait]
impl Handler for Arc<Manager> {
async fn handle(&self, bus: Arc<dyn Dispatch>, msg: Inbound) {
if let Some(sender) = self
.clone()
.alloc(msg.enveloppe.database_id.unwrap(), bus.clone())
.schedule(msg.enveloppe.database_id.unwrap(), bus.clone())
.await
{
let _ = sender.send(AllocationMessage::Inbound(msg)).await;
Expand Down
7 changes: 3 additions & 4 deletions libsqlx-server/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,19 @@ impl Store {
Self { meta_store }
}

pub async fn allocate(&self, database_name: &str, meta: &AllocConfig) {
pub async fn allocate(&self, id: DatabaseId, meta: &AllocConfig) {
//TODO: Handle conflict
block_in_place(|| {
let meta_bytes = bincode::serialize(meta).unwrap();
let id = DatabaseId::from_name(database_name);
self.meta_store
.compare_and_swap(id, None as Option<&[u8]>, Some(meta_bytes))
.unwrap()
.unwrap();
});
}

pub async fn deallocate(&self, _database_name: &str) {
todo!()
pub async fn deallocate(&self, id: DatabaseId) {
block_in_place(|| self.meta_store.remove(id).unwrap());
}

pub async fn meta(&self, database_id: &DatabaseId) -> Option<AllocConfig> {
Expand Down