Don't return leaked Objects back to StupidPool, because this is dangerous. Reuse Cleaners in StupidPool. Make StupidPools named. Add StupidPool.leakedObjectCount(). Minor fixes (#3631)

This commit is contained in:
Roman Leventov 2016-12-26 14:35:35 +08:00 committed by Himanshu
parent 76cb06a8d8
commit 33800122ad
19 changed files with 248 additions and 58 deletions

View File

@ -65,6 +65,7 @@ public class StupidPoolConcurrencyBenchmark
{
private final AtomicLong numPools = new AtomicLong(0L);
private final StupidPool<Object> pool = new StupidPool<>(
"simpleObject pool",
new Supplier<Object>()
{
@Override

View File

@ -312,6 +312,7 @@ public class GroupByBenchmark
}
StupidPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 250_000_000),
0,
Integer.MAX_VALUE

View File

@ -272,7 +272,12 @@ public class TopNBenchmark
}
factory = new TopNQueryRunnerFactory(
new StupidPool<>(new OffheapBufferGenerator("compute", 250000000), 0, Integer.MAX_VALUE),
new StupidPool<>(
"TopNBenchmark-compute-bufferPool",
new OffheapBufferGenerator("compute", 250000000),
0,
Integer.MAX_VALUE
),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);

View File

