Skip to content

SOCKS5: support looking up names remotely #2666

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jschwartzenberg
Copy link

@jschwartzenberg jschwartzenberg commented Jul 23, 2025

Hi all, I'm dealing with a Kafka cluster behind a SOCKS proxy where the broker names cannot locally be resolved.

This PR implements resolving the name remotely. It is probably rather hacky though, so I appreciate any feedback to improve it and will make changes accordingly.

@jschwartzenberg jschwartzenberg force-pushed the feature/socks5h branch 2 times, most recently from 5e712e4 to 8771bee Compare July 25, 2025 13:46
@jschwartzenberg jschwartzenberg changed the title WIP: support looking up remote name resolving with SOCKS5 support looking up remote name resolving with SOCKS5 Jul 25, 2025
@jschwartzenberg jschwartzenberg changed the title support looking up remote name resolving with SOCKS5 SOCKS5: support looking up names remotely Jul 25, 2025
kafka/conn.py Outdated
else:
self._sock = self._socks5_proxy.socket(None, socket.SOCK_STREAM)
self._sock_afi = None
self._sock_addr = [self.host.encode('ascii'), self.port]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend you defer str.encode() until needed by socks code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this as you suggested. Let me know what you think!

kafka/conn.py Outdated
return self.state
else:
if self.config["socks5_proxy"] is None or not self._socks5_proxy.use_remote_lookup():
next_lookup = self._next_afi_sockaddr()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be simpler to move the self._socks5_proxy.use_remote_lookup() inside of _next_afi_sockaddr() ? For socks proxy w/ remote_lookup, it should return (socket.AF_UNSPEC, (self.host, self.port)). With that change I think most of these other changes would be unnecessary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that indeed reduces the changes quite a bit! Let me know what you think of the current version!

kafka/conn.py Outdated
@@ -862,7 +873,7 @@ def connection_delay(self):
large number to handle slow/stalled connections.
"""
if self.disconnected() or self.connecting():
if len(self._gai) > 0:
if len(self._gai) > 0 or self._socks5_proxy.use_remote_lookup():
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this crash if/when self._socks5_proxy is None and self._gai is empty?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're completely right. I added some extra logic now to check whether self._socks5_proxy is set.

kafka/conn.py Outdated

for option in self.config['socket_options']:
log.debug('%s: setting socket option %s', self, option)
self._sock.setsockopt(*option)

self._sock.setblocking(False)
self.config['state_change_callback'](self.node_id, self._sock, self)
if self._sock_afi != None:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally should check is or is not for None, not equality.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got rid of the None variant for the socket family now.

kafka/conn.py Outdated
@@ -1291,9 +1302,13 @@ def check_version(self, timeout=2, **kwargs):
return self._api_version

def __str__(self):
if self._sock_afi != None:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implied suggestion above is to use socket.AF_UNSPEC instead of None

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, makes sense :)

@jschwartzenberg
Copy link
Author

Many thanks for your review! I'll rework and push a better commit!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants