Skip to content

Commit 5c14c9e

Browse files
committed
retry quorum
Summary: - we currently don't retry quorum requests from the manager to lighthouse - if lighthouse crashes, this can result in all replicas crashing - so add retries configurable through env var - remove timeout on the request made the the manager server
1 parent ca01921 commit 5c14c9e

File tree

4 files changed

+67
-16
lines changed

4 files changed

+67
-16
lines changed

src/lib.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ fn num_threads() -> usize {
7171
/// world_size (int): The world size of the replica group.
7272
/// heartbeat_interval (timedelta): The interval at which heartbeats are sent.
7373
/// connect_timeout (timedelta): The timeout for connecting to the lighthouse server.
74+
/// quorum_retries (int): The number of retries for quorum requests to lighthouse server.
7475
#[pyclass]
7576
struct ManagerServer {
7677
handle: JoinHandle<Result<()>>,
@@ -91,6 +92,7 @@ impl ManagerServer {
9192
world_size: u64,
9293
heartbeat_interval: Duration,
9394
connect_timeout: Duration,
95+
quorum_retries: i64,
9496
) -> PyResult<Self> {
9597
py.allow_threads(move || {
9698
let runtime = tokio::runtime::Builder::new_multi_thread()
@@ -108,6 +110,7 @@ impl ManagerServer {
108110
world_size,
109111
heartbeat_interval,
110112
connect_timeout,
113+
quorum_retries,
111114
))
112115
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
113116
let handle = runtime.spawn(manager.clone().run());
@@ -190,9 +193,11 @@ impl ManagerClient {
190193
commit_failures: commit_failures,
191194
});
192195

193-
// This timeout is processed on the server side so we also enable
194-
// keep alives to detect server health.
195-
request.set_timeout(timeout);
196+
// TODO: Have some mechanism to detect server health.
197+
//
198+
// We can't use timeouts here e.g. using `request.set_timeout(timeout)`
199+
// because the server might need to retry quorum requests to the
200+
// lighthouse server.
196201

197202
let response = self.runtime.block_on(self.client.clone().quorum(request))?;
198203
let resp = response.into_inner();

src/manager.rs

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::collections::HashSet;
1010
use std::sync::Arc;
1111
use std::time::Duration;
1212

13+
use crate::torchftpb::LighthouseQuorumResponse;
1314
use anyhow::Result;
1415
use tokio::sync::broadcast;
1516
use tokio::sync::Mutex;
@@ -72,6 +73,7 @@ pub struct Manager {
7273
local_addr: SocketAddr,
7374
heartbeat_interval: Duration,
7475
lighthouse_client: LighthouseServiceClient<Channel>,
76+
quorum_retries: i64,
7577
}
7678

7779
pub async fn manager_client_new(
@@ -108,6 +110,7 @@ impl Manager {
108110
world_size: u64,
109111
heartbeat_interval: Duration,
110112
connect_timeout: Duration,
113+
quorum_retries: i64,
111114
) -> Result<Arc<Self>> {
112115
let listener = tokio::net::TcpListener::bind(&bind).await?;
113116
let local_addr = listener.local_addr()?;
@@ -135,6 +138,7 @@ impl Manager {
135138
}),
136139
local_addr: local_addr,
137140
listener: Mutex::new(Some(listener)),
141+
quorum_retries,
138142
}))
139143
}
140144

