diff --git a/Cargo.lock b/Cargo.lock index d08d5034..23ee18ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2554,6 +2554,7 @@ dependencies = [ "either", "futures", "hmac", + "humantime", "hyper", "itertools 0.11.0", "libsqlx", @@ -2567,6 +2568,7 @@ dependencies = [ "sha2", "sha3", "sled", + "tempfile", "thiserror", "tokio", "tokio-stream", diff --git a/libsqlx-server/Cargo.toml b/libsqlx-server/Cargo.toml index a5a11437..42a508c6 100644 --- a/libsqlx-server/Cargo.toml +++ b/libsqlx-server/Cargo.toml @@ -17,6 +17,7 @@ color-eyre = "0.6.2" either = "1.8.1" futures = "0.3.28" hmac = "0.12.1" +humantime = "2.1.0" hyper = { version = "0.14.27", features = ["h2", "server"] } itertools = "0.11.0" libsqlx = { version = "0.1.0", path = "../libsqlx", features = ["tokio"] } @@ -30,6 +31,7 @@ serde_json = "1.0.100" sha2 = "0.10.7" sha3 = "0.10.8" sled = "0.34.7" +tempfile = "3.6.0" thiserror = "1.0.43" tokio = { version = "1.29.1", features = ["full"] } tokio-stream = "0.1.14" diff --git a/libsqlx-server/src/allocation/config.rs b/libsqlx-server/src/allocation/config.rs index 9d1bab34..ac21efa7 100644 --- a/libsqlx-server/src/allocation/config.rs +++ b/libsqlx-server/src/allocation/config.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use serde::{Deserialize, Serialize}; use crate::linc::NodeId; @@ -19,5 +21,8 @@ pub struct AllocConfig { #[derive(Debug, Serialize, Deserialize)] pub enum DbConfig { Primary {}, - Replica { primary_node_id: NodeId }, + Replica { + primary_node_id: NodeId, + proxy_request_timeout_duration: Duration, + }, } diff --git a/libsqlx-server/src/allocation/mod.rs b/libsqlx-server/src/allocation/mod.rs index fbe0d98e..a41f2cd7 100644 --- a/libsqlx-server/src/allocation/mod.rs +++ b/libsqlx-server/src/allocation/mod.rs @@ -1,13 +1,17 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::future::poll_fn; use std::mem::size_of; use std::ops::Deref; use std::path::PathBuf; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use bytes::Bytes; use either::Either; +use futures::Future; use libsqlx::libsql::{LibsqlDatabase, LogCompactor, LogFile, PrimaryType, ReplicaType}; use libsqlx::program::Program; use libsqlx::proxy::{WriteProxyConnection, WriteProxyDatabase}; @@ -19,12 +23,12 @@ use libsqlx::{ use parking_lot::Mutex; use tokio::sync::{mpsc, oneshot}; use tokio::task::{block_in_place, JoinSet}; -use tokio::time::timeout; +use tokio::time::{timeout, Sleep}; use crate::hrana; use crate::hrana::http::handle_pipeline; use crate::hrana::http::proto::{PipelineRequestBody, PipelineResponseBody}; -use crate::linc::bus::{Dispatch}; +use crate::linc::bus::Dispatch; use crate::linc::proto::{ BuilderStep, Enveloppe, Frames, Message, ProxyResponse, StepError, Value, }; @@ -50,7 +54,9 @@ pub enum AllocationMessage { Inbound(Inbound), } -pub struct RemoteDb; +pub struct RemoteDb { + proxy_request_timeout_duration: Duration, +} #[derive(Clone)] pub struct RemoteConn { @@ -62,10 +68,12 @@ struct Request { builder: Box, pgm: Option, next_seq_no: u32, + timeout: Pin>, } pub struct RemoteConnInner { current_req: Mutex>, + request_timeout_duration: Duration, } impl Deref for RemoteConn { @@ -93,6 +101,7 @@ impl libsqlx::Connection for RemoteConn { builder, pgm: Some(program.clone()), next_seq_no: 0, + timeout: Box::pin(tokio::time::sleep(self.inner.request_timeout_duration)), }), }; @@ -111,6 +120,7 @@ impl libsqlx::Database for RemoteDb { Ok(RemoteConn { inner: Arc::new(RemoteConnInner { current_req: Default::default(), + request_timeout_duration: self.proxy_request_timeout_duration, }), }) } @@ -462,9 +472,14 @@ impl Database { frame_notifier: receiver, }) } - DbConfig::Replica { primary_node_id } => { + DbConfig::Replica { + primary_node_id, + proxy_request_timeout_duration, + } => { let rdb = LibsqlDatabase::new_replica(path, MAX_INJECTOR_BUFFER_CAP, ()).unwrap(); - let wdb = RemoteDb; + let wdb = RemoteDb { + proxy_request_timeout_duration, + }; let mut db = WriteProxyDatabase::new(rdb, wdb, Arc::new(|_| ())); let injector = db.injector().unwrap(); let (sender, receiver) = mpsc::channel(16); @@ -502,7 +517,7 @@ impl Database { conn: db.connect().unwrap(), connection_id, next_req_id: 0, - primary_id: *primary_id, + primary_node_id: *primary_id, database_id: DatabaseId::from_name(&alloc.db_name), dispatcher: alloc.dispatcher.clone(), }), @@ -520,8 +535,8 @@ struct PrimaryConnection { #[async_trait::async_trait] impl ConnectionHandler for PrimaryConnection { - fn exec_ready(&self) -> bool { - true + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<()> { + Poll::Ready(()) } async fn handle_exec(&mut self, exec: ExecFn) { @@ -537,7 +552,7 @@ struct ReplicaConnection { conn: ProxyConnection, connection_id: u32, next_req_id: u32, - primary_id: NodeId, + primary_node_id: NodeId, database_id: DatabaseId, dispatcher: Arc, } @@ -551,16 +566,21 @@ impl ReplicaConnection { // TODO: pass actual config let config = QueryBuilderConfig { max_size: None }; let mut finnalized = false; - for step in resp.row_steps.iter() { - if finnalized { break }; + for step in resp.row_steps.into_iter() { + if finnalized { + break; + }; match step { BuilderStep::Init => req.builder.init(&config).unwrap(), BuilderStep::BeginStep => req.builder.begin_step().unwrap(), BuilderStep::FinishStep(affected_row_count, last_insert_rowid) => req .builder - .finish_step(*affected_row_count, *last_insert_rowid) + .finish_step(affected_row_count, last_insert_rowid) + .unwrap(), + BuilderStep::StepError(e) => req + .builder + .step_error(todo!("handle proxy step error")) .unwrap(), - BuilderStep::StepError(e) => req.builder.step_error(todo!()).unwrap(), BuilderStep::ColsDesc(cols) => req .builder .cols_description(&mut cols.iter().map(|c| Column { @@ -570,11 +590,15 @@ impl ReplicaConnection { .unwrap(), BuilderStep::BeginRows => req.builder.begin_rows().unwrap(), BuilderStep::BeginRow => req.builder.begin_row().unwrap(), - BuilderStep::AddRowValue(v) => req.builder.add_row_value(v.into()).unwrap(), + BuilderStep::AddRowValue(v) => req.builder.add_row_value((&v).into()).unwrap(), BuilderStep::FinishRow => req.builder.finish_row().unwrap(), BuilderStep::FinishRows => req.builder.finish_rows().unwrap(), BuilderStep::Finnalize { is_txn, frame_no } => { - let _ = req.builder.finnalize(*is_txn, *frame_no).unwrap(); + let _ = req.builder.finnalize(is_txn, frame_no).unwrap(); + finnalized = true; + }, + BuilderStep::FinnalizeError(e) => { + req.builder.finnalize_error(e); finnalized = true; } } @@ -596,9 +620,28 @@ impl ReplicaConnection { #[async_trait::async_trait] impl ConnectionHandler for ReplicaConnection { - fn exec_ready(&self) -> bool { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> { // we are currently handling a request on this connection - self.conn.writer().current_req.lock().is_none() + // self.conn.writer().current_req.timeout.poll() + let mut req = self.conn.writer().current_req.lock(); + let should_abort_query = match &mut *req { + Some(ref mut req) => { + match req.timeout.as_mut().poll(cx) { + Poll::Ready(_) => { + req.builder.finnalize_error("request timed out".to_string()); + true + } + Poll::Pending => return Poll::Pending, + } + } + None => return Poll::Ready(()), + }; + + if should_abort_query { + *req = None + } + + Poll::Ready(()) } async fn handle_exec(&mut self, exec: ExecFn) { @@ -616,7 +659,7 @@ impl ConnectionHandler for ReplicaConnection { req.id = Some(req_id); let msg = Outbound { - to: self.primary_id, + to: self.primary_node_id, enveloppe: Enveloppe { database_id: Some(self.database_id), message: Message::ProxyRequest { @@ -654,10 +697,10 @@ where L: ConnectionHandler, R: ConnectionHandler, { - fn exec_ready(&self) -> bool { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> { match self { - Either::Left(l) => l.exec_ready(), - Either::Right(r) => r.exec_ready(), + Either::Left(l) => l.poll_ready(cx), + Either::Right(r) => r.poll_ready(cx), } } @@ -852,7 +895,7 @@ impl Allocation { }; conn.execute_program(&program, Box::new(builder)).unwrap(); }) - .await; + .await; }; if self.database.is_primary() { @@ -921,7 +964,7 @@ struct Connection { #[async_trait::async_trait] trait ConnectionHandler: Send { - fn exec_ready(&self) -> bool; + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()>; async fn handle_exec(&mut self, exec: ExecFn); async fn handle_inbound(&mut self, msg: Inbound); } @@ -929,11 +972,13 @@ trait ConnectionHandler: Send { impl Connection { async fn run(mut self) -> (NodeId, u32) { loop { + let fut = + futures::future::join(self.exec.recv(), poll_fn(|cx| self.conn.poll_ready(cx))); tokio::select! { Some(inbound) = self.inbound.recv() => { self.conn.handle_inbound(inbound).await; } - Some(exec) = self.exec.recv(), if self.conn.exec_ready() => { + (Some(exec), _) = fut => { self.conn.handle_exec(exec).await; }, else => break, @@ -943,3 +988,65 @@ impl Connection { self.id } } + +#[cfg(test)] +mod test { + use tokio::sync::Notify; + + use crate::linc::bus::Bus; + + use super::*; + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn proxy_request_timeout() { + let bus = Arc::new(Bus::new(0, |_, _| async {})); + let _queue = bus.connect(1); // pretend connection to node 1 + let tmp = tempfile::TempDir::new().unwrap(); + let read_db = LibsqlDatabase::new_replica(tmp.path().to_path_buf(), 1, ()).unwrap(); + let write_db = RemoteDb { + proxy_request_timeout_duration: Duration::from_millis(100), + }; + let db = WriteProxyDatabase::new(read_db, write_db, Arc::new(|_| ())); + let conn = db.connect().unwrap(); + let conn = ReplicaConnection { + conn, + connection_id: 0, + next_req_id: 0, + primary_node_id: 1, + database_id: DatabaseId::random(), + dispatcher: bus, + }; + + let (exec_sender, exec) = mpsc::channel(1); + let (_inbound_sender, inbound) = mpsc::channel(1); + let connection = Connection { + id: (0, 0), + conn, + exec, + inbound, + }; + + let handle = tokio::spawn(connection.run()); + + let notify = Arc::new(Notify::new()); + struct Builder(Arc); + impl ResultBuilder for Builder { + fn finnalize_error(&mut self, _e: String) { + self.0.notify_waiters() + } + } + + let builder = Box::new(Builder(notify.clone())); + exec_sender + .send(Box::new(move |conn| { + conn.execute_program(&Program::seq(&["create table test (c)"]), builder) + .unwrap(); + })) + .await + .unwrap(); + + notify.notified().await; + + handle.abort(); + } +} diff --git a/libsqlx-server/src/hrana/result_builder.rs b/libsqlx-server/src/hrana/result_builder.rs index e91bca28..c0c597bf 100644 --- a/libsqlx-server/src/hrana/result_builder.rs +++ b/libsqlx-server/src/hrana/result_builder.rs @@ -76,6 +76,10 @@ impl ResultBuilder for SingleStatementBuilder { let _ = self.ret.take().unwrap().send(res); Ok(true) } + + fn finnalize_error(&mut self, _e: String) { + todo!() + } } #[derive(Debug, Default)] @@ -354,4 +358,8 @@ impl ResultBuilder for HranaBatchProtoBuilder { fn add_row_value(&mut self, v: ValueRef) -> Result<(), QueryResultBuilderError> { self.stmt_builder.add_row_value(v) } + + fn finnalize_error(&mut self, _e: String) { + todo!() + } } diff --git a/libsqlx-server/src/http/admin.rs b/libsqlx-server/src/http/admin.rs index 8a08187e..9323bcdd 100644 --- a/libsqlx-server/src/http/admin.rs +++ b/libsqlx-server/src/http/admin.rs @@ -1,16 +1,18 @@ use std::sync::Arc; +use std::str::FromStr; +use std::time::Duration; -use axum::{extract::State, routing::post, Json, Router}; +use axum::{Json, Router}; +use axum::routing::post; +use axum::extract::State; use color_eyre::eyre::Result; use hyper::server::accept::Accept; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; -use crate::{ - allocation::config::{AllocConfig, DbConfig}, - linc::NodeId, - meta::Store, -}; +use crate::meta::Store; +use crate::allocation::config::{AllocConfig, DbConfig}; +use crate::linc::NodeId; pub struct Config { pub meta_store: Arc, @@ -52,11 +54,46 @@ struct AllocateReq { config: DbConfigReq, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum DbConfigReq { Primary {}, - Replica { primary_node_id: NodeId }, + Replica { + primary_node_id: NodeId, + #[serde(deserialize_with = "deserialize_duration", default = "default_proxy_timeout")] + proxy_request_timeout_duration: Duration, + }, +} + +const fn default_proxy_timeout() -> Duration { + Duration::from_secs(5) +} + +fn deserialize_duration<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + struct Visitor; + impl serde::de::Visitor<'_> for Visitor { + type Value = Duration; + + fn visit_str(self, v: &str) -> std::result::Result + where + E: serde::de::Error, + { + match humantime::Duration::from_str(v) { + Ok(d) => Ok(*d), + Err(e) => Err(E::custom(e.to_string())), + } + } + + fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_str("a duration, in a string format") + } + + } + + deserializer.deserialize_str(Visitor) } async fn allocate( @@ -68,7 +105,13 @@ async fn allocate( db_name: req.alloc_id.clone(), db_config: match req.config { DbConfigReq::Primary {} => DbConfig::Primary {}, - DbConfigReq::Replica { primary_node_id } => DbConfig::Replica { primary_node_id }, + DbConfigReq::Replica { + primary_node_id, + proxy_request_timeout_duration, + } => DbConfig::Replica { + primary_node_id, + proxy_request_timeout_duration, + }, }, }; state.meta_store.allocate(&req.alloc_id, &config).await; diff --git a/libsqlx-server/src/linc/bus.rs b/libsqlx-server/src/linc/bus.rs index a31c3368..7c7b70dd 100644 --- a/libsqlx-server/src/linc/bus.rs +++ b/libsqlx-server/src/linc/bus.rs @@ -2,9 +2,11 @@ use std::collections::HashSet; use std::sync::Arc; use parking_lot::RwLock; +use tokio::sync::mpsc; use super::connection::SendQueue; use super::handler::Handler; +use super::proto::Enveloppe; use super::{Inbound, NodeId, Outbound}; pub struct Bus { @@ -37,9 +39,10 @@ impl Bus { &self.send_queue } - pub fn connect(&self, node_id: NodeId) { + pub fn connect(&self, node_id: NodeId) -> mpsc::UnboundedReceiver { // TODO: handle peer already exists self.peers.write().insert(node_id); + self.send_queue.register(node_id) } pub fn disconnect(&self, node_id: NodeId) { diff --git a/libsqlx-server/src/linc/connection.rs b/libsqlx-server/src/linc/connection.rs index 5f5d9f24..b979c437 100644 --- a/libsqlx-server/src/linc/connection.rs +++ b/libsqlx-server/src/linc/connection.rs @@ -227,8 +227,7 @@ where self.peer = Some(node_id); self.state = ConnectionState::Connected; - self.send_queue = Some(self.bus.send_queue().register(node_id)); - self.bus.connect(node_id); + self.send_queue = Some(self.bus.connect(node_id)); Ok(()) } @@ -321,9 +320,7 @@ mod test { assert!(matches!( m.message, - Message::Error( - ProtoError::HandshakeVersionMismatch { .. } - ) + Message::Error(ProtoError::HandshakeVersionMismatch { .. }) )); done.notify_waiters(); diff --git a/libsqlx-server/src/linc/handler.rs b/libsqlx-server/src/linc/handler.rs index 828c8bb6..2d17ff96 100644 --- a/libsqlx-server/src/linc/handler.rs +++ b/libsqlx-server/src/linc/handler.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use super::bus::{Dispatch}; +use super::bus::Dispatch; use super::Inbound; #[async_trait::async_trait] @@ -11,9 +11,10 @@ pub trait Handler: Sized + Send + Sync + 'static { #[cfg(test)] #[async_trait::async_trait] -impl Handler for F -where F: Fn(Arc, Inbound) -> Fut + Send + Sync + 'static, - Fut: std::future::Future + Send, +impl Handler for F +where + F: Fn(Arc, Inbound) -> Fut + Send + Sync + 'static, + Fut: std::future::Future + Send, { async fn handle(&self, bus: Arc, msg: Inbound) { (self)(bus, msg).await diff --git a/libsqlx-server/src/linc/proto.rs b/libsqlx-server/src/linc/proto.rs index a9aa529d..7e3a583d 100644 --- a/libsqlx-server/src/linc/proto.rs +++ b/libsqlx-server/src/linc/proto.rs @@ -107,6 +107,7 @@ pub enum BuilderStep { is_txn: bool, frame_no: Option, }, + FinnalizeError(String), } #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/libsqlx/src/database/libsql/mod.rs b/libsqlx/src/database/libsql/mod.rs index 5c060e4d..382a4177 100644 --- a/libsqlx/src/database/libsql/mod.rs +++ b/libsqlx/src/database/libsql/mod.rs @@ -237,11 +237,9 @@ mod test { let file = File::open("assets/test/simple_wallog").unwrap(); let log = LogFile::new(file).unwrap(); let mut injector = db.injector().unwrap(); - log.frames_iter() - .unwrap() - .for_each(|f| { - injector.inject(f.unwrap()).unwrap(); - }); + log.frames_iter().unwrap().for_each(|f| { + injector.inject(f.unwrap()).unwrap(); + }); let row: Arc>> = Default::default(); let builder = Box::new(ReadRowBuilder(row.clone())); @@ -258,7 +256,10 @@ mod test { let primary = LibsqlDatabase::new( temp_primary.path().to_path_buf(), PrimaryType { - logger: Arc::new(ReplicationLogger::open(temp_primary.path(), false, (), Box::new(|_| ())).unwrap()), + logger: Arc::new( + ReplicationLogger::open(temp_primary.path(), false, (), Box::new(|_| ())) + .unwrap(), + ), }, ); @@ -363,7 +364,7 @@ mod test { temp.path().to_path_buf(), Compactor(compactor_called.clone()), false, - Box::new(|_| ()) + Box::new(|_| ()), ) .unwrap(); @@ -374,7 +375,7 @@ mod test { "create table test (x)", "insert into test values (12)", ]), - Box::new(()) + Box::new(()), ) .unwrap(); conn.inner_connection().cache_flush().unwrap(); diff --git a/libsqlx/src/database/proxy/connection.rs b/libsqlx/src/database/proxy/connection.rs index 2d576387..a7638e56 100644 --- a/libsqlx/src/database/proxy/connection.rs +++ b/libsqlx/src/database/proxy/connection.rs @@ -10,9 +10,27 @@ use crate::Result; use super::WaitFrameNoCb; +#[derive(Debug, Default)] +enum State { + Txn, + #[default] + Idle, + Unknown, +} + +impl State { + /// Returns `true` if the state is [`Idle`]. + /// + /// [`Idle`]: State::Idle + #[must_use] + fn is_idle(&self) -> bool { + matches!(self, Self::Idle) + } +} + #[derive(Debug, Default)] pub(crate) struct ConnState { - is_txn: bool, + state: State, last_frame_no: Option, } @@ -123,6 +141,9 @@ where state: self.state.clone(), }; + // set the connection state to unknown before executing on the remote + self.state.lock().state = State::Unknown; + self.conn .execute_program(&self.pgm, Box::new(builder)) .unwrap(); @@ -132,6 +153,10 @@ where self.builder.as_mut().unwrap().finnalize(is_txn, frame_no) } } + + fn finnalize_error(&mut self, e: String) { + self.builder.take().unwrap().finnalize_error(e) + } } impl Connection for WriteProxyConnection @@ -144,7 +169,7 @@ where pgm: &Program, builder: Box, ) -> crate::Result<()> { - if !self.state.lock().is_txn && pgm.is_read_only() { + if self.state.lock().state.is_idle() && pgm.is_read_only() { if let Some(frame_no) = self.state.lock().last_frame_no { (self.wait_frame_no_cb)(frame_no); } @@ -162,6 +187,9 @@ where // rollback(&mut self.conn.read_db); Ok(()) } else { + // we set the state to unknown because until we have received from the actual + // connection state from the primary. + self.state.lock().state = State::Unknown; let builder = ExtractFrameNoBuilder { builder, state: self.state.clone(), @@ -243,9 +271,17 @@ impl ResultBuilder for ExtractFrameNoBuilder { ) -> Result { let mut state = self.state.lock(); state.last_frame_no = frame_no; - state.is_txn = is_txn; + if is_txn { + state.state = State::Txn; + } else { + state.state = State::Idle; + } self.builder.finnalize(is_txn, frame_no) } + + fn finnalize_error(&mut self, e: String) { + self.builder.finnalize_error(e) + } } #[cfg(test)] @@ -254,10 +290,10 @@ mod test { use parking_lot::Mutex; - use crate::Connection; use crate::database::test_utils::MockDatabase; use crate::database::{proxy::database::WriteProxyDatabase, Database}; use crate::program::Program; + use crate::Connection; #[test] fn simple_write_proxied() { @@ -266,7 +302,7 @@ mod test { let write_called = write_called.clone(); move |_, mut b| { b.finnalize(false, Some(42)).unwrap(); - *write_called.lock() =true; + *write_called.lock() = true; Ok(()) } }); @@ -294,8 +330,11 @@ mod test { ); let mut conn = db.connect().unwrap(); - conn.execute_program(&Program::seq(&["insert into test values (12)"]), Box::new(())) - .unwrap(); + conn.execute_program( + &Program::seq(&["insert into test values (12)"]), + Box::new(()), + ) + .unwrap(); assert!(!*wait_called.lock()); assert!(!*read_called.lock()); diff --git a/libsqlx/src/database/test_utils.rs b/libsqlx/src/database/test_utils.rs index 93bf3b1d..3034ca93 100644 --- a/libsqlx/src/database/test_utils.rs +++ b/libsqlx/src/database/test_utils.rs @@ -10,17 +10,17 @@ use super::Database; pub struct MockDatabase { #[allow(clippy::type_complexity)] - describe_fn: Arc crate::Result +Send +Sync>, + describe_fn: Arc crate::Result + Send + Sync>, #[allow(clippy::type_complexity)] - execute_fn: Arc) -> crate::Result<()> +Send +Sync>, + execute_fn: Arc) -> crate::Result<()> + Send + Sync>, } #[derive(Clone)] pub struct MockConnection { #[allow(clippy::type_complexity)] - describe_fn: Arc crate::Result + Send +Sync>, + describe_fn: Arc crate::Result + Send + Sync>, #[allow(clippy::type_complexity)] - execute_fn: Arc) -> crate::Result<()> + Send +Sync>, + execute_fn: Arc) -> crate::Result<()> + Send + Sync>, } impl MockDatabase { @@ -33,7 +33,7 @@ impl MockDatabase { pub fn with_execute( mut self, - f: impl Fn(&Program, Box) -> crate::Result<()> + Send + Sync +'static, + f: impl Fn(&Program, Box) -> crate::Result<()> + Send + Sync + 'static, ) -> Self { self.execute_fn = Arc::new(f); self diff --git a/libsqlx/src/program.rs b/libsqlx/src/program.rs index b2b627af..fc30a4bf 100644 --- a/libsqlx/src/program.rs +++ b/libsqlx/src/program.rs @@ -39,7 +39,6 @@ impl Program { Self { steps } } - #[cfg(test)] pub fn seq(stmts: &[&str]) -> Self { use crate::{analysis::Statement, query::Params}; diff --git a/libsqlx/src/result_builder.rs b/libsqlx/src/result_builder.rs index 458b50cc..d69ac35b 100644 --- a/libsqlx/src/result_builder.rs +++ b/libsqlx/src/result_builder.rs @@ -138,6 +138,9 @@ pub trait ResultBuilder: Send + 'static { ) -> Result { Ok(true) } + + /// There was a fatal error and the request was aborted + fn finnalize_error(&mut self, _e: String) {} } pub trait ResultBuilderExt: ResultBuilder {