Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Progress towards multi-tenant log compaction #541

Merged
merged 5 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libsqlx-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ axum = "0.6.18"
base64 = "0.21.2"
bincode = "1.3.3"
bytes = { version = "1.4.0", features = ["serde"] }
bytesize = { version = "1.2.0", features = ["serde"] }
clap = { version = "4.3.11", features = ["derive"] }
color-eyre = "0.6.2"
either = "1.8.1"
Expand Down
7 changes: 6 additions & 1 deletion libsqlx-server/src/allocation/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ pub struct AllocConfig {

#[derive(Debug, Serialize, Deserialize)]
pub enum DbConfig {
Primary {},
Primary {
/// maximum size the replication log is allowed to grow, before it is compacted.
max_log_size: usize,
/// Interval at which to force compaction
replication_log_compact_interval: Option<Duration>,
},
Replica {
primary_node_id: NodeId,
proxy_request_timeout_duration: Duration,
Expand Down
95 changes: 61 additions & 34 deletions libsqlx-server/src/allocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::future::poll_fn;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use std::time::Instant;

use either::Either;
use libsqlx::libsql::{LibsqlDatabase, LogCompactor, LogFile};
use libsqlx::libsql::LibsqlDatabase;
use libsqlx::program::Program;
use libsqlx::proxy::WriteProxyDatabase;
use libsqlx::{Database as _, InjectableDatabase};
use tokio::sync::{mpsc, oneshot};
use tokio::task::{block_in_place, JoinSet};
use tokio::time::Interval;

use crate::allocation::primary::FrameStreamer;
use crate::hrana;
Expand All @@ -24,16 +26,18 @@ use crate::linc::{Inbound, NodeId};
use crate::meta::DatabaseId;

use self::config::{AllocConfig, DbConfig};
use self::primary::compactor::Compactor;
use self::primary::{PrimaryConnection, PrimaryDatabase, ProxyResponseBuilder};
use self::replica::{ProxyDatabase, RemoteDb, ReplicaConnection, Replicator};

pub mod config;
mod primary;
mod replica;

/// the maximum number of frame a Frame messahe is allowed to contain
/// Maximum number of frame a Frame message is allowed to contain
const FRAMES_MESSAGE_MAX_COUNT: usize = 5;
const MAX_INJECTOR_BUFFER_CAP: usize = 32;
/// Maximum number of frames in the injector buffer
const MAX_INJECTOR_BUFFER_CAPACITY: usize = 32;

type ExecFn = Box<dyn FnOnce(&mut dyn libsqlx::Connection) + Send>;

Expand All @@ -46,7 +50,10 @@ pub enum AllocationMessage {
}

pub enum Database {
Primary(PrimaryDatabase),
Primary {
db: PrimaryDatabase,
compact_interval: Option<Pin<Box<Interval>>>,
},
Replica {
db: ProxyDatabase,
injector_handle: mpsc::Sender<Frames>,
Expand All @@ -55,49 +62,57 @@ pub enum Database {
},
}

struct Compactor;

impl LogCompactor for Compactor {
fn should_compact(&self, _log: &LogFile) -> bool {
false
}
impl Database {
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
if let Self::Primary {
compact_interval: Some(ref mut interval),
db,
} = self
{
ready!(interval.poll_tick(cx));
let db = db.db.clone();
tokio::task::spawn_blocking(move || {
db.compact_log();
});
}

fn compact(
&self,
_log: LogFile,
_path: std::path::PathBuf,
_size_after: u32,
) -> Result<(), Box<dyn std::error::Error + Sync + Send + 'static>> {
todo!()
Poll::Pending
}
}

impl Database {
pub fn from_config(config: &AllocConfig, path: PathBuf, dispatcher: Arc<dyn Dispatch>) -> Self {
match config.db_config {
DbConfig::Primary {} => {
DbConfig::Primary {
max_log_size,
replication_log_compact_interval,
} => {
let (sender, receiver) = tokio::sync::watch::channel(0);
let db = LibsqlDatabase::new_primary(
path,
Compactor,
Compactor::new(max_log_size, replication_log_compact_interval),
false,
Box::new(move |fno| {
let _ = sender.send(fno);
}),
)
.unwrap();

Self::Primary(PrimaryDatabase {
db,
replica_streams: HashMap::new(),
frame_notifier: receiver,
})
Self::Primary {
db: PrimaryDatabase {
db: Arc::new(db),
replica_streams: HashMap::new(),
frame_notifier: receiver,
},
compact_interval: None,
}
}
DbConfig::Replica {
primary_node_id,
proxy_request_timeout_duration,
} => {
let rdb = LibsqlDatabase::new_replica(path, MAX_INJECTOR_BUFFER_CAP, ()).unwrap();
let rdb =
LibsqlDatabase::new_replica(path, MAX_INJECTOR_BUFFER_CAPACITY, ()).unwrap();
let wdb = RemoteDb {
proxy_request_timeout_duration,
};
Expand Down Expand Up @@ -129,7 +144,10 @@ impl Database {

fn connect(&self, connection_id: u32, alloc: &Allocation) -> impl ConnectionHandler {
match self {
Database::Primary(PrimaryDatabase { db, .. }) => Either::Right(PrimaryConnection {
Database::Primary {
db: PrimaryDatabase { db, .. },
..
} => Either::Right(PrimaryConnection {
conn: db.connect().unwrap(),
}),
Database::Replica { db, primary_id, .. } => Either::Left(ReplicaConnection {
Expand All @@ -144,7 +162,7 @@ impl Database {
}

pub fn is_primary(&self) -> bool {
matches!(self, Self::Primary(..))
matches!(self, Self::Primary { .. })
}
}

Expand Down Expand Up @@ -190,7 +208,9 @@ impl ConnectionHandle {
impl Allocation {
pub async fn run(mut self) {
loop {
let fut = poll_fn(|cx| self.database.poll(cx));
tokio::select! {
_ = fut => (),
Some(msg) = self.inbox.recv() => {
match msg {
AllocationMessage::HranaPipelineReq { req, ret } => {
Expand Down Expand Up @@ -229,12 +249,16 @@ impl Allocation {
req_no,
next_frame_no,
} => match &mut self.database {
Database::Primary(PrimaryDatabase {
db,
replica_streams,
frame_notifier,
Database::Primary {
db:
PrimaryDatabase {
db,
replica_streams,
frame_notifier,
..
},
..
}) => {
} => {
let streamer = FrameStreamer {
logger: db.logger(),
database_id: DatabaseId::from_name(&self.db_name),
Expand Down Expand Up @@ -277,7 +301,10 @@ impl Allocation {
*last_received_frame_ts = Some(Instant::now());
injector_handle.send(frames).await.unwrap();
}
Database::Primary(PrimaryDatabase { .. }) => todo!("handle primary receiving txn"),
Database::Primary {
db: PrimaryDatabase { .. },
..
} => todo!("handle primary receiving txn"),
},
Message::ProxyRequest {
connection_id,
Expand Down
42 changes: 42 additions & 0 deletions libsqlx-server/src/allocation/primary/compactor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::time::{Duration, Instant};

use libsqlx::libsql::{LogCompactor, LogFile};

pub struct Compactor {
max_log_size: usize,
last_compacted_at: Instant,
compact_interval: Option<Duration>,
}

impl Compactor {
pub fn new(max_log_size: usize, compact_interval: Option<Duration>) -> Self {
Self {
max_log_size,
last_compacted_at: Instant::now(),
compact_interval,
}
}
}

impl LogCompactor for Compactor {
fn should_compact(&self, log: &LogFile) -> bool {
let mut should_compact = false;
if let Some(compact_interval) = self.compact_interval {
should_compact |= self.last_compacted_at.elapsed() >= compact_interval
}

should_compact |= log.size() >= self.max_log_size;

should_compact
}

fn compact(
&mut self,
_log: LogFile,
_path: std::path::PathBuf,
_size_after: u32,
) -> Result<(), Box<dyn std::error::Error + Sync + Send + 'static>> {
self.last_compacted_at = Instant::now();
todo!()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use crate::meta::DatabaseId;

use super::{ConnectionHandler, ExecFn, FRAMES_MESSAGE_MAX_COUNT};

pub mod compactor;

const MAX_STEP_BATCH_SIZE: usize = 100_000_000; // ~100kb
//
pub struct PrimaryDatabase {
pub db: LibsqlDatabase<PrimaryType>,
pub db: Arc<LibsqlDatabase<PrimaryType>>,
pub replica_streams: HashMap<NodeId, (u32, tokio::task::JoinHandle<()>)>,
pub frame_notifier: tokio::sync::watch::Receiver<FrameNo>,
}
Expand Down
39 changes: 16 additions & 23 deletions libsqlx-server/src/allocation/replica.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use std::time::Duration;

use futures::Future;
Expand All @@ -11,22 +11,16 @@ use libsqlx::proxy::{WriteProxyConnection, WriteProxyDatabase};
use libsqlx::result_builder::{Column, QueryBuilderConfig, ResultBuilder};
use libsqlx::{DescribeResponse, Frame, FrameNo, Injector};
use parking_lot::Mutex;
use tokio::{
sync::mpsc,
task::block_in_place,
time::{timeout, Sleep},
};
use tokio::sync::mpsc;
use tokio::task::block_in_place;
use tokio::time::{timeout, Sleep};

use crate::linc::bus::Dispatch;
use crate::linc::proto::{BuilderStep, ProxyResponse};
use crate::linc::proto::{Enveloppe, Frames, Message};
use crate::linc::Inbound;
use crate::{
linc::{
bus::Dispatch,
proto::{Enveloppe, Frames, Message},
NodeId, Outbound,
},
meta::DatabaseId,
};
use crate::linc::{NodeId, Outbound};
use crate::meta::DatabaseId;

use super::{ConnectionHandler, ExecFn};

Expand Down Expand Up @@ -227,7 +221,7 @@ impl ReplicaConnection {
.builder
.finish_step(affected_row_count, last_insert_rowid)
.unwrap(),
BuilderStep::StepError(e) => req
BuilderStep::StepError(_e) => req
.builder
.step_error(todo!("handle proxy step error"))
.unwrap(),
Expand Down Expand Up @@ -274,16 +268,15 @@ impl ReplicaConnection {
impl ConnectionHandler for ReplicaConnection {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// we are currently handling a request on this connection
// self.conn.writer().current_req.timeout.poll()
let mut req = self.conn.writer().current_req.lock();
let should_abort_query = match &mut *req {
Some(ref mut req) => match req.timeout.as_mut().poll(cx) {
Poll::Ready(_) => {
req.builder.finnalize_error("request timed out".to_string());
true
}
Poll::Pending => return Poll::Pending,
},
Some(ref mut req) => {
ready!(req.timeout.as_mut().poll(cx));
// the request has timedout, we finalize the builder with a error, and clean the
// current request.
req.builder.finnalize_error("request timed out".to_string());
true
}
None => return Poll::Ready(()),
};

Expand Down
Loading