[OBSDATA-1786]: Adding Channel acquire time metric for netty HTTP client (#237)

This commit is contained in:
PANKAJ KUMAR 2024-10-07 13:21:00 +05:30 committed by Pankaj Kumar
parent 2b551b4816
commit 3bc73e0f43
13 changed files with 481 additions and 386 deletions

View File

@ -86,7 +86,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be materialized as frames due other reasons.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be materialized as frames due other reasons.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given row limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given row limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given byte limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given byte limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`httpClient/channelAcquire/time`|Time in nannoseconds spent by the httpclient to acquire the channel.| | |`httpClient/channelAcquire/timeNs`|Time in nanoseconds spent by the httpclient to acquire the channel.|`server`| few milliseconds|
### Historical ### Historical

View File

@ -29,6 +29,8 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.CredentialedHttpClient; import org.apache.druid.java.util.http.client.CredentialedHttpClient;
import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.HttpClientConfig; import org.apache.druid.java.util.http.client.HttpClientConfig;
@ -392,7 +394,8 @@ public class ITTLSTest
HttpClient client = HttpClientInit.createClient( HttpClient client = HttpClientInit.createClient(
builder.build(), builder.build(),
lifecycle lifecycle,
new ServiceEmitter("", "", new NoopEmitter())
); );
HttpClient adminClient = new CredentialedHttpClient( HttpClient adminClient = new CredentialedHttpClient(
@ -419,7 +422,8 @@ public class ITTLSTest
HttpClient client = HttpClientInit.createClient( HttpClient client = HttpClientInit.createClient(
builder.build(), builder.build(),
lifecycle lifecycle,
new ServiceEmitter("", "", new NoopEmitter())
); );
HttpClient adminClient = new CredentialedHttpClient( HttpClient adminClient = new CredentialedHttpClient(

View File

@ -25,7 +25,8 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory; import org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory;
import org.apache.druid.java.util.http.client.pool.ChannelResourceFactory; import org.apache.druid.java.util.http.client.pool.ChannelResourceFactory;
import org.apache.druid.java.util.http.client.pool.MetricsEmittingResourcePool; import org.apache.druid.java.util.http.client.pool.DefaultResourcePoolImpl;
import org.apache.druid.java.util.http.client.pool.MetricsEmittingResourcePoolImpl;
import org.apache.druid.java.util.http.client.pool.ResourcePoolConfig; import org.apache.druid.java.util.http.client.pool.ResourcePoolConfig;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.socket.nio.NioClientBossPool; import org.jboss.netty.channel.socket.nio.NioClientBossPool;
@ -81,7 +82,8 @@ public class HttpClientInit
); );
return lifecycle.addMaybeStartManagedInstance( return lifecycle.addMaybeStartManagedInstance(
new NettyHttpClient( new NettyHttpClient(
new MetricsEmittingResourcePool<>( new MetricsEmittingResourcePoolImpl<>(
new DefaultResourcePoolImpl<>(
new ChannelResourceFactory( new ChannelResourceFactory(
createBootstrap(lifecycle, timer, config.getBossPoolSize(), config.getWorkerPoolSize()), createBootstrap(lifecycle, timer, config.getBossPoolSize(), config.getWorkerPoolSize()),
config.getSslContext(), config.getSslContext(),
@ -93,7 +95,8 @@ public class HttpClientInit
config.getNumConnections(), config.getNumConnections(),
config.getUnusedConnectionTimeoutDuration().getMillis() config.getUnusedConnectionTimeoutDuration().getMillis()
), ),
config.isEagerInitialization(), config.isEagerInitialization()
),
emitter emitter
), ),
config.getReadTimeout(), config.getReadTimeout(),

View File

@ -55,6 +55,7 @@ import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
import org.jboss.netty.util.Timer; import org.jboss.netty.util.Timer;
import org.joda.time.Duration; import org.joda.time.Duration;
import java.io.IOException;
import java.net.URL; import java.net.URL;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
@ -100,7 +101,7 @@ public class NettyHttpClient extends AbstractHttpClient
} }
@LifecycleStop @LifecycleStop
public void stop() public void stop() throws IOException
{ {
pool.close(); pool.close();
} }

View File

@ -0,0 +1,381 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.http.client.pool;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A resource pool based on {@link LoadingCache}. When a resource is first requested for a new key,
* If the flag: eagerInitialization is true: use {@link EagerCreationResourceHolder}
* {@link ResourcePoolConfig#getMaxPerKey()} resources are initialized and cached in the {@link #pool}.
* Else:
* Initialize a single resource and further lazily using {@link LazyCreationResourceHolder}
* The individual resource in {@link ResourceHolderPerKey} is valid while (current time - last access time)
* <= {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}.
*
* A resource is closed and reinitialized if {@link ResourceFactory#isGood} returns false or it's expired based on
* {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}.
*
* {@link ResourcePoolConfig#getMaxPerKey() is a hard limit for the max number of resources per cache entry. The total
* number of resources in {@link ResourceHolderPerKey} cannot be larger than the limit in any case.
*/
public class DefaultResourcePoolImpl<K, V> implements ResourcePool<K, V>
{
private static final Logger log = new Logger(DefaultResourcePoolImpl.class);
private final LoadingCache<K, ResourceHolderPerKey<K, V>> pool;
private final AtomicBoolean closed = new AtomicBoolean(false);
public DefaultResourcePoolImpl(final ResourceFactory<K, V> factory, final ResourcePoolConfig config,
final boolean eagerInitialization)
{
this.pool = CacheBuilder.newBuilder().build(
new CacheLoader<K, ResourceHolderPerKey<K, V>>()
{
@Override
public ResourceHolderPerKey<K, V> load(K input)
{
if (eagerInitialization) {
return new EagerCreationResourceHolder<>(
config.getMaxPerKey(),
config.getUnusedConnectionTimeoutMillis(),
input,
factory
);
} else {
return new LazyCreationResourceHolder<>(
config.getMaxPerKey(),
config.getUnusedConnectionTimeoutMillis(),
input,
factory
);
}
}
}
);
}
/**
* Returns a {@link ResourceContainer} for the given key or null if this pool is already closed.
*/
@Override
public ResourceContainer<V> take(final K key)
{
if (closed.get()) {
log.error(StringUtils.format("take(%s) called even though I'm closed.", key));
return null;
}
final ResourceHolderPerKey<K, V> holder;
try {
holder = pool.get(key);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
final V value = holder.get();
return new ResourceContainer<V>()
{
private final AtomicBoolean returned = new AtomicBoolean(false);
@Override
public V get()
{
Preconditions.checkState(!returned.get(), "Resource for key[%s] has been returned, cannot get().", key);
return value;
}
@Override
public void returnResource()
{
if (returned.getAndSet(true)) {
log.warn("Resource at key[%s] was returned multiple times?", key);
} else {
holder.giveBack(value);
}
}
@Override
protected void finalize() throws Throwable
{
if (!returned.get()) {
log.warn(
StringUtils.format(
"Resource[%s] at key[%s] was not returned before Container was finalized, potential resource leak.",
value,
key
)
);
returnResource();
}
super.finalize();
}
};
}
@Override
public void close()
{
closed.set(true);
final ConcurrentMap<K, ResourceHolderPerKey<K, V>> mapView = pool.asMap();
Closer closer = Closer.create();
for (Iterator<Map.Entry<K, ResourceHolderPerKey<K, V>>> iterator =
mapView.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<K, ResourceHolderPerKey<K, V>> e = iterator.next();
iterator.remove();
closer.register(e.getValue());
}
try {
closer.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private static class EagerCreationResourceHolder<K, V> extends LazyCreationResourceHolder<K, V>
{
private EagerCreationResourceHolder(
int maxSize,
long unusedResourceTimeoutMillis,
K key,
ResourceFactory<K, V> factory
)
{
super(maxSize, unusedResourceTimeoutMillis, key, factory);
// Eagerly Instantiate
for (int i = 0; i < maxSize; i++) {
resourceHolderList.add(
new ResourceHolder<>(
System.currentTimeMillis(),
Preconditions.checkNotNull(
factory.generate(key),
"factory.generate(key)"
)
)
);
}
}
}
private static class LazyCreationResourceHolder<K, V> extends ResourceHolderPerKey<K, V>
{
private LazyCreationResourceHolder(
int maxSize,
long unusedResourceTimeoutMillis,
K key,
ResourceFactory<K, V> factory
)
{
super(maxSize, unusedResourceTimeoutMillis, key, factory);
}
}
private static class ResourceHolderPerKey<K, V> implements Closeable
{
protected final int maxSize;
private final K key;
private final ResourceFactory<K, V> factory;
private final long unusedResourceTimeoutMillis;
// Hold previously created / returned resources
protected final ArrayDeque<ResourceHolder<V>> resourceHolderList;
// To keep track of resources that have been successfully returned to caller.
private int numLentResources = 0;
private boolean closed = false;
protected ResourceHolderPerKey(
int maxSize,
long unusedResourceTimeoutMillis,
K key,
ResourceFactory<K, V> factory
)
{
this.maxSize = maxSize;
this.key = key;
this.factory = factory;
this.unusedResourceTimeoutMillis = unusedResourceTimeoutMillis;
this.resourceHolderList = new ArrayDeque<>();
}
/**
* Returns a resource or null if this holder is already closed or the current thread is interrupted.
*
* Try to return a previously created resource if it isGood(). Else, generate a new resource
*/
@Nullable
V get()
{
final V poolVal;
// resourceHolderList can't have nulls, so we'll use a null to signal that we need to create a new resource.
boolean expired = false;
synchronized (this) {
while (!closed && (numLentResources == maxSize)) {
try {
log.debug("Thread [%s] is blocked waiting for resource for key [%s]", Thread.currentThread().getName(), key);
this.wait();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
if (closed) {
log.info(StringUtils.format("get() called even though I'm closed. key[%s]", key));
return null;
} else if (numLentResources < maxSize) {
// Attempt to take an existing resource or create one if list is empty, and increment numLentResources
if (resourceHolderList.isEmpty()) {
poolVal = factory.generate(key);
} else {
ResourceHolder<V> holder = resourceHolderList.removeFirst();
poolVal = holder.getResource();
if (System.currentTimeMillis() - holder.getLastAccessedTime() > unusedResourceTimeoutMillis) {
expired = true;
}
}
numLentResources++;
} else {
throw new IllegalStateException("Unexpected state: More objects lent than permissible");
}
}
final V retVal;
// At this point, we must either return a valid resource. Or throw and exception decrement "numLentResources"
try {
if (poolVal != null && !expired && factory.isGood(poolVal)) {
retVal = poolVal;
} else {
if (poolVal != null) {
factory.close(poolVal);
}
retVal = factory.generate(key);
}
}
catch (Throwable e) {
synchronized (this) {
numLentResources--;
this.notifyAll();
}
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
}
return retVal;
}
void giveBack(V object)
{
Preconditions.checkNotNull(object, "object");
synchronized (this) {
if (closed) {
log.info(StringUtils.format("giveBack called after being closed. key[%s]", key));
factory.close(object);
return;
}
if (resourceHolderList.size() >= maxSize) {
if (holderListContains(object)) {
log.warn(
new Exception("Exception for stacktrace"),
StringUtils.format(
"Returning object[%s] at key[%s] that has already been returned!? Skipping",
object,
key
)
);
} else {
log.warn(
new Exception("Exception for stacktrace"),
StringUtils.format(
"Returning object[%s] at key[%s] even though we already have all that we can hold[%s]!? Skipping",
object,
key,
resourceHolderList
)
);
}
return;
}
resourceHolderList.addLast(new ResourceHolder<>(System.currentTimeMillis(), object));
numLentResources--;
this.notifyAll();
}
}
private boolean holderListContains(V object)
{
return resourceHolderList.stream().anyMatch(a -> a.getResource().equals(object));
}
@Override
public void close()
{
synchronized (this) {
closed = true;
resourceHolderList.forEach(v -> factory.close(v.getResource()));
resourceHolderList.clear();
this.notifyAll();
}
}
}
private static class ResourceHolder<V>
{
private final long lastAccessedTime;
private final V resource;
private ResourceHolder(long lastAccessedTime, V resource)
{
this.resource = resource;
this.lastAccessedTime = lastAccessedTime;
}
private long getLastAccessedTime()
{
return lastAccessedTime;
}
public V getResource()
{
return resource;
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.http.client.pool;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import java.io.IOException;
public class MetricsEmittingResourcePoolImpl<K, V> implements ResourcePool<K, V>
{
private final ServiceEmitter emitter;
private final ResourcePool<K, V> resourcePool;
public MetricsEmittingResourcePoolImpl(ResourcePool<K, V> resourcePool, ServiceEmitter emitter)
{
this.resourcePool = resourcePool;
Preconditions.checkNotNull(emitter, "emitter cannot be null");
this.emitter = emitter;
}
@Override
public ResourceContainer<V> take(final K key)
{
long startTime = System.nanoTime();
ResourceContainer<V> retVal = resourcePool.take(key);
long totalduration = System.nanoTime() - startTime;
emitter.emit(ServiceMetricEvent.builder().setDimension("server", key.toString()).setMetric("httpClient/channelAcquire/timeNs", totalduration));
return retVal;
}
@Override
public void close() throws IOException
{
this.resourcePool.close();
}
}

View File

@ -19,362 +19,11 @@
package org.apache.druid.java.util.http.client.pool; package org.apache.druid.java.util.http.client.pool;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
/** public interface ResourcePool<K, V> extends Closeable
* A resource pool based on {@link LoadingCache}. When a resource is first requested for a new key,
* If the flag: eagerInitialization is true: use {@link EagerCreationResourceHolder}
* {@link ResourcePoolConfig#getMaxPerKey()} resources are initialized and cached in the {@link #pool}.
* Else:
* Initialize a single resource and further lazily using {@link LazyCreationResourceHolder}
* The individual resource in {@link ResourceHolderPerKey} is valid while (current time - last access time)
* <= {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}.
*
* A resource is closed and reinitialized if {@link ResourceFactory#isGood} returns false or it's expired based on
* {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}.
*
* {@link ResourcePoolConfig#getMaxPerKey() is a hard limit for the max number of resources per cache entry. The total
* number of resources in {@link ResourceHolderPerKey} cannot be larger than the limit in any case.
*/
public class ResourcePool<K, V> implements Closeable
{ {
private static final Logger log = new Logger(ResourcePool.class);
private final LoadingCache<K, ResourceHolderPerKey<K, V>> pool;
private final AtomicBoolean closed = new AtomicBoolean(false);
public ResourcePool(final ResourceFactory<K, V> factory, final ResourcePoolConfig config, ResourceContainer<V> take(K key);
final boolean eagerInitialization)
{
this.pool = CacheBuilder.newBuilder().build(
new CacheLoader<K, ResourceHolderPerKey<K, V>>()
{
@Override
public ResourceHolderPerKey<K, V> load(K input)
{
if (eagerInitialization) {
return new EagerCreationResourceHolder<>(
config.getMaxPerKey(),
config.getUnusedConnectionTimeoutMillis(),
input,
factory
);
} else {
return new LazyCreationResourceHolder<>(
config.getMaxPerKey(),
config.getUnusedConnectionTimeoutMillis(),
input,
factory
);
}
}
}
);
}
/**
* Returns a {@link ResourceContainer} for the given key or null if this pool is already closed.
*/
@Nullable
public ResourceContainer<V> take(final K key)
{
if (closed.get()) {
log.error(StringUtils.format("take(%s) called even though I'm closed.", key));
return null;
}
final ResourceHolderPerKey<K, V> holder;
try {
holder = pool.get(key);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
final V value = holder.get();
return new ResourceContainer<V>()
{
private final AtomicBoolean returned = new AtomicBoolean(false);
@Override
public V get()
{
Preconditions.checkState(!returned.get(), "Resource for key[%s] has been returned, cannot get().", key);
return value;
}
@Override
public void returnResource()
{
if (returned.getAndSet(true)) {
log.warn("Resource at key[%s] was returned multiple times?", key);
} else {
holder.giveBack(value);
}
}
@Override
protected void finalize() throws Throwable
{
if (!returned.get()) {
log.warn(
StringUtils.format(
"Resource[%s] at key[%s] was not returned before Container was finalized, potential resource leak.",
value,
key
)
);
returnResource();
}
super.finalize();
}
};
}
@Override
public void close()
{
closed.set(true);
final ConcurrentMap<K, ResourceHolderPerKey<K, V>> mapView = pool.asMap();
Closer closer = Closer.create();
for (Iterator<Map.Entry<K, ResourceHolderPerKey<K, V>>> iterator =
mapView.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<K, ResourceHolderPerKey<K, V>> e = iterator.next();
iterator.remove();
closer.register(e.getValue());
}
try {
closer.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private static class EagerCreationResourceHolder<K, V> extends LazyCreationResourceHolder<K, V>
{
private EagerCreationResourceHolder(
int maxSize,
long unusedResourceTimeoutMillis,
K key,
ResourceFactory<K, V> factory
)
{
super(maxSize, unusedResourceTimeoutMillis, key, factory);
// Eagerly Instantiate
for (int i = 0; i < maxSize; i++) {
resourceHolderList.add(
new ResourceHolder<>(
System.currentTimeMillis(),
Preconditions.checkNotNull(
factory.generate(key),
"factory.generate(key)"
)
)
);
}
}
}
private static class LazyCreationResourceHolder<K, V> extends ResourceHolderPerKey<K, V>
{
private LazyCreationResourceHolder(
int maxSize,
long unusedResourceTimeoutMillis,
K key,
ResourceFactory<K, V> factory
)
{
super(maxSize, unusedResourceTimeoutMillis, key, factory);
}
}
private static class ResourceHolderPerKey<K, V> implements Closeable
{
protected final int maxSize;
private final K key;
private final ResourceFactory<K, V> factory;
private final long unusedResourceTimeoutMillis;
// Hold previously created / returned resources
protected final ArrayDeque<ResourceHolder<V>> resourceHolderList;
// To keep track of resources that have been successfully returned to caller.
private int numLentResources = 0;
private boolean closed = false;
protected ResourceHolderPerKey(
int maxSize,
long unusedResourceTimeoutMillis,
K key,
ResourceFactory<K, V> factory
)
{
this.maxSize = maxSize;
this.key = key;
this.factory = factory;
this.unusedResourceTimeoutMillis = unusedResourceTimeoutMillis;
this.resourceHolderList = new ArrayDeque<>();
}
/**
* Returns a resource or null if this holder is already closed or the current thread is interrupted.
*
* Try to return a previously created resource if it isGood(). Else, generate a new resource
*/
@Nullable
V get()
{
final V poolVal;
// resourceHolderList can't have nulls, so we'll use a null to signal that we need to create a new resource.
boolean expired = false;
synchronized (this) {
while (!closed && (numLentResources == maxSize)) {
try {
this.wait();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
if (closed) {
log.info(StringUtils.format("get() called even though I'm closed. key[%s]", key));
return null;
} else if (numLentResources < maxSize) {
// Attempt to take an existing resource or create one if list is empty, and increment numLentResources
if (resourceHolderList.isEmpty()) {
poolVal = factory.generate(key);
} else {
ResourceHolder<V> holder = resourceHolderList.removeFirst();
poolVal = holder.getResource();
if (System.currentTimeMillis() - holder.getLastAccessedTime() > unusedResourceTimeoutMillis) {
expired = true;
}
}
numLentResources++;
} else {
throw new IllegalStateException("Unexpected state: More objects lent than permissible");
}
}
final V retVal;
// At this point, we must either return a valid resource. Or throw and exception decrement "numLentResources"
try {
if (poolVal != null && !expired && factory.isGood(poolVal)) {
retVal = poolVal;
} else {
if (poolVal != null) {
factory.close(poolVal);
}
retVal = factory.generate(key);
}
}
catch (Throwable e) {
synchronized (this) {
numLentResources--;
this.notifyAll();
}
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
}
return retVal;
}
void giveBack(V object)
{
Preconditions.checkNotNull(object, "object");
synchronized (this) {
if (closed) {
log.info(StringUtils.format("giveBack called after being closed. key[%s]", key));
factory.close(object);
return;
}
if (resourceHolderList.size() >= maxSize) {
if (holderListContains(object)) {
log.warn(
new Exception("Exception for stacktrace"),
StringUtils.format(
"Returning object[%s] at key[%s] that has already been returned!? Skipping",
object,
key
)
);
} else {
log.warn(
new Exception("Exception for stacktrace"),
StringUtils.format(
"Returning object[%s] at key[%s] even though we already have all that we can hold[%s]!? Skipping",
object,
key,
resourceHolderList
)
);
}
return;
}
resourceHolderList.addLast(new ResourceHolder<>(System.currentTimeMillis(), object));
numLentResources--;
this.notifyAll();
}
}
private boolean holderListContains(V object)
{
return resourceHolderList.stream().anyMatch(a -> a.getResource().equals(object));
}
@Override
public void close()
{
synchronized (this) {
closed = true;
resourceHolderList.forEach(v -> factory.close(v.getResource()));
resourceHolderList.clear();
this.notifyAll();
}
}
}
private static class ResourceHolder<V>
{
private final long lastAccessedTime;
private final V resource;
private ResourceHolder(long lastAccessedTime, V resource)
{
this.resource = resource;
this.lastAccessedTime = lastAccessedTime;
}
private long getLastAccessedTime()
{
return lastAccessedTime;
}
public V getResource()
{
return resource;
}
}
} }

View File

@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
@ -96,7 +97,7 @@ public class FriendlyServersTest
final Lifecycle lifecycle = new Lifecycle(); final Lifecycle lifecycle = new Lifecycle();
try { try {
final HttpClientConfig config = HttpClientConfig.builder().build(); final HttpClientConfig config = HttpClientConfig.builder().build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter()));
final StatusResponseHolder response = client final StatusResponseHolder response = client
.go( .go(
new Request( new Request(
@ -166,7 +167,7 @@ public class FriendlyServersTest
new HttpClientProxyConfig("localhost", serverSocket.getLocalPort(), "bob", "sally") new HttpClientProxyConfig("localhost", serverSocket.getLocalPort(), "bob", "sally")
) )
.build(); .build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter()));
final StatusResponseHolder response = client final StatusResponseHolder response = client
.go( .go(
new Request( new Request(
@ -233,7 +234,7 @@ public class FriendlyServersTest
final HttpClientConfig config = HttpClientConfig.builder() final HttpClientConfig config = HttpClientConfig.builder()
.withCompressionCodec(HttpClientConfig.CompressionCodec.IDENTITY) .withCompressionCodec(HttpClientConfig.CompressionCodec.IDENTITY)
.build(); .build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter()));
final StatusResponseHolder response = client final StatusResponseHolder response = client
.go( .go(
new Request( new Request(
@ -284,12 +285,12 @@ public class FriendlyServersTest
try { try {
final SSLContext mySsl = HttpClientInit.sslContextWithTrustedKeyStore(keyStorePath, "abc123"); final SSLContext mySsl = HttpClientInit.sslContextWithTrustedKeyStore(keyStorePath, "abc123");
final HttpClientConfig trustingConfig = HttpClientConfig.builder().withSslContext(mySsl).build(); final HttpClientConfig trustingConfig = HttpClientConfig.builder().withSslContext(mySsl).build();
final HttpClient trustingClient = HttpClientInit.createClient(trustingConfig, lifecycle, new NoopEmitter()); final HttpClient trustingClient = HttpClientInit.createClient(trustingConfig, lifecycle, new ServiceEmitter("", "", new NoopEmitter()));
final HttpClientConfig skepticalConfig = HttpClientConfig.builder() final HttpClientConfig skepticalConfig = HttpClientConfig.builder()
.withSslContext(SSLContext.getDefault()) .withSslContext(SSLContext.getDefault())
.build(); .build();
final HttpClient skepticalClient = HttpClientInit.createClient(skepticalConfig, lifecycle, new NoopEmitter()); final HttpClient skepticalClient = HttpClientInit.createClient(skepticalConfig, lifecycle, new ServiceEmitter("", "", new NoopEmitter()));
// Correct name ("localhost") // Correct name ("localhost")
{ {
@ -365,7 +366,7 @@ public class FriendlyServersTest
final Lifecycle lifecycle = new Lifecycle(); final Lifecycle lifecycle = new Lifecycle();
try { try {
final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build(); final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter()));
{ {
final HttpResponseStatus status = client final HttpResponseStatus status = client

View File

@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelException;
@ -156,7 +157,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle(); final Lifecycle lifecycle = new Lifecycle();
try { try {
final HttpClientConfig config = HttpClientConfig.builder().withReadTimeout(new Duration(100)).build(); final HttpClientConfig config = HttpClientConfig.builder().withReadTimeout(new Duration(100)).build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter()));
final ListenableFuture<StatusResponseHolder> future = client final ListenableFuture<StatusResponseHolder> future = client
.go( .go(
new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", silentServerSocket.getLocalPort()))), new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", silentServerSocket.getLocalPort()))),
@ -184,7 +185,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle(); final Lifecycle lifecycle = new Lifecycle();
try { try {
final HttpClientConfig config = HttpClientConfig.builder().withReadTimeout(new Duration(86400L * 365)).build(); final HttpClientConfig config = HttpClientConfig.builder().withReadTimeout(new Duration(86400L * 365)).build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter()));
final ListenableFuture<StatusResponseHolder> future = client final ListenableFuture<StatusResponseHolder> future = client
.go( .go(
new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", silentServerSocket.getLocalPort()))), new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", silentServerSocket.getLocalPort()))),
@ -216,8 +217,7 @@ public class JankyServersTest
.withSslContext(SSLContext.getDefault()) .withSslContext(SSLContext.getDefault())
.withSslHandshakeTimeout(new Duration(100)) .withSslHandshakeTimeout(new Duration(100))
.build(); .build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter()));
final ListenableFuture<StatusResponseHolder> response = client final ListenableFuture<StatusResponseHolder> response = client
.go( .go(
new Request(HttpMethod.GET, new URL(StringUtils.format("https://localhost:%d/", silentServerSocket.getLocalPort()))), new Request(HttpMethod.GET, new URL(StringUtils.format("https://localhost:%d/", silentServerSocket.getLocalPort()))),
@ -245,7 +245,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle(); final Lifecycle lifecycle = new Lifecycle();
try { try {
final HttpClientConfig config = HttpClientConfig.builder().build(); final HttpClientConfig config = HttpClientConfig.builder().build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter()));
final ListenableFuture<StatusResponseHolder> response = client final ListenableFuture<StatusResponseHolder> response = client
.go( .go(
new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", closingServerSocket.getLocalPort()))), new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", closingServerSocket.getLocalPort()))),
@ -273,8 +273,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle(); final Lifecycle lifecycle = new Lifecycle();
try { try {
final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build(); final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter()));
final ListenableFuture<StatusResponseHolder> response = client final ListenableFuture<StatusResponseHolder> response = client
.go( .go(
new Request(HttpMethod.GET, new URL(StringUtils.format("https://localhost:%d/", closingServerSocket.getLocalPort()))), new Request(HttpMethod.GET, new URL(StringUtils.format("https://localhost:%d/", closingServerSocket.getLocalPort()))),
@ -382,7 +381,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle(); final Lifecycle lifecycle = new Lifecycle();
try { try {
final HttpClientConfig config = HttpClientConfig.builder().build(); final HttpClientConfig config = HttpClientConfig.builder().build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter()));
final ListenableFuture<StatusResponseHolder> response = client final ListenableFuture<StatusResponseHolder> response = client
.go( .go(
new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", echoServerSocket.getLocalPort()))), new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", echoServerSocket.getLocalPort()))),
@ -405,8 +404,7 @@ public class JankyServersTest
final Lifecycle lifecycle = new Lifecycle(); final Lifecycle lifecycle = new Lifecycle();
try { try {
final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build(); final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
final HttpClient client = HttpClientInit.createClient(config, lifecycle, new NoopEmitter()); final HttpClient client = HttpClientInit.createClient(config, lifecycle, new ServiceEmitter("", "", new NoopEmitter()));
final ListenableFuture<StatusResponseHolder> response = client final ListenableFuture<StatusResponseHolder> response = client
.go( .go(
new Request(HttpMethod.GET, new URL(StringUtils.format("https://localhost:%d/", echoServerSocket.getLocalPort()))), new Request(HttpMethod.GET, new URL(StringUtils.format("https://localhost:%d/", echoServerSocket.getLocalPort()))),

View File

@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit;
public class ResourcePoolTest public class ResourcePoolTest
{ {
ResourceFactory<String, String> resourceFactory; ResourceFactory<String, String> resourceFactory;
ResourcePool<String, String> pool; DefaultResourcePoolImpl<String, String> pool;
@Before @Before
public void setUp() public void setUp()
@ -52,7 +52,7 @@ public class ResourcePoolTest
resourceFactory = (ResourceFactory<String, String>) EasyMock.createMock(ResourceFactory.class); resourceFactory = (ResourceFactory<String, String>) EasyMock.createMock(ResourceFactory.class);
EasyMock.replay(resourceFactory); EasyMock.replay(resourceFactory);
pool = new ResourcePool<String, String>( pool = new DefaultResourcePoolImpl<String, String>(
resourceFactory, resourceFactory,
new ResourcePoolConfig(2, TimeUnit.MINUTES.toMillis(4)), new ResourcePoolConfig(2, TimeUnit.MINUTES.toMillis(4)),
eagerInitialization eagerInitialization
@ -418,7 +418,7 @@ public class ResourcePoolTest
{ {
resourceFactory = (ResourceFactory<String, String>) EasyMock.createMock(ResourceFactory.class); resourceFactory = (ResourceFactory<String, String>) EasyMock.createMock(ResourceFactory.class);
pool = new ResourcePool<String, String>( pool = new DefaultResourcePoolImpl<>(
resourceFactory, resourceFactory,
new ResourcePoolConfig(2, TimeUnit.MILLISECONDS.toMillis(10)), new ResourcePoolConfig(2, TimeUnit.MILLISECONDS.toMillis(10)),
true true

View File

@ -26,6 +26,7 @@ import com.google.inject.servlet.GuiceFilter;
import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.HttpClientConfig; import org.apache.druid.java.util.http.client.HttpClientConfig;
import org.apache.druid.java.util.http.client.HttpClientInit; import org.apache.druid.java.util.http.client.HttpClientInit;
@ -130,7 +131,7 @@ public abstract class BaseJettyTest
.withEagerInitialization(true) .withEagerInitialization(true)
.build(), .build(),
druidLifecycle, druidLifecycle,
new NoopEmitter() new ServiceEmitter("", "", new NoopEmitter())
); );
} }
catch (Exception e) { catch (Exception e) {

View File

@ -35,6 +35,7 @@ import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.Initialization; import org.apache.druid.initialization.Initialization;
import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.HttpClientConfig; import org.apache.druid.java.util.http.client.HttpClientConfig;
import org.apache.druid.java.util.http.client.HttpClientInit; import org.apache.druid.java.util.http.client.HttpClientInit;
@ -387,7 +388,7 @@ public class JettyCertRenewTest extends BaseJettyTest
client = HttpClientInit.createClient( client = HttpClientInit.createClient(
getSslConfig(), getSslConfig(),
lifecycle, lifecycle,
new NoopEmitter() new ServiceEmitter("", "", new NoopEmitter())
); );
} }
catch (Exception e) { catch (Exception e) {

View File

@ -36,6 +36,7 @@ import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.Initialization; import org.apache.druid.initialization.Initialization;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.HttpClientConfig; import org.apache.druid.java.util.http.client.HttpClientConfig;
import org.apache.druid.java.util.http.client.HttpClientInit; import org.apache.druid.java.util.http.client.HttpClientInit;
@ -503,7 +504,7 @@ public class JettyTest extends BaseJettyTest
client = HttpClientInit.createClient( client = HttpClientInit.createClient(
sslConfig, sslConfig,
lifecycle, lifecycle,
new NoopEmitter() new ServiceEmitter("", "", new NoopEmitter())
); );
} }
catch (Exception e) { catch (Exception e) {