Skip to content

Commit ff655ba

Browse files
committed
Always return a broker address, even in an error state
1 parent 7a8d1e1 commit ff655ba

File tree

4 files changed

+61
-29
lines changed

4 files changed

+61
-29
lines changed

src/kafka-util/src/client.rs

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -470,9 +470,21 @@ where
470470
// tunnel will ever be connected, and only one will be inserted into the
471471
// map.
472472
let ssh_tunnel = self.runtime.block_on(async {
473+
// Ensure the default tunnel host is resolved to an external address.
474+
let resolved_tunnel_addr = resolve_external_address(
475+
&default_tunnel.host,
476+
self.enforce_external_addresses,
477+
)
478+
.await?;
479+
let tunnel_config = SshTunnelConfig {
480+
host: resolved_tunnel_addr.to_string(),
481+
port: default_tunnel.port,
482+
user: default_tunnel.user.clone(),
483+
key_pair: default_tunnel.key_pair.clone(),
484+
};
473485
self.ssh_tunnel_manager
474486
.connect(
475-
default_tunnel.clone(),
487+
tunnel_config,
476488
&addr.host,
477489
addr.port.parse().unwrap(),
478490
self.ssh_timeout_config,
@@ -533,19 +545,37 @@ where
533545
TunnelConfig::None => {
534546
// If no rewrite is specified, we still should check that this potentially
535547
// new broker address is a global address.
536-
let rewrite = self.runtime.block_on(async {
537-
let resolved = resolve_external_address(
548+
self.runtime.block_on(async {
549+
match resolve_external_address(
538550
&addr.host,
539551
self.enforce_external_addresses,
540552
)
541553
.await
542-
.unwrap();
543-
BrokerRewriteHandle::Simple(BrokerRewrite {
544-
host: resolved.to_string(),
545-
port: addr.port.parse().ok(),
546-
})
547-
});
548-
return_rewrite(&rewrite)
554+
{
555+
Ok(resolved) => {
556+
let rewrite = BrokerRewriteHandle::Simple(BrokerRewrite {
557+
host: resolved.to_string(),
558+
port: addr.port.parse().ok(),
559+
});
560+
return_rewrite(&rewrite)
561+
}
562+
Err(e) => {
563+
warn!(
564+
"failed to resolve external address for {:?}: {}",
565+
addr,
566+
e.display_with_causes()
567+
);
568+
// We have to give rdkafka an address, as this callback can't fail,
569+
// we just give it a random one that will never resolve, for safety
570+
// in-case `addr.host` suddenly starts resolving again.
571+
BrokerAddr {
572+
host: "failed-broker-address.dev.materialize.com"
573+
.to_string(),
574+
port: 1337.to_string(),
575+
}
576+
}
577+
}
578+
})
549579
}
550580
}
551581
}

src/storage-types/src/connections.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -547,8 +547,14 @@ impl KafkaConnection {
547547
.await?;
548548
let key_pair = SshKeyPair::from_bytes(&secret)?;
549549

550+
// Ensure any ssh-bastion address we connect to is resolved to an external address.
551+
let resolved = resolve_external_address(
552+
&ssh_tunnel.connection.host,
553+
ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
554+
)
555+
.await?;
550556
context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
551-
host: ssh_tunnel.connection.host.clone(),
557+
host: resolved.to_string(),
552558
port: ssh_tunnel.connection.port,
553559
user: ssh_tunnel.connection.user.clone(),
554560
key_pair,
@@ -567,25 +573,21 @@ impl KafkaConnection {
567573
};
568574
match &broker.tunnel {
569575
Tunnel::Direct => {
576+
// By default, don't override broker address lookup.
577+
//
578+
// N.B.
579+
//
570580
// We _could_ pre-setup the default ssh tunnel for all known brokers here, but
571581
// we avoid doing because:
572582
// - Its not necessary.
573583
// - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path
574584
// in the `TunnelingClientContext`.
575-
576-
// Ensure any broker address we connect to is resolved to an external address.
577-
let resolved = resolve_external_address(
578-
&addr.host,
579-
ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
580-
)
581-
.await?;
582-
context.add_broker_rewrite(
583-
addr,
584-
BrokerRewrite {
585-
host: resolved.to_string(),
586-
port: None,
587-
},
588-
)
585+
//
586+
// NOTE that we do not need to use the `resolve_external_address` method to
587+
// validate the broker address here since it will be validated when the
588+
// connection is established in `src/kafka-util/src/client.rs`, and we do not
589+
// want to specify any BrokerRewrite that would override any default-tunnel
590+
// settings.
589591
}
590592
Tunnel::AwsPrivatelink(aws_privatelink) => {
591593
let host = mz_cloud_resources::vpc_endpoint_host(

test/ssh-connection/kafka-source-after-ssh-failure.td

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
# Ensure they all are marked as broken for ssh reasons
1313
> SELECT status FROM mz_internal.mz_source_statuses st
1414
JOIN mz_sources s ON st.id = s.id
15-
WHERE
16-
s.name in ('fixed_text', 'dynamic_text', 'fixed_plus_csr', 'dynamic_plus_csr')
15+
WHERE error LIKE 'ssh:%'
16+
AND s.name in ('fixed_text', 'dynamic_text', 'fixed_plus_csr', 'dynamic_plus_csr')
1717
stalled
1818
stalled
1919
stalled

test/testdrive/connection-validation.td

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ ALTER SYSTEM SET enable_connection_validation_syntax = true
3737
> CREATE CONNECTION invalid_tunnel TO SSH TUNNEL (HOST 'invalid', USER 'invalid', PORT 22)
3838

3939
! CREATE CONNECTION invalid_kafka_conn TO KAFKA (BROKERS ('${testdrive.kafka-addr}' USING SSH TUNNEL invalid_tunnel), SECURITY PROTOCOL PLAINTEXT)
40-
contains:failed to lookup address information: Name or service not known
40+
contains:failed to lookup address information
4141

4242
# Create the connection without validation and validate later
4343
> CREATE CONNECTION invalid_kafka_conn TO KAFKA (BROKERS ('${testdrive.kafka-addr}' USING SSH TUNNEL invalid_tunnel), SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = false)
4444

4545
! VALIDATE CONNECTION invalid_kafka_conn
46-
contains:failed to lookup address information: Name or service not known
46+
contains:failed to lookup address information

0 commit comments

Comments
 (0)