Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit b23ec73

Browse files
authored
Merge pull request #532 from libsql/full-async-builder
Async ResultBuilder
2 parents 0ba8ba1 + d1e9845 commit b23ec73

File tree

15 files changed

+442
-360
lines changed

15 files changed

+442
-360
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsqlx-server/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@ bincode = "1.3.3"
1414
bytes = { version = "1.4.0", features = ["serde"] }
1515
clap = { version = "4.3.11", features = ["derive"] }
1616
color-eyre = "0.6.2"
17+
either = "1.8.1"
1718
futures = "0.3.28"
1819
hmac = "0.12.1"
1920
hyper = { version = "0.14.27", features = ["h2", "server"] }
2021
itertools = "0.11.0"
21-
libsqlx = { version = "0.1.0", path = "../libsqlx" }
22+
libsqlx = { version = "0.1.0", path = "../libsqlx", features = ["tokio"] }
2223
moka = { version = "0.11.2", features = ["future"] }
2324
parking_lot = "0.12.1"
2425
priority-queue = "1.3.2"

libsqlx-server/src/allocation/mod.rs

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1-
use std::collections::HashMap;
21
use std::collections::hash_map::Entry;
2+
use std::collections::HashMap;
33
use std::path::PathBuf;
44
use std::sync::Arc;
55
use std::time::{Duration, Instant};
66

