Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

mt store replica applied frameno #561

Merged
merged 3 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 48 additions & 13 deletions libsqlx-server/src/allocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::linc::bus::Dispatch;
use crate::linc::proto::{Frames, Message};
use crate::linc::{Inbound, NodeId};
use crate::meta::DatabaseId;
use crate::replica_commit_store::ReplicaCommitStore;

use self::config::{AllocConfig, DbConfig};
use self::primary::compactor::Compactor;
Expand Down Expand Up @@ -97,8 +98,14 @@ impl Database {

fn txn_timeout_duration(&self) -> Duration {
match self {
Database::Primary { transaction_timeout_duration, .. } => *transaction_timeout_duration,
Database::Replica { transaction_timeout_duration, .. } => *transaction_timeout_duration,
Database::Primary {
transaction_timeout_duration,
..
} => *transaction_timeout_duration,
Database::Replica {
transaction_timeout_duration,
..
} => *transaction_timeout_duration,
}
}
}
Expand All @@ -109,6 +116,7 @@ impl Database {
path: PathBuf,
dispatcher: Arc<dyn Dispatch>,
compaction_queue: Arc<CompactionQueue>,
replica_commit_store: Arc<ReplicaCommitStore>,
) -> Self {
let database_id = DatabaseId::from_name(&config.db_name);

Expand All @@ -118,7 +126,7 @@ impl Database {
replication_log_compact_interval,
transaction_timeout_duration,
} => {
let (sender, receiver) = tokio::sync::watch::channel(0);
let (sender, receiver) = tokio::sync::watch::channel(None);
let db = LibsqlDatabase::new_primary(
path,
Compactor::new(
Expand All @@ -129,7 +137,7 @@ impl Database {
),
false,
Box::new(move |fno| {
let _ = sender.send(fno);
let _ = sender.send(Some(fno));
}),
)
.unwrap();
Expand All @@ -156,8 +164,22 @@ impl Database {
proxy_request_timeout_duration,
transaction_timeout_duration,
} => {
let rdb =
LibsqlDatabase::new_replica(path, MAX_INJECTOR_BUFFER_CAPACITY, ()).unwrap();
let next_frame_no =
block_in_place(|| replica_commit_store.get_commit_index(database_id))
.map(|fno| fno + 1)
.unwrap_or(0);

let commit_callback = Arc::new(move |fno| {
replica_commit_store.commit(database_id, fno);
});

let rdb = LibsqlDatabase::new_replica(
path,
MAX_INJECTOR_BUFFER_CAPACITY,
commit_callback,
)
.unwrap();

let wdb = RemoteDb {
proxy_request_timeout_duration,
};
Expand All @@ -167,7 +189,7 @@ impl Database {

let replicator = Replicator::new(
dispatcher,
0,
next_frame_no,
database_id,
primary_node_id,
injector,
Expand Down Expand Up @@ -556,9 +578,10 @@ mod test {
use tokio::sync::Notify;

use crate::allocation::replica::ReplicaConnection;
use crate::init_dirs;
use crate::linc::bus::Bus;
use crate::replica_commit_store::ReplicaCommitStore;
use crate::snapshot_store::SnapshotStore;
use crate::{init_dirs, replica_commit_store};

use super::*;

Expand All @@ -567,7 +590,8 @@ mod test {
let bus = Arc::new(Bus::new(0, |_, _| async {}));
let _queue = bus.connect(1); // pretend connection to node 1
let tmp = tempfile::TempDir::new().unwrap();
let read_db = LibsqlDatabase::new_replica(tmp.path().to_path_buf(), 1, ()).unwrap();
let read_db =
LibsqlDatabase::new_replica(tmp.path().to_path_buf(), 1, Arc::new(|_| ())).unwrap();
let write_db = RemoteDb {
proxy_request_timeout_duration: Duration::from_millis(100),
};
Expand Down Expand Up @@ -631,16 +655,23 @@ mod test {
},
};
let (sender, inbox) = mpsc::channel(10);
let env = EnvOpenOptions::new().max_dbs(10).map_size(4096 * 100).open(tmp.path()).unwrap();
let env = EnvOpenOptions::new()
.max_dbs(10)
.map_size(4096 * 100)
.open(tmp.path())
.unwrap();
let store = Arc::new(SnapshotStore::new(tmp.path().to_path_buf(), env.clone()).unwrap());
let queue = Arc::new(CompactionQueue::new(env, tmp.path().to_path_buf(), store).unwrap());
let queue =
Arc::new(CompactionQueue::new(env.clone(), tmp.path().to_path_buf(), store).unwrap());
let replica_commit_store = Arc::new(ReplicaCommitStore::new(env.clone()));
let mut alloc = Allocation {
inbox,
database: Database::from_config(
&config,
tmp.path().to_path_buf(),
bus.clone(),
queue,
replica_commit_store,
),
connections_futs: JoinSet::new(),
next_conn_id: 0,
Expand All @@ -656,14 +687,18 @@ mod test {

let (snd, rcv) = oneshot::channel();
let builder = StepResultsBuilder::new(snd);
conn.execute(Program::seq(&["begin"]), Box::new(builder)).await.unwrap();
conn.execute(Program::seq(&["begin"]), Box::new(builder))
.await
.unwrap();
rcv.await.unwrap().unwrap();

tokio::time::sleep(Duration::from_secs(1)).await;

let (snd, rcv) = oneshot::channel();
let builder = StepResultsBuilder::new(snd);
conn.execute(Program::seq(&["create table test (x)"]), Box::new(builder)).await.unwrap();
conn.execute(Program::seq(&["create table test (x)"]), Box::new(builder))
.await
.unwrap();
assert!(rcv.await.unwrap().is_err());
}
}
6 changes: 3 additions & 3 deletions libsqlx-server/src/allocation/primary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const MAX_STEP_BATCH_SIZE: usize = 100_000_000; // ~100kb
pub struct PrimaryDatabase {
pub db: Arc<LibsqlDatabase<PrimaryType>>,
pub replica_streams: HashMap<NodeId, (u32, tokio::task::JoinHandle<()>)>,
pub frame_notifier: tokio::sync::watch::Receiver<FrameNo>,
pub frame_notifier: tokio::sync::watch::Receiver<Option<FrameNo>>,
pub snapshot_store: Arc<SnapshotStore>,
}

Expand Down Expand Up @@ -207,7 +207,7 @@ pub struct FrameStreamer {
pub req_no: u32,
pub seq_no: u32,
pub dipatcher: Arc<dyn Dispatch>,
pub notifier: tokio::sync::watch::Receiver<FrameNo>,
pub notifier: tokio::sync::watch::Receiver<Option<FrameNo>>,
pub buffer: Vec<Bytes>,
pub snapshot_store: Arc<SnapshotStore>,
}
Expand All @@ -230,7 +230,7 @@ impl FrameStreamer {
}
if self
.notifier
.wait_for(|fno| *fno >= self.next_frame_no)
.wait_for(|fno| fno.map(|f| f >= self.next_frame_no).unwrap_or(false))
.await
.is_err()
{
Expand Down
7 changes: 3 additions & 4 deletions libsqlx-server/src/allocation/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ use tokio::task::block_in_place;
use tokio::time::{timeout, Sleep};

use crate::linc::bus::Dispatch;
use crate::linc::proto::{BuilderStep, ProxyResponse};
use crate::linc::proto::{Enveloppe, Frames, Message};
use crate::linc::Inbound;
use crate::linc::{NodeId, Outbound};
use crate::linc::proto::{BuilderStep, Enveloppe, Frames, Message, ProxyResponse};
use crate::linc::{Inbound, NodeId, Outbound};
use crate::meta::DatabaseId;

use super::{ConnectionHandler, ConnectionMessage};
Expand Down Expand Up @@ -166,6 +164,7 @@ impl Replicator {
});
}
}
// no news from primary for the past 5 secs, send a request again
Err(_) => self.query_replicate().await,
Ok(None) => break,
}
Expand Down
19 changes: 11 additions & 8 deletions libsqlx-server/src/hrana/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,15 @@ pub async fn execute_sequence(conn: &ConnectionHandle, pgm: Program) -> color_ey
let builder = StepResultsBuilder::new(snd);
conn.execute(pgm, Box::new(builder)).await?;

rcv.await?.map_err(|e| anyhow!("{e}"))?.into_iter().try_for_each(|result| match result {
StepResult::Ok => Ok(()),
StepResult::Err(e) => match stmt_error_from_sqld_error(e) {
Ok(stmt_err) => Err(anyhow!(stmt_err)),
Err(sqld_err) => Err(anyhow!(sqld_err)),
},
StepResult::Skipped => Err(anyhow!("Statement in sequence was not executed")),
})
rcv.await?
.map_err(|e| anyhow!("{e}"))?
.into_iter()
.try_for_each(|result| match result {
StepResult::Ok => Ok(()),
StepResult::Err(e) => match stmt_error_from_sqld_error(e) {
Ok(stmt_err) => Err(anyhow!(stmt_err)),
Err(sqld_err) => Err(anyhow!(sqld_err)),
},
StepResult::Skipped => Err(anyhow!("Statement in sequence was not executed")),
})
}
4 changes: 4 additions & 0 deletions libsqlx-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use hyper::server::conn::AddrIncoming;
use linc::bus::Bus;
use manager::Manager;
use meta::Store;
use replica_commit_store::ReplicaCommitStore;
use snapshot_store::SnapshotStore;
use tokio::fs::create_dir_all;
use tokio::net::{TcpListener, TcpStream};
Expand All @@ -28,6 +29,7 @@ mod http;
mod linc;
mod manager;
mod meta;
mod replica_commit_store;
mod snapshot_store;

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -123,11 +125,13 @@ async fn main() -> Result<()> {
snapshot_store,
)?);
let store = Arc::new(Store::new(env.clone()));
let replica_commit_store = Arc::new(ReplicaCommitStore::new(env.clone()));
let manager = Arc::new(Manager::new(
config.db_path.clone(),
store.clone(),
100,
compaction_queue.clone(),
replica_commit_store,
));
let bus = Arc::new(Bus::new(config.cluster.id, manager.clone()));

Expand Down
5 changes: 5 additions & 0 deletions libsqlx-server/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ use crate::linc::bus::Dispatch;
use crate::linc::handler::Handler;
use crate::linc::Inbound;
use crate::meta::{DatabaseId, Store};
use crate::replica_commit_store::ReplicaCommitStore;

pub struct Manager {
cache: Cache<DatabaseId, mpsc::Sender<AllocationMessage>>,
meta_store: Arc<Store>,
db_path: PathBuf,
compaction_queue: Arc<CompactionQueue>,
replica_commit_store: Arc<ReplicaCommitStore>,
}

const MAX_ALLOC_MESSAGE_QUEUE_LEN: usize = 32;
Expand All @@ -30,12 +32,14 @@ impl Manager {
meta_store: Arc<Store>,
max_conccurent_allocs: u64,
compaction_queue: Arc<CompactionQueue>,
replica_commit_store: Arc<ReplicaCommitStore>,
) -> Self {
Self {
cache: Cache::new(max_conccurent_allocs),
meta_store,
db_path,
compaction_queue,
replica_commit_store,
}
}

Expand All @@ -60,6 +64,7 @@ impl Manager {
path,
dispatcher.clone(),
self.compaction_queue.clone(),
self.replica_commit_store.clone(),
),
connections_futs: JoinSet::new(),
next_conn_id: 0,
Expand Down
2 changes: 0 additions & 2 deletions libsqlx-server/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use tokio::task::block_in_place;

use crate::allocation::config::AllocConfig;

type ExecFn = Box<dyn FnOnce(&mut libsqlx::libsql::LibsqlConnection<()>)>;

pub struct Store {
env: heed::Env,
alloc_config_db: heed::Database<OwnedType<DatabaseId>, SerdeBincode<AllocConfig>>,
Expand Down
34 changes: 34 additions & 0 deletions libsqlx-server/src/replica_commit_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use heed_types::OwnedType;
use libsqlx::FrameNo;

use crate::meta::DatabaseId;

/// Stores replica last injected commit index
pub struct ReplicaCommitStore {
env: heed::Env,
database: heed::Database<OwnedType<DatabaseId>, OwnedType<FrameNo>>,
}

impl ReplicaCommitStore {
const DB_NAME: &str = "replica-commit-store";
pub fn new(env: heed::Env) -> Self {
let mut txn = env.write_txn().unwrap();
let database = env.create_database(&mut txn, Some(Self::DB_NAME)).unwrap();
txn.commit().unwrap();

Self { env, database }
}

pub fn commit(&self, database_id: DatabaseId, frame_no: FrameNo) {
let mut txn = self.env.write_txn().unwrap();
self.database
.put(&mut txn, &database_id, &frame_no)
.unwrap();
txn.commit().unwrap();
}

pub fn get_commit_index(&self, database_id: DatabaseId) -> Option<FrameNo> {
let txn = self.env.read_txn().unwrap();
self.database.get(&txn, &database_id).unwrap()
}
}
13 changes: 5 additions & 8 deletions libsqlx/src/database/libsql/injector/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::database::frame::FrameBorrowed;
use crate::database::libsql::replication_log::WAL_PAGE_SIZE;

use super::headers::Headers;
use super::{FrameBuffer, InjectorCommitHandler};
use super::{FrameBuffer, OnCommitCb};

// Those are custom error codes returned by the replicator hook.
pub const LIBSQL_INJECT_FATAL: c_int = 200;
Expand All @@ -23,15 +23,15 @@ pub struct InjectorHookCtx {
buffer: FrameBuffer,
/// currently in a txn
is_txn: bool,
commit_handler: Box<dyn InjectorCommitHandler>,
on_commit_cb: OnCommitCb,
}

impl InjectorHookCtx {
pub fn new(buffer: FrameBuffer, commit_handler: Box<dyn InjectorCommitHandler>) -> Self {
pub fn new(buffer: FrameBuffer, commit_handler: OnCommitCb) -> Self {
Self {
buffer,
is_txn: false,
commit_handler,
on_commit_cb: commit_handler,
}
}

Expand All @@ -45,9 +45,6 @@ impl InjectorHookCtx {
let buffer = self.buffer.lock();
let (mut headers, last_frame_no, size_after) =
make_page_header(buffer.iter().map(|f| &**f));
if size_after != 0 {
self.commit_handler.pre_commit(last_frame_no)?;
}

let ret = unsafe {
orig(
Expand All @@ -64,7 +61,7 @@ impl InjectorHookCtx {
debug_assert!(headers.all_applied());
drop(headers);
if size_after != 0 {
self.commit_handler.post_commit(last_frame_no)?;
(self.on_commit_cb)(last_frame_no);
self.is_txn = false;
}
tracing::trace!("applied frame batch");
Expand Down
Loading