From 8997c3f83442239f62af9036676d2a7a718745e4 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 20 Jul 2023 18:40:03 +0200 Subject: [PATCH 1/5] add compact method to LibsqlDatabase --- libsqlx-server/src/allocation/replica.rs | 34 ++++++------- libsqlx/src/database/libsql/mod.rs | 4 ++ .../database/libsql/replication_log/logger.rs | 49 ++++++++++++------- 3 files changed, 48 insertions(+), 39 deletions(-) diff --git a/libsqlx-server/src/allocation/replica.rs b/libsqlx-server/src/allocation/replica.rs index 297d27fb..441a3d40 100644 --- a/libsqlx-server/src/allocation/replica.rs +++ b/libsqlx-server/src/allocation/replica.rs @@ -1,7 +1,7 @@ use std::ops::Deref; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{Context, Poll, ready}; use std::time::Duration; use futures::Future; @@ -11,22 +11,16 @@ use libsqlx::proxy::{WriteProxyConnection, WriteProxyDatabase}; use libsqlx::result_builder::{Column, QueryBuilderConfig, ResultBuilder}; use libsqlx::{DescribeResponse, Frame, FrameNo, Injector}; use parking_lot::Mutex; -use tokio::{ - sync::mpsc, - task::block_in_place, - time::{timeout, Sleep}, -}; +use tokio::sync::mpsc; +use tokio::task::block_in_place; +use tokio::time::{timeout, Sleep}; +use crate::linc::bus::Dispatch; use crate::linc::proto::{BuilderStep, ProxyResponse}; +use crate::linc::proto::{Enveloppe, Frames, Message}; use crate::linc::Inbound; -use crate::{ - linc::{ - bus::Dispatch, - proto::{Enveloppe, Frames, Message}, - NodeId, Outbound, - }, - meta::DatabaseId, -}; +use crate::linc::{NodeId, Outbound}; +use crate::meta::DatabaseId; use super::{ConnectionHandler, ExecFn}; @@ -227,7 +221,7 @@ impl ReplicaConnection { .builder .finish_step(affected_row_count, last_insert_rowid) .unwrap(), - BuilderStep::StepError(e) => req + BuilderStep::StepError(_e) => req .builder .step_error(todo!("handle proxy step error")) .unwrap(), @@ -274,15 +268,15 @@ impl ReplicaConnection { impl ConnectionHandler for ReplicaConnection { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> { // we are currently handling a request on this connection - // self.conn.writer().current_req.timeout.poll() let mut req = self.conn.writer().current_req.lock(); let should_abort_query = match &mut *req { - Some(ref mut req) => match req.timeout.as_mut().poll(cx) { - Poll::Ready(_) => { + Some(ref mut req) => { + ready!(req.timeout.as_mut().poll(cx)); + // the request has timedout, we finalize the builder with a error, and clean the + // current request. req.builder.finnalize_error("request timed out".to_string()); true - } - Poll::Pending => return Poll::Pending, + }, None => return Poll::Ready(()), }; diff --git a/libsqlx/src/database/libsql/mod.rs b/libsqlx/src/database/libsql/mod.rs index 382a4177..a6fdceb1 100644 --- a/libsqlx/src/database/libsql/mod.rs +++ b/libsqlx/src/database/libsql/mod.rs @@ -132,6 +132,10 @@ impl LibsqlDatabase { Ok(Self::new(db_path, ty)) } + pub fn compact_log(&self) { + self.ty.logger.compact(); + } + pub fn logger(&self) -> Arc { self.ty.logger.clone() } diff --git a/libsqlx/src/database/libsql/replication_log/logger.rs b/libsqlx/src/database/libsql/replication_log/logger.rs index 7bcfb0bf..4f899a0d 100644 --- a/libsqlx/src/database/libsql/replication_log/logger.rs +++ b/libsqlx/src/database/libsql/replication_log/logger.rs @@ -143,11 +143,12 @@ unsafe impl WalHook for ReplicationLoggerHook { std::process::abort(); } - if let Err(e) = ctx.logger.log_file.write().maybe_compact( - &*ctx.logger.compactor, - ntruncate, - &ctx.logger.db_path, - ) { + if let Err(e) = ctx + .logger + .log_file + .write() + .maybe_compact(&*ctx.logger.compactor, &ctx.logger.db_path) + { tracing::error!("fatal error: {e}, exiting"); std::process::abort() } @@ -425,6 +426,10 @@ impl LogFile { } } + pub fn can_compact(&mut self) -> bool { + self.header.frame_count > 0 && self.uncommitted_frame_count == 0 + } + pub fn read_header(file: &File) -> crate::Result { let mut buf = [0; size_of::()]; file.read_exact_at(&mut buf, 0)?; @@ -563,27 +568,24 @@ impl LogFile { Ok(frame) } - fn maybe_compact( - &mut self, - compactor: &dyn LogCompactor, - size_after: u32, - path: &Path, - ) -> anyhow::Result<()> { - if compactor.should_compact(self) { - return self.do_compaction(compactor, size_after, path); + fn maybe_compact(&mut self, compactor: &dyn LogCompactor, path: &Path) -> anyhow::Result<()> { + if self.can_compact() && compactor.should_compact(self) { + return self.do_compaction(compactor, path); } Ok(()) } - fn do_compaction( - &mut self, - compactor: &dyn LogCompactor, - size_after: u32, - path: &Path, - ) -> anyhow::Result<()> { + fn do_compaction(&mut self, compactor: &dyn LogCompactor, path: &Path) -> anyhow::Result<()> { tracing::info!("performing log compaction"); let temp_log_path = path.join("temp_log"); + let last_frame = self + .rev_frames_iter()? + .next() + .expect("there should be at least one frame to perform compaction")?; + let size_after = last_frame.header().size_after; + assert!(size_after != 0); + let file = OpenOptions::new() .read(true) .write(true) @@ -916,6 +918,15 @@ impl ReplicationLogger { pub fn get_frame(&self, frame_no: FrameNo) -> Result { self.log_file.read().frame(frame_no) } + + pub fn compact(&self) { + let mut log_file = self.log_file.write(); + if log_file.can_compact() { + log_file + .do_compaction(&*self.compactor, &self.db_path) + .unwrap(); + } + } } fn checkpoint_db(data_path: &Path) -> crate::Result<()> { From 2b3316ebaa7e106473832991bea1908041b8abdf Mon Sep 17 00:00:00 2001 From: ad hoc Date: Fri, 21 Jul 2023 10:21:53 +0200 Subject: [PATCH 2/5] implement should_compact --- libsqlx-server/src/allocation/mod.rs | 28 +++++++++++++++---- .../database/libsql/replication_log/logger.rs | 23 +++++++++------ 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/libsqlx-server/src/allocation/mod.rs b/libsqlx-server/src/allocation/mod.rs index 8c7ba873..836bbb50 100644 --- a/libsqlx-server/src/allocation/mod.rs +++ b/libsqlx-server/src/allocation/mod.rs @@ -4,7 +4,7 @@ use std::future::poll_fn; use std::path::PathBuf; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::Instant; +use std::time::{Instant, Duration}; use either::Either; use libsqlx::libsql::{LibsqlDatabase, LogCompactor, LogFile}; @@ -55,19 +55,31 @@ pub enum Database { }, } -struct Compactor; +struct Compactor { + max_log_size: usize, + last_compacted_at: Instant, + compact_interval: Option, +} impl LogCompactor for Compactor { - fn should_compact(&self, _log: &LogFile) -> bool { - false + fn should_compact(&self, log: &LogFile) -> bool { + let mut should_compact = false; + if let Some(compact_interval)= self.compact_interval { + should_compact |= self.last_compacted_at.elapsed() >= compact_interval + } + + should_compact |= log.size() >= self.max_log_size; + + should_compact } fn compact( - &self, + &mut self, _log: LogFile, _path: std::path::PathBuf, _size_after: u32, ) -> Result<(), Box> { + self.last_compacted_at = Instant::now(); todo!() } } @@ -79,7 +91,11 @@ impl Database { let (sender, receiver) = tokio::sync::watch::channel(0); let db = LibsqlDatabase::new_primary( path, - Compactor, + Compactor { + max_log_size: usize::MAX, + last_compacted_at: Instant::now(), + compact_interval: None, + }, false, Box::new(move |fno| { let _ = sender.send(fno); diff --git a/libsqlx/src/database/libsql/replication_log/logger.rs b/libsqlx/src/database/libsql/replication_log/logger.rs index 4f899a0d..cbb9acca 100644 --- a/libsqlx/src/database/libsql/replication_log/logger.rs +++ b/libsqlx/src/database/libsql/replication_log/logger.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use anyhow::{bail, ensure}; use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; use bytes::{Bytes, BytesMut}; -use parking_lot::RwLock; +use parking_lot::{RwLock, Mutex}; use rusqlite::ffi::{ libsql_wal as Wal, sqlite3, PgHdr, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR, SQLITE_OK, }; @@ -147,7 +147,7 @@ unsafe impl WalHook for ReplicationLoggerHook { .logger .log_file .write() - .maybe_compact(&*ctx.logger.compactor, &ctx.logger.db_path) + .maybe_compact(&mut *ctx.logger.compactor.lock(), &ctx.logger.db_path) { tracing::error!("fatal error: {e}, exiting"); std::process::abort() @@ -568,7 +568,7 @@ impl LogFile { Ok(frame) } - fn maybe_compact(&mut self, compactor: &dyn LogCompactor, path: &Path) -> anyhow::Result<()> { + fn maybe_compact(&mut self, compactor: &mut dyn LogCompactor, path: &Path) -> anyhow::Result<()> { if self.can_compact() && compactor.should_compact(self) { return self.do_compaction(compactor, path); } @@ -576,7 +576,7 @@ impl LogFile { Ok(()) } - fn do_compaction(&mut self, compactor: &dyn LogCompactor, path: &Path) -> anyhow::Result<()> { + fn do_compaction(&mut self, compactor: &mut dyn LogCompactor, path: &Path) -> anyhow::Result<()> { tracing::info!("performing log compaction"); let temp_log_path = path.join("temp_log"); let last_frame = self @@ -631,6 +631,11 @@ impl LogFile { self.file.set_len(0)?; Self::new(self.file) } + + /// return the size in bytes of the log + pub fn size(&self) -> usize { + size_of::() + Frame::SIZE * self.header().frame_count as usize + } } #[cfg(target_os = "macos")] @@ -730,7 +735,7 @@ pub trait LogCompactor: Sync + Send + 'static { fn should_compact(&self, log: &LogFile) -> bool; /// Compact the given snapshot fn compact( - &self, + &mut self, log: LogFile, path: PathBuf, size_after: u32, @@ -740,7 +745,7 @@ pub trait LogCompactor: Sync + Send + 'static { #[cfg(test)] impl LogCompactor for () { fn compact( - &self, + &mut self, _file: LogFile, _path: PathBuf, _size_after: u32, @@ -758,7 +763,7 @@ pub type FrameNotifierCb = Box; pub struct ReplicationLogger { pub generation: Generation, pub log_file: RwLock, - compactor: Box, + compactor: Box>, db_path: PathBuf, /// a notifier channel other tasks can subscribe to, and get notified when new frames become /// available. @@ -822,7 +827,7 @@ impl ReplicationLogger { Ok(Self { generation: Generation::new(generation_start_frame_no), - compactor: Box::new(compactor), + compactor: Box::new(Mutex::new(compactor)), log_file: RwLock::new(log_file), db_path, new_frame_notifier, @@ -923,7 +928,7 @@ impl ReplicationLogger { let mut log_file = self.log_file.write(); if log_file.can_compact() { log_file - .do_compaction(&*self.compactor, &self.db_path) + .do_compaction(&mut *self.compactor.lock(), &self.db_path) .unwrap(); } } From b9cb88ec02e1974211ffeb2a0bf53dbbab9e92ae Mon Sep 17 00:00:00 2001 From: ad hoc Date: Fri, 21 Jul 2023 10:39:29 +0200 Subject: [PATCH 3/5] periodic compaction --- libsqlx-server/src/allocation/mod.rs | 58 +++++++++++++++++------- libsqlx-server/src/allocation/primary.rs | 2 +- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/libsqlx-server/src/allocation/mod.rs b/libsqlx-server/src/allocation/mod.rs index 836bbb50..d71f128c 100644 --- a/libsqlx-server/src/allocation/mod.rs +++ b/libsqlx-server/src/allocation/mod.rs @@ -2,8 +2,9 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::future::poll_fn; use std::path::PathBuf; +use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{Context, Poll, ready}; use std::time::{Instant, Duration}; use either::Either; @@ -13,6 +14,7 @@ use libsqlx::proxy::WriteProxyDatabase; use libsqlx::{Database as _, InjectableDatabase}; use tokio::sync::{mpsc, oneshot}; use tokio::task::{block_in_place, JoinSet}; +use tokio::time::Interval; use crate::allocation::primary::FrameStreamer; use crate::hrana; @@ -46,7 +48,10 @@ pub enum AllocationMessage { } pub enum Database { - Primary(PrimaryDatabase), + Primary { + db: PrimaryDatabase, + compact_interval: Option>>, + }, Replica { db: ProxyDatabase, injector_handle: mpsc::Sender, @@ -55,6 +60,20 @@ pub enum Database { }, } +impl Database { + fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { + if let Self::Primary { compact_interval: Some(ref mut interval), db } = self { + ready!(interval.poll_tick(cx)); + let db = db.db.clone(); + tokio::task::spawn_blocking(move || { + db.compact_log(); + }); + } + + Poll::Pending + } +} + struct Compactor { max_log_size: usize, last_compacted_at: Instant, @@ -103,11 +122,14 @@ impl Database { ) .unwrap(); - Self::Primary(PrimaryDatabase { - db, - replica_streams: HashMap::new(), - frame_notifier: receiver, - }) + Self::Primary{ + db: PrimaryDatabase { + db: Arc::new(db), + replica_streams: HashMap::new(), + frame_notifier: receiver, + } , + compact_interval: None, + } } DbConfig::Replica { primary_node_id, @@ -145,7 +167,7 @@ impl Database { fn connect(&self, connection_id: u32, alloc: &Allocation) -> impl ConnectionHandler { match self { - Database::Primary(PrimaryDatabase { db, .. }) => Either::Right(PrimaryConnection { + Database::Primary { db: PrimaryDatabase { db, .. }, .. } => Either::Right(PrimaryConnection { conn: db.connect().unwrap(), }), Database::Replica { db, primary_id, .. } => Either::Left(ReplicaConnection { @@ -160,7 +182,7 @@ impl Database { } pub fn is_primary(&self) -> bool { - matches!(self, Self::Primary(..)) + matches!(self, Self::Primary { .. }) } } @@ -206,7 +228,9 @@ impl ConnectionHandle { impl Allocation { pub async fn run(mut self) { loop { + let fut = poll_fn(|cx| self.database.poll(cx)); tokio::select! { + _ = fut => (), Some(msg) = self.inbox.recv() => { match msg { AllocationMessage::HranaPipelineReq { req, ret } => { @@ -245,12 +269,14 @@ impl Allocation { req_no, next_frame_no, } => match &mut self.database { - Database::Primary(PrimaryDatabase { - db, - replica_streams, - frame_notifier, - .. - }) => { + Database::Primary{ + db: PrimaryDatabase { + db, + replica_streams, + frame_notifier, + .. + }, .. + } => { let streamer = FrameStreamer { logger: db.logger(), database_id: DatabaseId::from_name(&self.db_name), @@ -293,7 +319,7 @@ impl Allocation { *last_received_frame_ts = Some(Instant::now()); injector_handle.send(frames).await.unwrap(); } - Database::Primary(PrimaryDatabase { .. }) => todo!("handle primary receiving txn"), + Database::Primary { db: PrimaryDatabase { .. }, .. } => todo!("handle primary receiving txn"), }, Message::ProxyRequest { connection_id, diff --git a/libsqlx-server/src/allocation/primary.rs b/libsqlx-server/src/allocation/primary.rs index 15ac4dbd..f66ed4dd 100644 --- a/libsqlx-server/src/allocation/primary.rs +++ b/libsqlx-server/src/allocation/primary.rs @@ -19,7 +19,7 @@ use super::{ConnectionHandler, ExecFn, FRAMES_MESSAGE_MAX_COUNT}; const MAX_STEP_BATCH_SIZE: usize = 100_000_000; // ~100kb // pub struct PrimaryDatabase { - pub db: LibsqlDatabase, + pub db: Arc>, pub replica_streams: HashMap)>, pub frame_notifier: tokio::sync::watch::Receiver, } From 3d8c5e70b1a755bf5ef17accab5b9d92b6277c5a Mon Sep 17 00:00:00 2001 From: ad hoc Date: Fri, 21 Jul 2023 11:45:37 +0200 Subject: [PATCH 4/5] add log compaction config to primary config --- Cargo.lock | 4 + libsqlx-server/Cargo.toml | 1 + libsqlx-server/src/allocation/config.rs | 7 +- libsqlx-server/src/allocation/mod.rs | 54 ++++++---- libsqlx-server/src/allocation/replica.rs | 13 ++- libsqlx-server/src/http/admin.rs | 98 ++++++++++++------- .../database/libsql/replication_log/logger.rs | 14 ++- 7 files changed, 128 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 23ee18ef..d369d037 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -900,6 +900,9 @@ name = "bytesize" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38fcc2979eff34a4b84e1cf9a1e3da42a7d44b3b690a40cdcb23e3d556cfb2e5" +dependencies = [ + "serde", +] [[package]] name = "camino" @@ -2549,6 +2552,7 @@ dependencies = [ "base64 0.21.2", "bincode", "bytes 1.4.0", + "bytesize", "clap", "color-eyre", "either", diff --git a/libsqlx-server/Cargo.toml b/libsqlx-server/Cargo.toml index 42a508c6..90b27680 100644 --- a/libsqlx-server/Cargo.toml +++ b/libsqlx-server/Cargo.toml @@ -12,6 +12,7 @@ axum = "0.6.18" base64 = "0.21.2" bincode = "1.3.3" bytes = { version = "1.4.0", features = ["serde"] } +bytesize = { version = "1.2.0", features = ["serde"] } clap = { version = "4.3.11", features = ["derive"] } color-eyre = "0.6.2" either = "1.8.1" diff --git a/libsqlx-server/src/allocation/config.rs b/libsqlx-server/src/allocation/config.rs index ac21efa7..f0c13870 100644 --- a/libsqlx-server/src/allocation/config.rs +++ b/libsqlx-server/src/allocation/config.rs @@ -20,7 +20,12 @@ pub struct AllocConfig { #[derive(Debug, Serialize, Deserialize)] pub enum DbConfig { - Primary {}, + Primary { + /// maximum size the replication log is allowed to grow, before it is compacted. + max_log_size: usize, + /// Interval at which to force compaction + replication_log_compact_interval: Option, + }, Replica { primary_node_id: NodeId, proxy_request_timeout_duration: Duration, diff --git a/libsqlx-server/src/allocation/mod.rs b/libsqlx-server/src/allocation/mod.rs index d71f128c..33bde27d 100644 --- a/libsqlx-server/src/allocation/mod.rs +++ b/libsqlx-server/src/allocation/mod.rs @@ -4,8 +4,8 @@ use std::future::poll_fn; use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll, ready}; -use std::time::{Instant, Duration}; +use std::task::{ready, Context, Poll}; +use std::time::{Duration, Instant}; use either::Either; use libsqlx::libsql::{LibsqlDatabase, LogCompactor, LogFile}; @@ -36,6 +36,7 @@ mod replica; /// the maximum number of frame a Frame messahe is allowed to contain const FRAMES_MESSAGE_MAX_COUNT: usize = 5; const MAX_INJECTOR_BUFFER_CAP: usize = 32; +const DEFAULT_MAX_LOG_SIZE: usize = 100 * 1024 * 1024; // 100Mb type ExecFn = Box; @@ -62,7 +63,11 @@ pub enum Database { impl Database { fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { - if let Self::Primary { compact_interval: Some(ref mut interval), db } = self { + if let Self::Primary { + compact_interval: Some(ref mut interval), + db, + } = self + { ready!(interval.poll_tick(cx)); let db = db.db.clone(); tokio::task::spawn_blocking(move || { @@ -83,10 +88,10 @@ struct Compactor { impl LogCompactor for Compactor { fn should_compact(&self, log: &LogFile) -> bool { let mut should_compact = false; - if let Some(compact_interval)= self.compact_interval { + if let Some(compact_interval) = self.compact_interval { should_compact |= self.last_compacted_at.elapsed() >= compact_interval } - + should_compact |= log.size() >= self.max_log_size; should_compact @@ -106,14 +111,17 @@ impl LogCompactor for Compactor { impl Database { pub fn from_config(config: &AllocConfig, path: PathBuf, dispatcher: Arc) -> Self { match config.db_config { - DbConfig::Primary {} => { + DbConfig::Primary { + max_log_size, + replication_log_compact_interval, + } => { let (sender, receiver) = tokio::sync::watch::channel(0); let db = LibsqlDatabase::new_primary( path, Compactor { - max_log_size: usize::MAX, last_compacted_at: Instant::now(), - compact_interval: None, + max_log_size, + compact_interval: replication_log_compact_interval, }, false, Box::new(move |fno| { @@ -122,12 +130,12 @@ impl Database { ) .unwrap(); - Self::Primary{ + Self::Primary { db: PrimaryDatabase { db: Arc::new(db), replica_streams: HashMap::new(), frame_notifier: receiver, - } , + }, compact_interval: None, } } @@ -167,7 +175,10 @@ impl Database { fn connect(&self, connection_id: u32, alloc: &Allocation) -> impl ConnectionHandler { match self { - Database::Primary { db: PrimaryDatabase { db, .. }, .. } => Either::Right(PrimaryConnection { + Database::Primary { + db: PrimaryDatabase { db, .. }, + .. + } => Either::Right(PrimaryConnection { conn: db.connect().unwrap(), }), Database::Replica { db, primary_id, .. } => Either::Left(ReplicaConnection { @@ -269,13 +280,15 @@ impl Allocation { req_no, next_frame_no, } => match &mut self.database { - Database::Primary{ - db: PrimaryDatabase { - db, - replica_streams, - frame_notifier, - .. - }, .. + Database::Primary { + db: + PrimaryDatabase { + db, + replica_streams, + frame_notifier, + .. + }, + .. } => { let streamer = FrameStreamer { logger: db.logger(), @@ -319,7 +332,10 @@ impl Allocation { *last_received_frame_ts = Some(Instant::now()); injector_handle.send(frames).await.unwrap(); } - Database::Primary { db: PrimaryDatabase { .. }, .. } => todo!("handle primary receiving txn"), + Database::Primary { + db: PrimaryDatabase { .. }, + .. + } => todo!("handle primary receiving txn"), }, Message::ProxyRequest { connection_id, diff --git a/libsqlx-server/src/allocation/replica.rs b/libsqlx-server/src/allocation/replica.rs index 441a3d40..ee8008d6 100644 --- a/libsqlx-server/src/allocation/replica.rs +++ b/libsqlx-server/src/allocation/replica.rs @@ -1,7 +1,7 @@ use std::ops::Deref; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll, ready}; +use std::task::{ready, Context, Poll}; use std::time::Duration; use futures::Future; @@ -272,12 +272,11 @@ impl ConnectionHandler for ReplicaConnection { let should_abort_query = match &mut *req { Some(ref mut req) => { ready!(req.timeout.as_mut().poll(cx)); - // the request has timedout, we finalize the builder with a error, and clean the - // current request. - req.builder.finnalize_error("request timed out".to_string()); - true - - }, + // the request has timedout, we finalize the builder with a error, and clean the + // current request. + req.builder.finnalize_error("request timed out".to_string()); + true + } None => return Poll::Ready(()), }; diff --git a/libsqlx-server/src/http/admin.rs b/libsqlx-server/src/http/admin.rs index 0e263ddf..4dd27944 100644 --- a/libsqlx-server/src/http/admin.rs +++ b/libsqlx-server/src/http/admin.rs @@ -1,3 +1,4 @@ +use std::ops::Deref; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -57,46 +58,69 @@ struct AllocateReq { #[derive(Debug, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] -pub enum DbConfigReq { - Primary {}, - Replica { - primary_node_id: NodeId, - #[serde( - deserialize_with = "deserialize_duration", - default = "default_proxy_timeout" - )] - proxy_request_timeout_duration: Duration, - }, +pub struct Primary { + /// The maximum size the replication is allowed to grow. Expects a string like 200mb. + #[serde(default = "default_max_log_size")] + pub max_replication_log_size: bytesize::ByteSize, + pub replication_log_compact_interval: Option, } -const fn default_proxy_timeout() -> Duration { - Duration::from_secs(5) +#[derive(Debug)] +pub struct HumanDuration(Duration); + +impl Deref for HumanDuration { + type Target = Duration; + + fn deref(&self) -> &Self::Target { + &self.0 + } } -fn deserialize_duration<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - struct Visitor; - impl serde::de::Visitor<'_> for Visitor { - type Value = Duration; - - fn visit_str(self, v: &str) -> std::result::Result - where - E: serde::de::Error, - { - match humantime::Duration::from_str(v) { - Ok(d) => Ok(*d), - Err(e) => Err(E::custom(e.to_string())), +impl<'de> Deserialize<'de> for HumanDuration { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + struct DurationVisitor; + impl serde::de::Visitor<'_> for DurationVisitor { + type Value = HumanDuration; + + fn visit_str(self, v: &str) -> std::result::Result + where + E: serde::de::Error, + { + match humantime::Duration::from_str(v) { + Ok(d) => Ok(HumanDuration(*d)), + Err(e) => Err(E::custom(e.to_string())), + } } - } - fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.write_str("a duration, in a string format") + fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_str("a duration, in a string format") + } } + + deserializer.deserialize_str(DurationVisitor) } +} - deserializer.deserialize_str(Visitor) +#[derive(Debug, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum DbConfigReq { + Primary(Primary), + Replica { + primary_node_id: NodeId, + #[serde(default = "default_proxy_timeout")] + proxy_request_timeout_duration: HumanDuration, + }, +} + +const fn default_max_log_size() -> bytesize::ByteSize { + bytesize::ByteSize::mb(100) +} + +const fn default_proxy_timeout() -> HumanDuration { + HumanDuration(Duration::from_secs(5)) } async fn allocate( @@ -107,13 +131,21 @@ async fn allocate( max_conccurent_connection: req.max_conccurent_connection.unwrap_or(16), db_name: req.database_name.clone(), db_config: match req.config { - DbConfigReq::Primary {} => DbConfig::Primary {}, + DbConfigReq::Primary(Primary { + max_replication_log_size, + replication_log_compact_interval, + }) => DbConfig::Primary { + max_log_size: max_replication_log_size.as_u64() as usize, + replication_log_compact_interval: replication_log_compact_interval + .as_deref() + .copied(), + }, DbConfigReq::Replica { primary_node_id, proxy_request_timeout_duration, } => DbConfig::Replica { primary_node_id, - proxy_request_timeout_duration, + proxy_request_timeout_duration: *proxy_request_timeout_duration, }, }, }; diff --git a/libsqlx/src/database/libsql/replication_log/logger.rs b/libsqlx/src/database/libsql/replication_log/logger.rs index cbb9acca..48d3916e 100644 --- a/libsqlx/src/database/libsql/replication_log/logger.rs +++ b/libsqlx/src/database/libsql/replication_log/logger.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use anyhow::{bail, ensure}; use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; use bytes::{Bytes, BytesMut}; -use parking_lot::{RwLock, Mutex}; +use parking_lot::{Mutex, RwLock}; use rusqlite::ffi::{ libsql_wal as Wal, sqlite3, PgHdr, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR, SQLITE_OK, }; @@ -568,7 +568,11 @@ impl LogFile { Ok(frame) } - fn maybe_compact(&mut self, compactor: &mut dyn LogCompactor, path: &Path) -> anyhow::Result<()> { + fn maybe_compact( + &mut self, + compactor: &mut dyn LogCompactor, + path: &Path, + ) -> anyhow::Result<()> { if self.can_compact() && compactor.should_compact(self) { return self.do_compaction(compactor, path); } @@ -576,7 +580,11 @@ impl LogFile { Ok(()) } - fn do_compaction(&mut self, compactor: &mut dyn LogCompactor, path: &Path) -> anyhow::Result<()> { + fn do_compaction( + &mut self, + compactor: &mut dyn LogCompactor, + path: &Path, + ) -> anyhow::Result<()> { tracing::info!("performing log compaction"); let temp_log_path = path.join("temp_log"); let last_frame = self From 773bd730601704e7d2c20c25b58f49ba8c97d11e Mon Sep 17 00:00:00 2001 From: ad hoc Date: Fri, 21 Jul 2023 13:11:52 +0200 Subject: [PATCH 5/5] move Compactor to own file --- libsqlx-server/src/allocation/mod.rs | 49 ++++--------------- .../src/allocation/primary/compactor.rs | 42 ++++++++++++++++ .../allocation/{primary.rs => primary/mod.rs} | 2 + 3 files changed, 53 insertions(+), 40 deletions(-) create mode 100644 libsqlx-server/src/allocation/primary/compactor.rs rename libsqlx-server/src/allocation/{primary.rs => primary/mod.rs} (99%) diff --git a/libsqlx-server/src/allocation/mod.rs b/libsqlx-server/src/allocation/mod.rs index 33bde27d..fbc4049f 100644 --- a/libsqlx-server/src/allocation/mod.rs +++ b/libsqlx-server/src/allocation/mod.rs @@ -5,10 +5,10 @@ use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; -use std::time::{Duration, Instant}; +use std::time::Instant; use either::Either; -use libsqlx::libsql::{LibsqlDatabase, LogCompactor, LogFile}; +use libsqlx::libsql::LibsqlDatabase; use libsqlx::program::Program; use libsqlx::proxy::WriteProxyDatabase; use libsqlx::{Database as _, InjectableDatabase}; @@ -26,6 +26,7 @@ use crate::linc::{Inbound, NodeId}; use crate::meta::DatabaseId; use self::config::{AllocConfig, DbConfig}; +use self::primary::compactor::Compactor; use self::primary::{PrimaryConnection, PrimaryDatabase, ProxyResponseBuilder}; use self::replica::{ProxyDatabase, RemoteDb, ReplicaConnection, Replicator}; @@ -33,10 +34,10 @@ pub mod config; mod primary; mod replica; -/// the maximum number of frame a Frame messahe is allowed to contain +/// Maximum number of frame a Frame message is allowed to contain const FRAMES_MESSAGE_MAX_COUNT: usize = 5; -const MAX_INJECTOR_BUFFER_CAP: usize = 32; -const DEFAULT_MAX_LOG_SIZE: usize = 100 * 1024 * 1024; // 100Mb +/// Maximum number of frames in the injector buffer +const MAX_INJECTOR_BUFFER_CAPACITY: usize = 32; type ExecFn = Box; @@ -79,35 +80,6 @@ impl Database { } } -struct Compactor { - max_log_size: usize, - last_compacted_at: Instant, - compact_interval: Option, -} - -impl LogCompactor for Compactor { - fn should_compact(&self, log: &LogFile) -> bool { - let mut should_compact = false; - if let Some(compact_interval) = self.compact_interval { - should_compact |= self.last_compacted_at.elapsed() >= compact_interval - } - - should_compact |= log.size() >= self.max_log_size; - - should_compact - } - - fn compact( - &mut self, - _log: LogFile, - _path: std::path::PathBuf, - _size_after: u32, - ) -> Result<(), Box> { - self.last_compacted_at = Instant::now(); - todo!() - } -} - impl Database { pub fn from_config(config: &AllocConfig, path: PathBuf, dispatcher: Arc) -> Self { match config.db_config { @@ -118,11 +90,7 @@ impl Database { let (sender, receiver) = tokio::sync::watch::channel(0); let db = LibsqlDatabase::new_primary( path, - Compactor { - last_compacted_at: Instant::now(), - max_log_size, - compact_interval: replication_log_compact_interval, - }, + Compactor::new(max_log_size, replication_log_compact_interval), false, Box::new(move |fno| { let _ = sender.send(fno); @@ -143,7 +111,8 @@ impl Database { primary_node_id, proxy_request_timeout_duration, } => { - let rdb = LibsqlDatabase::new_replica(path, MAX_INJECTOR_BUFFER_CAP, ()).unwrap(); + let rdb = + LibsqlDatabase::new_replica(path, MAX_INJECTOR_BUFFER_CAPACITY, ()).unwrap(); let wdb = RemoteDb { proxy_request_timeout_duration, }; diff --git a/libsqlx-server/src/allocation/primary/compactor.rs b/libsqlx-server/src/allocation/primary/compactor.rs new file mode 100644 index 00000000..62b9c0ed --- /dev/null +++ b/libsqlx-server/src/allocation/primary/compactor.rs @@ -0,0 +1,42 @@ +use std::time::{Duration, Instant}; + +use libsqlx::libsql::{LogCompactor, LogFile}; + +pub struct Compactor { + max_log_size: usize, + last_compacted_at: Instant, + compact_interval: Option, +} + +impl Compactor { + pub fn new(max_log_size: usize, compact_interval: Option) -> Self { + Self { + max_log_size, + last_compacted_at: Instant::now(), + compact_interval, + } + } +} + +impl LogCompactor for Compactor { + fn should_compact(&self, log: &LogFile) -> bool { + let mut should_compact = false; + if let Some(compact_interval) = self.compact_interval { + should_compact |= self.last_compacted_at.elapsed() >= compact_interval + } + + should_compact |= log.size() >= self.max_log_size; + + should_compact + } + + fn compact( + &mut self, + _log: LogFile, + _path: std::path::PathBuf, + _size_after: u32, + ) -> Result<(), Box> { + self.last_compacted_at = Instant::now(); + todo!() + } +} diff --git a/libsqlx-server/src/allocation/primary.rs b/libsqlx-server/src/allocation/primary/mod.rs similarity index 99% rename from libsqlx-server/src/allocation/primary.rs rename to libsqlx-server/src/allocation/primary/mod.rs index f66ed4dd..480a0e6a 100644 --- a/libsqlx-server/src/allocation/primary.rs +++ b/libsqlx-server/src/allocation/primary/mod.rs @@ -16,6 +16,8 @@ use crate::meta::DatabaseId; use super::{ConnectionHandler, ExecFn, FRAMES_MESSAGE_MAX_COUNT}; +pub mod compactor; + const MAX_STEP_BATCH_SIZE: usize = 100_000_000; // ~100kb // pub struct PrimaryDatabase {