From 6977417945355a2f49b88b4803eb9d390a6e52b5 Mon Sep 17 00:00:00 2001 From: Pankaj Kumar Date: Fri, 20 Sep 2024 17:28:18 +0530 Subject: [PATCH] Adding channel acquire time metric for http client --- .../java/util/emitter/core/NoopEmitter.java | 9 +++- .../java/util/http/client/HttpClientInit.java | 10 +++-- .../pool/MetricsEmittingResourcePool.java | 45 +++++++++++++++++++ .../util/http/client/FriendlyServersTest.java | 13 +++--- .../util/http/client/JankyServersTest.java | 19 ++++---- 5 files changed, 76 insertions(+), 20 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/java/util/http/client/pool/MetricsEmittingResourcePool.java diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/core/NoopEmitter.java b/processing/src/main/java/org/apache/druid/java/util/emitter/core/NoopEmitter.java index 072f168e970..ecbdb98416c 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/core/NoopEmitter.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/core/NoopEmitter.java @@ -19,10 +19,17 @@ package org.apache.druid.java.util.emitter.core; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; + /** */ -public class NoopEmitter implements Emitter +public class NoopEmitter extends ServiceEmitter implements Emitter { + public NoopEmitter() + { + super("", "", null); + } + @Override public void start() { diff --git a/processing/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java b/processing/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java index 602a8a6e261..5b9b10f9bcc 100644 --- a/processing/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java +++ b/processing/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java @@ -22,9 +22,10 @@ package org.apache.druid.java.util.http.client; import com.google.common.base.Throwables; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory; import org.apache.druid.java.util.http.client.pool.ChannelResourceFactory; -import org.apache.druid.java.util.http.client.pool.ResourcePool; +import org.apache.druid.java.util.http.client.pool.MetricsEmittingResourcePool; import org.apache.druid.java.util.http.client.pool.ResourcePoolConfig; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.socket.nio.NioClientBossPool; @@ -48,7 +49,7 @@ import java.util.concurrent.TimeUnit; */ public class HttpClientInit { - public static HttpClient createClient(HttpClientConfig config, Lifecycle lifecycle) + public static HttpClient createClient(HttpClientConfig config, Lifecycle lifecycle, ServiceEmitter emitter) { try { // We need to use the full constructor in order to set a ThreadNameDeterminer. The other parameters are taken @@ -80,7 +81,7 @@ public class HttpClientInit ); return lifecycle.addMaybeStartManagedInstance( new NettyHttpClient( - new ResourcePool<>( + new MetricsEmittingResourcePool<>( new ChannelResourceFactory( createBootstrap(lifecycle, timer, config.getBossPoolSize(), config.getWorkerPoolSize()), config.getSslContext(), @@ -92,7 +93,8 @@ public class HttpClientInit config.getNumConnections(), config.getUnusedConnectionTimeoutDuration().getMillis() ), - config.isEagerInitialization() + config.isEagerInitialization(), + emitter ), config.getReadTimeout(), config.getCompressionCodec(), diff --git a/processing/src/main/java/org/apache/druid/java/util/http/client/pool/MetricsEmittingResourcePool.java b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/MetricsEmittingResourcePool.java new file mode 100644 index 00000000000..1f5a3fa5086 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/MetricsEmittingResourcePool.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.http.client.pool; + +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; + +public class MetricsEmittingResourcePool extends ResourcePool +{ + private final ServiceEmitter emitter; + + public MetricsEmittingResourcePool(ResourceFactory factory, ResourcePoolConfig config, boolean eagerInitialization, ServiceEmitter emitter) + { + super(factory, config, eagerInitialization); + this.emitter = emitter; + } + + @Override + public ResourceContainer take(final K key) + { + long startTime = System.nanoTime(); + ResourceContainer retVal = super.take(key); + long totalduration = System.nanoTime() - startTime; + emitter.emit(ServiceMetricEvent.builder().setDimension("destination", key.toString()).setMetric("httpClient/channelAcquire/time", totalduration)); + return retVal; + } + +} diff --git a/processing/src/test/java/org/apache/druid/java/util/http/client/FriendlyServersTest.java b/processing/src/test/java/org/apache/druid/java/util/http/client/FriendlyServersTest.java index 4df73564389..f0526532081 100644 --- a/processing/src/test/java/org/apache/druid/java/util/http/client/FriendlyServersTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/http/client/FriendlyServersTest.java @@ -22,6 +22,7 @@ package org.apache.druid.java.util.http.client; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.eclipse.jetty.server.Connector; @@ -95,7 +96,7 @@ public class FriendlyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final StatusResponseHolder response = client .go( new Request( @@ -165,7 +166,7 @@ public class FriendlyServersTest new HttpClientProxyConfig("localhost", serverSocket.getLocalPort(), "bob", "sally") ) .build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final StatusResponseHolder response = client .go( new Request( @@ -232,7 +233,7 @@ public class FriendlyServersTest final HttpClientConfig config = HttpClientConfig.builder() .withCompressionCodec(HttpClientConfig.CompressionCodec.IDENTITY) .build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final StatusResponseHolder response = client .go( new Request( @@ -283,12 +284,12 @@ public class FriendlyServersTest try { final SSLContext mySsl = HttpClientInit.sslContextWithTrustedKeyStore(keyStorePath, "abc123"); final HttpClientConfig trustingConfig = HttpClientConfig.builder().withSslContext(mySsl).build(); - final HttpClient trustingClient = HttpClientInit.createClient(trustingConfig, lifecycle); + final HttpClient trustingClient = HttpClientInit.createClient(trustingConfig, lifecycle, new NoopEmitter()); final HttpClientConfig skepticalConfig = HttpClientConfig.builder() .withSslContext(SSLContext.getDefault()) .build(); - final HttpClient skepticalClient = HttpClientInit.createClient(skepticalConfig, lifecycle); + final HttpClient skepticalClient = HttpClientInit.createClient(skepticalConfig, lifecycle, new NoopEmitter()); // Correct name ("localhost") { @@ -364,7 +365,7 @@ public class FriendlyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); { final HttpResponseStatus status = client diff --git a/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java b/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java index ec54bd13500..3cce8fb6bec 100644 --- a/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java @@ -22,6 +22,7 @@ package org.apache.druid.java.util.http.client; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.jboss.netty.channel.ChannelException; @@ -155,7 +156,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().withReadTimeout(new Duration(100)).build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final ListenableFuture future = client .go( new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", silentServerSocket.getLocalPort()))), @@ -183,7 +184,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().withReadTimeout(new Duration(86400L * 365)).build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final ListenableFuture future = client .go( new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", silentServerSocket.getLocalPort()))), @@ -215,7 +216,7 @@ public class JankyServersTest .withSslContext(SSLContext.getDefault()) .withSslHandshakeTimeout(new Duration(100)) .build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final ListenableFuture response = client .go( @@ -244,7 +245,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final ListenableFuture response = client .go( new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", closingServerSocket.getLocalPort()))), @@ -272,7 +273,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final ListenableFuture response = client .go( @@ -302,7 +303,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); // Need to select a port that isn't being listened to. This approach finds an unused port in a racey way. // Hopefully it works most of the time. @@ -338,7 +339,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); // Need to select a port that isn't being listened to. This approach finds an unused port in a racey way. // Hopefully it works most of the time. @@ -381,7 +382,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final ListenableFuture response = client .go( new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", echoServerSocket.getLocalPort()))), @@ -404,7 +405,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final ListenableFuture response = client .go(