Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

multi-tenancy: replicate from snapshots #554

Merged
merged 3 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added libsqlx-server/assets/test/simple-log
Binary file not shown.
5 changes: 4 additions & 1 deletion libsqlx-server/src/allocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Database {
Compactor::new(
max_log_size,
replication_log_compact_interval,
compaction_queue,
compaction_queue.clone(),
database_id,
),
false,
Expand All @@ -124,6 +124,7 @@ impl Database {
db: Arc::new(db),
replica_streams: HashMap::new(),
frame_notifier: receiver,
snapshot_store: compaction_queue.snapshot_store.clone(),
},
compact_interval,
}
Expand Down Expand Up @@ -275,6 +276,7 @@ impl Allocation {
db,
replica_streams,
frame_notifier,
snapshot_store,
..
},
..
Expand All @@ -289,6 +291,7 @@ impl Allocation {
dipatcher: self.dispatcher.clone() as _,
notifier: frame_notifier.clone(),
buffer: Vec::new(),
snapshot_store: snapshot_store.clone(),
};

match replica_streams.entry(msg.from) {
Expand Down
54 changes: 52 additions & 2 deletions libsqlx-server/src/allocation/primary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ use std::collections::HashMap;
use std::mem::size_of;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use bytes::Bytes;
use libsqlx::libsql::{LibsqlDatabase, PrimaryType};
use libsqlx::result_builder::ResultBuilder;
use libsqlx::{FrameNo, LogReadError, ReplicationLogger};
use libsqlx::{Frame, FrameHeader, FrameNo, LogReadError, ReplicationLogger};
use tokio::task::block_in_place;

use crate::linc::bus::Dispatch;
use crate::linc::proto::{BuilderStep, Enveloppe, Frames, Message, StepError, Value};
use crate::linc::{Inbound, NodeId, Outbound};
use crate::meta::DatabaseId;
use crate::snapshot_store::SnapshotStore;

use super::{ConnectionHandler, ExecFn, FRAMES_MESSAGE_MAX_COUNT};

Expand All @@ -24,6 +26,7 @@ pub struct PrimaryDatabase {
pub db: Arc<LibsqlDatabase<PrimaryType>>,
pub replica_streams: HashMap<NodeId, (u32, tokio::task::JoinHandle<()>)>,
pub frame_notifier: tokio::sync::watch::Receiver<FrameNo>,
pub snapshot_store: Arc<SnapshotStore>,
}

pub struct ProxyResponseBuilder {
Expand Down Expand Up @@ -206,6 +209,7 @@ pub struct FrameStreamer {
pub dipatcher: Arc<dyn Dispatch>,
pub notifier: tokio::sync::watch::Receiver<FrameNo>,
pub buffer: Vec<Bytes>,
pub snapshot_store: Arc<SnapshotStore>,
}

impl FrameStreamer {
Expand Down Expand Up @@ -234,7 +238,53 @@ impl FrameStreamer {
}
}
Err(LogReadError::Error(_)) => todo!("handle log read error"),
Err(LogReadError::SnapshotRequired) => todo!("handle reading from snapshot"),
Err(LogReadError::SnapshotRequired) => self.send_snapshot().await,
}
}
}

