@@ -366,35 +366,46 @@ def connect_blocking(self, timeout=float('inf')):
366
366
367
367
def connect (self ):
368
368
"""Attempt to connect and return ConnectionState"""
369
+ if self .config ["socks5_proxy" ] is not None and self ._socks5_proxy is None :
370
+ self ._socks5_proxy = Socks5Wrapper (self .config ["socks5_proxy" ], self .afi )
371
+
369
372
if self .state is ConnectionStates .DISCONNECTED and not self .blacked_out ():
370
373
self .state = ConnectionStates .CONNECTING
371
374
self .last_attempt = time .time ()
372
- next_lookup = self ._next_afi_sockaddr ()
373
- if not next_lookup :
374
- self .close (Errors .KafkaConnectionError ('DNS failure' ))
375
- return self .state
376
- else :
375
+ if self .config ["socks5_proxy" ] is None or not self ._socks5_proxy .use_remote_lookup ():
376
+ next_lookup = self ._next_afi_sockaddr ()
377
+ if not next_lookup :
378
+ self .close (Errors .KafkaConnectionError ('DNS failure' ))
379
+ return self .state
380
+
377
381
log .debug ('%s: creating new socket' , self )
378
382
assert self ._sock is None
379
383
self ._sock_afi , self ._sock_addr = next_lookup
380
384
try :
381
385
if self .config ["socks5_proxy" ] is not None :
382
- self ._socks5_proxy = Socks5Wrapper (self .config ["socks5_proxy" ], self .afi )
383
386
self ._sock = self ._socks5_proxy .socket (self ._sock_afi , socket .SOCK_STREAM )
384
387
else :
385
388
self ._sock = socket .socket (self ._sock_afi , socket .SOCK_STREAM )
386
389
except (socket .error , OSError ) as e :
387
390
self .close (e )
388
391
return self .state
392
+ else :
393
+ self ._sock = self ._socks5_proxy .socket (None , socket .SOCK_STREAM )
394
+ self ._sock_afi = None
395
+ self ._sock_addr = [self .host .encode ('ascii' ), self .port ]
389
396
390
397
for option in self .config ['socket_options' ]:
391
398
log .debug ('%s: setting socket option %s' , self , option )
392
399
self ._sock .setsockopt (* option )
393
400
394
401
self ._sock .setblocking (False )
395
402
self .config ['state_change_callback' ](self .node_id , self ._sock , self )
403
+ if self ._sock_afi != None :
404
+ family_str = AFI_NAMES [self ._sock_afi ]
405
+ else :
406
+ family_str = "n.a."
396
407
log .info ('%s: connecting to %s:%d [%s %s]' , self , self .host ,
397
- self .port , self ._sock_addr , AFI_NAMES [ self . _sock_afi ] )
408
+ self .port , self ._sock_addr , family_str )
398
409
399
410
if self .state is ConnectionStates .CONNECTING :
400
411
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -862,7 +873,7 @@ def connection_delay(self):
862
873
large number to handle slow/stalled connections.
863
874
"""
864
875
if self .disconnected () or self .connecting ():
865
- if len (self ._gai ) > 0 :
876
+ if len (self ._gai ) > 0 or self . _socks5_proxy . use_remote_lookup () :
866
877
return 0
867
878
else :
868
879
time_waited = time .time () - self .last_attempt
@@ -1291,9 +1302,13 @@ def check_version(self, timeout=2, **kwargs):
1291
1302
return self ._api_version
1292
1303
1293
1304
def __str__ (self ):
1305
+ if self ._sock_afi != None :
1306
+ family_str = AFI_NAMES [self ._sock_afi ]
1307
+ else :
1308
+ family_str = "n.a."
1294
1309
return "<BrokerConnection client_id=%s, node_id=%s host=%s:%d %s [%s %s]>" % (
1295
1310
self .config ['client_id' ], self .node_id , self .host , self .port , self .state ,
1296
- AFI_NAMES [ self . _sock_afi ] , self ._sock_addr )
1311
+ family_str , self ._sock_addr )
1297
1312
1298
1313
1299
1314
class BrokerConnectionMetrics (object ):
0 commit comments