Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

multi-tenancy log compaction #553

Merged
merged 9 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions libsqlx-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ clap = { version = "4.3.11", features = ["derive"] }
color-eyre = "0.6.2"
either = "1.8.1"
futures = "0.3.28"
heed = { version = "0.20.0-alpha.3", features = ["serde-bincode"] }
heed-types = "0.20.0-alpha.3"
# heed = { version = "0.20.0-alpha.3", features = ["serde-bincode", "sync-read-txn"] }
heed = { git = "https://github.com/MarinPostma/heed.git", rev = "2ae9a14", features = ["serde-bincode", "sync-read-txn"] }
heed-types = { git = "https://github.com/MarinPostma/heed.git", rev = "2ae9a14" }
# heed-types = "0.20.0-alpha.3"
hmac = "0.12.1"
humantime = "2.1.0"
hyper = { version = "0.14.27", features = ["h2", "server"] }
Expand All @@ -46,3 +48,4 @@ uuid = { version = "1.4.0", features = ["v4", "serde"] }

[dev-dependencies]
turmoil = "0.5.5"
walkdir = "2.3.3"
35 changes: 28 additions & 7 deletions libsqlx-server/src/allocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tokio::task::{block_in_place, JoinSet};
use tokio::time::Interval;

use crate::allocation::primary::FrameStreamer;
use crate::compactor::CompactionQueue;
use crate::hrana;
use crate::hrana::http::handle_pipeline;
use crate::hrana::http::proto::{PipelineRequestBody, PipelineResponseBody};
Expand Down Expand Up @@ -70,18 +71,27 @@ impl Database {
} = self
{
ready!(interval.poll_tick(cx));
tracing::debug!("attempting periodic log compaction");
let db = db.db.clone();
tokio::task::spawn_blocking(move || {
db.compact_log();
});
return Poll::Ready(());
}

Poll::Pending
}
}

impl Database {
pub fn from_config(config: &AllocConfig, path: PathBuf, dispatcher: Arc<dyn Dispatch>) -> Self {
pub fn from_config(
config: &AllocConfig,
path: PathBuf,
dispatcher: Arc<dyn Dispatch>,
compaction_queue: Arc<CompactionQueue>,
) -> Self {
let database_id = DatabaseId::from_name(&config.db_name);

match config.db_config {
DbConfig::Primary {
max_log_size,
Expand All @@ -90,21 +100,32 @@ impl Database {
let (sender, receiver) = tokio::sync::watch::channel(0);
let db = LibsqlDatabase::new_primary(
path,
Compactor::new(max_log_size, replication_log_compact_interval),
Compactor::new(
max_log_size,
replication_log_compact_interval,
compaction_queue,
database_id,
),
false,
Box::new(move |fno| {
let _ = sender.send(fno);
}),
)
.unwrap();

let compact_interval = replication_log_compact_interval.map(|d| {
let mut i = tokio::time::interval(d / 2);
i.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
Box::pin(i)
});

Self::Primary {
db: PrimaryDatabase {
db: Arc::new(db),
replica_streams: HashMap::new(),
frame_notifier: receiver,
},
compact_interval: None,
compact_interval,
}
}
DbConfig::Replica {
Expand All @@ -119,7 +140,6 @@ impl Database {
let mut db = WriteProxyDatabase::new(rdb, wdb, Arc::new(|_| ()));
let injector = db.injector().unwrap();
let (sender, receiver) = mpsc::channel(16);
let database_id = DatabaseId::from_name(&config.db_name);

let replicator = Replicator::new(
dispatcher,
Expand Down Expand Up @@ -225,9 +245,9 @@ impl Allocation {
}
}
},
maybe_id = self.connections_futs.join_next() => {
if let Some(Ok(_id)) = maybe_id {
// self.connections.remove_entry(&id);
maybe_id = self.connections_futs.join_next(), if !self.connections_futs.is_empty() => {
if let Some(Ok((node_id, conn_id))) = maybe_id {
self.connections.get_mut(&node_id).map(|m| m.remove(&conn_id));
}
},
else => break,
Expand Down Expand Up @@ -475,6 +495,7 @@ impl<C: ConnectionHandler> Connection<C> {
mod test {
use std::time::Duration;

use libsqlx::result_builder::ResultBuilder;
use tokio::sync::Notify;

use crate::allocation::replica::ReplicaConnection;
Expand Down
38 changes: 32 additions & 6 deletions libsqlx-server/src/allocation/primary/compactor.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
use std::time::{Duration, Instant};
use std::{
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};

use libsqlx::libsql::{LogCompactor, LogFile};
use uuid::Uuid;

use crate::{
compactor::{CompactionJob, CompactionQueue},
meta::DatabaseId,
};

pub struct Compactor {
max_log_size: usize,
last_compacted_at: Instant,
compact_interval: Option<Duration>,
queue: Arc<CompactionQueue>,
database_id: DatabaseId,
}

impl Compactor {
pub fn new(max_log_size: usize, compact_interval: Option<Duration>) -> Self {
pub fn new(
max_log_size: usize,
compact_interval: Option<Duration>,
queue: Arc<CompactionQueue>,
database_id: DatabaseId,
) -> Self {
Self {
max_log_size,
last_compacted_at: Instant::now(),
compact_interval,
queue,
database_id,
}
}
}
Expand All @@ -32,11 +51,18 @@ impl LogCompactor for Compactor {

fn compact(
&mut self,
_log: LogFile,
_path: std::path::PathBuf,
_size_after: u32,
log_id: Uuid,
) -> Result<(), Box<dyn std::error::Error + Sync + Send + 'static>> {
self.last_compacted_at = Instant::now();
todo!()
self.queue.push(&CompactionJob {
database_id: self.database_id,
log_id,
});

Ok(())
}

fn snapshot_dir(&self) -> PathBuf {
self.queue.snapshot_queue_dir()
}
}
Loading