Adding channel acquire time metric for http client

This commit is contained in:
Pankaj Kumar 2024-09-20 17:28:18 +05:30
parent 36a40c63f7
commit 6977417945
5 changed files with 76 additions and 20 deletions

View File

@ -19,10 +19,17 @@
package org.apache.druid.java.util.emitter.core;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
/**
*/
public class NoopEmitter implements Emitter
public class NoopEmitter extends ServiceEmitter implements Emitter
{
public NoopEmitter()
{
super("", "", null);
}
@Override
public void start()
{

View File

@ -22,9 +22,10 @@ package org.apache.druid.java.util.http.client;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory;
import org.apache.druid.java.util.http.client.pool.ChannelResourceFactory;
import org.apache.druid.java.util.http.client.pool.ResourcePool;
import org.apache.druid.java.util.http.client.pool.MetricsEmittingResourcePool;
import org.apache.druid.java.util.http.client.pool.ResourcePoolConfig;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.socket.nio.NioClientBossPool;
@ -48,7 +49,7 @@ import java.util.concurrent.TimeUnit;
*/
public class HttpClientInit
{
public static HttpClient createClient(HttpClientConfig config, Lifecycle lifecycle)
public static HttpClient createClient(HttpClientConfig config, Lifecycle lifecycle, ServiceEmitter emitter)
{
try {
// We need to use the full constructor in order to set a ThreadNameDeterminer. The other parameters are taken
@ -80,7 +81,7 @@ public class HttpClientInit
);
return lifecycle.addMaybeStartManagedInstance(
new NettyHttpClient(
new ResourcePool<>(
new MetricsEmittingResourcePool<>(
new ChannelResourceFactory(
createBootstrap(lifecycle, timer, config.getBossPoolSize(), config.getWorkerPoolSize()),
config.getSslContext(),
@ -92,7 +93,8 @@ public class HttpClientInit
config.getNumConnections(),
config.getUnusedConnectionTimeoutDuration().getMillis()
),
config.isEagerInitialization()
config.isEagerInitialization(),
emitter
),
config.getReadTimeout(),
config.getCompressionCodec(),

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.http.client.pool;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
public class MetricsEmittingResourcePool<K, V> extends ResourcePool<K, V>
{
private final ServiceEmitter emitter;
public MetricsEmittingResourcePool(ResourceFactory factory, ResourcePoolConfig config, boolean eagerInitialization, ServiceEmitter emitter)
{
super(factory, config, eagerInitialization);
this.emitter = emitter;
}
@Override
public ResourceContainer<V> take(final K key)
{
long startTime = System.nanoTime();
ResourceContainer<V> retVal = super.take(key);
long totalduration = System.nanoTime() - startTime;
emitter.emit(ServiceMetricEvent.builder().setDimension("destination", key.toString()).setMetric("httpClient/channelAcquire/time", totalduration));
return retVal;
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.java.util.http.client;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.eclipse.jetty.server.Connector;
@ -95,7 +96,7 @@ public class FriendlyServersTest
final Lifecycle lifecycle = new Lifecycle();
try {
final HttpClientConfig config = HttpClientConfig.builder().build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter());
final StatusResponseHolder response = client
.go(
new Request(
@ -165,7 +166,7 @@ public class FriendlyServersTest
new HttpClientProxyConfig("localhost", serverSocket.getLocalPort(), "bob", "sally")
)
.build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter());
final StatusResponseHolder response = client
.go(
new Request(
@ -232,7 +233,7 @@ public class FriendlyServersTest
final HttpClientConfig config = HttpClientConfig.builder()
.withCompressionCodec(HttpClientConfig.CompressionCodec.IDENTITY)
.build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter());
final StatusResponseHolder response = client
.go(
new Request(
@ -283,12 +284,12 @@ public class FriendlyServersTest
try {
final SSLContext mySsl = HttpClientInit.sslContextWithTrustedKeyStore(keyStorePath, "abc123");
final HttpClientConfig trustingConfig = HttpClientConfig.builder().withSslContext(mySsl).build();
final HttpClient trustingClient = HttpClientInit.createClient(trustingConfig, lifecycle);
final HttpClient trustingClient = HttpClientInit.createClient(trustingConfig, lifecycle, new NoopEmitter());
final HttpClientConfig skepticalConfig = HttpClientConfig.builder()
.withSslContext(SSLContext.getDefault())
.build();
final HttpClient skepticalClient = HttpClientInit.createClient(skepticalConfig, lifecycle);
final HttpClient skepticalClient = HttpClientInit.createClient(skepticalConfig, lifecycle, new NoopEmitter());
// Correct name ("localhost")
{
@ -364,7 +365,7 @@ public class FriendlyServersTest
final Lifecycle lifecycle = new Lifecycle();
try {
final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter());
{
final HttpResponseStatus status = client

View File

@ -22,6 +22,7 @@ package org.apache.druid.java.util.http.client;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.jboss.netty.channel.ChannelException;
@ -155,7 +156,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle();
try {
final HttpClientConfig config = HttpClientConfig.builder().withReadTimeout(new Duration(100)).build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter());
final ListenableFuture<StatusResponseHolder> future = client
.go(
new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", silentServerSocket.getLocalPort()))),
@ -183,7 +184,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle();
try {
final HttpClientConfig config = HttpClientConfig.builder().withReadTimeout(new Duration(86400L * 365)).build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter());
final ListenableFuture<StatusResponseHolder> future = client
.go(
new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", silentServerSocket.getLocalPort()))),
@ -215,7 +216,7 @@ public class JankyServersTest
.withSslContext(SSLContext.getDefault())
.withSslHandshakeTimeout(new Duration(100))
.build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter());
final ListenableFuture<StatusResponseHolder> response = client
.go(
@ -244,7 +245,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle();
try {
final HttpClientConfig config = HttpClientConfig.builder().build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter());
final ListenableFuture<StatusResponseHolder> response = client
.go(
new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", closingServerSocket.getLocalPort()))),
@ -272,7 +273,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle();
try {
final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter());
final ListenableFuture<StatusResponseHolder> response = client
.go(
@ -302,7 +303,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle();
try {
final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter());
// Need to select a port that isn't being listened to. This approach finds an unused port in a racey way.
// Hopefully it works most of the time.
@ -338,7 +339,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle();
try {
final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter());
// Need to select a port that isn't being listened to. This approach finds an unused port in a racey way.
// Hopefully it works most of the time.
@ -381,7 +382,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle();
try {
final HttpClientConfig config = HttpClientConfig.builder().build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter());
final ListenableFuture<StatusResponseHolder> response = client
.go(
new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", echoServerSocket.getLocalPort()))),
@ -404,7 +405,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle();
try {
final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter());
final ListenableFuture<StatusResponseHolder> response = client
.go(