@@ -197,21 +201,13 @@ impl Manager {
197201

198202
// TODO: don't hold the lock during quorum
199203

200-
let mut client = self.lighthouse_client.clone();
201-
202-
let mut lighthouse_request = tonic::Request::new(LighthouseQuorumRequest {
204+
let lighthouse_request = LighthouseQuorumRequest {
203205
requester: Some(requester),
204-
});
205-
lighthouse_request.set_timeout(timeout);
206+
};
206207

207-
let response = tokio::time::timeout(timeout, client.quorum(lighthouse_request))
208-
.await
209-
.unwrap_or_else(|e| {
210-
Err(Status::cancelled(format!(
211-
"lighthouse quorum timed out: {}",
212-
e.to_string()
213-
)))
214-
})?;
208+
let response = self
209+
._quorum_with_retries(timeout, lighthouse_request)
210+
.await?;
215211
let resp = response.into_inner();
216212

217213
info_with_replica!(self.replica_id, "got lighthouse quorum {:?}", resp);
@@ -226,6 +222,45 @@ impl Manager {
226222

227223
Ok(())
228224
}
225+
226+
async fn _quorum_with_retries(
227+
&self,
228+
timeout: Duration,
229+
lighthouse_request: LighthouseQuorumRequest,
230+
) -> Result<tonic::Response<LighthouseQuorumResponse>, Status> {
231+
let mut client = self.lighthouse_client.clone();
232+
233+
let mut retry_count = 0;
234+
loop {
235+
let mut request = tonic::Request::new(lighthouse_request.clone());
236+
request.set_timeout(timeout);
237+
238+
let result = tokio::time::timeout(timeout, client.quorum(request)).await;
239+
240+
match result {
241+
Ok(response) => {
242+
return response;
243+
}
244+
Err(e) => {
245+
info_with_replica!(
246+
self.replica_id,
247+
"lighthouse quorum timed out: {}",
248+
e.to_string()
249+
);
250+
251+
if retry_count == self.quorum_retries {
252+
return Err(Status::internal(format!(
253+
"lighthouse quorum timed out after {} retries",
254+
retry_count
255+
)));
256+
}
257+
258+
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
259+
retry_count += 1;
260+
}
261+
}
262+
}
263+
}
229264
}
230265

231266
#[tonic::async_trait]
@@ -563,6 +598,7 @@ mod tests {
563598
2, // world size
564599
Duration::from_millis(100), // heartbeat interval
565600
Duration::from_secs(10), // connect timeout
601+
0, // quorum retries
566602
)
567603
.await?;
568604
let manager_fut = tokio::spawn(manager._run_grpc());
@@ -610,6 +646,7 @@ mod tests {
610646
1, // world size
611647
Duration::from_millis(100), // heartbeat interval
612648
Duration::from_secs(10), // connect timeout
649+
0, // quorum retries
613650
)
614651
.await?;
615652
let manager_fut = tokio::spawn(manager.clone().run());
@@ -671,6 +708,7 @@ mod tests {
671708
1, // world size
672709
Duration::from_millis(100), // heartbeat interval
673710
Duration::from_secs(10), // connect timeout
711+
0, // quorum retries
674712
)
675713
.await?;
676714
let manager_fut = tokio::spawn(manager.clone().run());
@@ -737,6 +775,7 @@ mod tests {
737775
1, // world size
738776
Duration::from_millis(100), // heartbeat interval
739777
Duration::from_secs(10), // connect timeout
778+
0, // quorum retries
740779
)
741780
.await?;
742781
let manager_fut = tokio::spawn(manager.clone().run());

torchft/_torchft.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class ManagerServer:
4848
world_size: int,
4949
heartbeat_interval: timedelta,
5050
connect_timeout: timedelta,
51+
quorum_retries: int,
5152
) -> None: ...
5253
def address(self) -> str: ...
5354
def shutdown(self) -> None: ...

torchft/manager.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@
6767
QUORUM_TIMEOUT_SEC_ENV: str = "TORCHFT_QUORUM_TIMEOUT_SEC"
6868
CONNECT_TIMEOUT_SEC_ENV: str = "TORCHFT_CONNECT_TIMEOUT_SEC"
6969

70+
# Environment variable for the number of retries to use for the quorum.
71+
# We need to retry quorum in case lighthouse fails. Otherwise, if we
72+
# crash if call to quorum fails, all replicas will crash.
73+
QUORUM_RETRIES_ENV: str = "TORCHFT_QUORUM_RETRIES"
74+
7075
T = TypeVar("T")
7176

7277

@@ -271,6 +276,7 @@ def __init__(
271276
world_size=group_world_size,
272277
heartbeat_interval=heartbeat_interval,
273278
connect_timeout=connect_timeout,
279+
quorum_retries=int(os.environ.get(QUORUM_RETRIES_ENV, 0)),
274280
)
275281

276282
self._store.set(MANAGER_ADDR_KEY, self._manager.address())

0 commit comments

Comments
 (0)