Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit f57b173

Browse files
committed
implement LINC protocol
1 parent 6cefe75 commit f57b173

File tree

13 files changed

+2422
-555
lines changed

13 files changed

+2422
-555
lines changed

Cargo.lock

Lines changed: 621 additions & 551 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsqlx-server/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9+
async-bincode = { version = "0.7.1", features = ["tokio"] }
910
axum = "0.6.18"
1011
base64 = "0.21.2"
1112
bincode = "1.3.3"
@@ -15,6 +16,7 @@ color-eyre = "0.6.2"
1516
futures = "0.3.28"
1617
hmac = "0.12.1"
1718
hyper = { version = "0.14.27", features = ["h2", "server"] }
19+
itertools = "0.11.0"
1820
libsqlx = { version = "0.1.0", path = "../libsqlx" }
1921
moka = { version = "0.11.2", features = ["future"] }
2022
parking_lot = "0.12.1"
@@ -27,6 +29,11 @@ sha2 = "0.10.7"
2729
sled = "0.34.7"
2830
thiserror = "1.0.43"
2931
tokio = { version = "1.29.1", features = ["full"] }
32+
tokio-stream = "0.1.14"
33+
tokio-util = "0.7.8"
3034
tracing = "0.1.37"
3135
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
3236
uuid = { version = "1.4.0", features = ["v4"] }
37+
38+
[dev-dependencies]
39+
turmoil = "0.5.5"

libsqlx-server/src/database.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
use tokio::sync::{mpsc, oneshot};
22

33
use crate::hrana::http::proto::{PipelineRequestBody, PipelineResponseBody};
4-
use crate::allocation::{AllocationMessage, ConnectionHandle};
4+
use crate::allocation::AllocationMessage;
55

66
pub struct Database {
77
pub sender: mpsc::Sender<AllocationMessage>,
88
}
99

1010
impl Database {
1111
pub async fn hrana_pipeline(&self, req: PipelineRequestBody) -> crate::Result<PipelineResponseBody> {
12-
dbg!();
1312
let (sender, ret) = oneshot::channel();
14-
dbg!();
1513
self.sender.send(AllocationMessage::HranaPipelineReq { req, ret: sender }).await.unwrap();
16-
dbg!();
1714
ret.await.unwrap()
1815
}
1916
}

