Skip to content

Commit f3d9fb2

Browse files
SOCKS5: support looking up names remotely
1 parent 512d0a0 commit f3d9fb2

File tree

2 files changed

+34
-12
lines changed

2 files changed

+34
-12
lines changed

kafka/conn.py

100644100755
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,8 @@ def _dns_lookup(self):
326326
return True
327327

328328
def _next_afi_sockaddr(self):
329+
if self._socks5_proxy.use_remote_lookup():
330+
return (socket.AF_UNSPEC, (self.host, self.port))
329331
if not self._gai:
330332
if not self._dns_lookup():
331333
return
@@ -366,6 +368,9 @@ def connect_blocking(self, timeout=float('inf')):
366368

367369
def connect(self):
368370
"""Attempt to connect and return ConnectionState"""
371+
if self.config["socks5_proxy"] is not None and self._socks5_proxy is None:
372+
self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi)
373+
369374
if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out():
370375
self.state = ConnectionStates.CONNECTING
371376
self.last_attempt = time.time()
@@ -379,7 +384,6 @@ def connect(self):
379384
self._sock_afi, self._sock_addr = next_lookup
380385
try:
381386
if self.config["socks5_proxy"] is not None:
382-
self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi)
383387
self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM)
384388
else:
385389
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
@@ -862,7 +866,7 @@ def connection_delay(self):
862866
large number to handle slow/stalled connections.
863867
"""
864868
if self.disconnected() or self.connecting():
865-
if len(self._gai) > 0:
869+
if len(self._gai) > 0 or (self._socks5_proxy is not None and self._socks5_proxy.use_remote_lookup()):
866870
return 0
867871
else:
868872
time_waited = time.time() - self.last_attempt

kafka/socks5_wrapper.py

100644100755
Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC):
6464
log.warning("DNS lookup failed for proxy %s:%d, %r", host, port, ex)
6565
return []
6666

67+
def use_remote_lookup(self):
68+
return self._proxy_url.scheme == 'socks5h'
69+
6770
def socket(self, family, sock_type):
6871
"""Open and record a socket.
6972
@@ -187,7 +190,10 @@ def connect_ex(self, addr):
187190
return errno.ECONNREFUSED
188191

189192
if self._state == ProxyConnectionStates.REQUEST_SUBMIT:
190-
if self._target_afi == socket.AF_INET:
193+
if self.use_remote_lookup():
194+
addr_type = 3
195+
addr_len = len(addr[0])
196+
elif self._target_afi == socket.AF_INET:
191197
addr_type = 1
192198
addr_len = 4
193199
elif self._target_afi == socket.AF_INET6:
@@ -199,15 +205,27 @@ def connect_ex(self, addr):
199205
self._sock.close()
200206
return errno.ECONNREFUSED
201207

202-
self._buffer_out = struct.pack(
203-
"!bbbb{}sh".format(addr_len),
204-
5, # version
205-
1, # command: connect
206-
0, # reserved
207-
addr_type, # 1 for ipv4, 4 for ipv6 address
208-
socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address
209-
addr[1], # port
210-
)
208+
if self.use_remote_lookup():
209+
self._buffer_out = struct.pack(
210+
"!bbbbb{}sh".format(addr_len),
211+
5, # version
212+
1, # command: connect
213+
0, # reserved
214+
addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name
215+
addr_len,
216+
addr[0].encode('ascii'), # host
217+
addr[1], # port
218+
)
219+
else:
220+
self._buffer_out = struct.pack(
221+
"!bbbb{}sh".format(addr_len),
222+
5, # version
223+
1, # command: connect
224+
0, # reserved
225+
addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name
226+
socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address
227+
addr[1], # port
228+
)
211229
self._state = ProxyConnectionStates.REQUESTING
212230

213231
if self._state == ProxyConnectionStates.REQUESTING:

0 commit comments

Comments
 (0)