Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit ebb8829

Browse files
authored
Merge pull request #541 from libsql/mt-log-compaction
Progress towards multi-tenant log compaction
2 parents eb729b7 + 773bd73 commit ebb8829

File tree

10 files changed

+242
-108
lines changed

10 files changed

+242
-108
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsqlx-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ axum = "0.6.18"
1212
base64 = "0.21.2"
1313
bincode = "1.3.3"
1414
bytes = { version = "1.4.0", features = ["serde"] }
15+
bytesize = { version = "1.2.0", features = ["serde"] }
1516
clap = { version = "4.3.11", features = ["derive"] }
1617
color-eyre = "0.6.2"
1718
either = "1.8.1"

libsqlx-server/src/allocation/config.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@ pub struct AllocConfig {
2020

2121
#[derive(Debug, Serialize, Deserialize)]
2222
pub enum DbConfig {
23-
Primary {},
23+
Primary {
24+
/// maximum size the replication log is allowed to grow, before it is compacted.
25+
max_log_size: usize,
26+
/// Interval at which to force compaction
27+
replication_log_compact_interval: Option<Duration>,
28+
},
2429
Replica {
2530
primary_node_id: NodeId,
2631
proxy_request_timeout_duration: Duration,

libsqlx-server/src/allocation/mod.rs

Lines changed: 61 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,19 @@ use std::collections::hash_map::Entry;
22
use std::collections::HashMap;
33
use std::future::poll_fn;
44
use std::path::PathBuf;
5+
use std::pin::Pin;
56
use std::sync::Arc;
6-
use std::task::{Context, Poll};
7+
use std::task::{ready, Context, Poll};
78
use std::time::Instant;
89

910
use either::Either;
10-
use libsqlx::libsql::{LibsqlDatabase, LogCompactor, LogFile};
11+
use libsqlx::libsql::LibsqlDatabase;
1112
use libsqlx::program::Program;
1213
use libsqlx::proxy::WriteProxyDatabase;
1314
use libsqlx::{Database as _, InjectableDatabase};
1415
use tokio::sync::{mpsc, oneshot};
1516
use tokio::task::{block_in_place, JoinSet};
17+
use tokio::time::Interval;
1618

1719
use crate::allocation::primary::FrameStreamer;
1820
use crate::hrana;
@@ -24,16 +26,18 @@ use crate::linc::{Inbound, NodeId};
2426
use crate::meta::DatabaseId;
2527

2628
use self::config::{AllocConfig, DbConfig};
29+
use self::primary::compactor::Compactor;
2730
use self::primary::{PrimaryConnection, PrimaryDatabase, ProxyResponseBuilder};
2831
use self::replica::{ProxyDatabase, RemoteDb, ReplicaConnection, Replicator};
2932

3033
pub mod config;
3134
mod primary;
3235
mod replica;
3336

34-
/// the maximum number of frame a Frame messahe is allowed to contain
37+
/// Maximum number of frame a Frame message is allowed to contain
3538
const FRAMES_MESSAGE_MAX_COUNT: usize = 5;
36-
const MAX_INJECTOR_BUFFER_CAP: usize = 32;
39+
/// Maximum number of frames in the injector buffer
40+
const MAX_INJECTOR_BUFFER_CAPACITY: usize = 32;
3741

3842
type ExecFn = Box<dyn FnOnce(&mut dyn libsqlx::Connection) + Send>;
3943

@@ -46,7 +50,10 @@ pub enum AllocationMessage {
4650
}
4751

4852
pub enum Database {
49-
Primary(PrimaryDatabase),
53+
Primary {
54+
db: PrimaryDatabase,
55+
compact_interval: Option<Pin<Box<Interval>>>,
56+
},
5057
Replica {
5158
db: ProxyDatabase,
5259
injector_handle: mpsc::Sender<Frames>,
@@ -55,49 +62,57 @@ pub enum Database {
5562
},
5663
}
5764

58-
struct Compactor;
59-
60-
impl LogCompactor for Compactor {
61-
fn should_compact(&self, _log: &LogFile) -> bool {
62-
false
63-
}
65+
impl Database {
66+
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
67+
if let Self::Primary {
68+
compact_interval: Some(ref mut interval),
69+
db,
70+
} = self
71+
{
72+
ready!(interval.poll_tick(cx));
73+
let db = db.db.clone();
74+
tokio::task::spawn_blocking(move || {
75+
db.compact_log();
76+
});
77+
}
6478

65-
fn compact(
66-
&self,
67-
_log: LogFile,
68-
_path: std::path::PathBuf,
69-
_size_after: u32,
70-
) -> Result<(), Box<dyn std::error::Error + Sync + Send + 'static>> {
71-
todo!()
79+
Poll::Pending
7280
}
7381
}
7482

7583
impl Database {
7684
pub fn from_config(config: &AllocConfig, path: PathBuf, dispatcher: Arc<dyn Dispatch>) -> Self {
7785
match config.db_config {
78-
DbConfig::Primary {} => {
86+
DbConfig::Primary {
87+
max_log_size,
88+
replication_log_compact_interval,
89+
} => {
7990
let (sender, receiver) = tokio::sync::watch::channel(0);
8091
let db = LibsqlDatabase::new_primary(
8192
path,
82-
Compactor,
93+
Compactor::new(max_log_size, replication_log_compact_interval),
8394
false,
8495
Box::new(move |fno| {
8596
let _ = sender.send(fno);
8697
}),
8798
)
8899
.unwrap();
89100

90-
Self::Primary(PrimaryDatabase {
91-
db,
92-
replica_streams: HashMap::new(),
93-
frame_notifier: receiver,
94-
})
101+
Self::Primary {
102+
db: PrimaryDatabase {
103+
db: Arc::new(db),
104+
replica_streams: HashMap::new(),
105+
frame_notifier: receiver,
106+
},
107+
compact_interval: None,
108+
}
95109
}
96110
DbConfig::Replica {
97111
primary_node_id,
98112
proxy_request_timeout_duration,
99113
} => {
100-
let rdb = LibsqlDatabase::new_replica(path, MAX_INJECTOR_BUFFER_CAP, ()).unwrap();
114+
let rdb =
115+
LibsqlDatabase::new_replica(path, MAX_INJECTOR_BUFFER_CAPACITY, ()).unwrap();
101116
let wdb = RemoteDb {
102117
proxy_request_timeout_duration,
103118
};
@@ -129,7 +144,10 @@ impl Database {
129144

130145
fn connect(&self, connection_id: u32, alloc: &Allocation) -> impl ConnectionHandler {
131146
match self {
132-
Database::Primary(PrimaryDatabase { db, .. }) => Either::Right(PrimaryConnection {
147+
Database::Primary {
148+
db: PrimaryDatabase { db, .. },
149+
..
150+
} => Either::Right(PrimaryConnection {
133151
conn: db.connect().unwrap(),
134152
}),
135153
Database::Replica { db, primary_id, .. } => Either::Left(ReplicaConnection {
@@ -144,7 +162,7 @@ impl Database {
144162
}
145163

146164
pub fn is_primary(&self) -> bool {
147-
matches!(self, Self::Primary(..))
165+
matches!(self, Self::Primary { .. })
148166
}
149167
}
150168

@@ -190,7 +208,9 @@ impl ConnectionHandle {
190208
impl Allocation {
191209
pub async fn run(mut self) {
192210
loop {
211+
let fut = poll_fn(|cx| self.database.poll(cx));
193212
tokio::select! {
213+
_ = fut => (),
194214
Some(msg) = self.inbox.recv() => {
195215
match msg {
196216
AllocationMessage::HranaPipelineReq { req, ret } => {
@@ -229,12 +249,16 @@ impl Allocation {
229249
req_no,
230250
next_frame_no,
231251
} => match &mut self.database {
232-
Database::Primary(PrimaryDatabase {
233-
db,
234-
replica_streams,
235-
frame_notifier,
252+
Database::Primary {
253+
db:
254+
PrimaryDatabase {
255+
db,
256+
replica_streams,
257+
frame_notifier,
258+
..
259+
},
236260
..
237-
}) => {
261+
} => {
238262
let streamer = FrameStreamer {
239263
logger: db.logger(),
240264
database_id: DatabaseId::from_name(&self.db_name),
@@ -277,7 +301,10 @@ impl Allocation {
277301
*last_received_frame_ts = Some(Instant::now());
278302
injector_handle.send(frames).await.unwrap();
279303
}
280-
Database::Primary(PrimaryDatabase { .. }) => todo!("handle primary receiving txn"),
304+
Database::Primary {
305+
db: PrimaryDatabase { .. },
306+
..
307+
} => todo!("handle primary receiving txn"),
281308
},
282309
Message::ProxyRequest {
283310
connection_id,
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use std::time::{Duration, Instant};
2+
3+
use libsqlx::libsql::{LogCompactor, LogFile};
4+
5+
pub struct Compactor {
6+
max_log_size: usize,
7+
last_compacted_at: Instant,
8+
compact_interval: Option<Duration>,
9+
}
10+
11+
impl Compactor {
12+
pub fn new(max_log_size: usize, compact_interval: Option<Duration>) -> Self {
13+
Self {
14+
max_log_size,
15+
last_compacted_at: Instant::now(),
16+
compact_interval,
17+
}
18+
}
19+
}
20+
21+
impl LogCompactor for Compactor {
22+
fn should_compact(&self, log: &LogFile) -> bool {
23+
let mut should_compact = false;
24+
if let Some(compact_interval) = self.compact_interval {
25+
should_compact |= self.last_compacted_at.elapsed() >= compact_interval
26+
}
27+
28+
should_compact |= log.size() >= self.max_log_size;
29+
30+
should_compact
31+
}
32+
33+
fn compact(
34+
&mut self,
35+
_log: LogFile,
36+
_path: std::path::PathBuf,
37+
_size_after: u32,
38+
) -> Result<(), Box<dyn std::error::Error + Sync + Send + 'static>> {
39+
self.last_compacted_at = Instant::now();
40+
todo!()
41+
}
42+
}

libsqlx-server/src/allocation/primary.rs renamed to libsqlx-server/src/allocation/primary/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ use crate::meta::DatabaseId;
1616

1717
use super::{ConnectionHandler, ExecFn, FRAMES_MESSAGE_MAX_COUNT};
1818

19+
pub mod compactor;
20+
1921
const MAX_STEP_BATCH_SIZE: usize = 100_000_000; // ~100kb
2022
//
2123
pub struct PrimaryDatabase {
22-
pub db: LibsqlDatabase<PrimaryType>,
24+
pub db: Arc<LibsqlDatabase<PrimaryType>>,
2325
pub replica_streams: HashMap<NodeId, (u32, tokio::task::JoinHandle<()>)>,
2426
pub frame_notifier: tokio::sync::watch::Receiver<FrameNo>,
2527
}

libsqlx-server/src/allocation/replica.rs

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::ops::Deref;
22
use std::pin::Pin;
33
use std::sync::Arc;
4-
use std::task::{Context, Poll};
4+
use std::task::{ready, Context, Poll};
55
use std::time::Duration;
66

77
use futures::Future;
@@ -11,22 +11,16 @@ use libsqlx::proxy::{WriteProxyConnection, WriteProxyDatabase};
1111
use libsqlx::result_builder::{Column, QueryBuilderConfig, ResultBuilder};
1212
use libsqlx::{DescribeResponse, Frame, FrameNo, Injector};
1313
use parking_lot::Mutex;
14-
use tokio::{
15-
sync::mpsc,
16-
task::block_in_place,
17-
time::{timeout, Sleep},
18-
};
14+
use tokio::sync::mpsc;
15+
use tokio::task::block_in_place;
16+
use tokio::time::{timeout, Sleep};
1917

18+
use crate::linc::bus::Dispatch;
2019
use crate::linc::proto::{BuilderStep, ProxyResponse};
20+
use crate::linc::proto::{Enveloppe, Frames, Message};
2121
use crate::linc::Inbound;
22-
use crate::{
23-
linc::{
24-
bus::Dispatch,
25-
proto::{Enveloppe, Frames, Message},
26-
NodeId, Outbound,
27-
},
28-
meta::DatabaseId,
29-
};
22+
use crate::linc::{NodeId, Outbound};
23+
use crate::meta::DatabaseId;
3024

3125
use super::{ConnectionHandler, ExecFn};
3226

@@ -227,7 +221,7 @@ impl ReplicaConnection {
227221
.builder
228222
.finish_step(affected_row_count, last_insert_rowid)
229223
.unwrap(),
230-
BuilderStep::StepError(e) => req
224+
BuilderStep::StepError(_e) => req
231225
.builder
232226
.step_error(todo!("handle proxy step error"))
233227
.unwrap(),
@@ -274,16 +268,15 @@ impl ReplicaConnection {
274268
impl ConnectionHandler for ReplicaConnection {
275269
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
276270
// we are currently handling a request on this connection
277-
// self.conn.writer().current_req.timeout.poll()
278271
let mut req = self.conn.writer().current_req.lock();
279272
let should_abort_query = match &mut *req {
280-
Some(ref mut req) => match req.timeout.as_mut().poll(cx) {
281-
Poll::Ready(_) => {
282-
req.builder.finnalize_error("request timed out".to_string());
283-
true
284-
}
285-
Poll::Pending => return Poll::Pending,
286-
},
273+
Some(ref mut req) => {
274+
ready!(req.timeout.as_mut().poll(cx));
275+
// the request has timedout, we finalize the builder with a error, and clean the
276+
// current request.
277+
req.builder.finnalize_error("request timed out".to_string());
278+
true
279+
}
287280
None => return Poll::Ready(()),
288281
};
289282

0 commit comments

Comments
 (0)