Skip to content

Commit 7bbd7a4

Browse files
author
Mateusz Rzeszutek
authored
Fix NPE happening when .headersWhen() is used (reactor-netty) (#9511)
1 parent edb0db3 commit 7bbd7a4

File tree

5 files changed

+151
-2
lines changed

5 files changed

+151
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.named;
9+
import static net.bytebuddy.matcher.ElementMatchers.returns;
10+
11+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
12+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
13+
import net.bytebuddy.asm.Advice;
14+
import net.bytebuddy.description.type.TypeDescription;
15+
import net.bytebuddy.matcher.ElementMatcher;
16+
import reactor.core.publisher.Mono;
17+
import reactor.netty.Connection;
18+
import reactor.netty.http.client.HttpClient;
19+
import reactor.netty.http.client.HttpClientConfig;
20+
import reactor.netty.http.client.HttpClientConfigBuddy;
21+
22+
public class HttpClientConnectInstrumentation implements TypeInstrumentation {
23+
@Override
24+
public ElementMatcher<TypeDescription> typeMatcher() {
25+
return named("reactor.netty.http.client.HttpClientConnect");
26+
}
27+
28+
@Override
29+
public void transform(TypeTransformer transformer) {
30+
transformer.applyAdviceToMethod(
31+
named("connect").and(returns(named("reactor.core.publisher.Mono"))),
32+
this.getClass().getName() + "$ConnectAdvice");
33+
}
34+
35+
@SuppressWarnings("unused")
36+
public static class ConnectAdvice {
37+
38+
@Advice.OnMethodExit(suppress = Throwable.class)
39+
public static void onExit(
40+
@Advice.Return(readOnly = false) Mono<? extends Connection> connection,
41+
@Advice.This HttpClient httpClient) {
42+
43+
HttpClientConfig config = httpClient.configuration();
44+
// reactor-netty 1.0.x has a bug: the .mapConnect() function is not applied when deferred
45+
// configuration is used
46+
// we're fixing this bug here, so that our instrumentation can safely add its own
47+
// .mapConnect() listener
48+
if (HttpClientConfigBuddy.hasDeferredConfig(config)) {
49+
connection = HttpClientConfigBuddy.getConnector(config).apply(connection);
50+
}
51+
}
52+
}
53+
}

instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/InstrumentationContexts.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,18 @@
1313
import io.opentelemetry.instrumentation.api.internal.Timer;
1414
import java.util.Queue;
1515
import java.util.concurrent.LinkedBlockingQueue;
16+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
1617
import javax.annotation.Nullable;
1718
import reactor.netty.http.client.HttpClientRequest;
1819
import reactor.netty.http.client.HttpClientResponse;
1920

2021
final class InstrumentationContexts {
2122

23+
private static final AtomicReferenceFieldUpdater<InstrumentationContexts, Context>
24+
parentContextUpdater =
25+
AtomicReferenceFieldUpdater.newUpdater(
26+
InstrumentationContexts.class, Context.class, "parentContext");
27+
2228
private volatile Context parentContext;
2329
private volatile Timer timer;
2430
// on retries, reactor-netty starts the next resend attempt before it ends the previous one (i.e.
@@ -27,8 +33,11 @@ final class InstrumentationContexts {
2733
private final Queue<RequestAndContext> clientContexts = new LinkedBlockingQueue<>();
2834

2935
void initialize(Context parentContext) {
30-
this.parentContext = HttpClientResendCount.initialize(parentContext);
31-
timer = Timer.start();
36+
Context parentContextWithResends = HttpClientResendCount.initialize(parentContext);
37+
// make sure initialization happens only once
38+
if (parentContextUpdater.compareAndSet(this, null, parentContextWithResends)) {
39+
timer = Timer.start();
40+
}
3241
}
3342

3443
Context getParentContext() {

instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,16 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
3636
return hasClassesNamed("reactor.netty.transport.AddressUtils");
3737
}
3838

39+
@Override
40+
public boolean isHelperClass(String className) {
41+
return className.startsWith("reactor.netty.http.client.HttpClientConfigBuddy");
42+
}
43+
3944
@Override
4045
public List<TypeInstrumentation> typeInstrumentations() {
4146
return asList(
4247
new HttpClientInstrumentation(),
48+
new HttpClientConnectInstrumentation(),
4349
new ResponseReceiverInstrumentation(),
4450
new TransportConnectorInstrumentation());
4551
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package reactor.netty.http.client;
7+
8+
import java.util.function.Function;
9+
import reactor.core.publisher.Mono;
10+
import reactor.netty.Connection;
11+
12+
// class in reactor package to access package-private code
13+
public final class HttpClientConfigBuddy {
14+
15+
public static boolean hasDeferredConfig(HttpClientConfig config) {
16+
return config.deferredConf != null;
17+
}
18+
19+
public static Function<? super Mono<? extends Connection>, ? extends Mono<? extends Connection>>
20+
getConnector(HttpClientConfig config) {
21+
return config.connector == null ? Function.identity() : config.connector;
22+
}
23+
24+
private HttpClientConfigBuddy() {}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
7+
8+
import io.netty.channel.ChannelOption;
9+
import io.netty.handler.codec.http.HttpMethod;
10+
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
11+
import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames;
12+
import java.net.URI;
13+
import java.util.Map;
14+
import reactor.core.publisher.Mono;
15+
import reactor.netty.http.client.HttpClient;
16+
17+
class ReactorNettyHttpClientDeferredHeadersTest extends AbstractReactorNettyHttpClientTest {
18+
19+
@Override
20+
protected HttpClient createHttpClient() {
21+
int connectionTimeoutMillis = (int) CONNECTION_TIMEOUT.toMillis();
22+
return HttpClient.create()
23+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMillis)
24+
.resolver(getAddressResolverGroup())
25+
.headers(headers -> headers.set(HttpHeaderNames.USER_AGENT, USER_AGENT));
26+
}
27+
28+
@Override
29+
public HttpClient.ResponseReceiver<?> buildRequest(
30+
String method, URI uri, Map<String, String> headers) {
31+
HttpClient client =
32+
createHttpClient()
33+
.followRedirect(true)
34+
.headersWhen(
35+
h -> {
36+
headers.forEach(h::add);
37+
return Mono.just(h);
38+
})
39+
.baseUrl(resolveAddress("").toString());
40+
if (uri.toString().contains("/read-timeout")) {
41+
client = client.responseTimeout(READ_TIMEOUT);
42+
}
43+
return client.request(HttpMethod.valueOf(method)).uri(uri.toString());
44+
}
45+
46+
@Override
47+
protected void configure(HttpClientTestOptions.Builder optionsBuilder) {
48+
super.configure(optionsBuilder);
49+
50+
// these scenarios don't work because deferred config does not apply the doOnRequestError()
51+
// callback
52+
optionsBuilder.disableTestReadTimeout();
53+
optionsBuilder.disableTestConnectionFailure();
54+
optionsBuilder.disableTestRemoteConnection();
55+
}
56+
}

0 commit comments

Comments
 (0)