libsqlx-server/src/linc/bus.rs

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
use std::collections::{hash_map::Entry, HashMap};
2+
use std::sync::Arc;
3+
4+
use color_eyre::eyre::{bail, anyhow};
5+
use parking_lot::Mutex;
6+
use tokio::sync::{mpsc, Notify};
7+
use uuid::Uuid;
8+
9+
use super::connection::{ConnectionHandle, Stream};
10+
11+
type NodeId = Uuid;
12+
type DatabaseId = Uuid;
13+
14+
#[must_use]
15+
pub struct Subscription {
16+
receiver: mpsc::Receiver<Stream>,
17+
bus: Bus,
18+
database_id: DatabaseId,
19+
}
20+
21+
impl Drop for Subscription {
22+
fn drop(&mut self) {
23+
self.bus
24+
.inner
25+
.lock()
26+
.subscriptions
27+
.remove(&self.database_id);
28+
}
29+
}
30+
31+
impl futures::Stream for Subscription {
32+
type Item = Stream;
33+
34+
fn poll_next(
35+
mut self: std::pin::Pin<&mut Self>,
36+
cx: &mut std::task::Context<'_>,
37+
) -> std::task::Poll<Option<Self::Item>> {
38+
self.receiver.poll_recv(cx)
39+
}
40+
}
41+
42+
#[derive(Clone)]
43+
pub struct Bus {
44+
inner: Arc<Mutex<BusInner>>,
45+
pub node_id: NodeId,
46+
}
47+
48+
enum ConnectionSlot {
49+
Handle(ConnectionHandle),
50+
// Interest in the connection when it becomes available
51+
Interest(Arc<Notify>),
52+
}
53+
54+
struct BusInner {
55+
connections: HashMap<NodeId, ConnectionSlot>,
56+
subscriptions: HashMap<DatabaseId, mpsc::Sender<Stream>>,
57+
}
58+
59+
impl Bus {
60+
pub fn new(node_id: NodeId) -> Self {
61+
Self {
62+
node_id,
63+
inner: Arc::new(Mutex::new(BusInner {
64+
connections: HashMap::new(),
65+
subscriptions: HashMap::new(),
66+
})),
67+
}
68+
}
69+
70+
/// open a new stream to the database at `database_id` on the node `node_id`
71+
pub async fn new_stream(
72+
&self,
73+
node_id: NodeId,
74+
database_id: DatabaseId,
75+
) -> color_eyre::Result<Stream> {
76+
let get_conn = || {
77+
let mut lock = self.inner.lock();
78+
match lock.connections.entry(node_id) {
79+
Entry::Occupied(mut e) => match e.get_mut() {
80+
ConnectionSlot::Handle(h) => Ok(h.clone()),
81+
ConnectionSlot::Interest(notify) => Err(notify.clone()),
82+
},
83+
Entry::Vacant(e) => {
84+
let notify = Arc::new(Notify::new());
85+
e.insert(ConnectionSlot::Interest(notify.clone()));
86+
Err(notify)
87+
}
88+
}
89+
};
90+
91+
let conn = match get_conn() {
92+
Ok(conn) => conn,
93+
Err(notify) => {
94+
notify.notified().await;
95+
get_conn().map_err(|_| anyhow!("failed to create stream"))?
96+
}
97+
};
98+
99+
conn.new_stream(database_id).await
100+
}
101+
102+
/// Notify a subscription that new stream was openned
103+
pub async fn notify_subscription(
104+
&mut self,
105+
database_id: DatabaseId,
106+
stream: Stream,
107+
) -> color_eyre::Result<()> {
108+
let maybe_sender = self.inner.lock().subscriptions.get(&database_id).cloned();
109+
110+
match maybe_sender {
111+
Some(sender) => {
112+
if sender.send(stream).await.is_err() {
113+
bail!("subscription for {database_id} closed");
114+
}
115+
116+
Ok(())
117+
}
118+
None => {
119+
bail!("no subscription for {database_id}")
120+
}
121+
}
122+
}
123+
124+
#[cfg(test)]
125+
pub fn is_empty(&self) -> bool {
126+
self.inner.lock().connections.is_empty()
127+
}
128+
129+
#[must_use]
130+
pub fn register_connection(&self, node_id: NodeId, conn: ConnectionHandle) -> Registration {
131+
let mut lock = self.inner.lock();
132+
match lock.connections.entry(node_id) {
133+
Entry::Occupied(mut e) => {
134+
if let ConnectionSlot::Interest(ref notify) = e.get() {
135+
notify.notify_waiters();
136+
}
137+
138+
*e.get_mut() = ConnectionSlot::Handle(conn);
139+
}
140+
Entry::Vacant(e) => {
141+
e.insert(ConnectionSlot::Handle(conn));
142+
}
143+
}
144+
145+
Registration {
146+
bus: self.clone(),
147+
node_id,
148+
}
149+
}
150+
151+
pub fn subscribe(&self, database_id: DatabaseId) -> color_eyre::Result<Subscription> {
152+
let (sender, receiver) = mpsc::channel(1);
153+
{
154+
let mut inner = self.inner.lock();
155+
156+
if inner.subscriptions.contains_key(&database_id) {
157+
bail!("a subscription already exist for that database");
158+
}
159+
160+
inner.subscriptions.insert(database_id, sender);
161+
}
162+
163+
Ok(Subscription {
164+
receiver,
165+
bus: self.clone(),
166+
database_id,
167+
})
168+
}
169+
}
170+
171+
pub struct Registration {
172+
bus: Bus,
173+
node_id: NodeId,
174+
}
175+
176+
impl Drop for Registration {
177+
fn drop(&mut self) {
178+
assert!(self
179+
.bus
180+
.inner
181+
.lock()
182+
.connections
183+
.remove(&self.node_id)
184+
.is_some());
185+
}
186+
}

0 commit comments

Comments
 (0)