diff --git a/Cargo.lock b/Cargo.lock index 53a34d6f..ed3c9c27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2589,6 +2589,7 @@ dependencies = [ "bytemuck", "bytes 1.4.0", "bytesize", + "chrono", "clap", "color-eyre", "either", diff --git a/libsqlx-server/Cargo.toml b/libsqlx-server/Cargo.toml index 1ce8c6c8..243cb0ea 100644 --- a/libsqlx-server/Cargo.toml +++ b/libsqlx-server/Cargo.toml @@ -14,6 +14,7 @@ bincode = "1.3.3" bytemuck = { version = "1.13.1", features = ["derive"] } bytes = { version = "1.4.0", features = ["serde"] } bytesize = { version = "1.2.0", features = ["serde"] } +chrono = { version = "0.4.26", features = ["serde"] } clap = { version = "4.3.11", features = ["derive"] } color-eyre = "0.6.2" either = "1.8.1" diff --git a/libsqlx-server/src/hrana/error.rs b/libsqlx-server/src/hrana/error.rs index 2324887a..8f8711a1 100644 --- a/libsqlx-server/src/hrana/error.rs +++ b/libsqlx-server/src/hrana/error.rs @@ -18,13 +18,11 @@ pub enum HranaError { } impl HranaError { - pub fn code(&self) -> Option<&str>{ + pub fn code(&self) -> Option<&str> { match self { HranaError::Stmt(e) => Some(e.code()), HranaError::StreamResponse(e) => Some(e.code()), - HranaError::Stream(_) - | HranaError::Libsqlx(_) - | HranaError::Proto(_) => None, + HranaError::Stream(_) | HranaError::Libsqlx(_) | HranaError::Proto(_) => None, } } } diff --git a/libsqlx-server/src/http/admin.rs b/libsqlx-server/src/http/admin.rs index 2bac534c..e90ecfe2 100644 --- a/libsqlx-server/src/http/admin.rs +++ b/libsqlx-server/src/http/admin.rs @@ -4,10 +4,13 @@ use std::sync::Arc; use std::time::Duration; use axum::extract::{Path, State}; +use axum::response::IntoResponse; use axum::routing::{delete, post}; use axum::{Json, Router}; +use chrono::{DateTime, Utc}; use color_eyre::eyre::Result; use hyper::server::accept::Accept; +use hyper::StatusCode; use serde::{Deserialize, Deserializer, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -15,7 +18,35 @@ use crate::allocation::config::{AllocConfig, DbConfig}; use crate::linc::bus::Bus; use crate::linc::NodeId; use crate::manager::Manager; -use crate::meta::DatabaseId; +use crate::meta::{AllocationError, DatabaseId}; + +impl IntoResponse for crate::error::Error { + fn into_response(self) -> axum::response::Response { + #[derive(Serialize)] + struct ErrorBody { + message: String, + } + + let mut resp = Json(ErrorBody { + message: self.to_string(), + }) + .into_response(); + *resp.status_mut() = match self { + crate::error::Error::Libsqlx(_) + | crate::error::Error::InjectorExited + | crate::error::Error::ConnectionClosed + | crate::error::Error::Io(_) + | crate::error::Error::AllocationClosed + | crate::error::Error::Internal(_) + | crate::error::Error::Heed(_) => StatusCode::INTERNAL_SERVER_ERROR, + crate::error::Error::Allocation(AllocationError::AlreadyExist(_)) => { + StatusCode::BAD_REQUEST + } + }; + + resp + } +} pub struct Config { pub bus: Arc>>, @@ -47,7 +78,19 @@ where struct ErrorResponse {} #[derive(Serialize, Debug)] -struct AllocateResp {} +#[serde(rename_all = "lowercase")] +enum DbType { + Primary, + Replica, +} + +#[derive(Serialize, Debug)] +struct AllocationSummaryView { + created_at: DateTime, + database_name: String, + #[serde(rename = "type")] + ty: DbType, +} #[derive(Deserialize, Debug)] struct AllocateReq { @@ -134,7 +177,7 @@ const fn default_txn_timeout() -> HumanDuration { async fn allocate( State(state): State>, Json(req): Json, -) -> Result, Json> { +) -> crate::Result> { let config = AllocConfig { max_conccurent_connection: req.max_conccurent_connection.unwrap_or(16), db_name: req.database_name.clone(), @@ -164,19 +207,26 @@ async fn allocate( let dispatcher = state.bus.clone(); let id = DatabaseId::from_name(&req.database_name); - state.bus.handler().allocate(id, &config, dispatcher).await; + let meta = state.bus.handler().allocate(id, config, dispatcher).await?; - Ok(Json(AllocateResp {})) + Ok(Json(AllocationSummaryView { + created_at: meta.created_at, + database_name: meta.config.db_name, + ty: match meta.config.db_config { + DbConfig::Primary {..} => DbType::Primary, + DbConfig::Replica {..} => DbType::Replica, + } + })) } async fn deallocate( State(state): State>, Path(database_name): Path, -) -> Result, Json> { +) -> crate::Result<()> { let id = DatabaseId::from_name(&database_name); - state.bus.handler().deallocate(id).await; + state.bus.handler().deallocate(id).await?; - Ok(Json(AllocateResp {})) + Ok(()) } #[derive(Serialize, Debug)] @@ -191,15 +241,16 @@ struct AllocView { async fn list_allocs( State(state): State>, -) -> Result, Json> { +) -> crate::Result> { let allocs = state .bus .handler() .store() - .list_allocs() - .unwrap() + .list_allocs()? .into_iter() - .map(|cfg| AllocView { id: cfg.db_name }) + .map(|meta| AllocView { + id: meta.config.db_name, + }) .collect(); Ok(Json(ListAllocResp { allocs })) diff --git a/libsqlx-server/src/http/user/mod.rs b/libsqlx-server/src/http/user/mod.rs index 3653377b..7b43d36d 100644 --- a/libsqlx-server/src/http/user/mod.rs +++ b/libsqlx-server/src/http/user/mod.rs @@ -5,8 +5,8 @@ use axum::response::IntoResponse; use axum::routing::post; use axum::{Json, Router}; use color_eyre::Result; -use hyper::StatusCode; use hyper::server::accept::Accept; +use hyper::StatusCode; use serde::Serialize; use tokio::io::{AsyncRead, AsyncWrite}; @@ -30,12 +30,12 @@ impl IntoResponse for HranaError { fn into_response(self) -> axum::response::Response { let (message, code) = match self.code() { Some(code) => (self.to_string(), code.to_owned()), - None => ("internal error, please check the logs".to_owned(), "INTERNAL_ERROR".to_owned()), - }; - let resp = ErrorResponseBody { - message, - code, + None => ( + "internal error, please check the logs".to_owned(), + "INTERNAL_ERROR".to_owned(), + ), }; + let resp = ErrorResponseBody { message, code }; let mut resp = Json(resp).into_response(); *resp.status_mut() = StatusCode::BAD_REQUEST; resp diff --git a/libsqlx-server/src/manager.rs b/libsqlx-server/src/manager.rs index 686e6fca..2f0cafa1 100644 --- a/libsqlx-server/src/manager.rs +++ b/libsqlx-server/src/manager.rs @@ -12,7 +12,7 @@ use crate::compactor::CompactionQueue; use crate::linc::bus::Dispatch; use crate::linc::handler::Handler; use crate::linc::Inbound; -use crate::meta::{DatabaseId, Store}; +use crate::meta::{AllocMeta, DatabaseId, Store}; use crate::replica_commit_store::ReplicaCommitStore; pub struct Manager { @@ -52,14 +52,14 @@ impl Manager { return Ok(Some(sender.clone())); } - if let Some(config) = self.meta_store.meta(&database_id)? { + if let Some(meta) = self.meta_store.meta(&database_id)? { let path = self.db_path.join("dbs").join(database_id.to_string()); tokio::fs::create_dir_all(&path).await?; let (alloc_sender, inbox) = mpsc::channel(MAX_ALLOC_MESSAGE_QUEUE_LEN); let alloc = Allocation { inbox, database: Database::from_config( - &config, + &meta.config, path, dispatcher.clone(), self.compaction_queue.clone(), @@ -67,9 +67,9 @@ impl Manager { )?, connections_futs: JoinSet::new(), next_conn_id: 0, - max_concurrent_connections: config.max_conccurent_connection, + max_concurrent_connections: meta.config.max_conccurent_connection, dispatcher, - db_name: config.db_name, + db_name: meta.config.db_name, connections: HashMap::new(), }; @@ -86,12 +86,13 @@ impl Manager { pub async fn allocate( self: &Arc, database_id: DatabaseId, - meta: &AllocConfig, + config: AllocConfig, dispatcher: Arc, - ) -> crate::Result<()> { - self.store().allocate(&database_id, meta)?; + ) -> crate::Result { + let meta = self.store().allocate(&database_id, config)?; self.schedule(database_id, dispatcher).await?; - Ok(()) + + Ok(meta) } pub async fn deallocate(&self, database_id: DatabaseId) -> crate::Result<()> { diff --git a/libsqlx-server/src/meta.rs b/libsqlx-server/src/meta.rs index baefaeaf..a2a3ac87 100644 --- a/libsqlx-server/src/meta.rs +++ b/libsqlx-server/src/meta.rs @@ -1,6 +1,7 @@ use std::fmt; use std::mem::size_of; +use chrono::{DateTime, Utc}; use heed::bytemuck::{Pod, Zeroable}; use heed_types::{OwnedType, SerdeBincode}; use itertools::Itertools; @@ -11,9 +12,15 @@ use tokio::task::block_in_place; use crate::allocation::config::AllocConfig; +#[derive(Debug, Serialize, Deserialize)] +pub struct AllocMeta { + pub config: AllocConfig, + pub created_at: DateTime, +} + pub struct Store { env: heed::Env, - alloc_config_db: heed::Database, SerdeBincode>, + alloc_config_db: heed::Database, SerdeBincode>, } #[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Hash, Clone, Copy, Pod, Zeroable)] @@ -73,7 +80,7 @@ impl Store { }) } - pub fn allocate(&self, id: &DatabaseId, meta: &AllocConfig) -> crate::Result<()> { + pub fn allocate(&self, id: &DatabaseId, config: AllocConfig) -> crate::Result { block_in_place(|| { let mut txn = self.env.write_txn()?; if self @@ -82,14 +89,19 @@ impl Store { .get(&txn, id)? .is_some() { - Err(AllocationError::AlreadyExist(meta.db_name.clone()))?; + Err(AllocationError::AlreadyExist(config.db_name.clone()))?; }; - self.alloc_config_db.put(&mut txn, id, meta)?; + let meta = AllocMeta { + config, + created_at: Utc::now(), + }; + + self.alloc_config_db.put(&mut txn, id, &meta)?; txn.commit()?; - Ok(()) + Ok(meta) }) } @@ -103,14 +115,14 @@ impl Store { }) } - pub fn meta(&self, id: &DatabaseId) -> crate::Result> { + pub fn meta(&self, id: &DatabaseId) -> crate::Result> { block_in_place(|| { let txn = self.env.read_txn()?; Ok(self.alloc_config_db.get(&txn, id)?) }) } - pub fn list_allocs(&self) -> crate::Result> { + pub fn list_allocs(&self) -> crate::Result> { block_in_place(|| { let txn = self.env.read_txn()?; let res = self