@ -19,14 +19,19 @@
package io.druid.collections;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import sun.misc.Cleaner;
import java.lang.ref.WeakReference;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
@ -34,25 +39,35 @@ public class StupidPool<T>
{
private static final Logger log = new Logger(StupidPool.class);
private final String name;
private final Supplier<T> generator;
private final Queue<T> objects = new ConcurrentLinkedQueue<>();
/**
* StupidPool Implementation Note
* It is assumed that StupidPools are never reclaimed by the GC, either stored in static fields or global singleton
* injector like Guice. Otherwise false positive "Not closed! Object leaked from..." could be reported. To avoid
* this, StupidPool should be made closeable (or implement {@link io.druid.java.util.common.lifecycle.LifecycleStop}
* and registered in the global lifecycle), in this close() method all {@link ObjectResourceHolder}s should be drained
* from the {@code objects} queue, and notifier.disable() called for them.
*/
private final Queue<ObjectResourceHolder> objects = new ConcurrentLinkedQueue<>();
/**
* {@link ConcurrentLinkedQueue}'s size() is O(n) queue traversal apparently for the sake of being 100%
* wait-free, that is not required by {@code StupidPool}. In {@code poolSize} we account the queue size
* ourselves, to avoid traversal of {@link #objects} in {@link #tryReturnToPool}.
*/
private final AtomicLong poolSize = new AtomicLong(0);
private final AtomicLong leakedObjectsCounter = new AtomicLong(0);
//note that this is just the max entries in the cache, pool can still create as many buffers as needed.
private final int objectsCacheMaxCount;
public StupidPool(
Supplier<T> generator
)
public StupidPool(String name, Supplier<T> generator)
{
this(generator, 0, Integer.MAX_VALUE);
this(name, generator, 0, Integer.MAX_VALUE);
}
public StupidPool(
Supplier<T> generator,
int initCount,
int objectsCacheMaxCount
)
public StupidPool(String name, Supplier<T> generator, int initCount, int objectsCacheMaxCount)
{
Preconditions.checkArgument(
initCount <= objectsCacheMaxCount,
@ -60,28 +75,117 @@ public class StupidPool<T>
initCount,
objectsCacheMaxCount
);
this.name = name;
this.generator = generator;
this.objectsCacheMaxCount = objectsCacheMaxCount;
for (int i = 0; i < initCount; i++) {
objects.add(generator.get());
objects.add(makeObjectWithHandler());
poolSize.incrementAndGet();
}
}
@Override
public String toString()
{
return "StupidPool{" +
"name=" + name +
", objectsCacheMaxCount=" + objectsCacheMaxCount +
", poolSize=" + poolSize() +
"}";
}
public ResourceHolder<T> take()
{
final T obj = objects.poll();
return obj == null ? new ObjectResourceHolder(generator.get()) : new ObjectResourceHolder(obj);
ObjectResourceHolder resourceHolder = objects.poll();
if (resourceHolder == null) {
return makeObjectWithHandler();
} else {
poolSize.decrementAndGet();
return resourceHolder;
}
}
private ObjectResourceHolder makeObjectWithHandler()
{
T object = generator.get();
ObjectId objectId = new ObjectId();
ObjectLeakNotifier notifier = new ObjectLeakNotifier(this);
// Using objectId as referent for Cleaner, because if the object itself (e. g. ByteBuffer) is leaked after taken
// from the pool, and the ResourceHolder is not closed, Cleaner won't notify about the leak.
return new ObjectResourceHolder(object, objectId, Cleaner.create(objectId, notifier), notifier);
}
@VisibleForTesting
long poolSize() {
return poolSize.get();
}
@VisibleForTesting
long leakedObjectsCount()
{
return leakedObjectsCounter.get();
}
private void tryReturnToPool(T object, ObjectId objectId, Cleaner cleaner, ObjectLeakNotifier notifier)
{
long currentPoolSize;
do {
currentPoolSize = poolSize.get();
if (currentPoolSize >= objectsCacheMaxCount) {
notifier.disable();
// Effectively does nothing, because notifier is disabled above. The purpose of this call is to deregister the
// cleaner from the internal global linked list of all cleaners in the JVM, and let it be reclaimed itself.
cleaner.clean();
// Important to use the objectId after notifier.disable() (in the logging statement below), otherwise VM may
// already decide that the objectId is unreachable and run Cleaner before notifier.disable(), that would be
// reported as a false-positive "leak". Ideally reachabilityFence(objectId) should be inserted here.
log.debug("cache num entries is exceeding in [%s], objectId [%s]", this, objectId);
return;
}
} while (!poolSize.compareAndSet(currentPoolSize, currentPoolSize + 1));
if (!objects.offer(new ObjectResourceHolder(object, objectId, cleaner, notifier))) {
impossibleOffsetFailed(object, objectId, cleaner, notifier);
}
}
/**
* This should be impossible, because {@link ConcurrentLinkedQueue#offer(Object)} event don't have `return false;` in
* it's body in OpenJDK 8.
*/
private void impossibleOffsetFailed(T object, ObjectId objectId, Cleaner cleaner, ObjectLeakNotifier notifier)
{
poolSize.decrementAndGet();
notifier.disable();
// Effectively does nothing, because notifier is disabled above. The purpose of this call is to deregister the
// cleaner from the internal global linked list of all cleaners in the JVM, and let it be reclaimed itself.
cleaner.clean();
log.error(
new ISE("Queue offer failed"),
"Could not offer object [%s] back into the queue in [%s], objectId [%s]",
object,
objectId
);
}
private class ObjectResourceHolder implements ResourceHolder<T>
{
private AtomicBoolean closed = new AtomicBoolean(false);
private final T object;
private final AtomicReference<T> objectRef;
private ObjectId objectId;
private Cleaner cleaner;
private ObjectLeakNotifier notifier;
public ObjectResourceHolder(final T object)
ObjectResourceHolder(
final T object,
final ObjectId objectId,
final Cleaner cleaner,
final ObjectLeakNotifier notifier
)
{
this.object = object;
this.objectRef = new AtomicReference<>(object);
this.objectId = objectId;
this.cleaner = cleaner;
this.notifier = notifier;
}
// WARNING: it is entirely possible for a caller to hold onto the object and call ObjectResourceHolder.close,
@ -89,7 +193,8 @@ public class StupidPool<T>
@Override
public T get()
{
if (closed.get()) {
final T object = objectRef.get();
if (object == null) {
throw new ISE("Already Closed!");
}
@ -99,30 +204,69 @@ public class StupidPool<T>
@Override
public void close()
{
if (!closed.compareAndSet(false, true)) {
log.warn(new ISE("Already Closed!"), "Already closed");
return;
}
if (objects.size() < objectsCacheMaxCount) {
if (!objects.offer(object)) {
log.warn(new ISE("Queue offer failed"), "Could not offer object [%s] back into the queue", object);
final T object = objectRef.get();
if (object != null && objectRef.compareAndSet(object, null)) {
try {
tryReturnToPool(object, objectId, cleaner, notifier);
}
} else {
log.debug("cache num entries is exceeding max limit [%s]", objectsCacheMaxCount);
}
}
@Override
protected void finalize() throws Throwable
{
try {
if (!closed.get()) {
log.warn("Not closed! Object was[%s]. Allowing gc to prevent leak.", object);
finally {
// Need to null reference to objectId because if ObjectResourceHolder is closed, but leaked, this reference
// will prevent reporting leaks of ResourceHandlers when this object and objectId are taken from the pool
// again.
objectId = null;
// Nulling cleaner and notifier is not strictly needed, but harmless for sure.
cleaner = null;
notifier = null;
}
}
finally {
super.finalize();
}
}
}
private static class ObjectLeakNotifier implements Runnable
{
/**
* Don't reference {@link StupidPool} directly to prevent it's leak through the internal global chain of Cleaners.
*/
final WeakReference<StupidPool<?>> poolReference;
final AtomicLong leakedObjectsCounter;
final AtomicBoolean disabled = new AtomicBoolean(false);
ObjectLeakNotifier(StupidPool<?> pool)
{
poolReference = new WeakReference<StupidPool<?>>(pool);
leakedObjectsCounter = pool.leakedObjectsCounter;
}
@Override
public void run()
{
try {
if (!disabled.getAndSet(true)) {
leakedObjectsCounter.incrementAndGet();
log.warn("Not closed! Object leaked from %s. Allowing gc to prevent leak.", poolReference.get());
}
}
// Exceptions must not be thrown in Cleaner.clean(), which calls this ObjectReclaimer.run() method
catch (Exception e) {
try {
log.error(e, "Exception in ObjectLeakNotifier.run()");
}
catch (Exception ignore) {
// ignore
}
}
}
public void disable()
{
disabled.set(true);
}
}
/**
* Plays the role of the reference for Cleaner, see comment in {@link #makeObjectWithHandler}
*/
private static class ObjectId
{
}
}

View File

@ -20,9 +20,7 @@
package io.druid.collections;
import com.google.common.base.Supplier;
import io.druid.java.util.common.ISE;
import org.easymock.EasyMock;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
@ -45,7 +43,7 @@ public class StupidPoolTest
generator = EasyMock.createMock(Supplier.class);
EasyMock.expect(generator.get()).andReturn(defaultString).anyTimes();
EasyMock.replay(generator);
poolOfString = new StupidPool<>(generator);
poolOfString = new StupidPool<>("poolOfString", generator);
resourceHolderObj = poolOfString.take();
}
@ -72,10 +70,27 @@ public class StupidPoolTest
resourceHolderObj.get();
}
@Test
public void testFinalizeInResourceHolder()
@Test(timeout = 60_000)
public void testResourceHandlerClearedByJVM() throws InterruptedException
{
resourceHolderObj = null;
System.runFinalization();
if (System.getProperty("java.version").startsWith("1.7")) {
// This test is unreliable on Java 7, probably GC is not triggered by System.gc(). It is not a problem because
// this test should ever pass on any version of Java to prove that StupidPool doesn't introduce leaks itself and
// actually cleans the leaked objects.
return;
}
String leakedString = createDanglingObjectHandler();
// Wait until dangling object string is returned to the pool
for (int i = 0; i < 6000 && poolOfString.leakedObjectsCount() == 0; i++) {
System.gc();
byte[] garbage = new byte[10_000_000];
Thread.sleep(10);
}
Assert.assertEquals(leakedString, 1, poolOfString.leakedObjectsCount());
}
private String createDanglingObjectHandler()
{
return poolOfString.take().get();
}
}

View File

@ -55,6 +55,7 @@ public class DistinctCountTopNQueryTest
{
TopNQueryEngine engine = new TopNQueryEngine(
new StupidPool<ByteBuffer>(
"TopNQueryEngine-bufferPool",
new Supplier<ByteBuffer>()
{
@Override

View File

@ -69,6 +69,7 @@ public class ApproximateHistogramTopNQueryTest
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
new StupidPool<ByteBuffer>(
"TopNQueryRunnerFactory-bufferPool",
new Supplier<ByteBuffer>()
{
@Override

View File

@ -37,6 +37,7 @@ public class CompressedPools
public static final int BUFFER_SIZE = 0x10000;
private static final StupidPool<BufferRecycler> bufferRecyclerPool = new StupidPool<BufferRecycler>(
"bufferRecyclerPool",
new Supplier<BufferRecycler>()
{
private final AtomicLong counter = new AtomicLong(0);
@ -56,6 +57,7 @@ public class CompressedPools
}
private static final StupidPool<byte[]> outputBytesPool = new StupidPool<byte[]>(
"outputBytesPool",
new Supplier<byte[]>()
{
private final AtomicLong counter = new AtomicLong(0);
@ -75,6 +77,7 @@ public class CompressedPools
}
private static final StupidPool<ByteBuffer> bigEndByteBufPool = new StupidPool<ByteBuffer>(
"bigEndByteBufPool",
new Supplier<ByteBuffer>()
{
private final AtomicLong counter = new AtomicLong(0);
@ -89,6 +92,7 @@ public class CompressedPools
);
private static final StupidPool<ByteBuffer> littleEndByteBufPool = new StupidPool<ByteBuffer>(
"littleEndByteBufPool",
new Supplier<ByteBuffer>()
{
private final AtomicLong counter = new AtomicLong(0);

View File

@ -40,6 +40,7 @@ import java.nio.ByteBuffer;
public class TestQueryRunners
{
public static final StupidPool<ByteBuffer> pool = new StupidPool<ByteBuffer>(
"TestQueryRunners-bufferPool",
new Supplier<ByteBuffer>()
{
@Override

View File

@ -256,6 +256,7 @@ public class AggregationTestHelper
TopNQueryRunnerFactory factory = new TopNQueryRunnerFactory(
new StupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
new Supplier<ByteBuffer>()
{
@Override

View File

@ -60,10 +60,10 @@ import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.last.LongLastAggregatorFactory;
@ -262,6 +262,7 @@ public class GroupByQueryRunnerTest
{
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final StupidPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByQueryEngine-bufferPool",
new Supplier<ByteBuffer>()
{
@Override

View File

@ -88,6 +88,7 @@ public class TopNQueryRunnerBenchmark extends AbstractBenchmark
{
QueryRunnerFactory factory = new TopNQueryRunnerFactory(
new StupidPool<ByteBuffer>(
"TopNQueryRunnerFactory-directBufferPool",
new Supplier<ByteBuffer>()
{
@Override

View File

@ -116,6 +116,7 @@ public class TopNQueryRunnerTest
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
new StupidPool<ByteBuffer>(
"TopNQueryRunnerFactory-bufferPool",
new Supplier<ByteBuffer>()
{
@Override

View File

@ -76,6 +76,7 @@ public class TopNUnionQueryTest
QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory(
new StupidPool<ByteBuffer>(
"TopNQueryRunnerFactory-bufferPool",
new Supplier<ByteBuffer>()
{
@Override

View File

@ -136,6 +136,7 @@ public class IncrementalIndexTest
return new OffheapIncrementalIndex(
0L, QueryGranularities.NONE, factories, 1000000,
new StupidPool<ByteBuffer>(
"OffheapIncrementalIndex-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
@ -168,6 +169,7 @@ public class IncrementalIndexTest
return new OffheapIncrementalIndex(
0L, QueryGranularities.NONE, false, factories, 1000000,
new StupidPool<ByteBuffer>(
"OffheapIncrementalIndex-bufferPool",
new Supplier<ByteBuffer>()
{
@Override

View File

@ -223,6 +223,7 @@ public class IncrementalIndexStorageAdapterTest
}
),
new StupidPool(
"GroupByQueryEngine-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
@ -306,6 +307,7 @@ public class IncrementalIndexStorageAdapterTest
TopNQueryEngine engine = new TopNQueryEngine(
new StupidPool<ByteBuffer>(
"TopNQueryEngine-bufferPool",
new Supplier<ByteBuffer>()
{
@Override

View File

@ -116,16 +116,22 @@ public class IncrementalIndexTest
public IncrementalIndex createIndex()
{
return new OffheapIncrementalIndex(
schema, true, true, sortFacts, 1000000, new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(256 * 1024);
}
}
)
schema,
true,
true,
sortFacts,
1000000,
new StupidPool<ByteBuffer>(
"OffheapIncrementalIndex-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(256 * 1024);
}
}
)
);
}
}

View File

@ -109,6 +109,7 @@ public class DruidProcessingModule implements Module
{
verifyDirectMemory(config);
return new StupidPool<>(
"intermediate processing pool",
new OffheapBufferGenerator("intermediate processing", config.intermediateComputeSizeBytes()),
config.getNumThreads(),
config.poolCacheMaxCount()

View File

@ -178,6 +178,7 @@ public class CalciteTests
TopNQuery.class,
new TopNQueryRunnerFactory(
new StupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
new Supplier<ByteBuffer>()
{
@Override