Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 26f781d

Browse files
authored
Merge pull request #535 from libsql/proxy-writes
proxy writes
2 parents b23ec73 + 95a2858 commit 26f781d

File tree

18 files changed

+808
-337
lines changed

18 files changed

+808
-337
lines changed

libsqlx-server/src/allocation/mod.rs

Lines changed: 531 additions & 84 deletions
Large diffs are not rendered by default.

libsqlx-server/src/hrana/batch.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use super::stmt::{proto_stmt_to_query, stmt_error_from_sqld_error};
88
use super::{proto, ProtocolError, Version};
99

1010
use color_eyre::eyre::anyhow;
11-
use libsqlx::Connection;
1211
use libsqlx::analysis::Statement;
1312
use libsqlx::program::{Cond, Program, Step};
1413
use libsqlx::query::{Params, Query};
@@ -78,7 +77,7 @@ pub async fn execute_batch(
7877
let fut = db
7978
.exec(move |conn| -> color_eyre::Result<_> {
8079
let (builder, ret) = HranaBatchProtoBuilder::new();
81-
conn.execute_program(&pgm, builder)?;
80+
conn.execute_program(&pgm, Box::new(builder))?;
8281
Ok(ret)
8382
})
8483
.await??;
@@ -116,20 +115,18 @@ pub async fn execute_sequence(conn: &ConnectionHandle, pgm: Program) -> color_ey
116115
.exec(move |conn| -> color_eyre::Result<_> {
117116
let (snd, rcv) = oneshot::channel();
118117
let builder = StepResultsBuilder::new(snd);
119-
conn.execute_program(&pgm, builder)?;
118+
conn.execute_program(&pgm, Box::new(builder))?;
120119

121120
Ok(rcv)
122121
})
123122
.await??;
124123

125-
fut.await?
126-
.into_iter()
127-
.try_for_each(|result| match result {
128-
StepResult::Ok => Ok(()),
129-
StepResult::Err(e) => match stmt_error_from_sqld_error(e) {
130-
Ok(stmt_err) => Err(anyhow!(stmt_err)),
131-
Err(sqld_err) => Err(anyhow!(sqld_err)),
132-
},
133-
StepResult::Skipped => Err(anyhow!("Statement in sequence was not executed")),
134-
})
124+
fut.await?.into_iter().try_for_each(|result| match result {
125+
StepResult::Ok => Ok(()),
126+
StepResult::Err(e) => match stmt_error_from_sqld_error(e) {
127+
Ok(stmt_err) => Err(anyhow!(stmt_err)),
128+
Err(sqld_err) => Err(anyhow!(sqld_err)),
129+
},
130+
StepResult::Skipped => Err(anyhow!("Statement in sequence was not executed")),
131+
})
135132
}

libsqlx-server/src/hrana/http/mod.rs

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
use std::sync::Arc;
2+
13
use color_eyre::eyre::Context;
24
use futures::Future;
35
use parking_lot::Mutex;
46
use serde::{de::DeserializeOwned, Serialize};
7+
use tokio::sync::oneshot;
58

69
use crate::allocation::ConnectionHandle;
710

@@ -47,31 +50,38 @@ fn handle_index() -> color_eyre::Result<hyper::Response<hyper::Body>> {
4750
}
4851

4952
pub async fn handle_pipeline<F, Fut>(
50-
server: &Server,
53+
server: Arc<Server>,
5154
req: PipelineRequestBody,
55+
ret: oneshot::Sender<color_eyre::Result<PipelineResponseBody>>,
5256
mk_conn: F,
53-
) -> color_eyre::Result<PipelineResponseBody>
57+
) -> color_eyre::Result<()>
5458
where
5559
F: FnOnce() -> Fut,
5660
Fut: Future<Output = crate::Result<ConnectionHandle>>,
5761
{
58-
let mut stream_guard = stream::acquire(server, req.baton.as_deref(), mk_conn).await?;
59-
60-
let mut results = Vec::with_capacity(req.requests.len());
61-
for request in req.requests.into_iter() {
62-
let result = request::handle(&mut stream_guard, request)
63-
.await
64-
.context("Could not execute a request in pipeline")?;
65-
results.push(result);
66-
}
67-
68-
let resp_body = proto::PipelineResponseBody {
69-
baton: stream_guard.release(),
70-
base_url: server.self_url.clone(),
71-
results,
72-
};
73-
74-
Ok(resp_body)
62+
let mut stream_guard = stream::acquire(server.clone(), req.baton.as_deref(), mk_conn).await?;
63+
64+
tokio::spawn(async move {
65+
let f = async move {
66+
let mut results = Vec::with_capacity(req.requests.len());
67+
for request in req.requests.into_iter() {
68+
let result = request::handle(&mut stream_guard, request)
69+
.await
70+
.context("Could not execute a request in pipeline")?;
71+
results.push(result);
72+
}
73+
74+
Ok(proto::PipelineResponseBody {
75+
baton: stream_guard.release(),
76+
base_url: server.self_url.clone(),
77+
results,
78+
})
79+
};
80+
81+
let _ = ret.send(f.await);
82+
});
83+
84+
Ok(())
7585
}
7686

