From 06ae4ac55b675eb8b9512670a155d59b04438acd Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 31 May 2023 15:52:18 +0200 Subject: [PATCH] sqld: deduplicate data stored in wallog **!!! early draft, full of debug prints, barely works !!!** This draft contains experiments around deduplicating our own `wallog` format with libSQL WAL. The potential win here is reducing write and space amplification from 2x to around 1.08x. The main idea is as follows: `wallog` is only used to store frame metadata, and frame data is only stored either in the main database file, or in WAL. That's very simple to implement in a single-node system, but it gets complicated with replicas, because a replica is allowed to ask the primary for any arbitrary wallog frame. The rough idea for dealing with replicas is to: 1. Make sure that we control checkpoints. autocheckpoint is off, and we only issue a checkpoint operation on the primary ourselves, explicitly, and periodically. 2. All streaming of frames to replicas must finish before we issue a checkpoint operation. 3. We only checkpoint in TRUNCATE mode, i.e. a write lock is taken and the whole WAL log is rewritten to the main db file. That simplifies lots of edge (sic!) cases. 4. Once we checkpoint, we drop the previous `wallog`, and instead only store the following information. Let's assume that the main db file has N pages. Pages 1..N are now available as frames X..X+N in the `wallog`, and X is the oldest frame a replica should ever ask for -> anything before X is out-of-date anyway. If any replica asks for an earlier page, it gets an error message saying "please drop whatever you're doing and start asking for frames X or greater instead. --- sqld/src/replication/frame.rs | 60 +++++++++++ sqld/src/replication/primary/logger.rs | 142 +++++++++++++++---------- sqld/src/replication/snapshot.rs | 20 +++- 3 files changed, 159 insertions(+), 63 deletions(-) diff --git a/sqld/src/replication/frame.rs b/sqld/src/replication/frame.rs index 17deb60f..4a990eb3 100644 --- a/sqld/src/replication/frame.rs +++ b/sqld/src/replication/frame.rs @@ -34,6 +34,66 @@ pub struct Frame { data: Bytes, } +#[repr(transparent)] +#[derive(Clone, Copy, Debug, Zeroable, Pod)] +// NOTICE: frame number 0 indicates that the frame is in the main db file. +// Any other number indicates that it's in the WAL file. +// We do not use an enum here in order to make this struct transparently +// serializable for C code and on-disk representation. +pub struct FrameLocation { + pub frame_no: u32, +} + +impl FrameLocation { + pub const IN_MAIN_DB_FILE: u32 = 0; + + pub fn new(frame_no: u32) -> Self { + Self { frame_no } + } + + pub fn in_wal_file(frame_no: u32) -> Self { + assert_ne!(frame_no, FrameLocation::IN_MAIN_DB_FILE); + Self { frame_no } + } + + pub fn in_main_db_file() -> Self { + Self { + frame_no: Self::IN_MAIN_DB_FILE, + } + } +} + +#[repr(C)] +#[derive(Clone, Copy, Debug, Zeroable, Pod)] +pub struct FrameRef { + pub header: FrameHeader, + pub location: FrameLocation, + _pad: u32, +} + +impl FrameRef { + pub const SIZE: usize = size_of::(); + + pub fn new(header: FrameHeader, location: FrameLocation) -> Self { + Self { + header, + location, + _pad: 0, + } + } + + pub fn as_bytes(&self) -> Bytes { + Bytes::copy_from_slice(bytes_of(self)) + } + + pub fn try_from_bytes(data: Bytes) -> anyhow::Result { + anyhow::ensure!(data.len() == Self::SIZE, "invalid frame size"); + try_from_bytes(&data) + .copied() + .map_err(|e| anyhow::anyhow!(e)) + } +} + impl fmt::Debug for Frame { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Frame") diff --git a/sqld/src/replication/primary/logger.rs b/sqld/src/replication/primary/logger.rs index 0a4a44d6..3eed1532 100644 --- a/sqld/src/replication/primary/logger.rs +++ b/sqld/src/replication/primary/logger.rs @@ -1,3 +1,4 @@ +use bytes::BytesMut; use std::ffi::{c_int, c_void, CStr}; use std::fs::{remove_dir_all, File, OpenOptions}; use std::io::Write; @@ -8,21 +9,18 @@ use std::sync::Arc; use anyhow::{bail, ensure}; use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; -use bytes::{Bytes, BytesMut}; use parking_lot::RwLock; use sqld_libsql_bindings::init_static_wal_method; use tokio::sync::watch; use uuid::Uuid; -#[cfg(feature = "bottomless")] -use crate::libsql::ffi::SQLITE_IOERR_WRITE; use crate::libsql::ffi::{ sqlite3, types::{XWalCheckpointFn, XWalFrameFn, XWalSavePointUndoFn, XWalUndoFn}, - PageHdrIter, PgHdr, Wal, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR, SQLITE_OK, + PageHdrIter, PgHdr, Wal, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR_WRITE, SQLITE_OK, }; use crate::libsql::wal_hook::WalHook; -use crate::replication::frame::{Frame, FrameHeader}; +use crate::replication::frame::{Frame, FrameHeader, FrameLocation, FrameRef}; use crate::replication::snapshot::{find_snapshot_file, LogCompactor, SnapshotFile}; use crate::replication::{FrameNo, CRC_64_GO_ISO, WAL_MAGIC, WAL_PAGE_SIZE}; @@ -75,6 +73,7 @@ unsafe impl WalHook for ReplicationLoggerHook { orig: XWalFrameFn, ) -> c_int { assert_eq!(page_size, 4096); + let mut frame_no = wal.hdr.mxFrame + 1; let wal_ptr = wal as *mut _; #[cfg(feature = "bottomless")] let last_valid_frame = wal.hdr.mxFrame; @@ -82,13 +81,14 @@ unsafe impl WalHook for ReplicationLoggerHook { let frame_checksum = wal.hdr.aFrameCksum; let ctx = Self::wal_extract_ctx(wal); - for (page_no, data) in PageHdrIter::new(page_headers, page_size as _) { - ctx.write_frame(page_no, data) + for (page_no, _data) in PageHdrIter::new(page_headers, page_size as _) { + ctx.write_frame(page_no, FrameLocation::in_wal_file(frame_no)); + frame_no += 1; } if let Err(e) = ctx.flush(ntruncate) { tracing::error!("error writing to replication log: {e}"); // returning IO_ERR ensure that xUndo will be called by sqlite. - return SQLITE_IOERR; + return SQLITE_IOERR_WRITE; } // FIXME: instead of block_on, we should consider replicating asynchronously in the background, @@ -294,7 +294,7 @@ pub struct WalPage { pub page_no: u32, /// 0 for non-commit frames pub size_after: u32, - pub data: Bytes, + pub data: FrameLocation, } impl ReplicationLoggerHookCtx { @@ -314,11 +314,11 @@ impl ReplicationLoggerHookCtx { } } - fn write_frame(&mut self, page_no: u32, data: &[u8]) { + fn write_frame(&mut self, page_no: u32, data: FrameLocation) { let entry = WalPage { page_no, size_after: 0, - data: Bytes::copy_from_slice(data), + data, }; self.buffer.push(entry); } @@ -352,6 +352,7 @@ impl ReplicationLoggerHookCtx { #[derive(Debug)] pub struct LogFile { file: File, + db_path: PathBuf, // actual data of the logged frames resides either in the main db file or the wal file pub header: LogFileHeader, /// the maximum number of frames this log is allowed to contain before it should be compacted. max_log_frame_count: u64, @@ -379,9 +380,12 @@ pub enum LogReadError { impl LogFile { /// size of a single frame + /// FIXME: LogFile should only ever use references -> what to do with snapshots? pub const FRAME_SIZE: usize = size_of::() + WAL_PAGE_SIZE as usize; + /// size of a single frame reference + pub const FRAME_REF_SIZE: usize = size_of::(); - pub fn new(file: File, max_log_frame_count: u64) -> anyhow::Result { + pub fn new(file: File, db_path: PathBuf, max_log_frame_count: u64) -> anyhow::Result { // FIXME: we should probably take a lock on this file, to prevent anybody else to write to // it. let file_end = file.metadata()?.len(); @@ -401,6 +405,7 @@ impl LogFile { let mut this = Self { file, + db_path, header, max_log_frame_count, uncommitted_frame_count: 0, @@ -415,6 +420,7 @@ impl LogFile { let header = Self::read_header(&file)?; let mut this = Self { file, + db_path, header, max_log_frame_count, uncommitted_frame_count: 0, @@ -504,30 +510,33 @@ impl LogFile { })) } - fn compute_checksum(&self, page: &WalPage) -> u64 { - let mut digest = CRC_64_GO_ISO.digest_with_initial(self.uncommitted_checksum); - digest.update(&page.data); + fn compute_checksum(&self, _page: &WalPage) -> u64 { + let digest = CRC_64_GO_ISO.digest_with_initial(self.uncommitted_checksum); + // FIXME: we should either read the page from its location and compute checksum, + // or just rely on the fact that the page is already checksummed by WAL or the main db file. + //digest.update(&page.data); digest.finalize() } pub fn push_page(&mut self, page: &WalPage) -> anyhow::Result<()> { let checksum = self.compute_checksum(page); - let frame = Frame::from_parts( - &FrameHeader { - frame_no: self.next_frame_no(), - checksum, - page_no: page.page_no, - size_after: page.size_after, - }, - &page.data, - ); + let header = FrameHeader { + frame_no: self.next_frame_no(), + checksum, + page_no: page.page_no, + size_after: page.size_after, + }; + + let frame_ref = FrameRef::new(header, page.data); let byte_offset = self.next_byte_offset(); - tracing::trace!( - "writing frame {} at offset {byte_offset}", - frame.header().frame_no + let data = frame_ref.as_bytes(); + tracing::warn!( + "writing frame {} at offset {byte_offset}, size {}", + frame_ref.header.frame_no, + data.len() ); - self.file.write_all_at(frame.as_slice(), byte_offset)?; + self.file.write_all_at(&data, byte_offset)?; self.uncommitted_frame_count += 1; self.uncommitted_checksum = checksum; @@ -546,7 +555,7 @@ impl LogFile { /// Returns the bytes position of the `nth` entry in the log fn absolute_byte_offset(nth: u64) -> u64 { - std::mem::size_of::() as u64 + nth * Self::FRAME_SIZE as u64 + std::mem::size_of::() as u64 + nth * Self::FRAME_REF_SIZE as u64 } fn byte_offset(&self, id: FrameNo) -> anyhow::Result> { @@ -602,7 +611,7 @@ impl LogFile { .write(true) .create(true) .open(&temp_log_path)?; - let mut new_log_file = LogFile::new(file, self.max_log_frame_count)?; + let mut new_log_file = LogFile::new(file, self.db_path.clone(), self.max_log_frame_count)?; let new_header = LogFileHeader { start_frame_no: self.header.start_frame_no + self.header.frame_count, frame_count: 0, @@ -620,11 +629,39 @@ impl LogFile { } fn read_frame_byte_offset(&self, offset: u64) -> anyhow::Result { - let mut buffer = BytesMut::zeroed(LogFile::FRAME_SIZE); + let mut buffer = BytesMut::zeroed(LogFile::FRAME_REF_SIZE); self.file.read_exact_at(&mut buffer, offset)?; - let buffer = buffer.freeze(); + tracing::trace!("Buffer size {}", buffer.len()); + let frame_ref = FrameRef::try_from_bytes(buffer.freeze())?; + + tracing::trace!("Frame reference: {frame_ref:?}"); + + let mut frame_data = [0; WAL_PAGE_SIZE as usize]; + match frame_ref.location { + FrameLocation { + frame_no: FrameLocation::IN_MAIN_DB_FILE, + } => { + let main_db_file = std::fs::File::open(self.db_path.join("data"))?; + main_db_file.read_exact_at( + &mut frame_data, + frame_ref.header.page_no as u64 * WAL_PAGE_SIZE as u64, + )?; + } + FrameLocation { frame_no } => { + let wal_file = std::fs::File::open(self.db_path.join("data-wal"))?; + // FIXME: this is *not* the correct way to read a frame from the wal file. + // It needs to take into account the wal file header, and the wal page headers. + wal_file.read_exact_at(&mut frame_data, Self::offset_in_wal(frame_no))?; + } + } + + // FIXME: memory copy, easy enough to avoid + Ok(Frame::from_parts(&frame_ref.header, &frame_data)) + } - Frame::try_from_bytes(buffer) + // The offset of frame `frame_no` in the libSQL WAL file + fn offset_in_wal(frame_no: u32) -> u64 { + 32 + ((frame_no - 1) as u64) * (WAL_PAGE_SIZE as u64 + 24) } fn last_commited_frame_no(&self) -> Option { @@ -639,7 +676,7 @@ impl LogFile { let max_log_frame_count = self.max_log_frame_count; // truncate file self.file.set_len(0)?; - Self::new(self.file, max_log_frame_count) + Self::new(self.file, self.db_path, max_log_frame_count) } } @@ -754,7 +791,7 @@ impl ReplicationLogger { .open(log_path)?; let max_log_frame_count = max_log_size * 1_000_000 / LogFile::FRAME_SIZE as u64; - let log_file = LogFile::new(file, max_log_frame_count)?; + let log_file = LogFile::new(file, db_path.to_owned(), max_log_frame_count)?; let header = log_file.header(); let should_recover = if header.version < 2 || header.sqld_version() != Version::current() { @@ -798,21 +835,18 @@ impl ReplicationLogger { // best effort, there may be no snapshots let _ = remove_dir_all(snapshot_path); - let data_file = File::open(&data_path)?; let size = data_path.metadata()?.len(); assert!( size % WAL_PAGE_SIZE as u64 == 0, "database file size is not a multiple of page size" ); let num_page = size / WAL_PAGE_SIZE as u64; - let mut buf = [0; WAL_PAGE_SIZE as usize]; let mut page_no = 1; // page numbering starts at 1 for i in 0..num_page { - data_file.read_exact_at(&mut buf, i * WAL_PAGE_SIZE as u64)?; log_file.push_page(&WalPage { page_no, size_after: if i == num_page - 1 { num_page as _ } else { 0 }, - data: Bytes::copy_from_slice(&buf), + data: FrameLocation::in_main_db_file(), // log recovery is performed from the main db file })?; log_file.commit()?; @@ -918,10 +952,14 @@ mod test { .map(|i| WalPage { page_no: i, size_after: 0, - data: Bytes::from(vec![i as _; 4096]), + data: FrameLocation::in_main_db_file(), }) .collect::>(); logger.write_pages(&frames).unwrap(); + let main_db_file = std::fs::File::create(dir.path().join("data")).unwrap(); + for i in 0..10 { + main_db_file.write_at(&[i as u8; 4096], i * 4096).unwrap(); + } logger.commit().unwrap(); let log_file = logger.log_file.write(); @@ -945,30 +983,16 @@ mod test { assert!(matches!(log_file.frame(1), Err(LogReadError::Ahead))); } - #[test] - #[should_panic] - fn incorrect_frame_size() { - let dir = tempfile::tempdir().unwrap(); - let logger = ReplicationLogger::open(dir.path(), 0).unwrap(); - let entry = WalPage { - page_no: 0, - size_after: 0, - data: vec![0; 3].into(), - }; - - logger.write_pages(&[entry]).unwrap(); - logger.commit().unwrap(); - } - #[test] fn log_file_test_rollback() { + let db = tempfile::tempdir().unwrap(); let f = tempfile::tempfile().unwrap(); - let mut log_file = LogFile::new(f, 100).unwrap(); + let mut log_file = LogFile::new(f, db.path().to_owned(), 100).unwrap(); (0..5) .map(|i| WalPage { page_no: i, size_after: 5, - data: Bytes::from_static(&[1; 4096]), + data: FrameLocation::in_main_db_file(), // FIXME: actually fill the fake main db file with data }) .for_each(|p| { log_file.push_page(&p).unwrap(); @@ -982,7 +1006,7 @@ mod test { .map(|i| WalPage { page_no: i, size_after: 5, - data: Bytes::from_static(&[1; 4096]), + data: FrameLocation::in_main_db_file(), }) .for_each(|p| { log_file.push_page(&p).unwrap(); @@ -995,7 +1019,7 @@ mod test { .push_page(&WalPage { page_no: 42, size_after: 5, - data: Bytes::from_static(&[1; 4096]), + data: FrameLocation::in_main_db_file(), }) .unwrap(); diff --git a/sqld/src/replication/snapshot.rs b/sqld/src/replication/snapshot.rs index 01da387d..9b7ef4c0 100644 --- a/sqld/src/replication/snapshot.rs +++ b/sqld/src/replication/snapshot.rs @@ -455,7 +455,7 @@ mod test { use bytes::Bytes; use tempfile::tempdir; - use crate::replication::frame::FrameHeader; + use crate::replication::frame::{FrameHeader, FrameLocation}; use crate::replication::primary::logger::WalPage; use crate::replication::snapshot::SnapshotFile; @@ -464,7 +464,13 @@ mod test { #[test] fn compact_file_create_snapshot() { let temp = tempfile::NamedTempFile::new().unwrap(); - let mut log_file = LogFile::new(temp.as_file().try_clone().unwrap(), 0).unwrap(); + let temp_db = tempfile::tempdir().unwrap(); + let mut log_file = LogFile::new( + temp.as_file().try_clone().unwrap(), + temp_db.path().to_owned(), + 0, + ) + .unwrap(); let db_id = Uuid::new_v4(); log_file.header.db_id = db_id.as_u128(); log_file.write_header().unwrap(); @@ -472,16 +478,22 @@ mod test { // add 50 pages, each one in two versions for _ in 0..2 { for i in 0..25 { - let data = std::iter::repeat(0).take(4096).collect::(); + let _data = std::iter::repeat(0).take(4096).collect::(); let page = WalPage { page_no: i, size_after: i + 1, - data, + data: FrameLocation::in_main_db_file(), }; log_file.push_page(&page).unwrap(); } } + // Fill the fake main db file with the aforementioned 50 pages + std::fs::File::create(temp_db.path().join("data")) + .unwrap() + .write_all(&[0; 4096 * 50]) + .unwrap(); + log_file.commit().unwrap(); let dump_dir = tempdir().unwrap();