From c815ea82e545903caf946fe8cf20ce750f9d9ba5 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 26 Jul 2023 15:58:44 +0200 Subject: [PATCH 1/3] intoduce SnapshotFrame --- libsqlx-server/src/compactor.rs | 72 +++++++++++++++++++++------- libsqlx-server/src/snapshot_store.rs | 3 +- 2 files changed, 55 insertions(+), 20 deletions(-) diff --git a/libsqlx-server/src/compactor.rs b/libsqlx-server/src/compactor.rs index ac88a455..517fca77 100644 --- a/libsqlx-server/src/compactor.rs +++ b/libsqlx-server/src/compactor.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::fs::File; use std::io::{BufWriter, Write}; use std::mem::size_of; @@ -8,7 +9,7 @@ use std::sync::{ Arc, }; -use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; +use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable, try_from_bytes}; use bytes::{Bytes, BytesMut}; use heed::byteorder::BigEndian; use heed_types::{SerdeBincode, U64}; @@ -168,6 +169,39 @@ pub struct SnapshotBuilder { last_seen_frame_no: u64, } +#[derive(Debug, Clone, Copy, Pod, Zeroable)] +#[repr(C)] +pub struct SnapshotFrameHeader { + frame_no: FrameNo, + page_no: u32, + _pad: u32, +} + +#[derive(Clone)] +pub struct SnapshotFrame { + data: Bytes +} + +impl SnapshotFrame { + const SIZE: usize = size_of::() + 4096; + + pub fn try_from_bytes(data: Bytes) -> crate::Result { + if data.len() != Self::SIZE { + color_eyre::eyre::bail!("invalid snapshot frame") + } + + Ok(Self { data }) + + } + + pub fn header(&self) -> Cow { + let data = &self.data[..size_of::()]; + try_from_bytes(data) + .map(Cow::Borrowed) + .unwrap_or_else(|_| Cow::Owned(pod_read_unaligned(data))) + } +} + impl SnapshotBuilder { pub fn new(db_path: &Path, db_id: DatabaseId, snapshot_id: Uuid) -> color_eyre::Result { let temp_dir = db_path.join("tmp"); @@ -202,8 +236,15 @@ impl SnapshotBuilder { self.header.end_frame_no = frame.header().frame_no; self.header.size_after = frame.header().size_after; } + let header = SnapshotFrameHeader { + frame_no: frame.header().frame_no, + page_no: frame.header().page_no, + _pad: 0, + }; + + self.snapshot_file.write_all(bytes_of(&header))?; + self.snapshot_file.write_all(frame.page())?; - self.snapshot_file.write_all(frame.as_slice())?; self.header.frame_count += 1; Ok(()) @@ -241,18 +282,18 @@ impl SnapshotFile { } /// Iterator on the frames contained in the snapshot file, in reverse frame_no order. - pub fn frames_iter(&self) -> impl Iterator> + '_ { + pub fn frames_iter(&self) -> impl Iterator> + '_ { let mut current_offset = 0; std::iter::from_fn(move || { if current_offset >= self.header.frame_count { return None; } let read_offset = size_of::() as u64 - + current_offset * LogFile::FRAME_SIZE as u64; + + current_offset * SnapshotFrame::SIZE as u64; current_offset += 1; - let mut buf = BytesMut::zeroed(LogFile::FRAME_SIZE); + let mut buf = BytesMut::zeroed(SnapshotFrame::SIZE); match self.file.read_exact_at(&mut buf, read_offset as _) { - Ok(_) => Some(Ok(buf.freeze())), + Ok(_) => Some(Ok(SnapshotFrame { data: buf.freeze() })), Err(e) => Some(Err(e.into())), } }) @@ -262,18 +303,15 @@ impl SnapshotFile { pub fn frames_iter_from( &self, frame_no: u64, - ) -> impl Iterator> + '_ { + ) -> impl Iterator> + '_ { let mut iter = self.frames_iter(); std::iter::from_fn(move || match iter.next() { - Some(Ok(bytes)) => match Frame::try_from_bytes(bytes.clone()) { - Ok(frame) => { - if frame.header().frame_no < frame_no { - None - } else { - Some(Ok(bytes)) - } + Some(Ok(frame)) => { + if frame.header().frame_no < frame_no { + None + } else { + Some(Ok(frame)) } - Err(e) => Some(Err(e)), }, other => other, }) @@ -332,14 +370,12 @@ mod test { assert_eq!(snapshot_file.header.start_frame_no, expected_start_frameno); assert_eq!(snapshot_file.header.end_frame_no, expected_end_frameno); assert!(snapshot_file.frames_iter().all(|f| expected_page_content - .remove(&Frame::try_from_bytes(f.unwrap()).unwrap().header().page_no))); + .remove(&f.unwrap().header().page_no))); assert!(expected_page_content.is_empty()); assert_eq!(snapshot_file .frames_iter() .map(Result::unwrap) - .map(Frame::try_from_bytes) - .map(Result::unwrap) .map(|f| f.header().frame_no) .reduce(|prev, new| { assert!(new < prev); diff --git a/libsqlx-server/src/snapshot_store.rs b/libsqlx-server/src/snapshot_store.rs index 68610c10..72977966 100644 --- a/libsqlx-server/src/snapshot_store.rs +++ b/libsqlx-server/src/snapshot_store.rs @@ -2,8 +2,7 @@ use std::mem::size_of; use std::path::PathBuf; use bytemuck::{Pod, Zeroable}; -use heed::BytesDecode; -use heed_types::{ByteSlice, CowType, SerdeBincode}; +use heed_types::{CowType, SerdeBincode}; use libsqlx::FrameNo; use serde::{Deserialize, Serialize}; use tokio::task::block_in_place; From e10d2f341f34de7b9d14c76626e23e61f1b36d94 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 26 Jul 2023 15:58:59 +0200 Subject: [PATCH 2/3] add missing test assets --- libsqlx-server/assets/test/simple-log | Bin 0 -> 20664 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 libsqlx-server/assets/test/simple-log diff --git a/libsqlx-server/assets/test/simple-log b/libsqlx-server/assets/test/simple-log new file mode 100644 index 0000000000000000000000000000000000000000..0cf5b0539c802b0e595616afe0e8cdf18e936254 GIT binary patch literal 20664 zcmeI)ze@sP9LMo{p6kbuaV-*I!y`dtLk$fR1kv-TPaL#5guK6;_dI^_9`0V-^Eoh^@s~D|zGLfVZvNxpw0;~} zy&SJxtUTI&Z+jxmIMn(g+C4Lr8u{95efr(@vKfCXD5_FJEb8GVVA&Vo| Date: Thu, 27 Jul 2023 10:40:47 +0200 Subject: [PATCH 3/3] replicate from snapshot --- libsqlx-server/src/allocation/mod.rs | 5 +- libsqlx-server/src/allocation/primary/mod.rs | 54 +++++++++++- libsqlx-server/src/compactor.rs | 75 ++++++++++------ libsqlx-server/src/snapshot_store.rs | 20 ++++- libsqlx/src/database/frame.rs | 2 - .../database/libsql/replication_log/logger.rs | 85 +++++-------------- .../database/libsql/replication_log/mod.rs | 3 - libsqlx/src/database/mod.rs | 2 +- libsqlx/src/lib.rs | 2 +- 9 files changed, 146 insertions(+), 102 deletions(-) diff --git a/libsqlx-server/src/allocation/mod.rs b/libsqlx-server/src/allocation/mod.rs index f6d87b24..0978b549 100644 --- a/libsqlx-server/src/allocation/mod.rs +++ b/libsqlx-server/src/allocation/mod.rs @@ -103,7 +103,7 @@ impl Database { Compactor::new( max_log_size, replication_log_compact_interval, - compaction_queue, + compaction_queue.clone(), database_id, ), false, @@ -124,6 +124,7 @@ impl Database { db: Arc::new(db), replica_streams: HashMap::new(), frame_notifier: receiver, + snapshot_store: compaction_queue.snapshot_store.clone(), }, compact_interval, } @@ -275,6 +276,7 @@ impl Allocation { db, replica_streams, frame_notifier, + snapshot_store, .. }, .. @@ -289,6 +291,7 @@ impl Allocation { dipatcher: self.dispatcher.clone() as _, notifier: frame_notifier.clone(), buffer: Vec::new(), + snapshot_store: snapshot_store.clone(), }; match replica_streams.entry(msg.from) { diff --git a/libsqlx-server/src/allocation/primary/mod.rs b/libsqlx-server/src/allocation/primary/mod.rs index 480a0e6a..ccd67c55 100644 --- a/libsqlx-server/src/allocation/primary/mod.rs +++ b/libsqlx-server/src/allocation/primary/mod.rs @@ -2,17 +2,19 @@ use std::collections::HashMap; use std::mem::size_of; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Duration; use bytes::Bytes; use libsqlx::libsql::{LibsqlDatabase, PrimaryType}; use libsqlx::result_builder::ResultBuilder; -use libsqlx::{FrameNo, LogReadError, ReplicationLogger}; +use libsqlx::{Frame, FrameHeader, FrameNo, LogReadError, ReplicationLogger}; use tokio::task::block_in_place; use crate::linc::bus::Dispatch; use crate::linc::proto::{BuilderStep, Enveloppe, Frames, Message, StepError, Value}; use crate::linc::{Inbound, NodeId, Outbound}; use crate::meta::DatabaseId; +use crate::snapshot_store::SnapshotStore; use super::{ConnectionHandler, ExecFn, FRAMES_MESSAGE_MAX_COUNT}; @@ -24,6 +26,7 @@ pub struct PrimaryDatabase { pub db: Arc>, pub replica_streams: HashMap)>, pub frame_notifier: tokio::sync::watch::Receiver, + pub snapshot_store: Arc, } pub struct ProxyResponseBuilder { @@ -206,6 +209,7 @@ pub struct FrameStreamer { pub dipatcher: Arc, pub notifier: tokio::sync::watch::Receiver, pub buffer: Vec, + pub snapshot_store: Arc, } impl FrameStreamer { @@ -234,7 +238,53 @@ impl FrameStreamer { } } Err(LogReadError::Error(_)) => todo!("handle log read error"), - Err(LogReadError::SnapshotRequired) => todo!("handle reading from snapshot"), + Err(LogReadError::SnapshotRequired) => self.send_snapshot().await, + } + } + } + + async fn send_snapshot(&mut self) { + tracing::debug!("sending frames from snapshot"); + loop { + match self + .snapshot_store + .locate_file(self.database_id, self.next_frame_no) + { + Some(file) => { + let mut iter = file.frames_iter_from(self.next_frame_no).peekable(); + + while let Some(frame) = block_in_place(|| iter.next()) { + let frame = frame.unwrap(); + // TODO: factorize in maybe_send + if self.buffer.len() > FRAMES_MESSAGE_MAX_COUNT { + self.send_frames().await; + } + let size_after = iter + .peek() + .is_none() + .then_some(file.header.size_after) + .unwrap_or(0); + let frame = Frame::from_parts( + &FrameHeader { + frame_no: frame.header().frame_no, + page_no: frame.header().page_no, + size_after, + }, + frame.page(), + ); + self.next_frame_no = frame.header().frame_no + 1; + self.buffer.push(frame.bytes()); + + tokio::task::yield_now().await; + } + + break; + } + None => { + // snapshot is not ready yet, wait a bit + // FIXME: notify when snapshot becomes ready instead of using loop + tokio::time::sleep(Duration::from_millis(100)).await; + } } } } diff --git a/libsqlx-server/src/compactor.rs b/libsqlx-server/src/compactor.rs index 517fca77..2039343a 100644 --- a/libsqlx-server/src/compactor.rs +++ b/libsqlx-server/src/compactor.rs @@ -9,7 +9,7 @@ use std::sync::{ Arc, }; -use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable, try_from_bytes}; +use bytemuck::{bytes_of, pod_read_unaligned, try_from_bytes, Pod, Zeroable}; use bytes::{Bytes, BytesMut}; use heed::byteorder::BigEndian; use heed_types::{SerdeBincode, U64}; @@ -38,7 +38,7 @@ pub struct CompactionQueue { next_id: AtomicU64, notify: watch::Sender>, db_path: PathBuf, - snapshot_store: Arc, + pub snapshot_store: Arc, } impl CompactionQueue { @@ -110,9 +110,17 @@ impl CompactionQueue { let to_compact_path = to_compact_path.clone(); let db_path = self.db_path.clone(); move || { - let mut builder = SnapshotBuilder::new(&db_path, job.database_id, job.log_id)?; let log = LogFile::new(to_compact_path)?; - for frame in log.rev_deduped() { + let (start_fno, end_fno, iter) = + log.rev_deduped().expect("compaction job with no frames!"); + let mut builder = SnapshotBuilder::new( + &db_path, + job.database_id, + job.log_id, + start_fno, + end_fno, + )?; + for frame in iter { let frame = frame?; builder.push_frame(frame)?; } @@ -172,14 +180,14 @@ pub struct SnapshotBuilder { #[derive(Debug, Clone, Copy, Pod, Zeroable)] #[repr(C)] pub struct SnapshotFrameHeader { - frame_no: FrameNo, - page_no: u32, + pub frame_no: FrameNo, + pub page_no: u32, _pad: u32, } #[derive(Clone)] pub struct SnapshotFrame { - data: Bytes + data: Bytes, } impl SnapshotFrame { @@ -191,7 +199,6 @@ impl SnapshotFrame { } Ok(Self { data }) - } pub fn header(&self) -> Cow { @@ -200,10 +207,20 @@ impl SnapshotFrame { .map(Cow::Borrowed) .unwrap_or_else(|_| Cow::Owned(pod_read_unaligned(data))) } + + pub(crate) fn page(&self) -> &[u8] { + &self.data[size_of::()..] + } } impl SnapshotBuilder { - pub fn new(db_path: &Path, db_id: DatabaseId, snapshot_id: Uuid) -> color_eyre::Result { + pub fn new( + db_path: &Path, + db_id: DatabaseId, + snapshot_id: Uuid, + start_fno: FrameNo, + end_fno: FrameNo, + ) -> color_eyre::Result { let temp_dir = db_path.join("tmp"); let mut target = BufWriter::new(NamedTempFile::new_in(&temp_dir)?); // reserve header space @@ -212,8 +229,8 @@ impl SnapshotBuilder { Ok(Self { header: SnapshotFileHeader { db_id, - start_frame_no: u64::MAX, - end_frame_no: u64::MIN, + start_frame_no: start_fno, + end_frame_no: end_fno, frame_count: 0, size_after: 0, _pad: 0, @@ -228,14 +245,11 @@ impl SnapshotBuilder { pub fn push_frame(&mut self, frame: Frame) -> color_eyre::Result<()> { assert!(frame.header().frame_no < self.last_seen_frame_no); self.last_seen_frame_no = frame.header().frame_no; - if frame.header().frame_no < self.header.start_frame_no { - self.header.start_frame_no = frame.header().frame_no; - } - if frame.header().frame_no > self.header.end_frame_no { - self.header.end_frame_no = frame.header().frame_no; + if frame.header().frame_no == self.header.end_frame_no { self.header.size_after = frame.header().size_after; } + let header = SnapshotFrameHeader { frame_no: frame.header().frame_no, page_no: frame.header().page_no, @@ -306,13 +320,13 @@ impl SnapshotFile { ) -> impl Iterator> + '_ { let mut iter = self.frames_iter(); std::iter::from_fn(move || match iter.next() { - Some(Ok(frame)) => { + Some(Ok(frame)) => { if frame.header().frame_no < frame_no { None } else { Some(Ok(frame)) } - }, + } other => other, }) } @@ -369,18 +383,23 @@ mod test { let snapshot_file = SnapshotFile::open(&snapshot_path).unwrap(); assert_eq!(snapshot_file.header.start_frame_no, expected_start_frameno); assert_eq!(snapshot_file.header.end_frame_no, expected_end_frameno); - assert!(snapshot_file.frames_iter().all(|f| expected_page_content - .remove(&f.unwrap().header().page_no))); + assert!(snapshot_file + .frames_iter() + .all(|f| expected_page_content.remove(&f.unwrap().header().page_no))); assert!(expected_page_content.is_empty()); - assert_eq!(snapshot_file - .frames_iter() - .map(Result::unwrap) - .map(|f| f.header().frame_no) - .reduce(|prev, new| { - assert!(new < prev); - new - }).unwrap(), 0); + assert_eq!( + snapshot_file + .frames_iter() + .map(Result::unwrap) + .map(|f| f.header().frame_no) + .reduce(|prev, new| { + assert!(new < prev); + new + }) + .unwrap(), + 0 + ); assert_eq!(store.locate(database_id, 0).unwrap().snapshot_id, log_id); } diff --git a/libsqlx-server/src/snapshot_store.rs b/libsqlx-server/src/snapshot_store.rs index 72977966..965c65a9 100644 --- a/libsqlx-server/src/snapshot_store.rs +++ b/libsqlx-server/src/snapshot_store.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use tokio::task::block_in_place; use uuid::Uuid; -use crate::meta::DatabaseId; +use crate::{compactor::SnapshotFile, meta::DatabaseId}; #[derive(Clone, Copy, Zeroable, Pod, Debug)] #[repr(transparent)] @@ -91,6 +91,10 @@ impl SnapshotStore { end_frame_no: u64::MAX.into(), }; + for entry in self.database.lazily_decode_data().iter(&txn).unwrap() { + let (k, _) = entry.unwrap(); + } + match self .database .get_lower_than_or_equal_to(&txn, &key) @@ -102,6 +106,11 @@ impl SnapshotStore { } else if frame_no >= key.start_frame_no.into() && frame_no <= key.end_frame_no.into() { + tracing::debug!( + "found snapshot for {frame_no}; {}-{}", + u64::from(key.start_frame_no), + u64::from(key.end_frame_no) + ); return Some(v); } else { None @@ -110,6 +119,15 @@ impl SnapshotStore { Err(_) => todo!(), } } + + pub fn locate_file(&self, database_id: DatabaseId, frame_no: FrameNo) -> Option { + let meta = self.locate(database_id, frame_no)?; + let path = self + .db_path + .join("snapshots") + .join(meta.snapshot_id.to_string()); + Some(SnapshotFile::open(&path).unwrap()) + } } #[cfg(test)] diff --git a/libsqlx/src/database/frame.rs b/libsqlx/src/database/frame.rs index ba2d638d..d3dd4a44 100644 --- a/libsqlx/src/database/frame.rs +++ b/libsqlx/src/database/frame.rs @@ -17,8 +17,6 @@ use super::FrameNo; pub struct FrameHeader { /// Incremental frame number pub frame_no: FrameNo, - /// Rolling checksum of all the previous frames, including this one. - pub checksum: u64, /// page number, if frame_type is FrameType::Page pub page_no: u32, /// Size of the database (in page) after commiting the transaction. This is passed from sqlite, diff --git a/libsqlx/src/database/libsql/replication_log/logger.rs b/libsqlx/src/database/libsql/replication_log/logger.rs index 914153b5..d12af50d 100644 --- a/libsqlx/src/database/libsql/replication_log/logger.rs +++ b/libsqlx/src/database/libsql/replication_log/logger.rs @@ -7,7 +7,7 @@ use std::os::unix::prelude::FileExt; use std::path::{Path, PathBuf}; use std::sync::Arc; -use anyhow::{bail, ensure}; +use anyhow::bail; use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; use bytes::{Bytes, BytesMut}; use parking_lot::{Mutex, RwLock}; @@ -26,7 +26,7 @@ use crate::database::frame::{Frame, FrameHeader}; #[cfg(feature = "bottomless")] use crate::libsql::ffi::SQLITE_IOERR_WRITE; -use super::{FrameNo, CRC_64_GO_ISO, WAL_MAGIC, WAL_PAGE_SIZE}; +use super::{FrameNo, WAL_MAGIC, WAL_PAGE_SIZE}; init_static_wal_method!(REPLICATION_METHODS, ReplicationLoggerHook); @@ -354,10 +354,6 @@ pub struct LogFile { /// On rollback, this is reset to 0, so that everything that was written after the previous /// header.frame_count is ignored and can be overwritten pub(crate) uncommitted_frame_count: u64, - uncommitted_checksum: u64, - - /// checksum of the last commited frame - commited_checksum: u64, } #[derive(thiserror::Error, Debug)] @@ -392,10 +388,10 @@ impl LogFile { start_frame_no: 0, magic: WAL_MAGIC, page_size: WAL_PAGE_SIZE, - start_checksum: 0, db_id: db_id.as_u128(), frame_count: 0, sqld_version: Version::current().0, + _pad: 0, }; let mut this = Self { @@ -403,8 +399,6 @@ impl LogFile { file, header, uncommitted_frame_count: 0, - uncommitted_checksum: 0, - commited_checksum: 0, }; this.write_header()?; @@ -412,27 +406,12 @@ impl LogFile { Ok(this) } else { let header = Self::read_header(&file)?; - let mut this = Self { + Ok(Self { file, header, uncommitted_frame_count: 0, - uncommitted_checksum: 0, - commited_checksum: 0, path, - }; - - if let Some(last_commited) = this.last_commited_frame_no() { - // file is not empty, the starting checksum is the checksum from the last entry - let last_frame = this.frame(last_commited).unwrap(); - this.commited_checksum = last_frame.header().checksum; - this.uncommitted_checksum = last_frame.header().checksum; - } else { - // file contains no entry, start with the initial checksum from the file header. - this.commited_checksum = this.header.start_checksum; - this.uncommitted_checksum = this.header.start_checksum; - } - - Ok(this) + }) } } @@ -458,7 +437,6 @@ impl LogFile { pub fn commit(&mut self) -> crate::Result<()> { self.header.frame_count += self.uncommitted_frame_count; self.uncommitted_frame_count = 0; - self.commited_checksum = self.uncommitted_checksum; self.write_header()?; Ok(()) @@ -466,7 +444,6 @@ impl LogFile { fn rollback(&mut self) { self.uncommitted_frame_count = 0; - self.uncommitted_checksum = self.commited_checksum; } pub fn write_header(&mut self) -> crate::Result<()> { @@ -504,11 +481,20 @@ impl LogFile { }) } - /// Return a reversed iterator over the deduplicated frames in the log file. - pub fn rev_deduped(&self) -> impl Iterator> + '_ { + /// If the log contains any frames, returns (start_frameno, end_frameno, iter), where iter, is + /// a deduplicated reversed iterator over the frames in the log + pub fn rev_deduped( + &self, + ) -> Option<( + FrameNo, + FrameNo, + impl Iterator> + '_, + )> { let mut iter = self.rev_frames_iter(); let mut seen = HashSet::new(); - std::iter::from_fn(move || loop { + let start_fno = self.header().start_frame_no; + let end_fno = self.header().last_frame_no()?; + let iter = std::iter::from_fn(move || loop { match iter.next()? { Ok(frame) => { if !seen.contains(&frame.header().page_no) { @@ -518,21 +504,15 @@ impl LogFile { } Err(e) => return Some(Err(e)), } - }) - } + }); - fn compute_checksum(&self, page: &WalPage) -> u64 { - let mut digest = CRC_64_GO_ISO.digest_with_initial(self.uncommitted_checksum); - digest.update(&page.data); - digest.finalize() + Some((start_fno, end_fno, iter)) } pub fn push_page(&mut self, page: &WalPage) -> crate::Result<()> { - let checksum = self.compute_checksum(page); let frame = Frame::from_parts( &FrameHeader { frame_no: self.next_frame_no(), - checksum, page_no: page.page_no, size_after: page.size_after, }, @@ -547,7 +527,6 @@ impl LogFile { self.file.write_all_at(frame.as_slice(), byte_offset)?; self.uncommitted_frame_count += 1; - self.uncommitted_checksum = checksum; Ok(()) } @@ -616,13 +595,12 @@ impl LogFile { let new_header = LogFileHeader { start_frame_no: self.header.start_frame_no + self.header.frame_count, frame_count: 0, - start_checksum: self.commited_checksum, ..self.header }; new_log_file.header = new_header; new_log_file.write_header().unwrap(); - // swap old and new snapshot - atomic_rename(dbg!(&temp_log_path), dbg!(&self.path)).unwrap(); + // swap old and new log + atomic_rename(&temp_log_path, &self.path).unwrap(); std::mem::swap(&mut new_log_file.path, &mut self.path); let _ = std::mem::replace(self, new_log_file); compactor.compact(log_id).unwrap(); @@ -704,9 +682,7 @@ fn atomic_rename(p1: impl AsRef, p2: impl AsRef) -> anyhow::Result<( pub struct LogFileHeader { /// magic number: b"SQLDWAL\0" as u64 pub magic: u64, - /// Initial checksum value for the rolling CRC checksum - /// computed with the 64 bits CRC_64_GO_ISO - pub start_checksum: u64, + _pad: u64, /// Uuid of the database associated with this log. pub db_id: u128, /// Frame_no of the first frame in the log @@ -897,23 +873,6 @@ impl ReplicationLogger { Ok(()) } - #[allow(dead_code)] - fn compute_checksum(wal_header: &LogFileHeader, log_file: &LogFile) -> anyhow::Result { - tracing::debug!("computing WAL log running checksum..."); - let mut iter = log_file.frames_iter()?; - iter.try_fold(wal_header.start_checksum, |sum, frame| { - let frame = frame?; - let mut digest = CRC_64_GO_ISO.digest_with_initial(sum); - digest.update(frame.page()); - let cs = digest.finalize(); - ensure!( - cs == frame.header().checksum, - "invalid WAL file: invalid checksum" - ); - Ok(cs) - }) - } - /// commit the current transaction and returns the new top frame number fn commit(&self) -> anyhow::Result { let mut log_file = self.log_file.write(); diff --git a/libsqlx/src/database/libsql/replication_log/mod.rs b/libsqlx/src/database/libsql/replication_log/mod.rs index 32120285..a7f006ae 100644 --- a/libsqlx/src/database/libsql/replication_log/mod.rs +++ b/libsqlx/src/database/libsql/replication_log/mod.rs @@ -1,11 +1,8 @@ -use crc::Crc; - pub mod logger; // pub mod merger; pub const WAL_PAGE_SIZE: i32 = 4096; pub const WAL_MAGIC: u64 = u64::from_le_bytes(*b"SQLDWAL\0"); -const CRC_64_GO_ISO: Crc = Crc::::new(&crc::CRC_64_GO_ISO); /// The frame uniquely identifying, monotonically increasing number pub type FrameNo = u64; diff --git a/libsqlx/src/database/mod.rs b/libsqlx/src/database/mod.rs index 368ac5ac..61c39c64 100644 --- a/libsqlx/src/database/mod.rs +++ b/libsqlx/src/database/mod.rs @@ -9,7 +9,7 @@ pub mod proxy; #[cfg(test)] mod test_utils; -pub use frame::Frame; +pub use frame::{Frame, FrameHeader}; pub type FrameNo = u64; diff --git a/libsqlx/src/lib.rs b/libsqlx/src/lib.rs index 899a7912..24441571 100644 --- a/libsqlx/src/lib.rs +++ b/libsqlx/src/lib.rs @@ -15,8 +15,8 @@ pub use database::libsql; pub use database::libsql::replication_log::logger::{LogReadError, ReplicationLogger}; pub use database::libsql::replication_log::FrameNo; pub use database::proxy; -pub use database::Frame; pub use database::{Database, InjectableDatabase, Injector}; +pub use database::{Frame, FrameHeader}; pub use sqld_libsql_bindings::wal_hook::WalHook;