From 3bc73e0f435e9bd19ebcb8e440ccb9ce1afb43f1 Mon Sep 17 00:00:00 2001 From: PANKAJ KUMAR <87029331+Pankaj260100@users.noreply.github.com> Date: Mon, 7 Oct 2024 13:21:00 +0530 Subject: [PATCH] [OBSDATA-1786]: Adding Channel acquire time metric for netty HTTP client (#237) --- docs/operations/metrics.md | 2 +- .../druid/tests/security/ITTLSTest.java | 8 +- .../java/util/http/client/HttpClientInit.java | 17 +- .../util/http/client/NettyHttpClient.java | 3 +- .../client/pool/DefaultResourcePoolImpl.java | 381 ++++++++++++++++++ .../pool/MetricsEmittingResourcePoolImpl.java | 55 +++ .../util/http/client/pool/ResourcePool.java | 355 +--------------- .../util/http/client/FriendlyServersTest.java | 13 +- .../util/http/client/JankyServersTest.java | 18 +- .../http/client/pool/ResourcePoolTest.java | 6 +- .../server/initialization/BaseJettyTest.java | 3 +- .../initialization/JettyCertRenewTest.java | 3 +- .../server/initialization/JettyTest.java | 3 +- 13 files changed, 481 insertions(+), 386 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/java/util/http/client/pool/DefaultResourcePoolImpl.java create mode 100644 processing/src/main/java/org/apache/druid/java/util/http/client/pool/MetricsEmittingResourcePoolImpl.java diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index d9bebf12e7d..9ebfd7e6054 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -86,7 +86,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be materialized as frames due other reasons.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given row limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given byte limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | -|`httpClient/channelAcquire/time`|Time in nannoseconds spent by the httpclient to acquire the channel.| | +|`httpClient/channelAcquire/timeNs`|Time in nanoseconds spent by the httpclient to acquire the channel.|`server`| few milliseconds| ### Historical diff --git a/integration-tests/src/test/java/org/apache/druid/tests/security/ITTLSTest.java b/integration-tests/src/test/java/org/apache/druid/tests/security/ITTLSTest.java index e8983b474cc..2016489a470 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/security/ITTLSTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/security/ITTLSTest.java @@ -29,6 +29,8 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.CredentialedHttpClient; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.HttpClientConfig; @@ -392,7 +394,8 @@ public class ITTLSTest HttpClient client = HttpClientInit.createClient( builder.build(), - lifecycle + lifecycle, + new ServiceEmitter("", "", new NoopEmitter()) ); HttpClient adminClient = new CredentialedHttpClient( @@ -419,7 +422,8 @@ public class ITTLSTest HttpClient client = HttpClientInit.createClient( builder.build(), - lifecycle + lifecycle, + new ServiceEmitter("", "", new NoopEmitter()) ); HttpClient adminClient = new CredentialedHttpClient( diff --git a/processing/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java b/processing/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java index 5b9b10f9bcc..2ed2e950e15 100644 --- a/processing/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java +++ b/processing/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java @@ -25,7 +25,8 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory; import org.apache.druid.java.util.http.client.pool.ChannelResourceFactory; -import org.apache.druid.java.util.http.client.pool.MetricsEmittingResourcePool; +import org.apache.druid.java.util.http.client.pool.DefaultResourcePoolImpl; +import org.apache.druid.java.util.http.client.pool.MetricsEmittingResourcePoolImpl; import org.apache.druid.java.util.http.client.pool.ResourcePoolConfig; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.socket.nio.NioClientBossPool; @@ -81,19 +82,21 @@ public class HttpClientInit ); return lifecycle.addMaybeStartManagedInstance( new NettyHttpClient( - new MetricsEmittingResourcePool<>( - new ChannelResourceFactory( - createBootstrap(lifecycle, timer, config.getBossPoolSize(), config.getWorkerPoolSize()), + new MetricsEmittingResourcePoolImpl<>( + new DefaultResourcePoolImpl<>( + new ChannelResourceFactory( + createBootstrap(lifecycle, timer, config.getBossPoolSize(), config.getWorkerPoolSize()), config.getSslContext(), config.getProxyConfig(), timer, config.getSslHandshakeTimeout() == null ? -1 : config.getSslHandshakeTimeout().getMillis() - ), - new ResourcePoolConfig( + ), + new ResourcePoolConfig( config.getNumConnections(), config.getUnusedConnectionTimeoutDuration().getMillis() + ), + config.isEagerInitialization() ), - config.isEagerInitialization(), emitter ), config.getReadTimeout(), diff --git a/processing/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java b/processing/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java index 3ab3719180f..b5f05bad02e 100644 --- a/processing/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java +++ b/processing/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java @@ -55,6 +55,7 @@ import org.jboss.netty.handler.timeout.ReadTimeoutHandler; import org.jboss.netty.util.Timer; import org.joda.time.Duration; +import java.io.IOException; import java.net.URL; import java.util.Collection; import java.util.Map; @@ -100,7 +101,7 @@ public class NettyHttpClient extends AbstractHttpClient } @LifecycleStop - public void stop() + public void stop() throws IOException { pool.close(); } diff --git a/processing/src/main/java/org/apache/druid/java/util/http/client/pool/DefaultResourcePoolImpl.java b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/DefaultResourcePoolImpl.java new file mode 100644 index 00000000000..5cf38bb2dbb --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/DefaultResourcePoolImpl.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.http.client.pool; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A resource pool based on {@link LoadingCache}. When a resource is first requested for a new key, + * If the flag: eagerInitialization is true: use {@link EagerCreationResourceHolder} + * {@link ResourcePoolConfig#getMaxPerKey()} resources are initialized and cached in the {@link #pool}. + * Else: + * Initialize a single resource and further lazily using {@link LazyCreationResourceHolder} + * The individual resource in {@link ResourceHolderPerKey} is valid while (current time - last access time) + * <= {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}. + * + * A resource is closed and reinitialized if {@link ResourceFactory#isGood} returns false or it's expired based on + * {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}. + * + * {@link ResourcePoolConfig#getMaxPerKey() is a hard limit for the max number of resources per cache entry. The total + * number of resources in {@link ResourceHolderPerKey} cannot be larger than the limit in any case. + */ +public class DefaultResourcePoolImpl implements ResourcePool +{ + private static final Logger log = new Logger(DefaultResourcePoolImpl.class); + private final LoadingCache> pool; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public DefaultResourcePoolImpl(final ResourceFactory factory, final ResourcePoolConfig config, + final boolean eagerInitialization) + { + this.pool = CacheBuilder.newBuilder().build( + new CacheLoader>() + { + @Override + public ResourceHolderPerKey load(K input) + { + if (eagerInitialization) { + return new EagerCreationResourceHolder<>( + config.getMaxPerKey(), + config.getUnusedConnectionTimeoutMillis(), + input, + factory + ); + } else { + return new LazyCreationResourceHolder<>( + config.getMaxPerKey(), + config.getUnusedConnectionTimeoutMillis(), + input, + factory + ); + } + } + } + ); + } + + /** + * Returns a {@link ResourceContainer} for the given key or null if this pool is already closed. + */ + @Override + public ResourceContainer take(final K key) + { + if (closed.get()) { + log.error(StringUtils.format("take(%s) called even though I'm closed.", key)); + return null; + } + + final ResourceHolderPerKey holder; + try { + holder = pool.get(key); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + final V value = holder.get(); + + return new ResourceContainer() + { + private final AtomicBoolean returned = new AtomicBoolean(false); + + @Override + public V get() + { + Preconditions.checkState(!returned.get(), "Resource for key[%s] has been returned, cannot get().", key); + return value; + } + + @Override + public void returnResource() + { + if (returned.getAndSet(true)) { + log.warn("Resource at key[%s] was returned multiple times?", key); + } else { + holder.giveBack(value); + } + } + + @Override + protected void finalize() throws Throwable + { + if (!returned.get()) { + log.warn( + StringUtils.format( + "Resource[%s] at key[%s] was not returned before Container was finalized, potential resource leak.", + value, + key + ) + ); + returnResource(); + } + super.finalize(); + } + }; + } + + @Override + public void close() + { + closed.set(true); + final ConcurrentMap> mapView = pool.asMap(); + Closer closer = Closer.create(); + for (Iterator>> iterator = + mapView.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry> e = iterator.next(); + iterator.remove(); + closer.register(e.getValue()); + } + try { + closer.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static class EagerCreationResourceHolder extends LazyCreationResourceHolder + { + private EagerCreationResourceHolder( + int maxSize, + long unusedResourceTimeoutMillis, + K key, + ResourceFactory factory + ) + { + super(maxSize, unusedResourceTimeoutMillis, key, factory); + // Eagerly Instantiate + for (int i = 0; i < maxSize; i++) { + resourceHolderList.add( + new ResourceHolder<>( + System.currentTimeMillis(), + Preconditions.checkNotNull( + factory.generate(key), + "factory.generate(key)" + ) + ) + ); + } + } + } + + private static class LazyCreationResourceHolder extends ResourceHolderPerKey + { + private LazyCreationResourceHolder( + int maxSize, + long unusedResourceTimeoutMillis, + K key, + ResourceFactory factory + ) + { + super(maxSize, unusedResourceTimeoutMillis, key, factory); + } + } + + private static class ResourceHolderPerKey implements Closeable + { + protected final int maxSize; + private final K key; + private final ResourceFactory factory; + private final long unusedResourceTimeoutMillis; + // Hold previously created / returned resources + protected final ArrayDeque> resourceHolderList; + // To keep track of resources that have been successfully returned to caller. + private int numLentResources = 0; + private boolean closed = false; + + protected ResourceHolderPerKey( + int maxSize, + long unusedResourceTimeoutMillis, + K key, + ResourceFactory factory + ) + { + this.maxSize = maxSize; + this.key = key; + this.factory = factory; + this.unusedResourceTimeoutMillis = unusedResourceTimeoutMillis; + this.resourceHolderList = new ArrayDeque<>(); + } + + /** + * Returns a resource or null if this holder is already closed or the current thread is interrupted. + * + * Try to return a previously created resource if it isGood(). Else, generate a new resource + */ + @Nullable + V get() + { + final V poolVal; + // resourceHolderList can't have nulls, so we'll use a null to signal that we need to create a new resource. + boolean expired = false; + synchronized (this) { + while (!closed && (numLentResources == maxSize)) { + try { + log.debug("Thread [%s] is blocked waiting for resource for key [%s]", Thread.currentThread().getName(), key); + this.wait(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } + + if (closed) { + log.info(StringUtils.format("get() called even though I'm closed. key[%s]", key)); + return null; + } else if (numLentResources < maxSize) { + // Attempt to take an existing resource or create one if list is empty, and increment numLentResources + if (resourceHolderList.isEmpty()) { + poolVal = factory.generate(key); + } else { + ResourceHolder holder = resourceHolderList.removeFirst(); + poolVal = holder.getResource(); + if (System.currentTimeMillis() - holder.getLastAccessedTime() > unusedResourceTimeoutMillis) { + expired = true; + } + } + numLentResources++; + } else { + throw new IllegalStateException("Unexpected state: More objects lent than permissible"); + } + } + + final V retVal; + // At this point, we must either return a valid resource. Or throw and exception decrement "numLentResources" + try { + if (poolVal != null && !expired && factory.isGood(poolVal)) { + retVal = poolVal; + } else { + if (poolVal != null) { + factory.close(poolVal); + } + retVal = factory.generate(key); + } + } + catch (Throwable e) { + synchronized (this) { + numLentResources--; + this.notifyAll(); + } + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); + } + + return retVal; + } + + void giveBack(V object) + { + Preconditions.checkNotNull(object, "object"); + + synchronized (this) { + if (closed) { + log.info(StringUtils.format("giveBack called after being closed. key[%s]", key)); + factory.close(object); + return; + } + + if (resourceHolderList.size() >= maxSize) { + if (holderListContains(object)) { + log.warn( + new Exception("Exception for stacktrace"), + StringUtils.format( + "Returning object[%s] at key[%s] that has already been returned!? Skipping", + object, + key + ) + ); + } else { + log.warn( + new Exception("Exception for stacktrace"), + StringUtils.format( + "Returning object[%s] at key[%s] even though we already have all that we can hold[%s]!? Skipping", + object, + key, + resourceHolderList + ) + ); + } + return; + } + + resourceHolderList.addLast(new ResourceHolder<>(System.currentTimeMillis(), object)); + numLentResources--; + this.notifyAll(); + } + } + + private boolean holderListContains(V object) + { + return resourceHolderList.stream().anyMatch(a -> a.getResource().equals(object)); + } + + @Override + public void close() + { + synchronized (this) { + closed = true; + resourceHolderList.forEach(v -> factory.close(v.getResource())); + resourceHolderList.clear(); + this.notifyAll(); + } + } + } + + private static class ResourceHolder + { + private final long lastAccessedTime; + private final V resource; + + private ResourceHolder(long lastAccessedTime, V resource) + { + this.resource = resource; + this.lastAccessedTime = lastAccessedTime; + } + + private long getLastAccessedTime() + { + return lastAccessedTime; + } + + public V getResource() + { + return resource; + } + + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/http/client/pool/MetricsEmittingResourcePoolImpl.java b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/MetricsEmittingResourcePoolImpl.java new file mode 100644 index 00000000000..a02a9ed4a90 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/MetricsEmittingResourcePoolImpl.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.http.client.pool; + +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; + +import java.io.IOException; + +public class MetricsEmittingResourcePoolImpl implements ResourcePool +{ + private final ServiceEmitter emitter; + private final ResourcePool resourcePool; + + public MetricsEmittingResourcePoolImpl(ResourcePool resourcePool, ServiceEmitter emitter) + { + this.resourcePool = resourcePool; + Preconditions.checkNotNull(emitter, "emitter cannot be null"); + this.emitter = emitter; + } + + @Override + public ResourceContainer take(final K key) + { + long startTime = System.nanoTime(); + ResourceContainer retVal = resourcePool.take(key); + long totalduration = System.nanoTime() - startTime; + emitter.emit(ServiceMetricEvent.builder().setDimension("server", key.toString()).setMetric("httpClient/channelAcquire/timeNs", totalduration)); + return retVal; + } + + @Override + public void close() throws IOException + { + this.resourcePool.close(); + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java index 2476e4c9ebf..dd44e4fb9c1 100644 --- a/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java +++ b/processing/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java @@ -19,362 +19,11 @@ package org.apache.druid.java.util.http.client.pool; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.logger.Logger; - -import javax.annotation.Nullable; import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; -/** - * A resource pool based on {@link LoadingCache}. When a resource is first requested for a new key, - * If the flag: eagerInitialization is true: use {@link EagerCreationResourceHolder} - * {@link ResourcePoolConfig#getMaxPerKey()} resources are initialized and cached in the {@link #pool}. - * Else: - * Initialize a single resource and further lazily using {@link LazyCreationResourceHolder} - * The individual resource in {@link ResourceHolderPerKey} is valid while (current time - last access time) - * <= {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}. - * - * A resource is closed and reinitialized if {@link ResourceFactory#isGood} returns false or it's expired based on - * {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}. - * - * {@link ResourcePoolConfig#getMaxPerKey() is a hard limit for the max number of resources per cache entry. The total - * number of resources in {@link ResourceHolderPerKey} cannot be larger than the limit in any case. - */ -public class ResourcePool implements Closeable +public interface ResourcePool extends Closeable { - private static final Logger log = new Logger(ResourcePool.class); - private final LoadingCache> pool; - private final AtomicBoolean closed = new AtomicBoolean(false); - public ResourcePool(final ResourceFactory factory, final ResourcePoolConfig config, - final boolean eagerInitialization) - { - this.pool = CacheBuilder.newBuilder().build( - new CacheLoader>() - { - @Override - public ResourceHolderPerKey load(K input) - { - if (eagerInitialization) { - return new EagerCreationResourceHolder<>( - config.getMaxPerKey(), - config.getUnusedConnectionTimeoutMillis(), - input, - factory - ); - } else { - return new LazyCreationResourceHolder<>( - config.getMaxPerKey(), - config.getUnusedConnectionTimeoutMillis(), - input, - factory - ); - } - } - } - ); - } + ResourceContainer take(K key); - /** - * Returns a {@link ResourceContainer} for the given key or null if this pool is already closed. - */ - @Nullable - public ResourceContainer take(final K key) - { - if (closed.get()) { - log.error(StringUtils.format("take(%s) called even though I'm closed.", key)); - return null; - } - - final ResourceHolderPerKey holder; - try { - holder = pool.get(key); - } - catch (ExecutionException e) { - throw new RuntimeException(e); - } - final V value = holder.get(); - - return new ResourceContainer() - { - private final AtomicBoolean returned = new AtomicBoolean(false); - - @Override - public V get() - { - Preconditions.checkState(!returned.get(), "Resource for key[%s] has been returned, cannot get().", key); - return value; - } - - @Override - public void returnResource() - { - if (returned.getAndSet(true)) { - log.warn("Resource at key[%s] was returned multiple times?", key); - } else { - holder.giveBack(value); - } - } - - @Override - protected void finalize() throws Throwable - { - if (!returned.get()) { - log.warn( - StringUtils.format( - "Resource[%s] at key[%s] was not returned before Container was finalized, potential resource leak.", - value, - key - ) - ); - returnResource(); - } - super.finalize(); - } - }; - } - - @Override - public void close() - { - closed.set(true); - final ConcurrentMap> mapView = pool.asMap(); - Closer closer = Closer.create(); - for (Iterator>> iterator = - mapView.entrySet().iterator(); iterator.hasNext(); ) { - Map.Entry> e = iterator.next(); - iterator.remove(); - closer.register(e.getValue()); - } - try { - closer.close(); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - - private static class EagerCreationResourceHolder extends LazyCreationResourceHolder - { - private EagerCreationResourceHolder( - int maxSize, - long unusedResourceTimeoutMillis, - K key, - ResourceFactory factory - ) - { - super(maxSize, unusedResourceTimeoutMillis, key, factory); - // Eagerly Instantiate - for (int i = 0; i < maxSize; i++) { - resourceHolderList.add( - new ResourceHolder<>( - System.currentTimeMillis(), - Preconditions.checkNotNull( - factory.generate(key), - "factory.generate(key)" - ) - ) - ); - } - } - } - - private static class LazyCreationResourceHolder extends ResourceHolderPerKey - { - private LazyCreationResourceHolder( - int maxSize, - long unusedResourceTimeoutMillis, - K key, - ResourceFactory factory - ) - { - super(maxSize, unusedResourceTimeoutMillis, key, factory); - } - } - - private static class ResourceHolderPerKey implements Closeable - { - protected final int maxSize; - private final K key; - private final ResourceFactory factory; - private final long unusedResourceTimeoutMillis; - // Hold previously created / returned resources - protected final ArrayDeque> resourceHolderList; - // To keep track of resources that have been successfully returned to caller. - private int numLentResources = 0; - private boolean closed = false; - - protected ResourceHolderPerKey( - int maxSize, - long unusedResourceTimeoutMillis, - K key, - ResourceFactory factory - ) - { - this.maxSize = maxSize; - this.key = key; - this.factory = factory; - this.unusedResourceTimeoutMillis = unusedResourceTimeoutMillis; - this.resourceHolderList = new ArrayDeque<>(); - } - - /** - * Returns a resource or null if this holder is already closed or the current thread is interrupted. - * - * Try to return a previously created resource if it isGood(). Else, generate a new resource - */ - @Nullable - V get() - { - final V poolVal; - // resourceHolderList can't have nulls, so we'll use a null to signal that we need to create a new resource. - boolean expired = false; - synchronized (this) { - while (!closed && (numLentResources == maxSize)) { - try { - this.wait(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } - } - - if (closed) { - log.info(StringUtils.format("get() called even though I'm closed. key[%s]", key)); - return null; - } else if (numLentResources < maxSize) { - // Attempt to take an existing resource or create one if list is empty, and increment numLentResources - if (resourceHolderList.isEmpty()) { - poolVal = factory.generate(key); - } else { - ResourceHolder holder = resourceHolderList.removeFirst(); - poolVal = holder.getResource(); - if (System.currentTimeMillis() - holder.getLastAccessedTime() > unusedResourceTimeoutMillis) { - expired = true; - } - } - numLentResources++; - } else { - throw new IllegalStateException("Unexpected state: More objects lent than permissible"); - } - } - - final V retVal; - // At this point, we must either return a valid resource. Or throw and exception decrement "numLentResources" - try { - if (poolVal != null && !expired && factory.isGood(poolVal)) { - retVal = poolVal; - } else { - if (poolVal != null) { - factory.close(poolVal); - } - retVal = factory.generate(key); - } - } - catch (Throwable e) { - synchronized (this) { - numLentResources--; - this.notifyAll(); - } - Throwables.propagateIfPossible(e); - throw new RuntimeException(e); - } - - return retVal; - } - - void giveBack(V object) - { - Preconditions.checkNotNull(object, "object"); - - synchronized (this) { - if (closed) { - log.info(StringUtils.format("giveBack called after being closed. key[%s]", key)); - factory.close(object); - return; - } - - if (resourceHolderList.size() >= maxSize) { - if (holderListContains(object)) { - log.warn( - new Exception("Exception for stacktrace"), - StringUtils.format( - "Returning object[%s] at key[%s] that has already been returned!? Skipping", - object, - key - ) - ); - } else { - log.warn( - new Exception("Exception for stacktrace"), - StringUtils.format( - "Returning object[%s] at key[%s] even though we already have all that we can hold[%s]!? Skipping", - object, - key, - resourceHolderList - ) - ); - } - return; - } - - resourceHolderList.addLast(new ResourceHolder<>(System.currentTimeMillis(), object)); - numLentResources--; - this.notifyAll(); - } - } - - private boolean holderListContains(V object) - { - return resourceHolderList.stream().anyMatch(a -> a.getResource().equals(object)); - } - - @Override - public void close() - { - synchronized (this) { - closed = true; - resourceHolderList.forEach(v -> factory.close(v.getResource())); - resourceHolderList.clear(); - this.notifyAll(); - } - } - } - - private static class ResourceHolder - { - private final long lastAccessedTime; - private final V resource; - - private ResourceHolder(long lastAccessedTime, V resource) - { - this.resource = resource; - this.lastAccessedTime = lastAccessedTime; - } - - private long getLastAccessedTime() - { - return lastAccessedTime; - } - - public V getResource() - { - return resource; - } - - } } diff --git a/processing/src/test/java/org/apache/druid/java/util/http/client/FriendlyServersTest.java b/processing/src/test/java/org/apache/druid/java/util/http/client/FriendlyServersTest.java index f0526532081..d636e67d6f7 100644 --- a/processing/src/test/java/org/apache/druid/java/util/http/client/FriendlyServersTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/http/client/FriendlyServersTest.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.eclipse.jetty.server.Connector; @@ -96,7 +97,7 @@ public class FriendlyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter())); final StatusResponseHolder response = client .go( new Request( @@ -166,7 +167,7 @@ public class FriendlyServersTest new HttpClientProxyConfig("localhost", serverSocket.getLocalPort(), "bob", "sally") ) .build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter())); final StatusResponseHolder response = client .go( new Request( @@ -233,7 +234,7 @@ public class FriendlyServersTest final HttpClientConfig config = HttpClientConfig.builder() .withCompressionCodec(HttpClientConfig.CompressionCodec.IDENTITY) .build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter())); final StatusResponseHolder response = client .go( new Request( @@ -284,12 +285,12 @@ public class FriendlyServersTest try { final SSLContext mySsl = HttpClientInit.sslContextWithTrustedKeyStore(keyStorePath, "abc123"); final HttpClientConfig trustingConfig = HttpClientConfig.builder().withSslContext(mySsl).build(); - final HttpClient trustingClient = HttpClientInit.createClient(trustingConfig, lifecycle, new NoopEmitter()); + final HttpClient trustingClient = HttpClientInit.createClient(trustingConfig, lifecycle, new ServiceEmitter("", "", new NoopEmitter())); final HttpClientConfig skepticalConfig = HttpClientConfig.builder() .withSslContext(SSLContext.getDefault()) .build(); - final HttpClient skepticalClient = HttpClientInit.createClient(skepticalConfig, lifecycle, new NoopEmitter()); + final HttpClient skepticalClient = HttpClientInit.createClient(skepticalConfig, lifecycle, new ServiceEmitter("", "", new NoopEmitter())); // Correct name ("localhost") { @@ -365,7 +366,7 @@ public class FriendlyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter())); { final HttpResponseStatus status = client diff --git a/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java b/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java index 3cce8fb6bec..4d24f3f2dc5 100644 --- a/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/http/client/JankyServersTest.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.jboss.netty.channel.ChannelException; @@ -156,7 +157,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().withReadTimeout(new Duration(100)).build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter())); final ListenableFuture future = client .go( new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", silentServerSocket.getLocalPort()))), @@ -184,7 +185,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().withReadTimeout(new Duration(86400L * 365)).build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter())); final ListenableFuture future = client .go( new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", silentServerSocket.getLocalPort()))), @@ -216,8 +217,7 @@ public class JankyServersTest .withSslContext(SSLContext.getDefault()) .withSslHandshakeTimeout(new Duration(100)) .build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); - + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter())); final ListenableFuture response = client .go( new Request(HttpMethod.GET, new URL(StringUtils.format("https://localhost:%d/", silentServerSocket.getLocalPort()))), @@ -245,7 +245,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter())); final ListenableFuture response = client .go( new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", closingServerSocket.getLocalPort()))), @@ -273,8 +273,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); - + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter())); final ListenableFuture response = client .go( new Request(HttpMethod.GET, new URL(StringUtils.format("https://localhost:%d/", closingServerSocket.getLocalPort()))), @@ -382,7 +381,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter())); final ListenableFuture response = client .go( new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", echoServerSocket.getLocalPort()))), @@ -405,8 +404,7 @@ public class JankyServersTest final Lifecycle lifecycle = new Lifecycle(); try { final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build(); - final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); - + final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter())); final ListenableFuture response = client .go( new Request(HttpMethod.GET, new URL(StringUtils.format("https://localhost:%d/", echoServerSocket.getLocalPort()))), diff --git a/processing/src/test/java/org/apache/druid/java/util/http/client/pool/ResourcePoolTest.java b/processing/src/test/java/org/apache/druid/java/util/http/client/pool/ResourcePoolTest.java index 2961f650469..57e2516b083 100644 --- a/processing/src/test/java/org/apache/druid/java/util/http/client/pool/ResourcePoolTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/http/client/pool/ResourcePoolTest.java @@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit; public class ResourcePoolTest { ResourceFactory resourceFactory; - ResourcePool pool; + DefaultResourcePoolImpl pool; @Before public void setUp() @@ -52,7 +52,7 @@ public class ResourcePoolTest resourceFactory = (ResourceFactory) EasyMock.createMock(ResourceFactory.class); EasyMock.replay(resourceFactory); - pool = new ResourcePool( + pool = new DefaultResourcePoolImpl( resourceFactory, new ResourcePoolConfig(2, TimeUnit.MINUTES.toMillis(4)), eagerInitialization @@ -418,7 +418,7 @@ public class ResourcePoolTest { resourceFactory = (ResourceFactory) EasyMock.createMock(ResourceFactory.class); - pool = new ResourcePool( + pool = new DefaultResourcePoolImpl<>( resourceFactory, new ResourcePoolConfig(2, TimeUnit.MILLISECONDS.toMillis(10)), true diff --git a/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java b/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java index b27b08bfa28..1a34482b760 100644 --- a/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java +++ b/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java @@ -26,6 +26,7 @@ import com.google.inject.servlet.GuiceFilter; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.HttpClientConfig; import org.apache.druid.java.util.http.client.HttpClientInit; @@ -130,7 +131,7 @@ public abstract class BaseJettyTest .withEagerInitialization(true) .build(), druidLifecycle, - new NoopEmitter() + new ServiceEmitter("", "", new NoopEmitter()) ); } catch (Exception e) { diff --git a/server/src/test/java/org/apache/druid/server/initialization/JettyCertRenewTest.java b/server/src/test/java/org/apache/druid/server/initialization/JettyCertRenewTest.java index 3a0c5eca32d..4f361a2119c 100644 --- a/server/src/test/java/org/apache/druid/server/initialization/JettyCertRenewTest.java +++ b/server/src/test/java/org/apache/druid/server/initialization/JettyCertRenewTest.java @@ -35,6 +35,7 @@ import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.annotations.Self; import org.apache.druid.initialization.Initialization; import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.HttpClientConfig; import org.apache.druid.java.util.http.client.HttpClientInit; @@ -387,7 +388,7 @@ public class JettyCertRenewTest extends BaseJettyTest client = HttpClientInit.createClient( getSslConfig(), lifecycle, - new NoopEmitter() + new ServiceEmitter("", "", new NoopEmitter()) ); } catch (Exception e) { diff --git a/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java b/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java index a98f6874d19..4fac4ff563a 100644 --- a/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java +++ b/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java @@ -36,6 +36,7 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.initialization.Initialization; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.HttpClientConfig; import org.apache.druid.java.util.http.client.HttpClientInit; @@ -503,7 +504,7 @@ public class JettyTest extends BaseJettyTest client = HttpClientInit.createClient( sslConfig, lifecycle, - new NoopEmitter() + new ServiceEmitter("", "", new NoopEmitter()) ); } catch (Exception e) {