77
use bytes::Bytes;
8+
use either::Either;
89
use libsqlx::libsql::{LibsqlDatabase, LogCompactor, LogFile, PrimaryType, ReplicaType};
9-
use libsqlx::proxy::WriteProxyDatabase;
10+
use libsqlx::proxy::{WriteProxyConnection, WriteProxyDatabase};
11+
use libsqlx::result_builder::ResultBuilder;
1012
use libsqlx::{
1113
Database as _, DescribeResponse, Frame, FrameNo, InjectableDatabase, Injector, LogReadError,
1214
ReplicationLogger,
@@ -27,7 +29,11 @@ use self::config::{AllocConfig, DbConfig};
2729

2830
pub mod config;
2931

30-
type ExecFn = Box<dyn FnOnce(&mut dyn libsqlx::Connection) + Send>;
32+
type LibsqlConnection = Either<
33+
libsqlx::libsql::LibsqlConnection<PrimaryType>,
34+
WriteProxyConnection<libsqlx::libsql::LibsqlConnection<ReplicaType>, DummyConn>,
35+
>;
36+
type ExecFn = Box<dyn FnOnce(&mut LibsqlConnection) + Send>;
3137

3238
#[derive(Clone)]
3339
pub struct ConnectionId {
@@ -47,10 +53,10 @@ pub struct DummyDb;
4753
pub struct DummyConn;
4854

4955
impl libsqlx::Connection for DummyConn {
50-
fn execute_program(
56+
fn execute_program<B: ResultBuilder>(
5157
&mut self,
52-
_pgm: libsqlx::program::Program,
53-
_result_builder: &mut dyn libsqlx::result_builder::ResultBuilder,
58+
_pgm: &libsqlx::program::Program,
59+
_result_builder: B,
5460
) -> libsqlx::Result<()> {
5561
todo!()
5662
}
@@ -207,7 +213,12 @@ impl FrameStreamer {
207213
if !self.buffer.is_empty() {
208214
self.send_frames().await;
209215
}
210-
if self.notifier.wait_for(|fno| dbg!(*fno) >= self.next_frame_no).await.is_err() {
216+
if self
217+
.notifier
218+
.wait_for(|fno| *fno >= self.next_frame_no)
219+
.await
220+
.is_err()
221+
{
211222
break;
212223
}
213224
}
@@ -244,7 +255,9 @@ impl Database {
244255
path,
245256
Compactor,
246257
false,
247-
Box::new(move |fno| { let _ = sender.send(fno); } ),
258+
Box::new(move |fno| {
259+
let _ = sender.send(fno);
260+
}),
248261
)
249262
.unwrap();
250263

@@ -253,7 +266,7 @@ impl Database {
253266
replica_streams: HashMap::new(),
254267
frame_notifier: receiver,
255268
}
256-
},
269+
}
257270
DbConfig::Replica { primary_node_id } => {
258271
let rdb = LibsqlDatabase::new_replica(path, MAX_INJECTOR_BUFFER_CAP, ()).unwrap();
259272
let wdb = DummyDb;
@@ -285,10 +298,10 @@ impl Database {
285298
}
286299
}
287300

288-
fn connect(&self) -> Box<dyn libsqlx::Connection + Send> {
301+
fn connect(&self) -> LibsqlConnection {
289302
match self {
290-
Database::Primary { db, .. } => Box::new(db.connect().unwrap()),
291-
Database::Replica { db, .. } => Box::new(db.connect().unwrap()),
303+
Database::Primary { db, .. } => Either::Left(db.connect().unwrap()),
304+
Database::Replica { db, .. } => Either::Right(db.connect().unwrap()),
292305
}
293306
}
294307
}
@@ -315,11 +328,11 @@ pub struct ConnectionHandle {
315328
impl ConnectionHandle {
316329
pub async fn exec<F, R>(&self, f: F) -> crate::Result<R>
317330
where
318-
F: for<'a> FnOnce(&'a mut (dyn libsqlx::Connection + 'a)) -> R + Send + 'static,
331+
F: for<'a> FnOnce(&'a mut LibsqlConnection) -> R + Send + 'static,
319332
R: Send + 'static,
320333
{
321334
let (sender, ret) = oneshot::channel();
322-
let cb = move |conn: &mut dyn libsqlx::Connection| {
335+
let cb = move |conn: &mut LibsqlConnection| {
323336
let res = f(conn);
324337
let _ = sender.send(res);
325338
};
@@ -371,9 +384,15 @@ impl Allocation {
371384
Message::Handshake { .. } => unreachable!("handshake should have been caught earlier"),
372385
Message::ReplicationHandshake { .. } => todo!(),
373386
Message::ReplicationHandshakeResponse { .. } => todo!(),
374-
Message::Replicate { req_no, next_frame_no } => match &mut self.database {
375-
Database::Primary { db, replica_streams, frame_notifier } => {
376-
dbg!(next_frame_no);
387+
Message::Replicate {
388+
req_no,
389+
next_frame_no,
390+
} => match &mut self.database {
391+
Database::Primary {
392+
db,
393+
replica_streams,
394+
frame_notifier,
395+
} => {
377396
let streamer = FrameStreamer {
378397
logger: db.logger(),
379398
database_id: DatabaseId::from_name(&self.db_name),
@@ -396,15 +415,15 @@ impl Allocation {
396415
*old_req_no = req_no;
397416
old_handle.abort();
398417
}
399-
},
418+
}
400419
Entry::Vacant(e) => {
401420
let handle = tokio::spawn(streamer.run());
402421
// For some reason, not yielding causes the task not to be spawned
403422
tokio::task::yield_now().await;
404423
e.insert((req_no, handle));
405-
},
424+
}
406425
}
407-
},
426+
}
408427
Database::Replica { .. } => todo!("not a primary!"),
409428
},
410429
Message::Frames(frames) => match &mut self.database {
@@ -459,7 +478,7 @@ impl Allocation {
459478

460479
struct Connection {
461480
id: u32,
462-
conn: Box<dyn libsqlx::Connection + Send>,
481+
conn: LibsqlConnection,
463482
exit: oneshot::Receiver<()>,
464483
exec: mpsc::Receiver<ExecFn>,
465484
}
@@ -470,7 +489,7 @@ impl Connection {
470489
tokio::select! {
471490
_ = &mut self.exit => break,
472491
Some(exec) = self.exec.recv() => {
473-
tokio::task::block_in_place(|| exec(&mut *self.conn));
492+
tokio::task::block_in_place(|| exec(&mut self.conn));
474493
}
475494
}
476495
}

libsqlx-server/src/hrana/batch.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ use super::stmt::{proto_stmt_to_query, stmt_error_from_sqld_error};
88
use super::{proto, ProtocolError, Version};
99

1010
use color_eyre::eyre::anyhow;
11+
use libsqlx::Connection;
1112
use libsqlx::analysis::Statement;
1213
use libsqlx::program::{Cond, Program, Step};
1314
use libsqlx::query::{Params, Query};
1415
use libsqlx::result_builder::{StepResult, StepResultsBuilder};
16+
use tokio::sync::oneshot;
1517

1618
fn proto_cond_to_cond(cond: &proto::BatchCond, max_step_i: usize) -> color_eyre::Result<Cond> {
1719
let try_convert_step = |step: i32| -> Result<usize, ProtocolError> {
@@ -73,15 +75,15 @@ pub async fn execute_batch(
7375
db: &ConnectionHandle,
7476
pgm: Program,
7577
) -> color_eyre::Result<proto::BatchResult> {
76-
let builder = db
78+
let fut = db
7779
.exec(move |conn| -> color_eyre::Result<_> {
78-
let mut builder = HranaBatchProtoBuilder::default();
79-
conn.execute_program(pgm, &mut builder)?;
80-
Ok(builder)
80+
let (builder, ret) = HranaBatchProtoBuilder::new();
81+
conn.execute_program(&pgm, builder)?;
82+
Ok(ret)
8183
})
8284
.await??;
8385

84-
Ok(builder.into_ret())
86+
Ok(fut.await?)
8587
}
8688

8789
pub fn proto_sequence_to_program(sql: &str) -> color_eyre::Result<Program> {
@@ -110,17 +112,17 @@ pub fn proto_sequence_to_program(sql: &str) -> color_eyre::Result<Program> {
110112
}
111113

112114
pub async fn execute_sequence(conn: &ConnectionHandle, pgm: Program) -> color_eyre::Result<()> {
113-
let builder = conn
115+
let fut = conn
114116
.exec(move |conn| -> color_eyre::Result<_> {
115-
let mut builder = StepResultsBuilder::default();
116-
conn.execute_program(pgm, &mut builder)?;
117+
let (snd, rcv) = oneshot::channel();
118+
let builder = StepResultsBuilder::new(snd);
119+
conn.execute_program(&pgm, builder)?;
117120

118-
Ok(builder)
121+
Ok(rcv)
119122
})
120123
.await??;
121124

122-
builder
123-
.into_ret()
125+
fut.await?
124126
.into_iter()
125127
.try_for_each(|result| match result {
126128
StepResult::Ok => Ok(()),

0 commit comments

Comments
 (0)