Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

proxy response timeout #538

Merged
merged 2 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions libsqlx-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ color-eyre = "0.6.2"
either = "1.8.1"
futures = "0.3.28"
hmac = "0.12.1"
humantime = "2.1.0"
hyper = { version = "0.14.27", features = ["h2", "server"] }
itertools = "0.11.0"
libsqlx = { version = "0.1.0", path = "../libsqlx", features = ["tokio"] }
Expand All @@ -30,6 +31,7 @@ serde_json = "1.0.100"
sha2 = "0.10.7"
sha3 = "0.10.8"
sled = "0.34.7"
tempfile = "3.6.0"
thiserror = "1.0.43"
tokio = { version = "1.29.1", features = ["full"] }
tokio-stream = "0.1.14"
Expand Down
7 changes: 6 additions & 1 deletion libsqlx-server/src/allocation/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use serde::{Deserialize, Serialize};

use crate::linc::NodeId;
Expand All @@ -19,5 +21,8 @@ pub struct AllocConfig {
#[derive(Debug, Serialize, Deserialize)]
pub enum DbConfig {
Primary {},
Replica { primary_node_id: NodeId },
Replica {
primary_node_id: NodeId,
proxy_request_timeout_duration: Duration,
},
}
155 changes: 131 additions & 24 deletions libsqlx-server/src/allocation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::future::poll_fn;
use std::mem::size_of;
use std::ops::Deref;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use bytes::Bytes;
use either::Either;
use futures::Future;
use libsqlx::libsql::{LibsqlDatabase, LogCompactor, LogFile, PrimaryType, ReplicaType};
use libsqlx::program::Program;
use libsqlx::proxy::{WriteProxyConnection, WriteProxyDatabase};
Expand All @@ -19,12 +23,12 @@ use libsqlx::{
use parking_lot::Mutex;
use tokio::sync::{mpsc, oneshot};
use tokio::task::{block_in_place, JoinSet};
use tokio::time::timeout;
use tokio::time::{timeout, Sleep};

use crate::hrana;
use crate::hrana::http::handle_pipeline;
use crate::hrana::http::proto::{PipelineRequestBody, PipelineResponseBody};
use crate::linc::bus::{Dispatch};
use crate::linc::bus::Dispatch;
use crate::linc::proto::{
BuilderStep, Enveloppe, Frames, Message, ProxyResponse, StepError, Value,
};
Expand All @@ -50,7 +54,9 @@ pub enum AllocationMessage {
Inbound(Inbound),
}

pub struct RemoteDb;
pub struct RemoteDb {
proxy_request_timeout_duration: Duration,
}

#[derive(Clone)]
pub struct RemoteConn {
Expand All @@ -62,10 +68,12 @@ struct Request {
builder: Box<dyn ResultBuilder>,
pgm: Option<Program>,
next_seq_no: u32,
timeout: Pin<Box<Sleep>>,
}

pub struct RemoteConnInner {
current_req: Mutex<Option<Request>>,
request_timeout_duration: Duration,
}

impl Deref for RemoteConn {
Expand Down Expand Up @@ -93,6 +101,7 @@ impl libsqlx::Connection for RemoteConn {
builder,
pgm: Some(program.clone()),
next_seq_no: 0,
timeout: Box::pin(tokio::time::sleep(self.inner.request_timeout_duration)),
}),
};

Expand All @@ -111,6 +120,7 @@ impl libsqlx::Database for RemoteDb {
Ok(RemoteConn {
inner: Arc::new(RemoteConnInner {
current_req: Default::default(),
request_timeout_duration: self.proxy_request_timeout_duration,
}),
})
}
Expand Down Expand Up @@ -462,9 +472,14 @@ impl Database {
frame_notifier: receiver,
})
}
DbConfig::Replica { primary_node_id } => {
DbConfig::Replica {
primary_node_id,
proxy_request_timeout_duration,
} => {
let rdb = LibsqlDatabase::new_replica(path, MAX_INJECTOR_BUFFER_CAP, ()).unwrap();
let wdb = RemoteDb;
let wdb = RemoteDb {
proxy_request_timeout_duration,
};
let mut db = WriteProxyDatabase::new(rdb, wdb, Arc::new(|_| ()));
let injector = db.injector().unwrap();
let (sender, receiver) = mpsc::channel(16);
Expand Down Expand Up @@ -502,7 +517,7 @@ impl Database {
conn: db.connect().unwrap(),
connection_id,
next_req_id: 0,
primary_id: *primary_id,
primary_node_id: *primary_id,
database_id: DatabaseId::from_name(&alloc.db_name),
dispatcher: alloc.dispatcher.clone(),
}),
Expand All @@ -520,8 +535,8 @@ struct PrimaryConnection {

#[async_trait::async_trait]
impl ConnectionHandler for PrimaryConnection {
fn exec_ready(&self) -> bool {
true
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}

async fn handle_exec(&mut self, exec: ExecFn) {
Expand All @@ -537,7 +552,7 @@ struct ReplicaConnection {
conn: ProxyConnection,
connection_id: u32,
next_req_id: u32,
primary_id: NodeId,
primary_node_id: NodeId,
database_id: DatabaseId,
dispatcher: Arc<dyn Dispatch>,
}
Expand All @@ -551,16 +566,21 @@ impl ReplicaConnection {
// TODO: pass actual config
let config = QueryBuilderConfig { max_size: None };
let mut finnalized = false;
for step in resp.row_steps.iter() {
if finnalized { break };
for step in resp.row_steps.into_iter() {
if finnalized {
break;
};
match step {
BuilderStep::Init => req.builder.init(&config).unwrap(),
BuilderStep::BeginStep => req.builder.begin_step().unwrap(),
BuilderStep::FinishStep(affected_row_count, last_insert_rowid) => req
.builder
.finish_step(*affected_row_count, *last_insert_rowid)
.finish_step(affected_row_count, last_insert_rowid)
.unwrap(),
BuilderStep::StepError(e) => req
.builder
.step_error(todo!("handle proxy step error"))
.unwrap(),
BuilderStep::StepError(e) => req.builder.step_error(todo!()).unwrap(),
BuilderStep::ColsDesc(cols) => req
.builder
.cols_description(&mut cols.iter().map(|c| Column {
Expand All @@ -570,11 +590,15 @@ impl ReplicaConnection {
.unwrap(),
BuilderStep::BeginRows => req.builder.begin_rows().unwrap(),
BuilderStep::BeginRow => req.builder.begin_row().unwrap(),
BuilderStep::AddRowValue(v) => req.builder.add_row_value(v.into()).unwrap(),
BuilderStep::AddRowValue(v) => req.builder.add_row_value((&v).into()).unwrap(),
BuilderStep::FinishRow => req.builder.finish_row().unwrap(),
BuilderStep::FinishRows => req.builder.finish_rows().unwrap(),
BuilderStep::Finnalize { is_txn, frame_no } => {
let _ = req.builder.finnalize(*is_txn, *frame_no).unwrap();
let _ = req.builder.finnalize(is_txn, frame_no).unwrap();
finnalized = true;
},
BuilderStep::FinnalizeError(e) => {
req.builder.finnalize_error(e);
finnalized = true;
}
}
Expand All @@ -596,9 +620,28 @@ impl ReplicaConnection {

#[async_trait::async_trait]
impl ConnectionHandler for ReplicaConnection {
fn exec_ready(&self) -> bool {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// we are currently handling a request on this connection
self.conn.writer().current_req.lock().is_none()
// self.conn.writer().current_req.timeout.poll()
let mut req = self.conn.writer().current_req.lock();
let should_abort_query = match &mut *req {
Some(ref mut req) => {
match req.timeout.as_mut().poll(cx) {
Poll::Ready(_) => {
req.builder.finnalize_error("request timed out".to_string());
true
}
Poll::Pending => return Poll::Pending,
}
}
None => return Poll::Ready(()),
};

if should_abort_query {
*req = None
}

Poll::Ready(())
}

async fn handle_exec(&mut self, exec: ExecFn) {
Expand All @@ -616,7 +659,7 @@ impl ConnectionHandler for ReplicaConnection {
req.id = Some(req_id);

let msg = Outbound {
to: self.primary_id,
to: self.primary_node_id,
enveloppe: Enveloppe {
database_id: Some(self.database_id),
message: Message::ProxyRequest {
Expand Down Expand Up @@ -654,10 +697,10 @@ where
L: ConnectionHandler,
R: ConnectionHandler,
{
fn exec_ready(&self) -> bool {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
match self {
Either::Left(l) => l.exec_ready(),
Either::Right(r) => r.exec_ready(),
Either::Left(l) => l.poll_ready(cx),
Either::Right(r) => r.poll_ready(cx),
}
}

Expand Down Expand Up @@ -852,7 +895,7 @@ impl Allocation {
};
conn.execute_program(&program, Box::new(builder)).unwrap();
})
.await;
.await;
};

if self.database.is_primary() {
Expand Down Expand Up @@ -921,19 +964,21 @@ struct Connection<C> {

#[async_trait::async_trait]
trait ConnectionHandler: Send {
fn exec_ready(&self) -> bool;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()>;
async fn handle_exec(&mut self, exec: ExecFn);
async fn handle_inbound(&mut self, msg: Inbound);
}

impl<C: ConnectionHandler> Connection<C> {
async fn run(mut self) -> (NodeId, u32) {
loop {
let fut =
futures::future::join(self.exec.recv(), poll_fn(|cx| self.conn.poll_ready(cx)));
tokio::select! {
Some(inbound) = self.inbound.recv() => {
self.conn.handle_inbound(inbound).await;
}
Some(exec) = self.exec.recv(), if self.conn.exec_ready() => {
(Some(exec), _) = fut => {
self.conn.handle_exec(exec).await;
},
else => break,
Expand All @@ -943,3 +988,65 @@ impl<C: ConnectionHandler> Connection<C> {
self.id
}
}

#[cfg(test)]
mod test {
use tokio::sync::Notify;

use crate::linc::bus::Bus;

use super::*;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn proxy_request_timeout() {
let bus = Arc::new(Bus::new(0, |_, _| async {}));
let _queue = bus.connect(1); // pretend connection to node 1
let tmp = tempfile::TempDir::new().unwrap();
let read_db = LibsqlDatabase::new_replica(tmp.path().to_path_buf(), 1, ()).unwrap();
let write_db = RemoteDb {
proxy_request_timeout_duration: Duration::from_millis(100),
};
let db = WriteProxyDatabase::new(read_db, write_db, Arc::new(|_| ()));
let conn = db.connect().unwrap();
let conn = ReplicaConnection {
conn,
connection_id: 0,
next_req_id: 0,
primary_node_id: 1,
database_id: DatabaseId::random(),
dispatcher: bus,
};

let (exec_sender, exec) = mpsc::channel(1);
let (_inbound_sender, inbound) = mpsc::channel(1);
let connection = Connection {
id: (0, 0),
conn,
exec,
inbound,
};

let handle = tokio::spawn(connection.run());

let notify = Arc::new(Notify::new());
struct Builder(Arc<Notify>);
impl ResultBuilder for Builder {
fn finnalize_error(&mut self, _e: String) {
self.0.notify_waiters()
}
}

let builder = Box::new(Builder(notify.clone()));
exec_sender
.send(Box::new(move |conn| {
conn.execute_program(&Program::seq(&["create table test (c)"]), builder)
.unwrap();
}))
.await
.unwrap();

notify.notified().await;

handle.abort();
}
}
8 changes: 8 additions & 0 deletions libsqlx-server/src/hrana/result_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ impl ResultBuilder for SingleStatementBuilder {
let _ = self.ret.take().unwrap().send(res);
Ok(true)
}

fn finnalize_error(&mut self, _e: String) {
todo!()
}
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -354,4 +358,8 @@ impl ResultBuilder for HranaBatchProtoBuilder {
fn add_row_value(&mut self, v: ValueRef) -> Result<(), QueryResultBuilderError> {
self.stmt_builder.add_row_value(v)
}

fn finnalize_error(&mut self, _e: String) {
todo!()
}
}
Loading