Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Multi-tenancy config boilerplate #520

Merged
merged 4 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 54 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libsqlx-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ thiserror = "1.0.43"
tokio = { version = "1.29.1", features = ["full"] }
tokio-stream = "0.1.14"
tokio-util = "0.7.8"
toml = "0.7.6"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
uuid = { version = "1.4.0", features = ["v4"] }
Expand Down
8 changes: 4 additions & 4 deletions libsqlx-server/src/allocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub enum AllocationMessage {
HranaPipelineReq {
req: PipelineRequestBody,
ret: oneshot::Sender<crate::Result<PipelineResponseBody>>,
}
},
}

pub enum Database {
Expand Down Expand Up @@ -87,8 +87,9 @@ pub struct ConnectionHandle {

impl ConnectionHandle {
pub async fn exec<F, R>(&self, f: F) -> crate::Result<R>
where F: for<'a> FnOnce(&'a mut (dyn libsqlx::Connection + 'a)) -> R + Send + 'static,
R: Send + 'static,
where
F: for<'a> FnOnce(&'a mut (dyn libsqlx::Connection + 'a)) -> R + Send + 'static,
R: Send + 'static,
{
let (sender, ret) = oneshot::channel();
let cb = move |conn: &mut dyn libsqlx::Connection| {
Expand Down Expand Up @@ -154,7 +155,6 @@ impl Allocation {
exec: exec_sender,
exit: close_sender,
}

}

fn next_conn_id(&mut self) -> u32 {
Expand Down
98 changes: 98 additions & 0 deletions libsqlx-server/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::net::SocketAddr;
use std::path::PathBuf;

use serde::Deserialize;
use serde::de::Visitor;

#[derive(Deserialize, Debug, Clone)]
pub struct Config {
/// Database path
#[serde(default = "default_db_path")]
pub db_path: PathBuf,
/// Cluster configuration
pub cluster: ClusterConfig,
/// User API configuration
pub user_api: UserApiConfig,
/// Admin API configuration
pub admin_api: AdminApiConfig,
}

impl Config {
pub fn validate(&self) -> color_eyre::Result<()> {
// TODO: implement validation
Ok(())
}
}

#[derive(Deserialize, Debug, Clone)]
pub struct ClusterConfig {
/// Address to bind this node to
#[serde(default = "default_linc_addr")]
pub addr: SocketAddr,
/// List of peers in the format `<node_id>:<node_addr>`
pub peers: Vec<Peer>,
}

#[derive(Deserialize, Debug, Clone)]
pub struct UserApiConfig {
#[serde(default = "default_user_addr")]
pub addr: SocketAddr,
}

#[derive(Deserialize, Debug, Clone)]
pub struct AdminApiConfig {
#[serde(default = "default_admin_addr")]
pub addr: SocketAddr,
}

fn default_db_path() -> PathBuf {
PathBuf::from("data.sqld")
}

fn default_admin_addr() -> SocketAddr {
"0.0.0.0:8081".parse().unwrap()
}

fn default_user_addr() -> SocketAddr {
"0.0.0.0:8080".parse().unwrap()
}

fn default_linc_addr() -> SocketAddr {
"0.0.0.0:5001".parse().unwrap()
}

#[derive(Debug, Clone)]
struct Peer {
id: u64,
addr: String,
}

impl<'de> Deserialize<'de> for Peer {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de> {
struct V;

impl Visitor<'_> for V {
type Value = Peer;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a string in the format <node_id>:<node_addr>")
}

fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error, {

let mut iter = v.split(":");
let Some(id) = iter.next() else { return Err(E::custom("node id is missing")) };
let Ok(id) = id.parse::<u64>() else { return Err(E::custom("failed to parse node id")) };
let Some(addr) = iter.next() else { return Err(E::custom("node address is missing")) };
Ok(Peer { id, addr: addr.to_string() })
}
}

deserializer.deserialize_str(V)
}
}

12 changes: 9 additions & 3 deletions libsqlx-server/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
use tokio::sync::{mpsc, oneshot};

use crate::hrana::http::proto::{PipelineRequestBody, PipelineResponseBody};
use crate::allocation::AllocationMessage;
use crate::hrana::http::proto::{PipelineRequestBody, PipelineResponseBody};

pub struct Database {
pub sender: mpsc::Sender<AllocationMessage>,
}

impl Database {
pub async fn hrana_pipeline(&self, req: PipelineRequestBody) -> crate::Result<PipelineResponseBody> {
pub async fn hrana_pipeline(
&self,
req: PipelineRequestBody,
) -> crate::Result<PipelineResponseBody> {
let (sender, ret) = oneshot::channel();
self.sender.send(AllocationMessage::HranaPipelineReq { req, ret: sender }).await.unwrap();
self.sender
.send(AllocationMessage::HranaPipelineReq { req, ret: sender })
.await
.unwrap();
ret.await.unwrap()
}
}
30 changes: 16 additions & 14 deletions libsqlx-server/src/hrana/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::{proto, ProtocolError, Version};
use color_eyre::eyre::anyhow;
use libsqlx::analysis::Statement;
use libsqlx::program::{Cond, Program, Step};
use libsqlx::query::{Query, Params};
use libsqlx::query::{Params, Query};
use libsqlx::result_builder::{StepResult, StepResultsBuilder};

fn proto_cond_to_cond(cond: &proto::BatchCond, max_step_i: usize) -> color_eyre::Result<Cond> {
Expand Down Expand Up @@ -73,11 +73,13 @@ pub async fn execute_batch(
db: &ConnectionHandle,
pgm: Program,
) -> color_eyre::Result<proto::BatchResult> {
let builder = db.exec(move |conn| -> color_eyre::Result<_> {
let mut builder = HranaBatchProtoBuilder::default();
conn.execute_program(pgm, &mut builder)?;
Ok(builder)
}).await??;
let builder = db
.exec(move |conn| -> color_eyre::Result<_> {
let mut builder = HranaBatchProtoBuilder::default();
conn.execute_program(pgm, &mut builder)?;
Ok(builder)
})
.await??;

Ok(builder.into_ret())
}
Expand All @@ -104,18 +106,18 @@ pub fn proto_sequence_to_program(sql: &str) -> color_eyre::Result<Program> {
})
.collect();

Ok(Program {
steps,
})
Ok(Program { steps })
}

pub async fn execute_sequence(conn: &ConnectionHandle, pgm: Program) -> color_eyre::Result<()> {
let builder = conn.exec(move |conn| -> color_eyre::Result<_> {
let mut builder = StepResultsBuilder::default();
conn.execute_program(pgm, &mut builder)?;
let builder = conn
.exec(move |conn| -> color_eyre::Result<_> {
let mut builder = StepResultsBuilder::default();
conn.execute_program(pgm, &mut builder)?;

Ok(builder)
}).await??;
Ok(builder)
})
.await??;

builder
.into_ret()
Expand Down
11 changes: 7 additions & 4 deletions libsqlx-server/src/hrana/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ fn handle_index() -> color_eyre::Result<hyper::Response<hyper::Body>> {
pub async fn handle_pipeline<F, Fut>(
server: &Server,
req: PipelineRequestBody,
mk_conn: F
mk_conn: F,
) -> color_eyre::Result<PipelineResponseBody>
where F: FnOnce() -> Fut,
Fut: Future<Output = crate::Result<ConnectionHandle>>,
where
F: FnOnce() -> Fut,
Fut: Future<Output = crate::Result<ConnectionHandle>>,
{
let mut stream_guard = stream::acquire(server, req.baton.as_deref(), mk_conn).await?;

Expand All @@ -73,7 +74,9 @@ where F: FnOnce() -> Fut,
Ok(resp_body)
}

async fn read_request_json<T: DeserializeOwned>(req: hyper::Request<hyper::Body>) -> color_eyre::Result<T> {
async fn read_request_json<T: DeserializeOwned>(
req: hyper::Request<hyper::Body>,
) -> color_eyre::Result<T> {
let req_body = hyper::body::to_bytes(req.into_body())
.await
.context("Could not read request body")?;
Expand Down
Loading