mirror of https://github.com/apache/druid.git
Don't override finalize() and reduce locking in LoadBalancingPool and ReferenceCountedResourceHandler (#3874)
* Specialize LoadBalancingPool as MemcacheClientPool, reduce locking and don't override Object.finalize() * Remove locking and don't override Object.finalize() in ReferenceCountingResourceHolder * Add leak counts in ReferenceCountingResourceHolder and MemcacheClientPool. Add tests for ReferenceCountingResourceHolder and MemcacheClientPool * Fix a race condition in ReferenceCountingResourceHolder.increment()
This commit is contained in:
parent
4c49a54517
commit
ca9f0e2b27
|
@ -1,144 +0,0 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.collections;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Throwables;
|
||||
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Simple load balancing pool that always returns the least used item.
|
||||
*
|
||||
* An item's usage is incremented every time one gets requested from the pool
|
||||
* and is decremented every time close is called on the holder.
|
||||
*
|
||||
* The pool eagerly instantiates all the items in the pool when created,
|
||||
* using the given supplier.
|
||||
*
|
||||
* @param <T> type of items to pool
|
||||
*/
|
||||
public class LoadBalancingPool<T> implements Supplier<ResourceHolder<T>>
|
||||
{
|
||||
private static final Logger log = new Logger(LoadBalancingPool.class);
|
||||
|
||||
private final Supplier<T> generator;
|
||||
private final int capacity;
|
||||
private final PriorityBlockingQueue<CountingHolder> queue;
|
||||
|
||||
public LoadBalancingPool(int capacity, Supplier<T> generator)
|
||||
{
|
||||
Preconditions.checkArgument(capacity > 0, "capacity must be greater than 0");
|
||||
Preconditions.checkNotNull(generator);
|
||||
|
||||
this.generator = generator;
|
||||
this.capacity = capacity;
|
||||
this.queue = new PriorityBlockingQueue<>(capacity);
|
||||
|
||||
// eagerly intantiate all items in the pool
|
||||
init();
|
||||
}
|
||||
|
||||
private void init() {
|
||||
for(int i = 0; i < capacity; ++i) {
|
||||
queue.offer(new CountingHolder(generator.get()));
|
||||
}
|
||||
}
|
||||
|
||||
public ResourceHolder<T> get()
|
||||
{
|
||||
final CountingHolder holder;
|
||||
// items never stay out of the queue for long, so we'll get one eventually
|
||||
try {
|
||||
holder = queue.take();
|
||||
} catch(InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
// synchronize on item to ensure count cannot get changed by
|
||||
// CountingHolder.close right after we put the item back in the queue
|
||||
synchronized (holder) {
|
||||
holder.count.incrementAndGet();
|
||||
queue.offer(holder);
|
||||
}
|
||||
return holder;
|
||||
}
|
||||
|
||||
private class CountingHolder implements ResourceHolder<T>, Comparable<CountingHolder>
|
||||
{
|
||||
private AtomicInteger count = new AtomicInteger(0);
|
||||
private final T object;
|
||||
|
||||
public CountingHolder(final T object)
|
||||
{
|
||||
this.object = object;
|
||||
}
|
||||
|
||||
@Override
|
||||
public T get()
|
||||
{
|
||||
return object;
|
||||
}
|
||||
|
||||
/**
|
||||
* Not idempotent, should only be called once when done using the resource
|
||||
*/
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// ensures count always gets adjusted while item is removed from the queue
|
||||
synchronized (this) {
|
||||
// item may not be in queue if another thread is calling LoadBalancingPool.get()
|
||||
// at the same time; in that case let the other thread put it back.
|
||||
boolean removed = queue.remove(this);
|
||||
count.decrementAndGet();
|
||||
if (removed) {
|
||||
queue.offer(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(CountingHolder o)
|
||||
{
|
||||
return Integer.compare(count.get(), o.count.get());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void finalize() throws Throwable
|
||||
{
|
||||
try {
|
||||
final int shouldBeZero = count.get();
|
||||
if (shouldBeZero != 0) {
|
||||
log.warn("Expected 0 resource count, got [%d]! Object was[%s].", shouldBeZero, object);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
super.finalize();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,27 +19,40 @@
|
|||
|
||||
package io.druid.collections;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import sun.misc.Cleaner;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class ReferenceCountingResourceHolder<T> implements ResourceHolder<T>
|
||||
{
|
||||
private static final Logger log = new Logger(ReferenceCountingResourceHolder.class);
|
||||
|
||||
private final Object lock = new Object();
|
||||
private static final AtomicLong leakedResources = new AtomicLong();
|
||||
|
||||
public static long leakedResources()
|
||||
{
|
||||
return leakedResources.get();
|
||||
}
|
||||
|
||||
private final T object;
|
||||
private final Closeable closer;
|
||||
private final AtomicInteger refCount = new AtomicInteger(1);
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
@SuppressWarnings("unused")
|
||||
private final Cleaner cleaner;
|
||||
|
||||
private int refcount = 1;
|
||||
private boolean didClose = false;
|
||||
|
||||
public ReferenceCountingResourceHolder(final T object, final Closeable closer)
|
||||
ReferenceCountingResourceHolder(final T object, final Closeable closer)
|
||||
{
|
||||
this.object = object;
|
||||
this.closer = closer;
|
||||
this.cleaner = Cleaner.create(this, new CloserRunnable(object, closer, refCount));
|
||||
}
|
||||
|
||||
public static <T extends Closeable> ReferenceCountingResourceHolder<T> fromCloseable(final T object)
|
||||
|
@ -50,92 +63,110 @@ public class ReferenceCountingResourceHolder<T> implements ResourceHolder<T>
|
|||
@Override
|
||||
public T get()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (refcount <= 0) {
|
||||
throw new ISE("Already closed!");
|
||||
}
|
||||
|
||||
return object;
|
||||
if (refCount.get() <= 0) {
|
||||
throw new ISE("Already closed!");
|
||||
}
|
||||
return object;
|
||||
}
|
||||
|
||||
public Releaser increment()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (refcount <= 0) {
|
||||
while (true) {
|
||||
int count = this.refCount.get();
|
||||
if (count <= 0) {
|
||||
throw new ISE("Already closed!");
|
||||
}
|
||||
if (refCount.compareAndSet(count, count + 1)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
refcount++;
|
||||
// This Releaser is supposed to be used from a single thread, so no synchronization/atomicity
|
||||
return new Releaser()
|
||||
{
|
||||
boolean released = false;
|
||||
|
||||
return new Releaser()
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
final AtomicBoolean didRelease = new AtomicBoolean();
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
if (didRelease.compareAndSet(false, true)) {
|
||||
decrement();
|
||||
} else {
|
||||
log.warn("WTF?! release called but we are already released!");
|
||||
}
|
||||
if (!released) {
|
||||
decrement();
|
||||
released = true;
|
||||
} else {
|
||||
log.warn(new ISE("Already closed"), "Already closed");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finalize() throws Throwable
|
||||
{
|
||||
if (didRelease.compareAndSet(false, true)) {
|
||||
log.warn("Not released! Object was[%s], releasing on finalize of releaser.", object);
|
||||
decrement();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public int getReferenceCount()
|
||||
{
|
||||
synchronized (lock) {
|
||||
return refcount;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!didClose) {
|
||||
didClose = true;
|
||||
decrement();
|
||||
} else {
|
||||
log.warn(new ISE("Already closed!"), "Already closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finalize() throws Throwable
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!didClose) {
|
||||
log.warn("Not closed! Object was[%s], closing on finalize of holder.", object);
|
||||
didClose = true;
|
||||
decrement();
|
||||
}
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
decrement();
|
||||
} else {
|
||||
log.warn(new ISE("Already closed"), "Already closed");
|
||||
}
|
||||
}
|
||||
|
||||
private void decrement()
|
||||
{
|
||||
synchronized (lock) {
|
||||
refcount--;
|
||||
if (refcount <= 0) {
|
||||
try {
|
||||
closer.close();
|
||||
// Checking that the count is exactly equal to 0, rather than less or equal, helps to avoid calling closer.close()
|
||||
// twice if there is a race with CloserRunnable. Such a race is possible and could be avoided only with
|
||||
// reachabilityFence() (Java 9+) in ReferenceCountingResourceHolder's and Releaser's close() methods.
|
||||
if (refCount.decrementAndGet() == 0) {
|
||||
try {
|
||||
closer.close();
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class CloserRunnable implements Runnable
|
||||
{
|
||||
private final Object object;
|
||||
private final Closeable closer;
|
||||
private final AtomicInteger refCount;
|
||||
|
||||
private CloserRunnable(Object object, Closeable closer, AtomicInteger refCount)
|
||||
{
|
||||
this.object = object;
|
||||
this.closer = closer;
|
||||
this.refCount = refCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
while (true) {
|
||||
int count = refCount.get();
|
||||
if (count <= 0) {
|
||||
return;
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "WTF?! Close failed, uh oh...");
|
||||
if (refCount.compareAndSet(count, 0)) {
|
||||
try {
|
||||
leakedResources.incrementAndGet();
|
||||
closer.close();
|
||||
return;
|
||||
}
|
||||
catch (Exception e) {
|
||||
try {
|
||||
log.error(e, "Exception in closer");
|
||||
}
|
||||
catch (Exception ignore) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
log.warn("Not closed! Object was[%s]", object);
|
||||
}
|
||||
catch (Exception ignore) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,142 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.collections;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class ReferenceCountingResourceHolderTest
|
||||
{
|
||||
@Test
|
||||
public void testIdiomaticUsage()
|
||||
{
|
||||
// Smoke testing
|
||||
for (int i = 0; i < 100; i++) {
|
||||
runIdiomaticUsage();
|
||||
}
|
||||
}
|
||||
|
||||
private void runIdiomaticUsage()
|
||||
{
|
||||
final AtomicBoolean released = new AtomicBoolean(false);
|
||||
final ReferenceCountingResourceHolder<Closeable> resourceHolder = makeReleasingHandler(released);
|
||||
List<Thread> threads = new ArrayList<>();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
Thread thread = new Thread() {
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try (Releaser r = resourceHolder.increment()) {
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
thread.start();
|
||||
threads.add(thread);
|
||||
}
|
||||
for (Thread thread : threads) {
|
||||
try {
|
||||
thread.join();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
Assert.assertFalse(released.get());
|
||||
resourceHolder.close();
|
||||
Assert.assertTrue(released.get());
|
||||
}
|
||||
|
||||
private ReferenceCountingResourceHolder<Closeable> makeReleasingHandler(final AtomicBoolean released)
|
||||
{
|
||||
return ReferenceCountingResourceHolder
|
||||
.fromCloseable((Closeable) new Closeable()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
released.set(true);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test(timeout = 60_000)
|
||||
public void testResourceHandlerClearedByJVM() throws InterruptedException
|
||||
{
|
||||
if (System.getProperty("java.version").startsWith("1.7")) {
|
||||
// This test is unreliable on Java 7, probably GC is not triggered by System.gc(). It is not a problem because
|
||||
// this test should ever pass on any version of Java to prove that ReferenceCountingResourceHolder doesn't
|
||||
// introduce leaks itself and actually cleans the leaked resources.
|
||||
return;
|
||||
}
|
||||
long initialLeakedResources = ReferenceCountingResourceHolder.leakedResources();
|
||||
final AtomicBoolean released = new AtomicBoolean(false);
|
||||
makeReleasingHandler(released); // Don't store the handler in a variable and don't close it, the object leaked
|
||||
verifyCleanerRun(released, initialLeakedResources);
|
||||
}
|
||||
|
||||
@Test(timeout = 60_000)
|
||||
public void testResourceHandlerWithReleaserClearedByJVM() throws InterruptedException
|
||||
{
|
||||
if (System.getProperty("java.version").startsWith("1.7")) {
|
||||
// This test is unreliable on Java 7, probably GC is not triggered by System.gc(). It is not a problem because
|
||||
// this test should ever pass on any version of Java to prove that ReferenceCountingResourceHolder doesn't
|
||||
// introduce leaks itself and actually cleans the leaked resources.
|
||||
return;
|
||||
}
|
||||
long initialLeakedResources = ReferenceCountingResourceHolder.leakedResources();
|
||||
final AtomicBoolean released = new AtomicBoolean(false);
|
||||
// createDanglingReleaser() need to be a separate method because otherwise JVM preserves a ref to Holder on stack
|
||||
// and Cleaner is not called
|
||||
createDanglingReleaser(released);
|
||||
verifyCleanerRun(released, initialLeakedResources);
|
||||
}
|
||||
|
||||
private void createDanglingReleaser(AtomicBoolean released)
|
||||
{
|
||||
try (ReferenceCountingResourceHolder<Closeable> handler = makeReleasingHandler(released)) {
|
||||
handler.increment(); // Releaser not close, the object leaked
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyCleanerRun(AtomicBoolean released, long initialLeakedResources) throws InterruptedException
|
||||
{
|
||||
// Wait until Closer runs
|
||||
for (int i = 0; i < 6000 && ReferenceCountingResourceHolder.leakedResources() == initialLeakedResources; i++) {
|
||||
System.gc();
|
||||
byte[] garbage = new byte[10_000_000];
|
||||
Thread.sleep(10);
|
||||
}
|
||||
Assert.assertEquals(initialLeakedResources + 1, ReferenceCountingResourceHolder.leakedResources());
|
||||
// Cleaner also runs the closer
|
||||
Assert.assertTrue(released.get());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,157 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.client.cache;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
import io.druid.collections.ResourceHolder;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import net.spy.memcached.MemcachedClientIF;
|
||||
import sun.misc.Cleaner;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Simple load balancing pool that always returns the least used {@link MemcachedClientIF}.
|
||||
*
|
||||
* A client's usage is incremented every time one gets requested from the pool
|
||||
* and is decremented every time close is called on the holder.
|
||||
*
|
||||
* The pool eagerly instantiates all the clients in the pool when created,
|
||||
* using the given supplier.
|
||||
*/
|
||||
final class MemcacheClientPool implements Supplier<ResourceHolder<MemcachedClientIF>>
|
||||
{
|
||||
private static final Logger log = new Logger(MemcacheClientPool.class);
|
||||
|
||||
private static final AtomicLong leakedClients = new AtomicLong(0);
|
||||
|
||||
public static long leakedClients()
|
||||
{
|
||||
return leakedClients.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of memcached connections is not expected to be small (<= 8), so it's easier to find the least used
|
||||
* connection using a linear search over a simple array, than fiddling with PriorityBlockingQueue. This also allows
|
||||
* to reduce locking.
|
||||
*/
|
||||
private final CountingHolder[] connections;
|
||||
|
||||
MemcacheClientPool(int capacity, Supplier<MemcachedClientIF> generator)
|
||||
{
|
||||
Preconditions.checkArgument(capacity > 0, "capacity must be greater than 0");
|
||||
Preconditions.checkNotNull(generator);
|
||||
|
||||
CountingHolder[] connections = new CountingHolder[capacity];
|
||||
// eagerly instantiate all items in the pool
|
||||
for(int i = 0; i < capacity; ++i) {
|
||||
connections[i] = new CountingHolder(generator.get());
|
||||
}
|
||||
// Assign the final field after filling the array to ensure visibility of elements
|
||||
this.connections = connections;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized IdempotentCloseableHolder get()
|
||||
{
|
||||
CountingHolder leastUsedClientHolder = connections[0];
|
||||
int minCount = leastUsedClientHolder.count.get();
|
||||
for (int i = 1; i < connections.length; i++) {
|
||||
CountingHolder clientHolder = connections[i];
|
||||
int count = clientHolder.count.get();
|
||||
if (count < minCount) {
|
||||
leastUsedClientHolder = clientHolder;
|
||||
minCount = count;
|
||||
}
|
||||
}
|
||||
leastUsedClientHolder.count.incrementAndGet();
|
||||
return new IdempotentCloseableHolder(leastUsedClientHolder);
|
||||
}
|
||||
|
||||
private static class CountingHolder
|
||||
{
|
||||
private final AtomicInteger count = new AtomicInteger(0);
|
||||
private final MemcachedClientIF clientIF;
|
||||
@SuppressWarnings("unused")
|
||||
private final Cleaner cleaner;
|
||||
|
||||
private CountingHolder(final MemcachedClientIF clientIF)
|
||||
{
|
||||
this.clientIF = clientIF;
|
||||
cleaner = Cleaner.create(this, new ClientLeakNotifier(count, clientIF));
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static class IdempotentCloseableHolder implements ResourceHolder<MemcachedClientIF>
|
||||
{
|
||||
private CountingHolder countingHolder;
|
||||
|
||||
private IdempotentCloseableHolder(CountingHolder countingHolder)
|
||||
{
|
||||
this.countingHolder = countingHolder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MemcachedClientIF get()
|
||||
{
|
||||
return countingHolder.clientIF;
|
||||
}
|
||||
|
||||
int count()
|
||||
{
|
||||
return countingHolder.count.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
if (countingHolder != null) {
|
||||
countingHolder.count.decrementAndGet();
|
||||
countingHolder = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class ClientLeakNotifier implements Runnable
|
||||
{
|
||||
private final AtomicInteger count;
|
||||
private final MemcachedClientIF clientIF;
|
||||
|
||||
private ClientLeakNotifier(AtomicInteger count, MemcachedClientIF clientIF)
|
||||
{
|
||||
this.count = count;
|
||||
this.clientIF = clientIF;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
final int shouldBeZero = count.get();
|
||||
if (shouldBeZero != 0) {
|
||||
leakedClients.incrementAndGet();
|
||||
log.warn("Expected 0 resource count, got [%d]! Object was[%s].", shouldBeZero, clientIF);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -35,7 +35,6 @@ import com.google.common.primitives.Ints;
|
|||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import com.metamx.metrics.AbstractMonitor;
|
||||
import io.druid.collections.LoadBalancingPool;
|
||||
import io.druid.collections.ResourceHolder;
|
||||
import io.druid.collections.StupidResourceHolder;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
|
@ -359,7 +358,7 @@ public class MemcachedCache implements Cache
|
|||
final Supplier<ResourceHolder<MemcachedClientIF>> clientSupplier;
|
||||
|
||||
if (config.getNumConnections() > 1) {
|
||||
clientSupplier = new LoadBalancingPool<MemcachedClientIF>(
|
||||
clientSupplier = new MemcacheClientPool(
|
||||
config.getNumConnections(),
|
||||
new Supplier<MemcachedClientIF>()
|
||||
{
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.client.cache;
|
||||
|
||||
import com.google.common.base.Suppliers;
|
||||
import net.spy.memcached.MemcachedClientIF;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class MemcacheClientPoolTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void testSimpleUsage()
|
||||
{
|
||||
MemcacheClientPool pool = new MemcacheClientPool(3, Suppliers.ofInstance((MemcachedClientIF) null));
|
||||
// First round
|
||||
MemcacheClientPool.IdempotentCloseableHolder first = pool.get();
|
||||
Assert.assertEquals(1, first.count());
|
||||
MemcacheClientPool.IdempotentCloseableHolder second = pool.get();
|
||||
Assert.assertEquals(1, second.count());
|
||||
MemcacheClientPool.IdempotentCloseableHolder third = pool.get();
|
||||
Assert.assertEquals(1, third.count());
|
||||
// Second round
|
||||
MemcacheClientPool.IdempotentCloseableHolder firstClientSecondRound = pool.get();
|
||||
Assert.assertEquals(2, firstClientSecondRound.count());
|
||||
MemcacheClientPool.IdempotentCloseableHolder secondClientSecondRound = pool.get();
|
||||
Assert.assertEquals(2, secondClientSecondRound.count());
|
||||
first.close();
|
||||
firstClientSecondRound.close();
|
||||
MemcacheClientPool.IdempotentCloseableHolder firstAgain = pool.get();
|
||||
Assert.assertEquals(1, firstAgain.count());
|
||||
|
||||
firstAgain.close();
|
||||
second.close();
|
||||
third.close();
|
||||
secondClientSecondRound.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientLeakDetected() throws InterruptedException
|
||||
{
|
||||
if (System.getProperty("java.version").startsWith("1.7")) {
|
||||
// This test is unreliable on Java 7, probably GC is not triggered by System.gc(). It is not a problem because
|
||||
// this test should ever pass on any version of Java to prove that MemcacheClientPool doesn't introduce leaks
|
||||
// itself.
|
||||
return;
|
||||
}
|
||||
long initialLeakedClients = MemcacheClientPool.leakedClients();
|
||||
createDanglingClient();
|
||||
// Wait until Closer runs
|
||||
for (int i = 0; i < 6000 && MemcacheClientPool.leakedClients() == initialLeakedClients; i++) {
|
||||
System.gc();
|
||||
byte[] garbage = new byte[10_000_000];
|
||||
Thread.sleep(10);
|
||||
}
|
||||
Assert.assertEquals(initialLeakedClients + 1, MemcacheClientPool.leakedClients());
|
||||
}
|
||||
|
||||
private void createDanglingClient()
|
||||
{
|
||||
MemcacheClientPool pool = new MemcacheClientPool(1, Suppliers.ofInstance((MemcachedClientIF) null));
|
||||
pool.get();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue