@@ -247,56 +247,20 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
247
247
}
248
248
249
249
boolean shouldStreamResponses = shouldStreamResponses (currentServerDescription );
250
+ lookupStartTimeNanos = System .nanoTime ();
251
+
252
+ // Handle connection setup
250
253
if (connection == null || connection .isClosed ()) {
251
- lookupStartTimeNanos = System .nanoTime ();
252
- return openNewConnectionAndGetInitialDescription (shouldStreamResponses );
254
+ return setupNewConnectionAndGetInitialDescription (shouldStreamResponses );
253
255
}
254
256
257
+ // Log heartbeat started if it hasn't been logged yet
255
258
if (!alreadyLoggedHeartBeatStarted ) {
256
- lookupStartTimeNanos = System .nanoTime ();
257
- logHeartbeatStarted (serverId , connection .getDescription (), shouldStreamResponses );
258
- serverMonitorListener .serverHearbeatStarted (new ServerHeartbeatStartedEvent (
259
- connection .getDescription ().getConnectionId (), shouldStreamResponses ));
259
+ logAndNotifyHeartbeatStarted (shouldStreamResponses );
260
260
}
261
- alreadyLoggedHeartBeatStarted = false ;
262
-
263
- try {
264
- OperationContext operationContext = operationContextFactory .create ();
265
- if (!connection .hasMoreToCome ()) {
266
- BsonDocument helloDocument = new BsonDocument (getHandshakeCommandName (currentServerDescription ), new BsonInt32 (1 ))
267
- .append ("helloOk" , BsonBoolean .TRUE );
268
- if (shouldStreamResponses ) {
269
- helloDocument .append ("topologyVersion" , assertNotNull (currentServerDescription .getTopologyVersion ()).asDocument ());
270
- helloDocument .append ("maxAwaitTimeMS" , new BsonInt64 (serverSettings .getHeartbeatFrequency (MILLISECONDS )));
271
- }
272
- connection .send (createCommandMessage (helloDocument , connection , currentServerDescription ), new BsonDocumentCodec (),
273
- operationContext );
274
- }
275
261
276
- BsonDocument helloResult ;
277
- if (shouldStreamResponses ) {
278
- helloResult = connection .receive (new BsonDocumentCodec (), operationContextWithAdditionalTimeout (operationContext ));
279
- } else {
280
- helloResult = connection .receive (new BsonDocumentCodec (), operationContext );
281
- }
282
- long elapsedTimeNanos = getElapsedTimeNanos ();
283
- if (!shouldStreamResponses ) {
284
- roundTripTimeSampler .addSample (elapsedTimeNanos );
285
- }
286
- logHeartbeatSucceeded (serverId , connection .getDescription (), shouldStreamResponses , elapsedTimeNanos , helloResult );
287
- serverMonitorListener .serverHeartbeatSucceeded (
288
- new ServerHeartbeatSucceededEvent (connection .getDescription ().getConnectionId (), helloResult ,
289
- elapsedTimeNanos , shouldStreamResponses ));
290
- return createServerDescription (serverId .getAddress (), helloResult , roundTripTimeSampler .getAverage (),
291
- roundTripTimeSampler .getMin ());
292
- } catch (Exception e ) {
293
- long elapsedTimeNanos = getElapsedTimeNanos ();
294
- logHeartbeatFailed (serverId , connection .getDescription (), shouldStreamResponses , elapsedTimeNanos , e );
295
- serverMonitorListener .serverHeartbeatFailed (
296
- new ServerHeartbeatFailedEvent (connection .getDescription ().getConnectionId (), elapsedTimeNanos ,
297
- shouldStreamResponses , e ));
298
- throw e ;
299
- }
262
+ // Get existing connection
263
+ return doHeartbeat (currentServerDescription , shouldStreamResponses );
300
264
} catch (Throwable t ) {
301
265
roundTripTimeSampler .reset ();
302
266
InternalConnection localConnection = withLock (lock , () -> {
@@ -311,30 +275,81 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
311
275
}
312
276
}
313
277
314
- private ServerDescription openNewConnectionAndGetInitialDescription (final boolean shouldStreamResponses ) {
315
- currentCheckCancelled = false ;
316
- alreadyLoggedHeartBeatStarted = true ;
317
- InternalConnection newConnection = internalConnectionFactory .create (serverId );
318
- logHeartbeatStarted (serverId , newConnection .getDescription (), shouldStreamResponses );
319
- serverMonitorListener .serverHearbeatStarted (new ServerHeartbeatStartedEvent (
320
- newConnection .getDescription ().getConnectionId (), shouldStreamResponses ));
278
+ private ServerDescription setupNewConnectionAndGetInitialDescription (final boolean shouldStreamResponses ) {
279
+ connection = internalConnectionFactory .create (serverId );
280
+ logAndNotifyHeartbeatStarted (shouldStreamResponses );
321
281
322
282
try {
323
- newConnection .open (operationContextFactory .create ());
324
- connection = newConnection ;
283
+ connection .open (operationContextFactory .create ());
325
284
roundTripTimeSampler .addSample (connection .getInitialServerDescription ().getRoundTripTimeNanos ());
326
285
return connection .getInitialServerDescription ();
327
286
} catch (Exception e ) {
328
- alreadyLoggedHeartBeatStarted = false ;
329
- long elapsedTimeNanos = getElapsedTimeNanos ();
330
- logHeartbeatFailed (serverId , newConnection .getDescription (), shouldStreamResponses , elapsedTimeNanos , e );
331
- serverMonitorListener .serverHeartbeatFailed (
332
- new ServerHeartbeatFailedEvent (newConnection .getDescription ().getConnectionId (), elapsedTimeNanos ,
333
- shouldStreamResponses , e ));
287
+ logAndNotifyHeartbeatFailed (shouldStreamResponses , e );
334
288
throw e ;
335
289
}
336
290
}
337
291
292
+ /**
293
+ * Run hello command to get the server description.
294
+ */
295
+ private ServerDescription doHeartbeat (final ServerDescription currentServerDescription ,
296
+ final boolean shouldStreamResponses ) {
297
+ try {
298
+ OperationContext operationContext = operationContextFactory .create ();
299
+ if (!connection .hasMoreToCome ()) {
300
+ BsonDocument helloDocument = new BsonDocument (getHandshakeCommandName (currentServerDescription ), new BsonInt32 (1 ))
301
+ .append ("helloOk" , BsonBoolean .TRUE );
302
+ if (shouldStreamResponses ) {
303
+ helloDocument .append ("topologyVersion" , assertNotNull (currentServerDescription .getTopologyVersion ()).asDocument ());
304
+ helloDocument .append ("maxAwaitTimeMS" , new BsonInt64 (serverSettings .getHeartbeatFrequency (MILLISECONDS )));
305
+ }
306
+ connection .send (createCommandMessage (helloDocument , connection , currentServerDescription ), new BsonDocumentCodec (),
307
+ operationContext );
308
+ }
309
+
310
+ BsonDocument helloResult ;
311
+ if (shouldStreamResponses ) {
312
+ helloResult = connection .receive (new BsonDocumentCodec (), operationContextWithAdditionalTimeout (operationContext ));
313
+ } else {
314
+ helloResult = connection .receive (new BsonDocumentCodec (), operationContext );
315
+ }
316
+ logAndNotifyHeartbeatSucceeded (shouldStreamResponses , helloResult );
317
+ return createServerDescription (serverId .getAddress (), helloResult , roundTripTimeSampler .getAverage (),
318
+ roundTripTimeSampler .getMin ());
319
+ } catch (Exception e ) {
320
+ logAndNotifyHeartbeatFailed (shouldStreamResponses , e );
321
+ throw e ;
322
+ }
323
+ }
324
+
325
+ private void logAndNotifyHeartbeatStarted (final boolean shouldStreamResponses ) {
326
+ alreadyLoggedHeartBeatStarted = true ;
327
+ logHeartbeatStarted (serverId , connection .getDescription (), shouldStreamResponses );
328
+ serverMonitorListener .serverHearbeatStarted (new ServerHeartbeatStartedEvent (
329
+ connection .getDescription ().getConnectionId (), shouldStreamResponses ));
330
+ }
331
+
332
+ private void logAndNotifyHeartbeatSucceeded (final boolean shouldStreamResponses , final BsonDocument helloResult ) {
333
+ alreadyLoggedHeartBeatStarted = false ;
334
+ long elapsedTimeNanos = getElapsedTimeNanos ();
335
+ if (!shouldStreamResponses ) {
336
+ roundTripTimeSampler .addSample (elapsedTimeNanos );
337
+ }
338
+ logHeartbeatSucceeded (serverId , connection .getDescription (), shouldStreamResponses , elapsedTimeNanos , helloResult );
339
+ serverMonitorListener .serverHeartbeatSucceeded (
340
+ new ServerHeartbeatSucceededEvent (connection .getDescription ().getConnectionId (), helloResult ,
341
+ elapsedTimeNanos , shouldStreamResponses ));
342
+ }
343
+
344
+ private void logAndNotifyHeartbeatFailed (final boolean shouldStreamResponses , final Exception e ) {
345
+ alreadyLoggedHeartBeatStarted = false ;
346
+ long elapsedTimeNanos = getElapsedTimeNanos ();
347
+ logHeartbeatFailed (serverId , connection .getDescription (), shouldStreamResponses , elapsedTimeNanos , e );
348
+ serverMonitorListener .serverHeartbeatFailed (
349
+ new ServerHeartbeatFailedEvent (connection .getDescription ().getConnectionId (), elapsedTimeNanos ,
350
+ shouldStreamResponses , e ));
351
+ }
352
+
338
353
private long getElapsedTimeNanos () {
339
354
return System .nanoTime () - lookupStartTimeNanos ;
340
355
}
0 commit comments