diff --git a/libsqlx-server/src/allocation/mod.rs b/libsqlx-server/src/allocation/mod.rs index 7300feea..f5e19ba6 100644 --- a/libsqlx-server/src/allocation/mod.rs +++ b/libsqlx-server/src/allocation/mod.rs @@ -27,6 +27,7 @@ use crate::linc::bus::Dispatch; use crate::linc::proto::{Frames, Message}; use crate::linc::{Inbound, NodeId}; use crate::meta::DatabaseId; +use crate::replica_commit_store::ReplicaCommitStore; use self::config::{AllocConfig, DbConfig}; use self::primary::compactor::Compactor; @@ -97,8 +98,14 @@ impl Database { fn txn_timeout_duration(&self) -> Duration { match self { - Database::Primary { transaction_timeout_duration, .. } => *transaction_timeout_duration, - Database::Replica { transaction_timeout_duration, .. } => *transaction_timeout_duration, + Database::Primary { + transaction_timeout_duration, + .. + } => *transaction_timeout_duration, + Database::Replica { + transaction_timeout_duration, + .. + } => *transaction_timeout_duration, } } } @@ -109,6 +116,7 @@ impl Database { path: PathBuf, dispatcher: Arc, compaction_queue: Arc, + replica_commit_store: Arc, ) -> Self { let database_id = DatabaseId::from_name(&config.db_name); @@ -118,7 +126,7 @@ impl Database { replication_log_compact_interval, transaction_timeout_duration, } => { - let (sender, receiver) = tokio::sync::watch::channel(0); + let (sender, receiver) = tokio::sync::watch::channel(None); let db = LibsqlDatabase::new_primary( path, Compactor::new( @@ -129,7 +137,7 @@ impl Database { ), false, Box::new(move |fno| { - let _ = sender.send(fno); + let _ = sender.send(Some(fno)); }), ) .unwrap(); @@ -156,8 +164,22 @@ impl Database { proxy_request_timeout_duration, transaction_timeout_duration, } => { - let rdb = - LibsqlDatabase::new_replica(path, MAX_INJECTOR_BUFFER_CAPACITY, ()).unwrap(); + let next_frame_no = + block_in_place(|| replica_commit_store.get_commit_index(database_id)) + .map(|fno| fno + 1) + .unwrap_or(0); + + let commit_callback = Arc::new(move |fno| { + replica_commit_store.commit(database_id, fno); + }); + + let rdb = LibsqlDatabase::new_replica( + path, + MAX_INJECTOR_BUFFER_CAPACITY, + commit_callback, + ) + .unwrap(); + let wdb = RemoteDb { proxy_request_timeout_duration, }; @@ -167,7 +189,7 @@ impl Database { let replicator = Replicator::new( dispatcher, - 0, + next_frame_no, database_id, primary_node_id, injector, @@ -556,9 +578,10 @@ mod test { use tokio::sync::Notify; use crate::allocation::replica::ReplicaConnection; - use crate::init_dirs; use crate::linc::bus::Bus; + use crate::replica_commit_store::ReplicaCommitStore; use crate::snapshot_store::SnapshotStore; + use crate::{init_dirs, replica_commit_store}; use super::*; @@ -567,7 +590,8 @@ mod test { let bus = Arc::new(Bus::new(0, |_, _| async {})); let _queue = bus.connect(1); // pretend connection to node 1 let tmp = tempfile::TempDir::new().unwrap(); - let read_db = LibsqlDatabase::new_replica(tmp.path().to_path_buf(), 1, ()).unwrap(); + let read_db = + LibsqlDatabase::new_replica(tmp.path().to_path_buf(), 1, Arc::new(|_| ())).unwrap(); let write_db = RemoteDb { proxy_request_timeout_duration: Duration::from_millis(100), }; @@ -631,9 +655,15 @@ mod test { }, }; let (sender, inbox) = mpsc::channel(10); - let env = EnvOpenOptions::new().max_dbs(10).map_size(4096 * 100).open(tmp.path()).unwrap(); + let env = EnvOpenOptions::new() + .max_dbs(10) + .map_size(4096 * 100) + .open(tmp.path()) + .unwrap(); let store = Arc::new(SnapshotStore::new(tmp.path().to_path_buf(), env.clone()).unwrap()); - let queue = Arc::new(CompactionQueue::new(env, tmp.path().to_path_buf(), store).unwrap()); + let queue = + Arc::new(CompactionQueue::new(env.clone(), tmp.path().to_path_buf(), store).unwrap()); + let replica_commit_store = Arc::new(ReplicaCommitStore::new(env.clone())); let mut alloc = Allocation { inbox, database: Database::from_config( @@ -641,6 +671,7 @@ mod test { tmp.path().to_path_buf(), bus.clone(), queue, + replica_commit_store, ), connections_futs: JoinSet::new(), next_conn_id: 0, @@ -656,14 +687,18 @@ mod test { let (snd, rcv) = oneshot::channel(); let builder = StepResultsBuilder::new(snd); - conn.execute(Program::seq(&["begin"]), Box::new(builder)).await.unwrap(); + conn.execute(Program::seq(&["begin"]), Box::new(builder)) + .await + .unwrap(); rcv.await.unwrap().unwrap(); tokio::time::sleep(Duration::from_secs(1)).await; let (snd, rcv) = oneshot::channel(); let builder = StepResultsBuilder::new(snd); - conn.execute(Program::seq(&["create table test (x)"]), Box::new(builder)).await.unwrap(); + conn.execute(Program::seq(&["create table test (x)"]), Box::new(builder)) + .await + .unwrap(); assert!(rcv.await.unwrap().is_err()); } } diff --git a/libsqlx-server/src/allocation/primary/mod.rs b/libsqlx-server/src/allocation/primary/mod.rs index 505333cb..606171c9 100644 --- a/libsqlx-server/src/allocation/primary/mod.rs +++ b/libsqlx-server/src/allocation/primary/mod.rs @@ -25,7 +25,7 @@ const MAX_STEP_BATCH_SIZE: usize = 100_000_000; // ~100kb pub struct PrimaryDatabase { pub db: Arc>, pub replica_streams: HashMap)>, - pub frame_notifier: tokio::sync::watch::Receiver, + pub frame_notifier: tokio::sync::watch::Receiver>, pub snapshot_store: Arc, } @@ -207,7 +207,7 @@ pub struct FrameStreamer { pub req_no: u32, pub seq_no: u32, pub dipatcher: Arc, - pub notifier: tokio::sync::watch::Receiver, + pub notifier: tokio::sync::watch::Receiver>, pub buffer: Vec, pub snapshot_store: Arc, } @@ -230,7 +230,7 @@ impl FrameStreamer { } if self .notifier - .wait_for(|fno| *fno >= self.next_frame_no) + .wait_for(|fno| fno.map(|f| f >= self.next_frame_no).unwrap_or(false)) .await .is_err() { diff --git a/libsqlx-server/src/allocation/replica.rs b/libsqlx-server/src/allocation/replica.rs index 5ff7d897..a31c8116 100644 --- a/libsqlx-server/src/allocation/replica.rs +++ b/libsqlx-server/src/allocation/replica.rs @@ -16,10 +16,8 @@ use tokio::task::block_in_place; use tokio::time::{timeout, Sleep}; use crate::linc::bus::Dispatch; -use crate::linc::proto::{BuilderStep, ProxyResponse}; -use crate::linc::proto::{Enveloppe, Frames, Message}; -use crate::linc::Inbound; -use crate::linc::{NodeId, Outbound}; +use crate::linc::proto::{BuilderStep, Enveloppe, Frames, Message, ProxyResponse}; +use crate::linc::{Inbound, NodeId, Outbound}; use crate::meta::DatabaseId; use super::{ConnectionHandler, ConnectionMessage}; @@ -166,6 +164,7 @@ impl Replicator { }); } } + // no news from primary for the past 5 secs, send a request again Err(_) => self.query_replicate().await, Ok(None) => break, } diff --git a/libsqlx-server/src/hrana/batch.rs b/libsqlx-server/src/hrana/batch.rs index 6d41c8b4..14cfb1c3 100644 --- a/libsqlx-server/src/hrana/batch.rs +++ b/libsqlx-server/src/hrana/batch.rs @@ -110,12 +110,15 @@ pub async fn execute_sequence(conn: &ConnectionHandle, pgm: Program) -> color_ey let builder = StepResultsBuilder::new(snd); conn.execute(pgm, Box::new(builder)).await?; - rcv.await?.map_err(|e| anyhow!("{e}"))?.into_iter().try_for_each(|result| match result { - StepResult::Ok => Ok(()), - StepResult::Err(e) => match stmt_error_from_sqld_error(e) { - Ok(stmt_err) => Err(anyhow!(stmt_err)), - Err(sqld_err) => Err(anyhow!(sqld_err)), - }, - StepResult::Skipped => Err(anyhow!("Statement in sequence was not executed")), - }) + rcv.await? + .map_err(|e| anyhow!("{e}"))? + .into_iter() + .try_for_each(|result| match result { + StepResult::Ok => Ok(()), + StepResult::Err(e) => match stmt_error_from_sqld_error(e) { + Ok(stmt_err) => Err(anyhow!(stmt_err)), + Err(sqld_err) => Err(anyhow!(sqld_err)), + }, + StepResult::Skipped => Err(anyhow!("Statement in sequence was not executed")), + }) } diff --git a/libsqlx-server/src/main.rs b/libsqlx-server/src/main.rs index 1392e325..ff5c415a 100644 --- a/libsqlx-server/src/main.rs +++ b/libsqlx-server/src/main.rs @@ -12,6 +12,7 @@ use hyper::server::conn::AddrIncoming; use linc::bus::Bus; use manager::Manager; use meta::Store; +use replica_commit_store::ReplicaCommitStore; use snapshot_store::SnapshotStore; use tokio::fs::create_dir_all; use tokio::net::{TcpListener, TcpStream}; @@ -28,6 +29,7 @@ mod http; mod linc; mod manager; mod meta; +mod replica_commit_store; mod snapshot_store; #[derive(Debug, Parser)] @@ -123,11 +125,13 @@ async fn main() -> Result<()> { snapshot_store, )?); let store = Arc::new(Store::new(env.clone())); + let replica_commit_store = Arc::new(ReplicaCommitStore::new(env.clone())); let manager = Arc::new(Manager::new( config.db_path.clone(), store.clone(), 100, compaction_queue.clone(), + replica_commit_store, )); let bus = Arc::new(Bus::new(config.cluster.id, manager.clone())); diff --git a/libsqlx-server/src/manager.rs b/libsqlx-server/src/manager.rs index a3bb68d8..1b0ca7d1 100644 --- a/libsqlx-server/src/manager.rs +++ b/libsqlx-server/src/manager.rs @@ -14,12 +14,14 @@ use crate::linc::bus::Dispatch; use crate::linc::handler::Handler; use crate::linc::Inbound; use crate::meta::{DatabaseId, Store}; +use crate::replica_commit_store::ReplicaCommitStore; pub struct Manager { cache: Cache>, meta_store: Arc, db_path: PathBuf, compaction_queue: Arc, + replica_commit_store: Arc, } const MAX_ALLOC_MESSAGE_QUEUE_LEN: usize = 32; @@ -30,12 +32,14 @@ impl Manager { meta_store: Arc, max_conccurent_allocs: u64, compaction_queue: Arc, + replica_commit_store: Arc, ) -> Self { Self { cache: Cache::new(max_conccurent_allocs), meta_store, db_path, compaction_queue, + replica_commit_store, } } @@ -60,6 +64,7 @@ impl Manager { path, dispatcher.clone(), self.compaction_queue.clone(), + self.replica_commit_store.clone(), ), connections_futs: JoinSet::new(), next_conn_id: 0, diff --git a/libsqlx-server/src/meta.rs b/libsqlx-server/src/meta.rs index 38a1c30b..2436839b 100644 --- a/libsqlx-server/src/meta.rs +++ b/libsqlx-server/src/meta.rs @@ -10,8 +10,6 @@ use tokio::task::block_in_place; use crate::allocation::config::AllocConfig; -type ExecFn = Box)>; - pub struct Store { env: heed::Env, alloc_config_db: heed::Database, SerdeBincode>, diff --git a/libsqlx-server/src/replica_commit_store.rs b/libsqlx-server/src/replica_commit_store.rs new file mode 100644 index 00000000..18c0aeed --- /dev/null +++ b/libsqlx-server/src/replica_commit_store.rs @@ -0,0 +1,34 @@ +use heed_types::OwnedType; +use libsqlx::FrameNo; + +use crate::meta::DatabaseId; + +/// Stores replica last injected commit index +pub struct ReplicaCommitStore { + env: heed::Env, + database: heed::Database, OwnedType>, +} + +impl ReplicaCommitStore { + const DB_NAME: &str = "replica-commit-store"; + pub fn new(env: heed::Env) -> Self { + let mut txn = env.write_txn().unwrap(); + let database = env.create_database(&mut txn, Some(Self::DB_NAME)).unwrap(); + txn.commit().unwrap(); + + Self { env, database } + } + + pub fn commit(&self, database_id: DatabaseId, frame_no: FrameNo) { + let mut txn = self.env.write_txn().unwrap(); + self.database + .put(&mut txn, &database_id, &frame_no) + .unwrap(); + txn.commit().unwrap(); + } + + pub fn get_commit_index(&self, database_id: DatabaseId) -> Option { + let txn = self.env.read_txn().unwrap(); + self.database.get(&txn, &database_id).unwrap() + } +} diff --git a/libsqlx/src/database/libsql/injector/hook.rs b/libsqlx/src/database/libsql/injector/hook.rs index 2cb5348d..57c34fbc 100644 --- a/libsqlx/src/database/libsql/injector/hook.rs +++ b/libsqlx/src/database/libsql/injector/hook.rs @@ -9,7 +9,7 @@ use crate::database::frame::FrameBorrowed; use crate::database::libsql::replication_log::WAL_PAGE_SIZE; use super::headers::Headers; -use super::{FrameBuffer, InjectorCommitHandler}; +use super::{FrameBuffer, OnCommitCb}; // Those are custom error codes returned by the replicator hook. pub const LIBSQL_INJECT_FATAL: c_int = 200; @@ -23,15 +23,15 @@ pub struct InjectorHookCtx { buffer: FrameBuffer, /// currently in a txn is_txn: bool, - commit_handler: Box, + on_commit_cb: OnCommitCb, } impl InjectorHookCtx { - pub fn new(buffer: FrameBuffer, commit_handler: Box) -> Self { + pub fn new(buffer: FrameBuffer, commit_handler: OnCommitCb) -> Self { Self { buffer, is_txn: false, - commit_handler, + on_commit_cb: commit_handler, } } @@ -45,9 +45,6 @@ impl InjectorHookCtx { let buffer = self.buffer.lock(); let (mut headers, last_frame_no, size_after) = make_page_header(buffer.iter().map(|f| &**f)); - if size_after != 0 { - self.commit_handler.pre_commit(last_frame_no)?; - } let ret = unsafe { orig( @@ -64,7 +61,7 @@ impl InjectorHookCtx { debug_assert!(headers.all_applied()); drop(headers); if size_after != 0 { - self.commit_handler.post_commit(last_frame_no)?; + (self.on_commit_cb)(last_frame_no); self.is_txn = false; } tracing::trace!("applied frame batch"); diff --git a/libsqlx/src/database/libsql/injector/mod.rs b/libsqlx/src/database/libsql/injector/mod.rs index 7580bd5f..cbc9dc80 100644 --- a/libsqlx/src/database/libsql/injector/mod.rs +++ b/libsqlx/src/database/libsql/injector/mod.rs @@ -18,6 +18,7 @@ mod headers; mod hook; pub type FrameBuffer = Arc>>; +pub type OnCommitCb = Arc; pub struct Injector { /// The injector is in a transaction state @@ -48,39 +49,15 @@ impl crate::database::Injector for Injector { /// This trait trait is used to record the last committed frame_no to the log. /// The implementer can persist the pre and post commit frame no, and compare them in the event of /// a crash; if the pre and post commit frame_no don't match, then the log may be corrupted. -pub trait InjectorCommitHandler: Send + Sync + 'static { - fn pre_commit(&mut self, frame_no: FrameNo) -> anyhow::Result<()>; - fn post_commit(&mut self, frame_no: FrameNo) -> anyhow::Result<()>; -} - -impl InjectorCommitHandler for Box { - fn pre_commit(&mut self, frame_no: FrameNo) -> anyhow::Result<()> { - self.as_mut().pre_commit(frame_no) - } - - fn post_commit(&mut self, frame_no: FrameNo) -> anyhow::Result<()> { - self.as_mut().post_commit(frame_no) - } -} - -impl InjectorCommitHandler for () { - fn pre_commit(&mut self, _frame_no: FrameNo) -> anyhow::Result<()> { - Ok(()) - } - - fn post_commit(&mut self, _frame_no: FrameNo) -> anyhow::Result<()> { - Ok(()) - } -} impl Injector { pub fn new( path: &Path, - injector_commit_handler: Box, + on_commit_cb: OnCommitCb, buffer_capacity: usize, ) -> crate::Result { let buffer = FrameBuffer::default(); - let ctx = InjectorHookCtx::new(buffer.clone(), injector_commit_handler); + let ctx = InjectorHookCtx::new(buffer.clone(), on_commit_cb); let mut ctx = Box::new(ctx); let connection = sqld_libsql_bindings::Connection::open( path, @@ -172,6 +149,7 @@ impl Injector { #[cfg(test)] mod test { use std::path::PathBuf; + use std::sync::Arc; use crate::database::libsql::injector::Injector; use crate::database::libsql::replication_log::logger::LogFile; @@ -181,7 +159,7 @@ mod test { let log = LogFile::new(PathBuf::from("assets/test/simple_wallog")).unwrap(); let temp = tempfile::tempdir().unwrap(); - let mut injector = Injector::new(temp.path(), Box::new(()), 10).unwrap(); + let mut injector = Injector::new(temp.path(), Arc::new(|_| ()), 10).unwrap(); for frame in log.frames_iter().unwrap() { let frame = frame.unwrap(); injector.inject_frame(frame).unwrap(); @@ -202,7 +180,7 @@ mod test { let temp = tempfile::tempdir().unwrap(); // inject one frame at a time - let mut injector = Injector::new(temp.path(), Box::new(()), 1).unwrap(); + let mut injector = Injector::new(temp.path(), Arc::new(|_| ()), 1).unwrap(); for frame in log.frames_iter().unwrap() { let frame = frame.unwrap(); injector.inject_frame(frame).unwrap(); @@ -223,7 +201,7 @@ mod test { let temp = tempfile::tempdir().unwrap(); // inject one frame at a time - let mut injector = Injector::new(temp.path(), Box::new(()), 10).unwrap(); + let mut injector = Injector::new(temp.path(), Arc::new(|_| ()), 10).unwrap(); let mut iter = log.frames_iter().unwrap(); assert!(injector diff --git a/libsqlx/src/database/libsql/mod.rs b/libsqlx/src/database/libsql/mod.rs index a42bfdc7..9582cf2c 100644 --- a/libsqlx/src/database/libsql/mod.rs +++ b/libsqlx/src/database/libsql/mod.rs @@ -14,7 +14,7 @@ use replication_log::logger::{ ReplicationLogger, ReplicationLoggerHook, ReplicationLoggerHookCtx, REPLICATION_METHODS, }; -use self::injector::InjectorCommitHandler; +use self::injector::OnCommitCb; use self::replication_log::logger::FrameNotifierCb; pub use connection::LibsqlConnection; @@ -44,7 +44,7 @@ impl LibsqlDbType for PrimaryType { } pub struct ReplicaType { - commit_handler: Option>, + on_commit_cb: OnCommitCb, injector_buffer_capacity: usize, } @@ -101,10 +101,10 @@ impl LibsqlDatabase { pub fn new_replica( db_path: PathBuf, injector_buffer_capacity: usize, - injector_commit_handler: impl InjectorCommitHandler, + on_commit_cb: OnCommitCb, ) -> crate::Result { let ty = ReplicaType { - commit_handler: Some(Box::new(injector_commit_handler)), + on_commit_cb, injector_buffer_capacity, }; @@ -185,10 +185,9 @@ impl Database for LibsqlDatabase { impl InjectableDatabase for LibsqlDatabase { fn injector(&mut self) -> crate::Result> { - let Some(commit_handler) = self.ty.commit_handler.take() else { panic!("there can be only one injector") }; Ok(Box::new(Injector::new( &self.db_path, - commit_handler, + self.ty.on_commit_cb.clone(), self.ty.injector_buffer_capacity, )?)) } @@ -226,7 +225,7 @@ mod test { fn inject_libsql_db() { let temp = tempfile::tempdir().unwrap(); let replica = ReplicaType { - commit_handler: Some(Box::new(())), + on_commit_cb: Arc::new(|_| ()), injector_buffer_capacity: 10, }; let mut db = LibsqlDatabase::new(temp.path().to_path_buf(), replica); @@ -269,7 +268,7 @@ mod test { let mut replica = LibsqlDatabase::new( temp_replica.path().to_path_buf(), ReplicaType { - commit_handler: Some(Box::new(())), + on_commit_cb: Arc::new(|_| ()), injector_buffer_capacity: 10, }, ); diff --git a/libsqlx/src/database/libsql/replication_log/logger.rs b/libsqlx/src/database/libsql/replication_log/logger.rs index d12af50d..38112c37 100644 --- a/libsqlx/src/database/libsql/replication_log/logger.rs +++ b/libsqlx/src/database/libsql/replication_log/logger.rs @@ -759,7 +759,6 @@ impl LogCompactor for () { pub type FrameNotifierCb = Box; pub struct ReplicationLogger { - pub generation: Generation, pub log_file: RwLock, compactor: Box>, /// a notifier channel other tasks can subscribe to, and get notified when new frames become @@ -807,15 +806,17 @@ impl ReplicationLogger { compactor: impl LogCompactor, new_frame_notifier: FrameNotifierCb, ) -> crate::Result { - let header = log_file.header(); - let generation_start_frame_no = header.start_frame_no + header.frame_count; - - Ok(Self { - generation: Generation::new(generation_start_frame_no), + let this = Self { compactor: Box::new(Mutex::new(compactor)), log_file: RwLock::new(log_file), new_frame_notifier, - }) + }; + + if let Some(last_frame) = this.log_file.read().last_commited_frame_no() { + (this.new_frame_notifier)(last_frame); + } + + Ok(this) } fn recover(