Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit abace0d

Browse files
authored
Merge pull request #553 from libsql/mt-log-compactor
mt log compactor
2 parents 5aa463d + f86954c commit abace0d

File tree

17 files changed

+764
-459
lines changed

17 files changed

+764
-459
lines changed

Cargo.lock

Lines changed: 5 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsqlx-server/Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ clap = { version = "4.3.11", features = ["derive"] }
1818
color-eyre = "0.6.2"
1919
either = "1.8.1"
2020
futures = "0.3.28"
21-
heed = { version = "0.20.0-alpha.3", features = ["serde-bincode"] }
22-
heed-types = "0.20.0-alpha.3"
21+
# heed = { version = "0.20.0-alpha.3", features = ["serde-bincode", "sync-read-txn"] }
22+
heed = { git = "https://github.com/MarinPostma/heed.git", rev = "2ae9a14", features = ["serde-bincode", "sync-read-txn"] }
23+
heed-types = { git = "https://github.com/MarinPostma/heed.git", rev = "2ae9a14" }
24+
# heed-types = "0.20.0-alpha.3"
2325
hmac = "0.12.1"
2426
humantime = "2.1.0"
2527
hyper = { version = "0.14.27", features = ["h2", "server"] }
@@ -46,3 +48,4 @@ uuid = { version = "1.4.0", features = ["v4", "serde"] }
4648

4749
[dev-dependencies]
4850
turmoil = "0.5.5"
51+
walkdir = "2.3.3"

libsqlx-server/src/allocation/mod.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use tokio::task::{block_in_place, JoinSet};
1717
use tokio::time::Interval;
1818

1919
use crate::allocation::primary::FrameStreamer;
20+
use crate::compactor::CompactionQueue;
2021
use crate::hrana;
2122
use crate::hrana::http::handle_pipeline;
2223
use crate::hrana::http::proto::{PipelineRequestBody, PipelineResponseBody};
@@ -70,18 +71,27 @@ impl Database {
7071
} = self
7172
{
7273
ready!(interval.poll_tick(cx));
74+
tracing::debug!("attempting periodic log compaction");
7375
let db = db.db.clone();
7476
tokio::task::spawn_blocking(move || {
7577
db.compact_log();
7678
});
79+
return Poll::Ready(());
7780
}
7881

7982
Poll::Pending
8083
}
8184
}
8285

8386
impl Database {
84-
pub fn from_config(config: &AllocConfig, path: PathBuf, dispatcher: Arc<dyn Dispatch>) -> Self {
87+
pub fn from_config(
88+
config: &AllocConfig,
89+
path: PathBuf,
90+
dispatcher: Arc<dyn Dispatch>,
91+
compaction_queue: Arc<CompactionQueue>,
92+
) -> Self {
93+
let database_id = DatabaseId::from_name(&config.db_name);
94+
8595
match config.db_config {
8696
DbConfig::Primary {
8797
max_log_size,
@@ -90,21 +100,32 @@ impl Database {
90100
let (sender, receiver) = tokio::sync::watch::channel(0);
91101
let db = LibsqlDatabase::new_primary(
92102
path,
93-
Compactor::new(max_log_size, replication_log_compact_interval),
103+
Compactor::new(
104+
max_log_size,
105+
replication_log_compact_interval,
106+
compaction_queue,
107+
database_id,
108+
),
94109
false,
95110
Box::new(move |fno| {
96111
let _ = sender.send(fno);
97112
}),
98113
)
99114
.unwrap();
100115

116+
let compact_interval = replication_log_compact_interval.map(|d| {
117+
let mut i = tokio::time::interval(d / 2);
118+
i.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
119+
Box::pin(i)
120+
});
121+
101122
Self::Primary {
102123
db: PrimaryDatabase {
103124
db: Arc::new(db),
104125
replica_streams: HashMap::new(),
105126
frame_notifier: receiver,
106127
},
107-
compact_interval: None,
128+
compact_interval,
108129
}
109130
}
110131
DbConfig::Replica {
@@ -119,7 +140,6 @@ impl Database {
119140
let mut db = WriteProxyDatabase::new(rdb, wdb, Arc::new(|_| ()));
120141
let injector = db.injector().unwrap();
121142
let (sender, receiver) = mpsc::channel(16);
122-
let database_id = DatabaseId::from_name(&config.db_name);
123143

124144
let replicator = Replicator::new(
125145
dispatcher,
@@ -225,9 +245,9 @@ impl Allocation {
225245
}
226246
}
227247
},
228-
maybe_id = self.connections_futs.join_next() => {
229-
if let Some(Ok(_id)) = maybe_id {
230-
// self.connections.remove_entry(&id);
248+
maybe_id = self.connections_futs.join_next(), if !self.connections_futs.is_empty() => {
249+
if let Some(Ok((node_id, conn_id))) = maybe_id {
250+
self.connections.get_mut(&node_id).map(|m| m.remove(&conn_id));
231251
}
232252
},
233253
else => break,
@@ -475,6 +495,7 @@ impl<C: ConnectionHandler> Connection<C> {
475495
mod test {
476496
use std::time::Duration;
477497

498+
use libsqlx::result_builder::ResultBuilder;
478499
use tokio::sync::Notify;
479500

480501
use crate::allocation::replica::ReplicaConnection;

libsqlx-server/src/allocation/primary/compactor.rs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,38 @@
1-
use std::time::{Duration, Instant};
1+
use std::{
2+
path::PathBuf,
3+
sync::Arc,
4+
time::{Duration, Instant},
5+
};
26

37
use libsqlx::libsql::{LogCompactor, LogFile};
8+
use uuid::Uuid;
9+
10+
use crate::{
11+
compactor::{CompactionJob, CompactionQueue},
12+
meta::DatabaseId,
13+
};
414

515
pub struct Compactor {
616
max_log_size: usize,
717
last_compacted_at: Instant,
818
compact_interval: Option<Duration>,
19+
queue: Arc<CompactionQueue>,
20+
database_id: DatabaseId,
921
}
1022

1123
impl Compactor {
12-
pub fn new(max_log_size: usize, compact_interval: Option<Duration>) -> Self {
24+
pub fn new(
25+
max_log_size: usize,
26+
compact_interval: Option<Duration>,
27+
queue: Arc<CompactionQueue>,
28+
database_id: DatabaseId,
29+
) -> Self {
1330
Self {
1431
max_log_size,
1532
last_compacted_at: Instant::now(),
1633
compact_interval,
34+
queue,
35+
database_id,
1736
}
1837
}
1938
}
@@ -32,11 +51,18 @@ impl LogCompactor for Compactor {
3251

3352
fn compact(
3453
&mut self,
35-
_log: LogFile,
36-
_path: std::path::PathBuf,
37-
_size_after: u32,
54+
log_id: Uuid,
3855
) -> Result<(), Box<dyn std::error::Error + Sync + Send + 'static>> {
3956
self.last_compacted_at = Instant::now();
40-
todo!()
57+
self.queue.push(&CompactionJob {
58+
database_id: self.database_id,
59+
log_id,
60+
});
61+
62+
Ok(())
63+
}
64+
65+
fn snapshot_dir(&self) -> PathBuf {
66+
self.queue.snapshot_queue_dir()
4167
}
4268
}

0 commit comments

Comments
 (0)