7787
async fn read_request_json<T: DeserializeOwned>(

libsqlx-server/src/hrana/http/request.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub enum StreamResponseError {
1313
}
1414

1515
pub async fn handle(
16-
stream_guard: &mut stream::Guard<'_>,
16+
stream_guard: &mut stream::Guard,
1717
request: proto::StreamRequest,
1818
) -> color_eyre::Result<proto::StreamResult> {
1919
let result = match try_handle(stream_guard, request).await {
@@ -31,7 +31,7 @@ pub async fn handle(
3131
}
3232

3333
async fn try_handle(
34-
stream_guard: &mut stream::Guard<'_>,
34+
stream_guard: &mut stream::Guard,
3535
request: proto::StreamRequest,
3636
) -> color_eyre::Result<proto::StreamResponse> {
3737
Ok(match request {

libsqlx-server/src/hrana/http/stream.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::cmp::Reverse;
22
use std::collections::{HashMap, VecDeque};
33
use std::pin::Pin;
4+
use std::sync::Arc;
45
use std::{future, mem, task};
56

67
use base64::prelude::{Engine as _, BASE64_STANDARD_NO_PAD};
@@ -67,8 +68,8 @@ struct Stream {
6768
/// Guard object that is used to access a stream from the outside. The guard makes sure that the
6869
/// stream's entry in [`ServerStreamState::handles`] is either removed or replaced with
6970
/// [`Handle::Available`] after the guard goes out of scope.
70-
pub struct Guard<'srv> {
71-
server: &'srv Server,
71+
pub struct Guard {
72+
server: Arc<Server>,
7273
/// The guarded stream. This is only set to `None` in the destructor.
7374
stream: Option<Box<Stream>>,
7475
/// If set to `true`, the destructor will release the stream for further use (saving it as
@@ -101,18 +102,18 @@ impl ServerStreamState {
101102

102103
/// Acquire a guard to a new or existing stream. If baton is `Some`, we try to look up the stream,
103104
/// otherwise we create a new stream.
104-
pub async fn acquire<'srv, F, Fut>(
105-
server: &'srv Server,
105+
pub async fn acquire<F, Fut>(
106+
server: Arc<Server>,
106107
baton: Option<&str>,
107108
mk_conn: F,
108-
) -> color_eyre::Result<Guard<'srv>>
109+
) -> color_eyre::Result<Guard>
109110
where
110111
F: FnOnce() -> Fut,
111112
Fut: Future<Output = crate::Result<ConnectionHandle>>,
112113
{
113114
let stream = match baton {
114115
Some(baton) => {
115-
let (stream_id, baton_seq) = decode_baton(server, baton)?;
116+
let (stream_id, baton_seq) = decode_baton(&server, baton)?;
116117

117118
let mut state = server.stream_state.lock();
118119
let handle = state.handles.get_mut(&stream_id);
@@ -182,7 +183,7 @@ where
182183
})
183184
}
184185

185-
impl<'srv> Guard<'srv> {
186+
impl Guard {
186187
pub fn get_db(&self) -> Result<&ConnectionHandle, ProtocolError> {
187188
let stream = self.stream.as_ref().unwrap();
188189
stream.conn.as_ref().ok_or(ProtocolError::BatonStreamClosed)
@@ -211,7 +212,7 @@ impl<'srv> Guard<'srv> {
211212
if stream.conn.is_some() {
212213
self.release = true; // tell destructor to make the stream available again
213214
Some(encode_baton(
214-
self.server,
215+
&self.server,
215216
stream.stream_id,
216217
stream.baton_seq,
217218
))
@@ -221,7 +222,7 @@ impl<'srv> Guard<'srv> {
221222
}
222223
}
223224

224-
impl<'srv> Drop for Guard<'srv> {
225+
impl Drop for Guard {
225226
fn drop(&mut self) {
226227
let stream = self.stream.take().unwrap();
227228
let stream_id = stream.stream_id;

libsqlx-server/src/hrana/result_builder.rs

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,22 @@ use super::proto;
1111

1212
pub struct SingleStatementBuilder {
1313
builder: StatementBuilder,
14-
ret: oneshot::Sender<Result<proto::StmtResult, libsqlx::error::Error>>,
14+
ret: Option<oneshot::Sender<Result<proto::StmtResult, libsqlx::error::Error>>>,
1515
}
1616

1717
impl SingleStatementBuilder {
18-
pub fn new() -> (Self, oneshot::Receiver<Result<proto::StmtResult, libsqlx::error::Error>>) {
18+
pub fn new() -> (
19+
Self,
20+
oneshot::Receiver<Result<proto::StmtResult, libsqlx::error::Error>>,
21+
) {
1922
let (ret, rcv) = oneshot::channel();
20-
(Self {
21-
builder: StatementBuilder::default(),
22-
ret,
23-
}, rcv)
23+
(
24+
Self {
25+
builder: StatementBuilder::default(),
26+
ret: Some(ret),
27+
},
28+
rcv,
29+
)
2430
}
2531
}
2632

@@ -38,7 +44,8 @@ impl ResultBuilder for SingleStatementBuilder {
3844
affected_row_count: u64,
3945
last_insert_rowid: Option<i64>,
4046
) -> Result<(), QueryResultBuilderError> {
41-
self.builder.finish_step(affected_row_count, last_insert_rowid)
47+
self.builder
48+
.finish_step(affected_row_count, last_insert_rowid)
4249
}
4350

4451
fn step_error(&mut self, error: libsqlx::error::Error) -> Result<(), QueryResultBuilderError> {
@@ -61,19 +68,16 @@ impl ResultBuilder for SingleStatementBuilder {
6168
}
6269

6370
fn finnalize(
64-
self,
71+
&mut self,
6572
_is_txn: bool,
6673
_frame_no: Option<FrameNo>,
67-
) -> Result<bool, QueryResultBuilderError>
68-
where Self: Sized
69-
{
70-
let res = self.builder.into_ret();
71-
let _ = self.ret.send(res);
74+
) -> Result<bool, QueryResultBuilderError> {
75+
let res = self.builder.take_ret();
76+
let _ = self.ret.take().unwrap().send(res);
7277
Ok(true)
7378
}
7479
}
7580

76-
7781
#[derive(Debug, Default)]
7882
struct StatementBuilder {
7983
has_step: bool,
@@ -191,12 +195,12 @@ impl StatementBuilder {
191195
Ok(())
192196
}
193197

194-
pub fn into_ret(self) -> Result<proto::StmtResult, libsqlx::error::Error> {
195-
match self.err {
198+
pub fn take_ret(&mut self) -> Result<proto::StmtResult, libsqlx::error::Error> {
199+
match self.err.take() {
196200
Some(err) => Err(err),
197201
None => Ok(proto::StmtResult {
198-
cols: self.cols,
199-
rows: self.rows,
202+
cols: std::mem::take(&mut self.cols),
203+
rows: std::mem::take(&mut self.rows),
200204
affected_row_count: self.affected_row_count,
201205
last_insert_rowid: self.last_insert_rowid,
202206
}),
@@ -262,23 +266,24 @@ pub struct HranaBatchProtoBuilder {
262266
current_size: u64,
263267
max_response_size: u64,
264268
step_empty: bool,
265-
ret: oneshot::Sender<proto::BatchResult>
269+
ret: oneshot::Sender<proto::BatchResult>,
266270
}
267271

268272
impl HranaBatchProtoBuilder {
269273
pub fn new() -> (Self, oneshot::Receiver<proto::BatchResult>) {
270274
let (ret, rcv) = oneshot::channel();
271-
(Self {
272-
step_results: Vec::new(),
273-
step_errors: Vec::new(),
274-
stmt_builder: StatementBuilder::default(),
275-
current_size: 0,
276-
max_response_size: u64::MAX,
277-
step_empty: false,
278-
ret,
279-
},
280-
rcv)
281-
275+
(
276+
Self {
277+
step_results: Vec::new(),
278+
step_errors: Vec::new(),
279+
stmt_builder: StatementBuilder::default(),
280+
current_size: 0,
281+
max_response_size: u64::MAX,
282+
step_empty: false,
283+
ret,
284+
},
285+
rcv,
286+
)
282287
}
283288
pub fn into_ret(self) -> proto::BatchResult {
284289
proto::BatchResult {
@@ -314,7 +319,7 @@ impl ResultBuilder for HranaBatchProtoBuilder {
314319
max_response_size: self.max_response_size - self.current_size,
315320
..Default::default()
316321
};
317-
match std::mem::replace(&mut self.stmt_builder, new_builder).into_ret() {
322+
match std::mem::replace(&mut self.stmt_builder, new_builder).take_ret() {
318323
Ok(res) => {
319324
self.step_results.push((!self.step_empty).then_some(res));
320325
self.step_errors.push(None);

libsqlx-server/src/hrana/stmt.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::collections::HashMap;
33
use color_eyre::eyre::{anyhow, bail};
44
use libsqlx::analysis::Statement;
55
use libsqlx::query::{Params, Query, Value};
6-
use libsqlx::Connection;
76

87
use super::result_builder::SingleStatementBuilder;
98
use super::{proto, ProtocolError, Version};
@@ -52,7 +51,7 @@ pub async fn execute_stmt(
5251
.exec(move |conn| -> color_eyre::Result<_> {
5352
let (builder, ret) = SingleStatementBuilder::new();
5453
let pgm = libsqlx::program::Program::from_queries(std::iter::once(query));
55-
conn.execute_program(&pgm, builder)?;
54+
conn.execute_program(&pgm, Box::new(builder))?;
5655

5756
Ok(ret)
5857
})

libsqlx-server/src/linc/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ where
163163
self.conn.feed(m).await.unwrap();
164164
}
165165
self.conn.flush().await.unwrap();
166-
}
166+
},
167167
else => {
168168
self.state = ConnectionState::Close;
169169
}

0 commit comments

Comments
 (0)