async fn send_snapshot(&mut self) {
tracing::debug!("sending frames from snapshot");
loop {
match self
.snapshot_store
.locate_file(self.database_id, self.next_frame_no)
{
Some(file) => {
let mut iter = file.frames_iter_from(self.next_frame_no).peekable();

while let Some(frame) = block_in_place(|| iter.next()) {
let frame = frame.unwrap();
// TODO: factorize in maybe_send
if self.buffer.len() > FRAMES_MESSAGE_MAX_COUNT {
self.send_frames().await;
}
let size_after = iter
.peek()
.is_none()
.then_some(file.header.size_after)
.unwrap_or(0);
let frame = Frame::from_parts(
&FrameHeader {
frame_no: frame.header().frame_no,
page_no: frame.header().page_no,
size_after,
},
frame.page(),
);
self.next_frame_no = frame.header().frame_no + 1;
self.buffer.push(frame.bytes());

tokio::task::yield_now().await;
}

break;
}
None => {
// snapshot is not ready yet, wait a bit
// FIXME: notify when snapshot becomes ready instead of using loop
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}
Expand Down
133 changes: 94 additions & 39 deletions libsqlx-server/src/compactor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::mem::size_of;
Expand All @@ -8,7 +9,7 @@ use std::sync::{
Arc,
};

use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable};
use bytemuck::{bytes_of, pod_read_unaligned, try_from_bytes, Pod, Zeroable};
use bytes::{Bytes, BytesMut};
use heed::byteorder::BigEndian;
use heed_types::{SerdeBincode, U64};
Expand Down Expand Up @@ -37,7 +38,7 @@ pub struct CompactionQueue {
next_id: AtomicU64,
notify: watch::Sender<Option<u64>>,
db_path: PathBuf,
snapshot_store: Arc<SnapshotStore>,
pub snapshot_store: Arc<SnapshotStore>,
}

impl CompactionQueue {
Expand Down Expand Up @@ -109,9 +110,17 @@ impl CompactionQueue {
let to_compact_path = to_compact_path.clone();
let db_path = self.db_path.clone();
move || {
let mut builder = SnapshotBuilder::new(&db_path, job.database_id, job.log_id)?;
let log = LogFile::new(to_compact_path)?;
for frame in log.rev_deduped() {
let (start_fno, end_fno, iter) =
log.rev_deduped().expect("compaction job with no frames!");
let mut builder = SnapshotBuilder::new(
&db_path,
job.database_id,
job.log_id,
start_fno,
end_fno,
)?;
for frame in iter {
let frame = frame?;
builder.push_frame(frame)?;
}
Expand Down Expand Up @@ -168,8 +177,50 @@ pub struct SnapshotBuilder {
last_seen_frame_no: u64,
}

#[derive(Debug, Clone, Copy, Pod, Zeroable)]
#[repr(C)]
pub struct SnapshotFrameHeader {
pub frame_no: FrameNo,
pub page_no: u32,
_pad: u32,
}

#[derive(Clone)]
pub struct SnapshotFrame {
data: Bytes,
}

impl SnapshotFrame {
const SIZE: usize = size_of::<SnapshotFrameHeader>() + 4096;

pub fn try_from_bytes(data: Bytes) -> crate::Result<Self> {
if data.len() != Self::SIZE {
color_eyre::eyre::bail!("invalid snapshot frame")
}

Ok(Self { data })
}

pub fn header(&self) -> Cow<SnapshotFrameHeader> {
let data = &self.data[..size_of::<SnapshotFrameHeader>()];
try_from_bytes(data)
.map(Cow::Borrowed)
.unwrap_or_else(|_| Cow::Owned(pod_read_unaligned(data)))
}

pub(crate) fn page(&self) -> &[u8] {
&self.data[size_of::<SnapshotFrameHeader>()..]
}
}

impl SnapshotBuilder {
pub fn new(db_path: &Path, db_id: DatabaseId, snapshot_id: Uuid) -> color_eyre::Result<Self> {
pub fn new(
db_path: &Path,
db_id: DatabaseId,
snapshot_id: Uuid,
start_fno: FrameNo,
end_fno: FrameNo,
) -> color_eyre::Result<Self> {
let temp_dir = db_path.join("tmp");
let mut target = BufWriter::new(NamedTempFile::new_in(&temp_dir)?);
// reserve header space
Expand All @@ -178,8 +229,8 @@ impl SnapshotBuilder {
Ok(Self {
header: SnapshotFileHeader {
db_id,
start_frame_no: u64::MAX,
end_frame_no: u64::MIN,
start_frame_no: start_fno,
end_frame_no: end_fno,
frame_count: 0,
size_after: 0,
_pad: 0,
Expand All @@ -194,16 +245,20 @@ impl SnapshotBuilder {
pub fn push_frame(&mut self, frame: Frame) -> color_eyre::Result<()> {
assert!(frame.header().frame_no < self.last_seen_frame_no);
self.last_seen_frame_no = frame.header().frame_no;
if frame.header().frame_no < self.header.start_frame_no {
self.header.start_frame_no = frame.header().frame_no;
}

if frame.header().frame_no > self.header.end_frame_no {
self.header.end_frame_no = frame.header().frame_no;
if frame.header().frame_no == self.header.end_frame_no {
self.header.size_after = frame.header().size_after;
}

self.snapshot_file.write_all(frame.as_slice())?;
let header = SnapshotFrameHeader {
frame_no: frame.header().frame_no,
page_no: frame.header().page_no,
_pad: 0,
};

self.snapshot_file.write_all(bytes_of(&header))?;
self.snapshot_file.write_all(frame.page())?;

self.header.frame_count += 1;

Ok(())
Expand Down Expand Up @@ -241,18 +296,18 @@ impl SnapshotFile {
}

/// Iterator on the frames contained in the snapshot file, in reverse frame_no order.
pub fn frames_iter(&self) -> impl Iterator<Item = libsqlx::Result<Bytes>> + '_ {
pub fn frames_iter(&self) -> impl Iterator<Item = crate::Result<SnapshotFrame>> + '_ {
let mut current_offset = 0;
std::iter::from_fn(move || {
if current_offset >= self.header.frame_count {
return None;
}
let read_offset = size_of::<SnapshotFileHeader>() as u64
+ current_offset * LogFile::FRAME_SIZE as u64;
+ current_offset * SnapshotFrame::SIZE as u64;
current_offset += 1;
let mut buf = BytesMut::zeroed(LogFile::FRAME_SIZE);
let mut buf = BytesMut::zeroed(SnapshotFrame::SIZE);
match self.file.read_exact_at(&mut buf, read_offset as _) {
Ok(_) => Some(Ok(buf.freeze())),
Ok(_) => Some(Ok(SnapshotFrame { data: buf.freeze() })),
Err(e) => Some(Err(e.into())),
}
})
Expand All @@ -262,19 +317,16 @@ impl SnapshotFile {
pub fn frames_iter_from(
&self,
frame_no: u64,
) -> impl Iterator<Item = libsqlx::Result<Bytes>> + '_ {
) -> impl Iterator<Item = crate::Result<SnapshotFrame>> + '_ {
let mut iter = self.frames_iter();
std::iter::from_fn(move || match iter.next() {
Some(Ok(bytes)) => match Frame::try_from_bytes(bytes.clone()) {
Ok(frame) => {
if frame.header().frame_no < frame_no {
None
} else {
Some(Ok(bytes))
}
Some(Ok(frame)) => {
if frame.header().frame_no < frame_no {
None
} else {
Some(Ok(frame))
}
Err(e) => Some(Err(e)),
},
}
other => other,
})
}
Expand Down Expand Up @@ -331,20 +383,23 @@ mod test {
let snapshot_file = SnapshotFile::open(&snapshot_path).unwrap();
assert_eq!(snapshot_file.header.start_frame_no, expected_start_frameno);
assert_eq!(snapshot_file.header.end_frame_no, expected_end_frameno);
assert!(snapshot_file.frames_iter().all(|f| expected_page_content
.remove(&Frame::try_from_bytes(f.unwrap()).unwrap().header().page_no)));
assert!(snapshot_file
.frames_iter()
.all(|f| expected_page_content.remove(&f.unwrap().header().page_no)));
assert!(expected_page_content.is_empty());

assert_eq!(snapshot_file
.frames_iter()
.map(Result::unwrap)
.map(Frame::try_from_bytes)
.map(Result::unwrap)
.map(|f| f.header().frame_no)
.reduce(|prev, new| {
assert!(new < prev);
new
}).unwrap(), 0);
assert_eq!(
snapshot_file
.frames_iter()
.map(Result::unwrap)
.map(|f| f.header().frame_no)
.reduce(|prev, new| {
assert!(new < prev);
new
})
.unwrap(),
0
);

assert_eq!(store.locate(database_id, 0).unwrap().snapshot_id, log_id);
}
Expand Down
23 changes: 20 additions & 3 deletions libsqlx-server/src/snapshot_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ use std::mem::size_of;
use std::path::PathBuf;

use bytemuck::{Pod, Zeroable};
use heed::BytesDecode;
use heed_types::{ByteSlice, CowType, SerdeBincode};
use heed_types::{CowType, SerdeBincode};
use libsqlx::FrameNo;
use serde::{Deserialize, Serialize};
use tokio::task::block_in_place;
use uuid::Uuid;

use crate::meta::DatabaseId;
use crate::{compactor::SnapshotFile, meta::DatabaseId};

#[derive(Clone, Copy, Zeroable, Pod, Debug)]
#[repr(transparent)]
Expand Down Expand Up @@ -92,6 +91,10 @@ impl SnapshotStore {
end_frame_no: u64::MAX.into(),
};

for entry in self.database.lazily_decode_data().iter(&txn).unwrap() {
let (k, _) = entry.unwrap();
}

match self
.database
.get_lower_than_or_equal_to(&txn, &key)
Expand All @@ -103,6 +106,11 @@ impl SnapshotStore {
} else if frame_no >= key.start_frame_no.into()
&& frame_no <= key.end_frame_no.into()
{
tracing::debug!(
"found snapshot for {frame_no}; {}-{}",
u64::from(key.start_frame_no),
u64::from(key.end_frame_no)
);
return Some(v);
} else {
None
Expand All @@ -111,6 +119,15 @@ impl SnapshotStore {
Err(_) => todo!(),
}
}

pub fn locate_file(&self, database_id: DatabaseId, frame_no: FrameNo) -> Option<SnapshotFile> {
let meta = self.locate(database_id, frame_no)?;
let path = self
.db_path
.join("snapshots")
.join(meta.snapshot_id.to_string());
Some(SnapshotFile::open(&path).unwrap())
}
}

#[cfg(test)]
Expand Down
Loading