From 0f11e308fea8626d7c9bb85c3953be85d34d721f Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 26 Jul 2023 10:04:42 +0200 Subject: [PATCH 1/9] compaction queue --- Cargo.lock | 12 +- libsqlx-server/Cargo.toml | 6 +- libsqlx-server/src/allocation/mod.rs | 39 ++- .../src/allocation/primary/compactor.rs | 38 ++- libsqlx-server/src/compactor.rs | 224 ++++++++++++++++++ libsqlx-server/src/http/admin.rs | 2 +- libsqlx-server/src/main.rs | 38 ++- libsqlx-server/src/manager.rs | 17 +- libsqlx-server/src/meta.rs | 8 +- libsqlx-server/src/snapshot_store.rs | 75 ++++++ libsqlx/src/database/frame.rs | 7 +- .../database/libsql/replication_log/logger.rs | 110 ++++----- .../libsql/replication_log/snapshot.rs | 6 +- libsqlx/src/error.rs | 2 + 14 files changed, 494 insertions(+), 90 deletions(-) create mode 100644 libsqlx-server/src/compactor.rs create mode 100644 libsqlx-server/src/snapshot_store.rs diff --git a/Cargo.lock b/Cargo.lock index 94c80f4a..a46f82e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2030,8 +2030,7 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "heed" version = "0.20.0-alpha.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ae92e0d788e4b5608cddd89bec1128ad9f424e365025bd2454915aab450d7a2" +source = "git+https://github.com/MarinPostma/heed.git?rev=2ae9a14#2ae9a14ce2270118e23f069ba6999212353d94aa" dependencies = [ "bytemuck", "byteorder", @@ -2049,14 +2048,12 @@ dependencies = [ [[package]] name = "heed-traits" version = "0.20.0-alpha.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44055e6d049fb62b58671059045fe4a8a083d78ef04347818cc8a87a62d6fa1f" +source = "git+https://github.com/MarinPostma/heed.git?rev=2ae9a14#2ae9a14ce2270118e23f069ba6999212353d94aa" [[package]] name = "heed-types" version = "0.20.0-alpha.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d419e64e429f0bbe6d8ef3507bf137105c8cebee0e5fba59cf556de93c8ab57" +source = "git+https://github.com/MarinPostma/heed.git?rev=2ae9a14#2ae9a14ce2270118e23f069ba6999212353d94aa" dependencies = [ "bincode", "bytemuck", @@ -2645,8 +2642,7 @@ checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" [[package]] name = "lmdb-master-sys" version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "629c123f5321b48fa4f8f4d3b868165b748d9ba79c7103fb58e3a94f736bcedd" +source = "git+https://github.com/MarinPostma/heed.git?rev=2ae9a14#2ae9a14ce2270118e23f069ba6999212353d94aa" dependencies = [ "cc", "doxygen-rs", diff --git a/libsqlx-server/Cargo.toml b/libsqlx-server/Cargo.toml index 89b34c74..b925732a 100644 --- a/libsqlx-server/Cargo.toml +++ b/libsqlx-server/Cargo.toml @@ -18,8 +18,10 @@ clap = { version = "4.3.11", features = ["derive"] } color-eyre = "0.6.2" either = "1.8.1" futures = "0.3.28" -heed = { version = "0.20.0-alpha.3", features = ["serde-bincode"] } -heed-types = "0.20.0-alpha.3" +# heed = { version = "0.20.0-alpha.3", features = ["serde-bincode", "sync-read-txn"] } +heed = { git = "https://github.com/MarinPostma/heed.git", rev = "2ae9a14", features = ["serde-bincode", "sync-read-txn"] } +heed-types = { git = "https://github.com/MarinPostma/heed.git", rev = "2ae9a14" } +# heed-types = "0.20.0-alpha.3" hmac = "0.12.1" humantime = "2.1.0" hyper = { version = "0.14.27", features = ["h2", "server"] } diff --git a/libsqlx-server/src/allocation/mod.rs b/libsqlx-server/src/allocation/mod.rs index fbc4049f..6b3e15d7 100644 --- a/libsqlx-server/src/allocation/mod.rs +++ b/libsqlx-server/src/allocation/mod.rs @@ -17,6 +17,7 @@ use tokio::task::{block_in_place, JoinSet}; use tokio::time::Interval; use crate::allocation::primary::FrameStreamer; +use crate::compactor::CompactionQueue; use crate::hrana; use crate::hrana::http::handle_pipeline; use crate::hrana::http::proto::{PipelineRequestBody, PipelineResponseBody}; @@ -70,10 +71,12 @@ impl Database { } = self { ready!(interval.poll_tick(cx)); + tracing::debug!("attempting periodic log compaction"); let db = db.db.clone(); tokio::task::spawn_blocking(move || { db.compact_log(); }); + return Poll::Ready(()) } Poll::Pending @@ -81,7 +84,14 @@ impl Database { } impl Database { - pub fn from_config(config: &AllocConfig, path: PathBuf, dispatcher: Arc) -> Self { + pub fn from_config( + config: &AllocConfig, + path: PathBuf, + dispatcher: Arc, + compaction_queue: Arc, + ) -> Self { + let database_id = DatabaseId::from_name(&config.db_name); + match config.db_config { DbConfig::Primary { max_log_size, @@ -90,7 +100,12 @@ impl Database { let (sender, receiver) = tokio::sync::watch::channel(0); let db = LibsqlDatabase::new_primary( path, - Compactor::new(max_log_size, replication_log_compact_interval), + Compactor::new( + max_log_size, + replication_log_compact_interval, + compaction_queue, + database_id, + ), false, Box::new(move |fno| { let _ = sender.send(fno); @@ -98,13 +113,19 @@ impl Database { ) .unwrap(); + let compact_interval = replication_log_compact_interval.map(|d| { + let mut i = tokio::time::interval(d / 2); + i.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + Box::pin(i) + }); + Self::Primary { db: PrimaryDatabase { db: Arc::new(db), replica_streams: HashMap::new(), frame_notifier: receiver, }, - compact_interval: None, + compact_interval, } } DbConfig::Replica { @@ -119,7 +140,6 @@ impl Database { let mut db = WriteProxyDatabase::new(rdb, wdb, Arc::new(|_| ())); let injector = db.injector().unwrap(); let (sender, receiver) = mpsc::channel(16); - let database_id = DatabaseId::from_name(&config.db_name); let replicator = Replicator::new( dispatcher, @@ -208,9 +228,10 @@ impl ConnectionHandle { impl Allocation { pub async fn run(mut self) { loop { + dbg!(); let fut = poll_fn(|cx| self.database.poll(cx)); tokio::select! { - _ = fut => (), + _ = fut => dbg!(), Some(msg) = self.inbox.recv() => { match msg { AllocationMessage::HranaPipelineReq { req, ret } => { @@ -225,12 +246,16 @@ impl Allocation { } } }, - maybe_id = self.connections_futs.join_next() => { + maybe_id = self.connections_futs.join_next(), if !self.connections_futs.is_empty() => { + dbg!(); if let Some(Ok(_id)) = maybe_id { // self.connections.remove_entry(&id); } }, - else => break, + else => { + dbg!(); + break + }, } } } diff --git a/libsqlx-server/src/allocation/primary/compactor.rs b/libsqlx-server/src/allocation/primary/compactor.rs index 62b9c0ed..5bc4c9a3 100644 --- a/libsqlx-server/src/allocation/primary/compactor.rs +++ b/libsqlx-server/src/allocation/primary/compactor.rs @@ -1,19 +1,38 @@ -use std::time::{Duration, Instant}; +use std::{ + path::PathBuf, + sync::Arc, + time::{Duration, Instant}, +}; use libsqlx::libsql::{LogCompactor, LogFile}; +use uuid::Uuid; + +use crate::{ + compactor::{CompactionJob, CompactionQueue}, + meta::DatabaseId, +}; pub struct Compactor { max_log_size: usize, last_compacted_at: Instant, compact_interval: Option, + queue: Arc, + database_id: DatabaseId, } impl Compactor { - pub fn new(max_log_size: usize, compact_interval: Option) -> Self { + pub fn new( + max_log_size: usize, + compact_interval: Option, + queue: Arc, + database_id: DatabaseId, + ) -> Self { Self { max_log_size, last_compacted_at: Instant::now(), compact_interval, + queue, + database_id, } } } @@ -32,11 +51,18 @@ impl LogCompactor for Compactor { fn compact( &mut self, - _log: LogFile, - _path: std::path::PathBuf, - _size_after: u32, + log_id: Uuid, ) -> Result<(), Box> { self.last_compacted_at = Instant::now(); - todo!() + self.queue.push(&CompactionJob { + database_id: self.database_id, + log_id, + }); + + Ok(()) + } + + fn snapshot_dir(&self) -> PathBuf { + self.queue.snapshot_queue_dir() } } diff --git a/libsqlx-server/src/compactor.rs b/libsqlx-server/src/compactor.rs new file mode 100644 index 00000000..22f31c6a --- /dev/null +++ b/libsqlx-server/src/compactor.rs @@ -0,0 +1,224 @@ +use std::io::{BufWriter, Write}; +use std::mem::size_of; +use std::os::unix::prelude::FileExt; +use std::path::{Path, PathBuf}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +use bytemuck::{bytes_of, Pod, Zeroable}; +use heed::byteorder::BigEndian; +use heed_types::{SerdeBincode, U64}; +use libsqlx::libsql::LogFile; +use libsqlx::{Frame, FrameNo}; +use serde::{Deserialize, Serialize}; +use tempfile::NamedTempFile; +use tokio::sync::watch; +use tokio::task::block_in_place; +use uuid::Uuid; + +use crate::meta::DatabaseId; +use crate::snapshot_store::SnapshotStore; + +#[derive(Debug, Serialize, Deserialize)] +pub struct CompactionJob { + /// Id of the database whose log needs to be compacted + pub database_id: DatabaseId, + /// path to the log to compact + pub log_id: Uuid, +} + +pub struct CompactionQueue { + env: heed::Env, + queue: heed::Database, SerdeBincode>, + next_id: AtomicU64, + notify: watch::Sender>, + db_path: PathBuf, + snapshot_store: Arc, +} + +impl CompactionQueue { + const COMPACTION_QUEUE_DB_NAME: &str = "compaction_queue_db"; + pub fn new( + env: heed::Env, + db_path: PathBuf, + snapshot_store: Arc, + ) -> color_eyre::Result { + let mut txn = env.write_txn()?; + let queue = env.create_database(&mut txn, Some(Self::COMPACTION_QUEUE_DB_NAME))?; + let next_id = match queue.last(&mut txn)? { + Some((id, _)) => id + 1, + None => 0, + }; + txn.commit()?; + + let (notify, _) = watch::channel((next_id > 0).then(|| next_id - 1)); + Ok(Self { + env, + queue, + next_id: next_id.into(), + notify, + db_path, + snapshot_store, + }) + } + + pub fn push(&self, job: &CompactionJob) { + tracing::debug!("new compaction job available: {job:?}"); + let mut txn = self.env.write_txn().unwrap(); + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + self.queue.put(&mut txn, &id, job).unwrap(); + txn.commit().unwrap(); + self.notify.send_replace(Some(id)); + } + + pub async fn peek(&self) -> (u64, CompactionJob) { + let id = self.next_id.load(Ordering::Relaxed); + let txn = block_in_place(|| self.env.read_txn().unwrap()); + match block_in_place(|| self.queue.first(&txn).unwrap()) { + Some(job) => job, + None => { + drop(txn); + self.notify + .subscribe() + .wait_for(|x| x.map(|x| x >= id).unwrap_or_default()) + .await + .unwrap(); + block_in_place(|| { + let txn = self.env.read_txn().unwrap(); + self.queue.first(&txn).unwrap().unwrap() + }) + } + } + } + + fn complete(&self, txn: &mut heed::RwTxn, job_id: u64) { + block_in_place(|| { + self.queue.delete(txn, &job_id).unwrap(); + }); + } + + async fn compact(&self) -> color_eyre::Result<()> { + let (job_id, job) = self.peek().await; + tracing::debug!("starting new compaction job: {job:?}"); + let to_compact_path = self.snapshot_queue_dir().join(job.log_id.to_string()); + let (snapshot_id, start_fno, end_fno) = tokio::task::spawn_blocking({ + let to_compact_path = to_compact_path.clone(); + let db_path = self.db_path.clone(); + move || { + let mut builder = SnapshotBuilder::new(&db_path, job.database_id)?; + let log = LogFile::new(to_compact_path)?; + for frame in log.rev_deduped() { + let frame = frame?; + builder.push_frame(frame)?; + } + builder.finish() + } + }) + .await??; + + let mut txn = self.env.write_txn()?; + self.complete(&mut txn, job_id); + self.snapshot_store + .register(&mut txn, job.database_id, start_fno, end_fno, snapshot_id); + txn.commit()?; + + std::fs::remove_file(to_compact_path)?; + + Ok(()) + } + + pub fn snapshot_queue_dir(&self) -> PathBuf { + self.db_path.join("snapshot_queue") + } +} + +pub async fn run_compactor_loop(compactor: Arc) -> color_eyre::Result<()> { + loop { + compactor.compact().await?; + } +} + +#[derive(Debug, Copy, Clone, Zeroable, Pod, PartialEq, Eq)] +#[repr(C)] +/// header of a snapshot file +pub struct SnapshotFileHeader { + /// id of the database + pub db_id: DatabaseId, + /// first frame in the snapshot + pub start_frame_no: u64, + /// end frame in the snapshot + pub end_frame_no: u64, + /// number of frames in the snapshot + pub frame_count: u64, + /// safe of the database after applying the snapshot + pub size_after: u32, + pub _pad: u32, +} + +/// An utility to build a snapshots from log frames +pub struct SnapshotBuilder { + pub header: SnapshotFileHeader, + snapshot_file: BufWriter, + db_path: PathBuf, + last_seen_frame_no: u64, +} + +impl SnapshotBuilder { + pub fn new(db_path: &Path, db_id: DatabaseId) -> color_eyre::Result { + let temp_dir = db_path.join("tmp"); + let mut target = BufWriter::new(NamedTempFile::new_in(&temp_dir)?); + // reserve header space + target.write_all(&[0; size_of::()])?; + + Ok(Self { + header: SnapshotFileHeader { + db_id, + start_frame_no: u64::MAX, + end_frame_no: u64::MIN, + frame_count: 0, + size_after: 0, + _pad: 0, + }, + snapshot_file: target, + db_path: db_path.to_path_buf(), + last_seen_frame_no: u64::MAX, + }) + } + + pub fn push_frame(&mut self, frame: Frame) -> color_eyre::Result<()> { + assert!(frame.header().frame_no < self.last_seen_frame_no); + self.last_seen_frame_no = frame.header().frame_no; + if frame.header().frame_no < self.header.start_frame_no { + self.header.start_frame_no = frame.header().frame_no; + } + + if frame.header().frame_no > self.header.end_frame_no { + self.header.end_frame_no = frame.header().frame_no; + self.header.size_after = frame.header().size_after; + } + + self.snapshot_file.write_all(frame.as_slice())?; + self.header.frame_count += 1; + + Ok(()) + } + + /// Persist the snapshot, and returns the name and size is frame on the snapshot. + pub fn finish(mut self) -> color_eyre::Result<(Uuid, FrameNo, FrameNo)> { + self.snapshot_file.flush()?; + let file = self.snapshot_file.into_inner()?; + file.as_file().write_all_at(bytes_of(&self.header), 0)?; + let snapshot_id = Uuid::new_v4(); + + let path = self.db_path.join("snapshots").join(snapshot_id.to_string()); + file.persist(path)?; + + Ok(( + snapshot_id, + self.header.start_frame_no, + self.header.end_frame_no, + )) + } +} diff --git a/libsqlx-server/src/http/admin.rs b/libsqlx-server/src/http/admin.rs index ac5c9ede..9b51b7ed 100644 --- a/libsqlx-server/src/http/admin.rs +++ b/libsqlx-server/src/http/admin.rs @@ -57,7 +57,7 @@ struct AllocateReq { } #[derive(Debug, Deserialize)] -#[serde(tag = "type", rename_all = "snake_case")] +#[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)] pub struct Primary { /// The maximum size the replication is allowed to grow. Expects a string like 200mb. #[serde(default = "default_max_log_size")] diff --git a/libsqlx-server/src/main.rs b/libsqlx-server/src/main.rs index 5a9e26da..b1856e9e 100644 --- a/libsqlx-server/src/main.rs +++ b/libsqlx-server/src/main.rs @@ -1,9 +1,10 @@ use std::fs::read_to_string; -use std::path::PathBuf; +use std::path::{PathBuf, Path}; use std::sync::Arc; use clap::Parser; use color_eyre::eyre::Result; +use compactor::{CompactionQueue, run_compactor_loop}; use config::{AdminApiConfig, ClusterConfig, UserApiConfig}; use http::admin::run_admin_api; use http::user::run_user_api; @@ -11,12 +12,15 @@ use hyper::server::conn::AddrIncoming; use linc::bus::Bus; use manager::Manager; use meta::Store; +use snapshot_store::SnapshotStore; +use tokio::fs::create_dir_all; use tokio::net::{TcpListener, TcpStream}; use tokio::task::JoinSet; use tracing::metadata::LevelFilter; use tracing_subscriber::prelude::*; mod allocation; +mod compactor; mod config; mod database; mod hrana; @@ -24,6 +28,7 @@ mod http; mod linc; mod manager; mod meta; +mod snapshot_store; #[derive(Debug, Parser)] struct Args { @@ -83,6 +88,17 @@ async fn spawn_cluster_networking( Ok(()) } +async fn init_dirs(db_path: &Path) -> color_eyre::Result<()> { + create_dir_all(&db_path).await?; + create_dir_all(db_path.join("tmp")).await?; + create_dir_all(db_path.join("snapshot_queue")).await?; + create_dir_all(db_path.join("snapshots")).await?; + create_dir_all(db_path.join("dbs")).await?; + create_dir_all(db_path.join("meta")).await?; + + Ok(()) +} + #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> Result<()> { init(); @@ -93,17 +109,29 @@ async fn main() -> Result<()> { let mut join_set = JoinSet::new(); + init_dirs(&config.db_path).await?; - let meta_path = config.db_path.join("meta"); - tokio::fs::create_dir_all(&meta_path).await?; let env = heed::EnvOpenOptions::new() .max_dbs(1000) .map_size(100 * 1024 * 1024) - .open(meta_path)?; + .open(config.db_path.join("meta"))?; + + let snapshot_store = Arc::new(SnapshotStore::new(config.db_path.clone(), &env)?); + let compaction_queue = Arc::new(CompactionQueue::new( + env.clone(), + config.db_path.clone(), + snapshot_store, + )?); let store = Arc::new(Store::new(env.clone())); - let manager = Arc::new(Manager::new(config.db_path.clone(), store.clone(), 100)); + let manager = Arc::new(Manager::new( + config.db_path.clone(), + store.clone(), + 100, + compaction_queue.clone(), + )); let bus = Arc::new(Bus::new(config.cluster.id, manager.clone())); + join_set.spawn(run_compactor_loop(compaction_queue)); spawn_cluster_networking(&mut join_set, &config.cluster, bus.clone()).await?; spawn_admin_api(&mut join_set, &config.admin_api, bus.clone()).await?; spawn_user_api(&mut join_set, &config.user_api, manager, bus).await?; diff --git a/libsqlx-server/src/manager.rs b/libsqlx-server/src/manager.rs index fb44414d..a3bb68d8 100644 --- a/libsqlx-server/src/manager.rs +++ b/libsqlx-server/src/manager.rs @@ -8,6 +8,7 @@ use tokio::task::JoinSet; use crate::allocation::config::AllocConfig; use crate::allocation::{Allocation, AllocationMessage, Database}; +use crate::compactor::CompactionQueue; use crate::hrana; use crate::linc::bus::Dispatch; use crate::linc::handler::Handler; @@ -18,16 +19,23 @@ pub struct Manager { cache: Cache>, meta_store: Arc, db_path: PathBuf, + compaction_queue: Arc, } const MAX_ALLOC_MESSAGE_QUEUE_LEN: usize = 32; impl Manager { - pub fn new(db_path: PathBuf, meta_store: Arc, max_conccurent_allocs: u64) -> Self { + pub fn new( + db_path: PathBuf, + meta_store: Arc, + max_conccurent_allocs: u64, + compaction_queue: Arc, + ) -> Self { Self { cache: Cache::new(max_conccurent_allocs), meta_store, db_path, + compaction_queue, } } @@ -47,7 +55,12 @@ impl Manager { let (alloc_sender, inbox) = mpsc::channel(MAX_ALLOC_MESSAGE_QUEUE_LEN); let alloc = Allocation { inbox, - database: Database::from_config(&config, path, dispatcher.clone()), + database: Database::from_config( + &config, + path, + dispatcher.clone(), + self.compaction_queue.clone(), + ), connections_futs: JoinSet::new(), next_conn_id: 0, max_concurrent_connections: config.max_conccurent_connection, diff --git a/libsqlx-server/src/meta.rs b/libsqlx-server/src/meta.rs index 84a8856c..38a1c30b 100644 --- a/libsqlx-server/src/meta.rs +++ b/libsqlx-server/src/meta.rs @@ -1,7 +1,8 @@ use std::fmt; +use std::mem::size_of; use heed::bytemuck::{Pod, Zeroable}; -use heed::types::{SerdeBincode, OwnedType}; +use heed_types::{OwnedType, SerdeBincode}; use serde::{Deserialize, Serialize}; use sha3::digest::{ExtendableOutput, Update, XofReader}; use sha3::Shake128; @@ -30,6 +31,11 @@ impl DatabaseId { Self(out) } + pub fn from_bytes(bytes: &[u8]) -> Self { + assert_eq!(bytes.len(), size_of::()); + Self(bytes.try_into().unwrap()) + } + #[cfg(test)] pub fn random() -> Self { Self(uuid::Uuid::new_v4().into_bytes()) diff --git a/libsqlx-server/src/snapshot_store.rs b/libsqlx-server/src/snapshot_store.rs new file mode 100644 index 00000000..a8803711 --- /dev/null +++ b/libsqlx-server/src/snapshot_store.rs @@ -0,0 +1,75 @@ +use std::mem::size_of; +use std::path::PathBuf; + +use bytemuck::{Pod, Zeroable}; +use heed_types::{CowType, SerdeBincode}; +use libsqlx::FrameNo; +use serde::Serialize; +use uuid::Uuid; + +use crate::meta::DatabaseId; + +#[derive(Clone, Copy, Zeroable, Pod, Debug)] +#[repr(transparent)] +struct BEU64([u8; size_of::()]); + +impl From for BEU64 { + fn from(value: u64) -> Self { + Self(value.to_be_bytes()) + } +} + +impl From for u64 { + fn from(value: BEU64) -> Self { + u64::from_be_bytes(value.0) + } +} + +#[derive(Clone, Copy, Zeroable, Pod, Debug)] +#[repr(C)] +struct SnapshotKey { + database_id: DatabaseId, + start_frame_no: BEU64, + end_frame_no: FrameNo, +} + +#[derive(Debug, Serialize)] +struct SnapshotMeta { + snapshot_id: Uuid, +} + +pub struct SnapshotStore { + database: heed::Database, SerdeBincode>, + db_path: PathBuf, +} + +impl SnapshotStore { + const SNAPSHOT_STORE_NAME: &str = "snapshot-store-db"; + + pub fn new(db_path: PathBuf, env: &heed::Env) -> color_eyre::Result { + let mut txn = env.write_txn().unwrap(); + let database = env.create_database(&mut txn, Some(Self::SNAPSHOT_STORE_NAME))?; + txn.commit()?; + + Ok(Self { database, db_path }) + } + + pub fn register( + &self, + txn: &mut heed::RwTxn, + database_id: DatabaseId, + start_frame_no: FrameNo, + end_frame_no: FrameNo, + snapshot_id: Uuid, + ) { + let key = SnapshotKey { + database_id, + start_frame_no: start_frame_no.into(), + end_frame_no: end_frame_no.into(), + }; + + let data = SnapshotMeta { snapshot_id }; + + self.database.put(txn, &key, &data).unwrap(); + } +} diff --git a/libsqlx/src/database/frame.rs b/libsqlx/src/database/frame.rs index 337853fb..ba2d638d 100644 --- a/libsqlx/src/database/frame.rs +++ b/libsqlx/src/database/frame.rs @@ -55,8 +55,11 @@ impl Frame { Self { data: buf.freeze() } } - pub fn try_from_bytes(data: Bytes) -> anyhow::Result { - anyhow::ensure!(data.len() == Self::SIZE, "invalid frame size"); + pub fn try_from_bytes(data: Bytes) -> crate::Result { + if data.len() != Self::SIZE { + return Err(crate::error::Error::InvalidFrame); + } + Ok(Self { data }) } diff --git a/libsqlx/src/database/libsql/replication_log/logger.rs b/libsqlx/src/database/libsql/replication_log/logger.rs index 48d3916e..96520ef8 100644 --- a/libsqlx/src/database/libsql/replication_log/logger.rs +++ b/libsqlx/src/database/libsql/replication_log/logger.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::ffi::{c_int, c_void, CStr}; use std::fs::{remove_dir_all, File, OpenOptions}; use std::io::Write; @@ -147,7 +148,7 @@ unsafe impl WalHook for ReplicationLoggerHook { .logger .log_file .write() - .maybe_compact(&mut *ctx.logger.compactor.lock(), &ctx.logger.db_path) + .maybe_compact(&mut *ctx.logger.compactor.lock()) { tracing::error!("fatal error: {e}, exiting"); std::process::abort() @@ -345,6 +346,8 @@ impl ReplicationLoggerHookCtx { #[derive(Debug)] pub struct LogFile { file: File, + /// Path of the LogFile + path: PathBuf, pub header: LogFileHeader, /// number of frames in the log that have not been commited yet. On commit the header's frame /// count is incremented by that ammount. New pages are written after the last @@ -365,16 +368,22 @@ pub enum LogReadError { #[error("requested entry is ahead of log")] Ahead, #[error(transparent)] - Error(#[from] anyhow::Error), + Error(#[from] crate::error::Error), } impl LogFile { /// size of a single frame pub const FRAME_SIZE: usize = size_of::() + WAL_PAGE_SIZE as usize; - pub fn new(file: File) -> crate::Result { + pub fn new(path: PathBuf) -> crate::Result { // FIXME: we should probably take a lock on this file, to prevent anybody else to write to // it. + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&path)?; + let file_end = file.metadata()?.len(); if file_end == 0 { @@ -391,6 +400,7 @@ impl LogFile { }; let mut this = Self { + path, file, header, uncommitted_frame_count: 0, @@ -409,6 +419,7 @@ impl LogFile { uncommitted_frame_count: 0, uncommitted_checksum: 0, commited_checksum: 0, + path, }; if let Some(last_commited) = this.last_commited_frame_no() { @@ -467,7 +478,7 @@ impl LogFile { } /// Returns an iterator over the WAL frame headers - pub fn frames_iter(&self) -> anyhow::Result> + '_> { + pub fn frames_iter(&self) -> anyhow::Result> + '_> { let mut current_frame_offset = 0; Ok(std::iter::from_fn(move || { if current_frame_offset >= self.header.frame_count { @@ -480,12 +491,10 @@ impl LogFile { } /// Returns an iterator over the WAL frame headers - pub fn rev_frames_iter( - &self, - ) -> anyhow::Result> + '_> { + pub fn rev_frames_iter(&self) -> impl Iterator> + '_ { let mut current_frame_offset = self.header.frame_count; - Ok(std::iter::from_fn(move || { + std::iter::from_fn(move || { if current_frame_offset == 0 { return None; } @@ -493,7 +502,24 @@ impl LogFile { let read_byte_offset = Self::absolute_byte_offset(current_frame_offset); let frame = self.read_frame_byte_offset(read_byte_offset); Some(frame) - })) + }) + } + + /// Return a reversed iterator over the deduplicated frames in the log file. + pub fn rev_deduped(&self) -> impl Iterator> + '_ { + let mut iter = self.rev_frames_iter(); + let mut seen = HashSet::new(); + std::iter::from_fn(move || loop { + match iter.next()? { + Ok(frame) => { + if !seen.contains(&frame.header().page_no) { + seen.insert(frame.header().page_no); + return Some(Ok(frame)); + } + } + Err(e) => return Some(Err(e)), + } + }) } fn compute_checksum(&self, page: &WalPage) -> u64 { @@ -541,7 +567,7 @@ impl LogFile { std::mem::size_of::() as u64 + nth * Self::FRAME_SIZE as u64 } - fn byte_offset(&self, id: FrameNo) -> anyhow::Result> { + fn byte_offset(&self, id: FrameNo) -> crate::Result> { if id < self.header.start_frame_no || id > self.header.start_frame_no + self.header.frame_count { @@ -550,7 +576,7 @@ impl LogFile { Ok(Self::absolute_byte_offset(id - self.header.start_frame_no).into()) } - /// Returns bytes represening a WalFrame for frame `frame_no` + /// Returns bytes representing a WalFrame for frame `frame_no` /// /// If the requested frame is before the first frame in the log, or after the last frame, /// Ok(None) is returned. @@ -568,38 +594,26 @@ impl LogFile { Ok(frame) } - fn maybe_compact( - &mut self, - compactor: &mut dyn LogCompactor, - path: &Path, - ) -> anyhow::Result<()> { + fn maybe_compact(&mut self, compactor: &mut dyn LogCompactor) -> anyhow::Result<()> { if self.can_compact() && compactor.should_compact(self) { - return self.do_compaction(compactor, path); + return self.do_compaction(compactor); } Ok(()) } - fn do_compaction( - &mut self, - compactor: &mut dyn LogCompactor, - path: &Path, - ) -> anyhow::Result<()> { + fn do_compaction(&mut self, compactor: &mut dyn LogCompactor) -> anyhow::Result<()> { tracing::info!("performing log compaction"); - let temp_log_path = path.join("temp_log"); + let log_id = Uuid::new_v4(); + let temp_log_path = compactor.snapshot_dir().join(log_id.to_string()); let last_frame = self - .rev_frames_iter()? + .rev_frames_iter() .next() .expect("there should be at least one frame to perform compaction")?; let size_after = last_frame.header().size_after; assert!(size_after != 0); - let file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&temp_log_path)?; - let mut new_log_file = LogFile::new(file)?; + let mut new_log_file = LogFile::new(temp_log_path.clone())?; let new_header = LogFileHeader { start_frame_no: self.header.start_frame_no + self.header.frame_count, frame_count: 0, @@ -609,16 +623,15 @@ impl LogFile { new_log_file.header = new_header; new_log_file.write_header().unwrap(); // swap old and new snapshot - atomic_rename(&temp_log_path, path.join("wallog")).unwrap(); - let old_log_file = std::mem::replace(self, new_log_file); - compactor - .compact(old_log_file, temp_log_path, size_after) - .unwrap(); + atomic_rename(dbg!(&temp_log_path), dbg!(&self.path)).unwrap(); + std::mem::swap(&mut new_log_file.path, &mut self.path); + let _ = std::mem::replace(self, new_log_file); + compactor.compact(log_id).unwrap(); Ok(()) } - fn read_frame_byte_offset(&self, offset: u64) -> anyhow::Result { + fn read_frame_byte_offset(&self, offset: u64) -> crate::Result { let mut buffer = BytesMut::zeroed(LogFile::FRAME_SIZE); self.file.read_exact_at(&mut buffer, offset)?; let buffer = buffer.freeze(); @@ -637,7 +650,7 @@ impl LogFile { fn reset(self) -> crate::Result { // truncate file self.file.set_len(0)?; - Self::new(self.file) + Self::new(self.path) } /// return the size in bytes of the log @@ -744,19 +757,18 @@ pub trait LogCompactor: Sync + Send + 'static { /// Compact the given snapshot fn compact( &mut self, - log: LogFile, - path: PathBuf, - size_after: u32, + log_id: Uuid, ) -> Result<(), Box>; + + fn snapshot_dir(&self) -> PathBuf; } #[cfg(test)] impl LogCompactor for () { fn compact( &mut self, - _file: LogFile, - _path: PathBuf, - _size_after: u32, + log_name: String, + path: PathBuf, ) -> Result<(), Box> { Ok(()) } @@ -790,13 +802,7 @@ impl ReplicationLogger { let fresh = !log_path.exists(); - let file = OpenOptions::new() - .create(true) - .write(true) - .read(true) - .open(log_path)?; - - let log_file = LogFile::new(file)?; + let log_file = LogFile::new(log_path)?; let header = log_file.header(); let should_recover = if dirty { @@ -935,9 +941,7 @@ impl ReplicationLogger { pub fn compact(&self) { let mut log_file = self.log_file.write(); if log_file.can_compact() { - log_file - .do_compaction(&mut *self.compactor.lock(), &self.db_path) - .unwrap(); + log_file.do_compaction(&mut *self.compactor.lock()).unwrap(); } } } diff --git a/libsqlx/src/database/libsql/replication_log/snapshot.rs b/libsqlx/src/database/libsql/replication_log/snapshot.rs index c5f58ea3..b6bac186 100644 --- a/libsqlx/src/database/libsql/replication_log/snapshot.rs +++ b/libsqlx/src/database/libsql/replication_log/snapshot.rs @@ -118,7 +118,7 @@ impl SnapshotFile { } /// Iterator on the frames contained in the snapshot file, in reverse frame_no order. - pub fn frames_iter(&self) -> impl Iterator> + '_ { + pub fn frames_iter(&self) -> impl Iterator> + '_ { let mut current_offset = 0; std::iter::from_fn(move || { if current_offset >= self.header.frame_count { @@ -139,7 +139,7 @@ impl SnapshotFile { pub fn frames_iter_from( &self, frame_no: u64, - ) -> impl Iterator> + '_ { + ) -> impl Iterator> + '_ { let mut iter = self.frames_iter(); std::iter::from_fn(move || match iter.next() { Some(Ok(bytes)) => match Frame::try_from_bytes(bytes.clone()) { @@ -197,7 +197,7 @@ impl SnapshotBuilder { /// append frames to the snapshot. Frames must be in decreasing frame_no order. pub fn append_frames( &mut self, - frames: impl Iterator>, + frames: impl Iterator>, ) -> anyhow::Result<()> { // We iterate on the frames starting from the end of the log and working our way backward. We // make sure that only the most recent version of each file is present in the resulting diff --git a/libsqlx/src/error.rs b/libsqlx/src/error.rs index fd7828c1..091152b9 100644 --- a/libsqlx/src/error.rs +++ b/libsqlx/src/error.rs @@ -45,4 +45,6 @@ pub enum Error { }, #[error(transparent)] LexerError(#[from] sqlite3_parser::lexer::sql::Error), + #[error("invalid frame")] + InvalidFrame, } From 4617dc58b656a2add910d58e0c65929501fcd4fe Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 26 Jul 2023 14:53:12 +0200 Subject: [PATCH 2/9] add locate method to SnapshotStore --- libsqlx-server/src/snapshot_store.rs | 59 +++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/libsqlx-server/src/snapshot_store.rs b/libsqlx-server/src/snapshot_store.rs index a8803711..f4a008f2 100644 --- a/libsqlx-server/src/snapshot_store.rs +++ b/libsqlx-server/src/snapshot_store.rs @@ -2,9 +2,11 @@ use std::mem::size_of; use std::path::PathBuf; use bytemuck::{Pod, Zeroable}; -use heed_types::{CowType, SerdeBincode}; +use heed::BytesDecode; +use heed_types::{ByteSlice, CowType, SerdeBincode}; use libsqlx::FrameNo; -use serde::Serialize; +use serde::{Deserialize, Serialize}; +use tokio::task::block_in_place; use uuid::Uuid; use crate::meta::DatabaseId; @@ -30,15 +32,16 @@ impl From for u64 { struct SnapshotKey { database_id: DatabaseId, start_frame_no: BEU64, - end_frame_no: FrameNo, + end_frame_no: BEU64, } -#[derive(Debug, Serialize)] -struct SnapshotMeta { - snapshot_id: Uuid, +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct SnapshotMeta { + pub snapshot_id: Uuid, } pub struct SnapshotStore { + env: heed::Env, database: heed::Database, SerdeBincode>, db_path: PathBuf, } @@ -46,12 +49,16 @@ pub struct SnapshotStore { impl SnapshotStore { const SNAPSHOT_STORE_NAME: &str = "snapshot-store-db"; - pub fn new(db_path: PathBuf, env: &heed::Env) -> color_eyre::Result { + pub fn new(db_path: PathBuf, env: heed::Env) -> color_eyre::Result { let mut txn = env.write_txn().unwrap(); let database = env.create_database(&mut txn, Some(Self::SNAPSHOT_STORE_NAME))?; txn.commit()?; - Ok(Self { database, db_path }) + Ok(Self { + database, + db_path, + env, + }) } pub fn register( @@ -70,6 +77,40 @@ impl SnapshotStore { let data = SnapshotMeta { snapshot_id }; - self.database.put(txn, &key, &data).unwrap(); + block_in_place(|| self.database.put(txn, &key, &data).unwrap()); + } + + /// Locate a snapshot for `database_id` that contains `frame_no` + pub fn locate(&self, database_id: DatabaseId, frame_no: FrameNo) -> Option { + let txn = self.env.read_txn().unwrap(); + // Snapshot keys being lexicographically ordered, looking for the first key less than of + // equal to (db_id, frame_no, FrameNo::MAX) will always return the entry we're looking for + // if it exists. + let key = SnapshotKey { + database_id, + start_frame_no: frame_no.into(), + end_frame_no: u64::MAX.into(), + }; + + match self + .database + .get_lower_than_or_equal_to(&txn, &key) + .transpose()? + { + Ok((key, v)) => { + if key.database_id != database_id { + return None; + } else if frame_no >= key.start_frame_no.into() + && frame_no <= key.end_frame_no.into() + { + return Some(v); + } else { + None + } + } + Err(_) => todo!(), + } + } +} } } From 898974ea5140e472cbcbd8928fb85b393413abee Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 26 Jul 2023 14:53:33 +0200 Subject: [PATCH 3/9] add test for locate method --- libsqlx-server/src/snapshot_store.rs | 81 ++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/libsqlx-server/src/snapshot_store.rs b/libsqlx-server/src/snapshot_store.rs index f4a008f2..68610c10 100644 --- a/libsqlx-server/src/snapshot_store.rs +++ b/libsqlx-server/src/snapshot_store.rs @@ -112,5 +112,86 @@ impl SnapshotStore { } } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn insert_and_locate() { + let temp = tempfile::tempdir().unwrap(); + let env = heed::EnvOpenOptions::new() + .max_dbs(10) + .map_size(1000 * 4096) + .open(temp.path()) + .unwrap(); + let store = SnapshotStore::new(temp.path().to_path_buf(), env).unwrap(); + let mut txn = store.env.write_txn().unwrap(); + let db_id = DatabaseId::random(); + let snapshot_id = Uuid::new_v4(); + store.register(&mut txn, db_id, 0, 51, snapshot_id); + txn.commit().unwrap(); + + assert!(store.locate(db_id, 0).is_some()); + assert!(store.locate(db_id, 17).is_some()); + assert!(store.locate(db_id, 51).is_some()); + assert!(store.locate(db_id, 52).is_none()); + } + + #[test] + fn multiple_snapshots() { + let temp = tempfile::tempdir().unwrap(); + let env = heed::EnvOpenOptions::new() + .max_dbs(10) + .map_size(1000 * 4096) + .open(temp.path()) + .unwrap(); + let store = SnapshotStore::new(temp.path().to_path_buf(), env).unwrap(); + let mut txn = store.env.write_txn().unwrap(); + let db_id = DatabaseId::random(); + let snapshot_1_id = Uuid::new_v4(); + store.register(&mut txn, db_id, 0, 51, snapshot_1_id); + let snapshot_2_id = Uuid::new_v4(); + store.register(&mut txn, db_id, 52, 112, snapshot_2_id); + txn.commit().unwrap(); + + assert_eq!(store.locate(db_id, 0).unwrap().snapshot_id, snapshot_1_id); + assert_eq!(store.locate(db_id, 17).unwrap().snapshot_id, snapshot_1_id); + assert_eq!(store.locate(db_id, 51).unwrap().snapshot_id, snapshot_1_id); + assert_eq!(store.locate(db_id, 52).unwrap().snapshot_id, snapshot_2_id); + assert_eq!(store.locate(db_id, 100).unwrap().snapshot_id, snapshot_2_id); + assert_eq!(store.locate(db_id, 112).unwrap().snapshot_id, snapshot_2_id); + assert!(store.locate(db_id, 12345).is_none()); + } + + #[test] + fn multiple_databases() { + let temp = tempfile::tempdir().unwrap(); + let env = heed::EnvOpenOptions::new() + .max_dbs(10) + .map_size(1000 * 4096) + .open(temp.path()) + .unwrap(); + let store = SnapshotStore::new(temp.path().to_path_buf(), env).unwrap(); + let mut txn = store.env.write_txn().unwrap(); + let db_id1 = DatabaseId::random(); + let db_id2 = DatabaseId::random(); + let snapshot_id1 = Uuid::new_v4(); + let snapshot_id2 = Uuid::new_v4(); + store.register(&mut txn, db_id1, 0, 51, snapshot_id1); + store.register(&mut txn, db_id2, 0, 51, snapshot_id2); + txn.commit().unwrap(); + + assert_eq!(store.locate(db_id1, 0).unwrap().snapshot_id, snapshot_id1); + assert_eq!(store.locate(db_id2, 0).unwrap().snapshot_id, snapshot_id2); + + assert_eq!(store.locate(db_id1, 12).unwrap().snapshot_id, snapshot_id1); + assert_eq!(store.locate(db_id2, 18).unwrap().snapshot_id, snapshot_id2); + + assert_eq!(store.locate(db_id1, 51).unwrap().snapshot_id, snapshot_id1); + assert_eq!(store.locate(db_id2, 51).unwrap().snapshot_id, snapshot_id2); + + assert!(store.locate(db_id1, 52).is_none()); + assert!(store.locate(db_id2, 52).is_none()); } } From 992256838b4d1d2e2c424913a06dc3634652c5c6 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 26 Jul 2023 14:54:09 +0200 Subject: [PATCH 4/9] add compactor test --- libsqlx-server/src/compactor.rs | 155 +++++++++++++++++++++++++++++--- 1 file changed, 141 insertions(+), 14 deletions(-) diff --git a/libsqlx-server/src/compactor.rs b/libsqlx-server/src/compactor.rs index 22f31c6a..ac88a455 100644 --- a/libsqlx-server/src/compactor.rs +++ b/libsqlx-server/src/compactor.rs @@ -1,3 +1,4 @@ +use std::fs::File; use std::io::{BufWriter, Write}; use std::mem::size_of; use std::os::unix::prelude::FileExt; @@ -7,7 +8,8 @@ use std::sync::{ Arc, }; -use bytemuck::{bytes_of, Pod, Zeroable}; +use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; +use bytes::{Bytes, BytesMut}; use heed::byteorder::BigEndian; use heed_types::{SerdeBincode, U64}; use libsqlx::libsql::LogFile; @@ -85,7 +87,7 @@ impl CompactionQueue { .wait_for(|x| x.map(|x| x >= id).unwrap_or_default()) .await .unwrap(); - block_in_place(|| { + block_in_place(|| { let txn = self.env.read_txn().unwrap(); self.queue.first(&txn).unwrap().unwrap() }) @@ -103,11 +105,11 @@ impl CompactionQueue { let (job_id, job) = self.peek().await; tracing::debug!("starting new compaction job: {job:?}"); let to_compact_path = self.snapshot_queue_dir().join(job.log_id.to_string()); - let (snapshot_id, start_fno, end_fno) = tokio::task::spawn_blocking({ + let (start_fno, end_fno) = tokio::task::spawn_blocking({ let to_compact_path = to_compact_path.clone(); let db_path = self.db_path.clone(); move || { - let mut builder = SnapshotBuilder::new(&db_path, job.database_id)?; + let mut builder = SnapshotBuilder::new(&db_path, job.database_id, job.log_id)?; let log = LogFile::new(to_compact_path)?; for frame in log.rev_deduped() { let frame = frame?; @@ -121,7 +123,7 @@ impl CompactionQueue { let mut txn = self.env.write_txn()?; self.complete(&mut txn, job_id); self.snapshot_store - .register(&mut txn, job.database_id, start_fno, end_fno, snapshot_id); + .register(&mut txn, job.database_id, start_fno, end_fno, job.log_id); txn.commit()?; std::fs::remove_file(to_compact_path)?; @@ -160,13 +162,14 @@ pub struct SnapshotFileHeader { /// An utility to build a snapshots from log frames pub struct SnapshotBuilder { pub header: SnapshotFileHeader, + snapshot_id: Uuid, snapshot_file: BufWriter, db_path: PathBuf, last_seen_frame_no: u64, } impl SnapshotBuilder { - pub fn new(db_path: &Path, db_id: DatabaseId) -> color_eyre::Result { + pub fn new(db_path: &Path, db_id: DatabaseId, snapshot_id: Uuid) -> color_eyre::Result { let temp_dir = db_path.join("tmp"); let mut target = BufWriter::new(NamedTempFile::new_in(&temp_dir)?); // reserve header space @@ -184,6 +187,7 @@ impl SnapshotBuilder { snapshot_file: target, db_path: db_path.to_path_buf(), last_seen_frame_no: u64::MAX, + snapshot_id, }) } @@ -206,19 +210,142 @@ impl SnapshotBuilder { } /// Persist the snapshot, and returns the name and size is frame on the snapshot. - pub fn finish(mut self) -> color_eyre::Result<(Uuid, FrameNo, FrameNo)> { + pub fn finish(mut self) -> color_eyre::Result<(FrameNo, FrameNo)> { self.snapshot_file.flush()?; let file = self.snapshot_file.into_inner()?; file.as_file().write_all_at(bytes_of(&self.header), 0)?; - let snapshot_id = Uuid::new_v4(); - let path = self.db_path.join("snapshots").join(snapshot_id.to_string()); + let path = self + .db_path + .join("snapshots") + .join(self.snapshot_id.to_string()); file.persist(path)?; - Ok(( - snapshot_id, - self.header.start_frame_no, - self.header.end_frame_no, - )) + Ok((self.header.start_frame_no, self.header.end_frame_no)) + } +} + +pub struct SnapshotFile { + pub file: File, + pub header: SnapshotFileHeader, +} + +impl SnapshotFile { + pub fn open(path: &Path) -> color_eyre::Result { + let file = File::open(path)?; + let mut header_buf = [0; size_of::()]; + file.read_exact_at(&mut header_buf, 0)?; + let header: SnapshotFileHeader = pod_read_unaligned(&header_buf); + + Ok(Self { file, header }) + } + + /// Iterator on the frames contained in the snapshot file, in reverse frame_no order. + pub fn frames_iter(&self) -> impl Iterator> + '_ { + let mut current_offset = 0; + std::iter::from_fn(move || { + if current_offset >= self.header.frame_count { + return None; + } + let read_offset = size_of::() as u64 + + current_offset * LogFile::FRAME_SIZE as u64; + current_offset += 1; + let mut buf = BytesMut::zeroed(LogFile::FRAME_SIZE); + match self.file.read_exact_at(&mut buf, read_offset as _) { + Ok(_) => Some(Ok(buf.freeze())), + Err(e) => Some(Err(e.into())), + } + }) + } + + /// Like `frames_iter`, but stops as soon as a frame with frame_no <= `frame_no` is reached + pub fn frames_iter_from( + &self, + frame_no: u64, + ) -> impl Iterator> + '_ { + let mut iter = self.frames_iter(); + std::iter::from_fn(move || match iter.next() { + Some(Ok(bytes)) => match Frame::try_from_bytes(bytes.clone()) { + Ok(frame) => { + if frame.header().frame_no < frame_no { + None + } else { + Some(Ok(bytes)) + } + } + Err(e) => Some(Err(e)), + }, + other => other, + }) + } +} + +#[cfg(test)] +mod test { + use std::collections::HashSet; + + use crate::init_dirs; + + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn create_snapshot() { + let temp = tempfile::tempdir().unwrap(); + init_dirs(temp.path()).await.unwrap(); + let env = heed::EnvOpenOptions::new() + .max_dbs(100) + .map_size(1000 * 4096) + .open(temp.path().join("meta")) + .unwrap(); + let snapshot_store = SnapshotStore::new(temp.path().to_path_buf(), env.clone()).unwrap(); + let store = Arc::new(snapshot_store); + let queue = CompactionQueue::new(env, temp.path().to_path_buf(), store.clone()).unwrap(); + let log_id = Uuid::new_v4(); + let database_id = DatabaseId::random(); + + let log_path = temp.path().join("snapshot_queue").join(log_id.to_string()); + tokio::fs::copy("assets/test/simple-log", &log_path) + .await + .unwrap(); + + let log_file = LogFile::new(log_path).unwrap(); + let expected_start_frameno = log_file.header().start_frame_no; + let expected_end_frameno = + log_file.header().start_frame_no + log_file.header().frame_count - 1; + let mut expected_page_content = log_file + .frames_iter() + .unwrap() + .map(|f| f.unwrap().header().page_no) + .collect::>(); + + queue.push(&CompactionJob { + database_id, + log_id, + }); + + queue.compact().await.unwrap(); + + let snapshot_path = temp.path().join("snapshots").join(log_id.to_string()); + assert!(snapshot_path.exists()); + + let snapshot_file = SnapshotFile::open(&snapshot_path).unwrap(); + assert_eq!(snapshot_file.header.start_frame_no, expected_start_frameno); + assert_eq!(snapshot_file.header.end_frame_no, expected_end_frameno); + assert!(snapshot_file.frames_iter().all(|f| expected_page_content + .remove(&Frame::try_from_bytes(f.unwrap()).unwrap().header().page_no))); + assert!(expected_page_content.is_empty()); + + assert_eq!(snapshot_file + .frames_iter() + .map(Result::unwrap) + .map(Frame::try_from_bytes) + .map(Result::unwrap) + .map(|f| f.header().frame_no) + .reduce(|prev, new| { + assert!(new < prev); + new + }).unwrap(), 0); + + assert_eq!(store.locate(database_id, 0).unwrap().snapshot_id, log_id); } } From 884b2c84d37a8a8cb581485b9c998ff459e91584 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 26 Jul 2023 14:54:24 +0200 Subject: [PATCH 5/9] remove snapshot for libsqlx --- .../libsql/replication_log/snapshot.rs | 334 ------------------ 1 file changed, 334 deletions(-) delete mode 100644 libsqlx/src/database/libsql/replication_log/snapshot.rs diff --git a/libsqlx/src/database/libsql/replication_log/snapshot.rs b/libsqlx/src/database/libsql/replication_log/snapshot.rs deleted file mode 100644 index b6bac186..00000000 --- a/libsqlx/src/database/libsql/replication_log/snapshot.rs +++ /dev/null @@ -1,334 +0,0 @@ -use std::collections::HashSet; -use std::fs::File; -use std::io::BufWriter; -use std::io::Write; -use std::mem::size_of; -use std::os::unix::prelude::FileExt; -use std::path::{Path, PathBuf}; -use std::str::FromStr; - -use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; -use bytes::{Bytes, BytesMut}; -use once_cell::sync::Lazy; -use regex::Regex; -use tempfile::NamedTempFile; -use uuid::Uuid; - -use crate::database::frame::Frame; - -use super::logger::LogFile; -use super::FrameNo; - -/// This is the ratio of the space required to store snapshot vs size of the actual database. -/// When this ratio is exceeded, compaction is triggered. -pub const SNAPHOT_SPACE_AMPLIFICATION_FACTOR: u64 = 2; -/// The maximum amount of snapshot allowed before a compaction is required -pub const MAX_SNAPSHOT_NUMBER: usize = 32; - -#[derive(Debug, Copy, Clone, Zeroable, Pod, PartialEq, Eq)] -#[repr(C)] -pub struct SnapshotFileHeader { - /// id of the database - pub db_id: u128, - /// first frame in the snapshot - pub start_frame_no: u64, - /// end frame in the snapshot - pub end_frame_no: u64, - /// number of frames in the snapshot - pub frame_count: u64, - /// safe of the database after applying the snapshot - pub size_after: u32, - pub _pad: u32, -} - -pub struct SnapshotFile { - pub file: File, - pub header: SnapshotFileHeader, -} - -/// returns (db_id, start_frame_no, end_frame_no) for the given snapshot name -pub fn parse_snapshot_name(name: &str) -> Option<(Uuid, u64, u64)> { - static SNAPSHOT_FILE_MATCHER: Lazy = Lazy::new(|| { - Regex::new( - r#"(?x) - # match database id - (\w{8}-\w{4}-\w{4}-\w{4}-\w{12})- - # match start frame_no - (\d*)- - # match end frame_no - (\d*).snap"#, - ) - .unwrap() - }); - let Some(captures) = SNAPSHOT_FILE_MATCHER.captures(name) else { return None}; - let db_id = captures.get(1).unwrap(); - let start_index: u64 = captures.get(2).unwrap().as_str().parse().unwrap(); - let end_index: u64 = captures.get(3).unwrap().as_str().parse().unwrap(); - - Some(( - Uuid::from_str(db_id.as_str()).unwrap(), - start_index, - end_index, - )) -} - -pub fn snapshot_list(db_path: &Path) -> anyhow::Result> { - let mut entries = std::fs::read_dir(snapshot_dir_path(db_path))?; - Ok(std::iter::from_fn(move || { - for entry in entries.by_ref() { - let Ok(entry) = entry else { continue; }; - let path = entry.path(); - let Some(name) = path.file_name() else {continue;}; - let Some(name_str) = name.to_str() else { continue;}; - - return Some(name_str.to_string()); - } - None - })) -} - -/// Return snapshot file containing "logically" frame_no -pub fn find_snapshot_file( - db_path: &Path, - frame_no: FrameNo, -) -> anyhow::Result> { - let snapshot_dir_path = snapshot_dir_path(db_path); - for name in snapshot_list(db_path)? { - let Some((_, start_frame_no, end_frame_no)) = parse_snapshot_name(&name) else { continue; }; - // we're looking for the frame right after the last applied frame on the replica - if (start_frame_no..=end_frame_no).contains(&frame_no) { - let snapshot_path = snapshot_dir_path.join(&name); - tracing::debug!("found snapshot for frame {frame_no} at {snapshot_path:?}"); - let snapshot_file = SnapshotFile::open(&snapshot_path)?; - return Ok(Some(snapshot_file)); - } - } - - Ok(None) -} - -impl SnapshotFile { - pub fn open(path: &Path) -> anyhow::Result { - let file = File::open(path)?; - let mut header_buf = [0; size_of::()]; - file.read_exact_at(&mut header_buf, 0)?; - let header: SnapshotFileHeader = pod_read_unaligned(&header_buf); - - Ok(Self { file, header }) - } - - /// Iterator on the frames contained in the snapshot file, in reverse frame_no order. - pub fn frames_iter(&self) -> impl Iterator> + '_ { - let mut current_offset = 0; - std::iter::from_fn(move || { - if current_offset >= self.header.frame_count { - return None; - } - let read_offset = size_of::() as u64 - + current_offset * LogFile::FRAME_SIZE as u64; - current_offset += 1; - let mut buf = BytesMut::zeroed(LogFile::FRAME_SIZE); - match self.file.read_exact_at(&mut buf, read_offset as _) { - Ok(_) => Some(Ok(buf.freeze())), - Err(e) => Some(Err(e.into())), - } - }) - } - - /// Like `frames_iter`, but stops as soon as a frame with frame_no <= `frame_no` is reached - pub fn frames_iter_from( - &self, - frame_no: u64, - ) -> impl Iterator> + '_ { - let mut iter = self.frames_iter(); - std::iter::from_fn(move || match iter.next() { - Some(Ok(bytes)) => match Frame::try_from_bytes(bytes.clone()) { - Ok(frame) => { - if frame.header().frame_no < frame_no { - None - } else { - Some(Ok(bytes)) - } - } - Err(e) => Some(Err(e)), - }, - other => other, - }) - } -} - -/// An utility to build a snapshots from log frames -pub struct SnapshotBuilder { - seen_pages: HashSet, - pub header: SnapshotFileHeader, - snapshot_file: BufWriter, - db_path: PathBuf, - last_seen_frame_no: u64, -} - -pub fn snapshot_dir_path(db_path: &Path) -> PathBuf { - db_path.join("snapshots") -} - -impl SnapshotBuilder { - pub fn new(db_path: &Path, db_id: u128) -> anyhow::Result { - let snapshot_dir_path = snapshot_dir_path(db_path); - std::fs::create_dir_all(&snapshot_dir_path)?; - let mut target = BufWriter::new(NamedTempFile::new_in(&snapshot_dir_path)?); - // reserve header space - target.write_all(&[0; size_of::()])?; - - Ok(Self { - seen_pages: HashSet::new(), - header: SnapshotFileHeader { - db_id, - start_frame_no: u64::MAX, - end_frame_no: u64::MIN, - frame_count: 0, - size_after: 0, - _pad: 0, - }, - snapshot_file: target, - db_path: db_path.to_path_buf(), - last_seen_frame_no: u64::MAX, - }) - } - - /// append frames to the snapshot. Frames must be in decreasing frame_no order. - pub fn append_frames( - &mut self, - frames: impl Iterator>, - ) -> anyhow::Result<()> { - // We iterate on the frames starting from the end of the log and working our way backward. We - // make sure that only the most recent version of each file is present in the resulting - // snapshot. - // - // The snapshot file contains the most recent version of each page, in descending frame - // number order. That last part is important for when we read it later on. - for frame in frames { - let frame = frame?; - assert!(frame.header().frame_no < self.last_seen_frame_no); - self.last_seen_frame_no = frame.header().frame_no; - if frame.header().frame_no < self.header.start_frame_no { - self.header.start_frame_no = frame.header().frame_no; - } - - if frame.header().frame_no > self.header.end_frame_no { - self.header.end_frame_no = frame.header().frame_no; - self.header.size_after = frame.header().size_after; - } - - if !self.seen_pages.contains(&frame.header().page_no) { - self.seen_pages.insert(frame.header().page_no); - self.snapshot_file.write_all(frame.as_slice())?; - self.header.frame_count += 1; - } - } - - Ok(()) - } - - /// Persist the snapshot, and returns the name and size is frame on the snapshot. - pub fn finish(mut self) -> anyhow::Result<(String, u64)> { - self.snapshot_file.flush()?; - let file = self.snapshot_file.into_inner()?; - file.as_file().write_all_at(bytes_of(&self.header), 0)?; - let snapshot_name = format!( - "{}-{}-{}.snap", - Uuid::from_u128(self.header.db_id), - self.header.start_frame_no, - self.header.end_frame_no, - ); - - file.persist(snapshot_dir_path(&self.db_path).join(&snapshot_name))?; - - Ok((snapshot_name, self.header.frame_count)) - } -} - -// #[cfg(test)] -// mod test { -// use std::fs::read; -// use std::{thread, time::Duration}; -// -// use bytemuck::pod_read_unaligned; -// use bytes::Bytes; -// use tempfile::tempdir; -// -// use crate::database::frame::FrameHeader; -// use crate::database::libsql::replication_log::logger::WalPage; -// -// use super::*; -// -// #[test] -// fn compact_file_create_snapshot() { -// let temp = tempfile::NamedTempFile::new().unwrap(); -// let mut log_file = LogFile::new(temp.as_file().try_clone().unwrap(), 0).unwrap(); -// let db_id = Uuid::new_v4(); -// log_file.header.db_id = db_id.as_u128(); -// log_file.write_header().unwrap(); -// -// // add 50 pages, each one in two versions -// for _ in 0..2 { -// for i in 0..25 { -// let data = std::iter::repeat(0).take(4096).collect::(); -// let page = WalPage { -// page_no: i, -// size_after: i + 1, -// data, -// }; -// log_file.push_page(&page).unwrap(); -// } -// } -// -// log_file.commit().unwrap(); -// -// let dump_dir = tempdir().unwrap(); -// let compactor = LogCompactor::new(dump_dir.path(), db_id.as_u128()).unwrap(); -// compactor -// .compact(log_file, temp.path().to_path_buf(), 25) -// .unwrap(); -// -// thread::sleep(Duration::from_secs(1)); -// -// let snapshot_path = -// snapshot_dir_path(dump_dir.path()).join(format!("{}-{}-{}.snap", db_id, 0, 49)); -// let snapshot = read(&snapshot_path).unwrap(); -// let header: SnapshotFileHeader = -// pod_read_unaligned(&snapshot[..std::mem::size_of::()]); -// -// assert_eq!(header.start_frame_no, 0); -// assert_eq!(header.end_frame_no, 49); -// assert_eq!(header.frame_count, 25); -// assert_eq!(header.db_id, db_id.as_u128()); -// assert_eq!(header.size_after, 25); -// -// let mut seen_frames = HashSet::new(); -// let mut seen_page_no = HashSet::new(); -// let data = &snapshot[std::mem::size_of::()..]; -// data.chunks(LogFile::FRAME_SIZE).for_each(|f| { -// let frame = Frame::try_from_bytes(Bytes::copy_from_slice(f)).unwrap(); -// assert!(!seen_frames.contains(&frame.header().frame_no)); -// assert!(!seen_page_no.contains(&frame.header().page_no)); -// seen_page_no.insert(frame.header().page_no); -// seen_frames.insert(frame.header().frame_no); -// assert!(frame.header().frame_no >= 25); -// }); -// -// assert_eq!(seen_frames.len(), 25); -// assert_eq!(seen_page_no.len(), 25); -// -// let snapshot_file = SnapshotFile::open(&snapshot_path).unwrap(); -// -// let frames = snapshot_file.frames_iter_from(0); -// let mut expected_frame_no = 49; -// for frame in frames { -// let frame = frame.unwrap(); -// let header: FrameHeader = pod_read_unaligned(&frame[..size_of::()]); -// assert_eq!(header.frame_no, expected_frame_no); -// expected_frame_no -= 1; -// } -// -// assert_eq!(expected_frame_no, 24); -// } -// } From c987f5cca4783bfd721e9010dc953333ec88853a Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 26 Jul 2023 14:54:53 +0200 Subject: [PATCH 6/9] add missing Poll::Ready for Database::Poll --- libsqlx-server/src/allocation/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsqlx-server/src/allocation/mod.rs b/libsqlx-server/src/allocation/mod.rs index 6b3e15d7..69a3a3a1 100644 --- a/libsqlx-server/src/allocation/mod.rs +++ b/libsqlx-server/src/allocation/mod.rs @@ -76,7 +76,7 @@ impl Database { tokio::task::spawn_blocking(move || { db.compact_log(); }); - return Poll::Ready(()) + return Poll::Ready(()); } Poll::Pending From 6337d9cc3c30bc3d6c6417ac4e043757106a8bbc Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 26 Jul 2023 14:57:16 +0200 Subject: [PATCH 7/9] fix periodic compaction --- libsqlx-server/src/allocation/mod.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/libsqlx-server/src/allocation/mod.rs b/libsqlx-server/src/allocation/mod.rs index 69a3a3a1..f6d87b24 100644 --- a/libsqlx-server/src/allocation/mod.rs +++ b/libsqlx-server/src/allocation/mod.rs @@ -113,7 +113,7 @@ impl Database { ) .unwrap(); - let compact_interval = replication_log_compact_interval.map(|d| { + let compact_interval = replication_log_compact_interval.map(|d| { let mut i = tokio::time::interval(d / 2); i.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); Box::pin(i) @@ -228,10 +228,9 @@ impl ConnectionHandle { impl Allocation { pub async fn run(mut self) { loop { - dbg!(); let fut = poll_fn(|cx| self.database.poll(cx)); tokio::select! { - _ = fut => dbg!(), + _ = fut => (), Some(msg) = self.inbox.recv() => { match msg { AllocationMessage::HranaPipelineReq { req, ret } => { @@ -247,15 +246,11 @@ impl Allocation { } }, maybe_id = self.connections_futs.join_next(), if !self.connections_futs.is_empty() => { - dbg!(); - if let Some(Ok(_id)) = maybe_id { - // self.connections.remove_entry(&id); + if let Some(Ok((node_id, conn_id))) = maybe_id { + self.connections.get_mut(&node_id).map(|m| m.remove(&conn_id)); } }, - else => { - dbg!(); - break - }, + else => break, } } } @@ -500,6 +495,7 @@ impl Connection { mod test { use std::time::Duration; + use libsqlx::result_builder::ResultBuilder; use tokio::sync::Notify; use crate::allocation::replica::ReplicaConnection; From c05dc09e2b3a43ab86f7d81825afa5dfd4c0c181 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 26 Jul 2023 14:57:32 +0200 Subject: [PATCH 8/9] spawn compactor loop --- libsqlx-server/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libsqlx-server/src/main.rs b/libsqlx-server/src/main.rs index b1856e9e..1392e325 100644 --- a/libsqlx-server/src/main.rs +++ b/libsqlx-server/src/main.rs @@ -1,10 +1,10 @@ use std::fs::read_to_string; -use std::path::{PathBuf, Path}; +use std::path::{Path, PathBuf}; use std::sync::Arc; use clap::Parser; use color_eyre::eyre::Result; -use compactor::{CompactionQueue, run_compactor_loop}; +use compactor::{run_compactor_loop, CompactionQueue}; use config::{AdminApiConfig, ClusterConfig, UserApiConfig}; use http::admin::run_admin_api; use http::user::run_user_api; @@ -116,7 +116,7 @@ async fn main() -> Result<()> { .map_size(100 * 1024 * 1024) .open(config.db_path.join("meta"))?; - let snapshot_store = Arc::new(SnapshotStore::new(config.db_path.clone(), &env)?); + let snapshot_store = Arc::new(SnapshotStore::new(config.db_path.clone(), env.clone())?); let compaction_queue = Arc::new(CompactionQueue::new( env.clone(), config.db_path.clone(), From f86954c32f9f2ad48e6daa07b5a5363bd184bc68 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 26 Jul 2023 14:57:47 +0200 Subject: [PATCH 9/9] changes to compactor trait --- Cargo.lock | 1 + libsqlx-server/Cargo.toml | 1 + libsqlx/src/database/libsql/injector/mod.rs | 11 +++----- libsqlx/src/database/libsql/mod.rs | 26 +++++++++-------- .../database/libsql/replication_log/logger.rs | 28 ++++++------------- .../database/libsql/replication_log/mod.rs | 3 +- 6 files changed, 30 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a46f82e5..53a34d6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2619,6 +2619,7 @@ dependencies = [ "tracing-subscriber", "turmoil", "uuid", + "walkdir", ] [[package]] diff --git a/libsqlx-server/Cargo.toml b/libsqlx-server/Cargo.toml index b925732a..1ce8c6c8 100644 --- a/libsqlx-server/Cargo.toml +++ b/libsqlx-server/Cargo.toml @@ -48,3 +48,4 @@ uuid = { version = "1.4.0", features = ["v4", "serde"] } [dev-dependencies] turmoil = "0.5.5" +walkdir = "2.3.3" diff --git a/libsqlx/src/database/libsql/injector/mod.rs b/libsqlx/src/database/libsql/injector/mod.rs index 0c2c2207..7580bd5f 100644 --- a/libsqlx/src/database/libsql/injector/mod.rs +++ b/libsqlx/src/database/libsql/injector/mod.rs @@ -171,15 +171,14 @@ impl Injector { #[cfg(test)] mod test { - use std::fs::File; + use std::path::PathBuf; use crate::database::libsql::injector::Injector; use crate::database::libsql::replication_log::logger::LogFile; #[test] fn test_simple_inject_frames() { - let file = File::open("assets/test/simple_wallog").unwrap(); - let log = LogFile::new(file).unwrap(); + let log = LogFile::new(PathBuf::from("assets/test/simple_wallog")).unwrap(); let temp = tempfile::tempdir().unwrap(); let mut injector = Injector::new(temp.path(), Box::new(()), 10).unwrap(); @@ -199,8 +198,7 @@ mod test { #[test] fn test_inject_frames_split_txn() { - let file = File::open("assets/test/simple_wallog").unwrap(); - let log = LogFile::new(file).unwrap(); + let log = LogFile::new(PathBuf::from("assets/test/simple_wallog")).unwrap(); let temp = tempfile::tempdir().unwrap(); // inject one frame at a time @@ -221,8 +219,7 @@ mod test { #[test] fn test_inject_partial_txn_isolated() { - let file = File::open("assets/test/simple_wallog").unwrap(); - let log = LogFile::new(file).unwrap(); + let log = LogFile::new(PathBuf::from("assets/test/simple_wallog")).unwrap(); let temp = tempfile::tempdir().unwrap(); // inject one frame at a time diff --git a/libsqlx/src/database/libsql/mod.rs b/libsqlx/src/database/libsql/mod.rs index a6fdceb1..1cc884a8 100644 --- a/libsqlx/src/database/libsql/mod.rs +++ b/libsqlx/src/database/libsql/mod.rs @@ -19,7 +19,6 @@ use self::replication_log::logger::FrameNotifierCb; pub use connection::LibsqlConnection; pub use replication_log::logger::{LogCompactor, LogFile}; -pub use replication_log::merger::SnapshotMerger; mod connection; mod injector; @@ -196,12 +195,12 @@ impl InjectableDatabase for LibsqlDatabase { #[cfg(test)] mod test { - use std::fs::File; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; use parking_lot::Mutex; use rusqlite::types::Value; + use uuid::Uuid; use crate::connection::Connection; use crate::database::libsql::replication_log::logger::LogFile; @@ -238,8 +237,7 @@ mod test { .unwrap(); assert!(row.lock().is_empty()); - let file = File::open("assets/test/simple_wallog").unwrap(); - let log = LogFile::new(file).unwrap(); + let log = LogFile::new(PathBuf::from("assets/test/simple_wallog")).unwrap(); let mut injector = db.injector().unwrap(); log.frames_iter().unwrap().for_each(|f| { injector.inject(f.unwrap()).unwrap(); @@ -312,14 +310,16 @@ mod test { } fn compact( - &self, - _file: LogFile, - _path: PathBuf, - _size_after: u32, + &mut self, + _id: Uuid, ) -> Result<(), Box> { self.0.store(true, Relaxed); Ok(()) } + + fn snapshot_dir(&self) -> PathBuf { + todo!(); + } } let temp = tempfile::tempdir().unwrap(); @@ -353,13 +353,15 @@ mod test { } fn compact( - &self, - _file: LogFile, - _path: PathBuf, - _size_after: u32, + &mut self, + _id: Uuid, ) -> Result<(), Box> { unreachable!() } + + fn snapshot_dir(&self) -> PathBuf { + todo!() + } } let temp = tempfile::tempdir().unwrap(); diff --git a/libsqlx/src/database/libsql/replication_log/logger.rs b/libsqlx/src/database/libsql/replication_log/logger.rs index 96520ef8..914153b5 100644 --- a/libsqlx/src/database/libsql/replication_log/logger.rs +++ b/libsqlx/src/database/libsql/replication_log/logger.rs @@ -26,7 +26,6 @@ use crate::database::frame::{Frame, FrameHeader}; #[cfg(feature = "bottomless")] use crate::libsql::ffi::SQLITE_IOERR_WRITE; -use super::snapshot::{find_snapshot_file, SnapshotFile}; use super::{FrameNo, CRC_64_GO_ISO, WAL_MAGIC, WAL_PAGE_SIZE}; init_static_wal_method!(REPLICATION_METHODS, ReplicationLoggerHook); @@ -767,8 +766,7 @@ pub trait LogCompactor: Sync + Send + 'static { impl LogCompactor for () { fn compact( &mut self, - log_name: String, - path: PathBuf, + _log_id: Uuid, ) -> Result<(), Box> { Ok(()) } @@ -776,6 +774,10 @@ impl LogCompactor for () { fn should_compact(&self, _file: &LogFile) -> bool { false } + + fn snapshot_dir(&self) -> PathBuf { + todo!() + } } pub type FrameNotifierCb = Box; @@ -784,7 +786,6 @@ pub struct ReplicationLogger { pub generation: Generation, pub log_file: RwLock, compactor: Box>, - db_path: PathBuf, /// a notifier channel other tasks can subscribe to, and get notified when new frames become /// available. pub new_frame_notifier: FrameNotifierCb, @@ -821,17 +822,11 @@ impl ReplicationLogger { if should_recover { Self::recover(log_file, data_path, compactor, new_frame_notifier) } else { - Self::from_log_file( - db_path.to_path_buf(), - log_file, - compactor, - new_frame_notifier, - ) + Self::from_log_file(log_file, compactor, new_frame_notifier) } } fn from_log_file( - db_path: PathBuf, log_file: LogFile, compactor: impl LogCompactor, new_frame_notifier: FrameNotifierCb, @@ -843,7 +838,6 @@ impl ReplicationLogger { generation: Generation::new(generation_start_frame_no), compactor: Box::new(Mutex::new(compactor)), log_file: RwLock::new(log_file), - db_path, new_frame_notifier, }) } @@ -885,7 +879,7 @@ impl ReplicationLogger { assert!(data_path.pop()); - Self::from_log_file(data_path, log_file, compactor, new_frame_notifier) + Self::from_log_file(log_file, compactor, new_frame_notifier) } pub fn database_id(&self) -> anyhow::Result { @@ -930,10 +924,6 @@ impl ReplicationLogger { .expect("there should be at least one frame after commit")) } - pub fn get_snapshot_file(&self, from: FrameNo) -> anyhow::Result> { - find_snapshot_file(&self.db_path, from) - } - pub fn get_frame(&self, frame_no: FrameNo) -> Result { self.log_file.read().frame(frame_no) } @@ -1036,8 +1026,8 @@ mod test { #[test] fn log_file_test_rollback() { - let f = tempfile::tempfile().unwrap(); - let mut log_file = LogFile::new(f).unwrap(); + let f = tempfile::NamedTempFile::new().unwrap(); + let mut log_file = LogFile::new(f.path().to_path_buf()).unwrap(); (0..5) .map(|i| WalPage { page_no: i, diff --git a/libsqlx/src/database/libsql/replication_log/mod.rs b/libsqlx/src/database/libsql/replication_log/mod.rs index 42b2a03f..32120285 100644 --- a/libsqlx/src/database/libsql/replication_log/mod.rs +++ b/libsqlx/src/database/libsql/replication_log/mod.rs @@ -1,8 +1,7 @@ use crc::Crc; pub mod logger; -pub mod merger; -pub mod snapshot; +// pub mod merger; pub const WAL_PAGE_SIZE: i32 = 4096; pub const WAL_MAGIC: u64 = u64::from_le_bytes(*b"SQLDWAL\0");