Recycler: better lifecycle control for pooled instances (#5214)

- introduce additional destroy() callback that allows better control
over internals of recycled data
- introduced AbstractRecyclerC as superclass for Recycler factories
(Adrien) with empty destroy() by default
- added tests for destroy()
- cleaned up Recycler tests (reduce copy&paste)
This commit is contained in:
Holger Hoffstätte 2014-02-24 17:59:50 +01:00 committed by Holger Hoffstaette
parent 16e350b266
commit bac09e2926
16 changed files with 229 additions and 66 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.base.Strings;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.recycler.AbstractRecyclerC;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -71,135 +72,140 @@ public class CacheRecycler extends AbstractComponent {
int smartSize = settings.getAsInt("smart_size", 1024);
final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
hashMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<ObjectObjectOpenHashMap>() {
hashMap = build(type, limit, smartSize, availableProcessors, new AbstractRecyclerC<ObjectObjectOpenHashMap>() {
@Override
public ObjectObjectOpenHashMap newInstance(int sizing) {
return new ObjectObjectOpenHashMap(size(sizing));
}
@Override
public void clear(ObjectObjectOpenHashMap value) {
public void recycle(ObjectObjectOpenHashMap value) {
value.clear();
}
});
hashSet = build(type, limit, smartSize, availableProcessors, new Recycler.C<ObjectOpenHashSet>() {
hashSet = build(type, limit, smartSize, availableProcessors, new AbstractRecyclerC<ObjectOpenHashSet>() {
@Override
public ObjectOpenHashSet newInstance(int sizing) {
return new ObjectOpenHashSet(size(sizing), 0.5f);
}
@Override
public void clear(ObjectOpenHashSet value) {
public void recycle(ObjectOpenHashSet value) {
value.clear();
}
});
doubleObjectMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<DoubleObjectOpenHashMap>() {
doubleObjectMap = build(type, limit, smartSize, availableProcessors, new AbstractRecyclerC<DoubleObjectOpenHashMap>() {
@Override
public DoubleObjectOpenHashMap newInstance(int sizing) {
return new DoubleObjectOpenHashMap(size(sizing));
}
@Override
public void clear(DoubleObjectOpenHashMap value) {
public void recycle(DoubleObjectOpenHashMap value) {
value.clear();
}
});
longObjectMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<LongObjectOpenHashMap>() {
longObjectMap = build(type, limit, smartSize, availableProcessors, new AbstractRecyclerC<LongObjectOpenHashMap>() {
@Override
public LongObjectOpenHashMap newInstance(int sizing) {
return new LongObjectOpenHashMap(size(sizing));
}
@Override
public void clear(LongObjectOpenHashMap value) {
public void recycle(LongObjectOpenHashMap value) {
value.clear();
}
});
longLongMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<LongLongOpenHashMap>() {
longLongMap = build(type, limit, smartSize, availableProcessors, new AbstractRecyclerC<LongLongOpenHashMap>() {
@Override
public LongLongOpenHashMap newInstance(int sizing) {
return new LongLongOpenHashMap(size(sizing));
}
@Override
public void clear(LongLongOpenHashMap value) {
public void recycle(LongLongOpenHashMap value) {
value.clear();
}
});
intIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<IntIntOpenHashMap>() {
intIntMap = build(type, limit, smartSize, availableProcessors, new AbstractRecyclerC<IntIntOpenHashMap>() {
@Override
public IntIntOpenHashMap newInstance(int sizing) {
return new IntIntOpenHashMap(size(sizing));
}
@Override
public void clear(IntIntOpenHashMap value) {
public void recycle(IntIntOpenHashMap value) {
value.clear();
}
});
floatIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<FloatIntOpenHashMap>() {
floatIntMap = build(type, limit, smartSize, availableProcessors, new AbstractRecyclerC<FloatIntOpenHashMap>() {
@Override
public FloatIntOpenHashMap newInstance(int sizing) {
return new FloatIntOpenHashMap(size(sizing));
}
@Override
public void clear(FloatIntOpenHashMap value) {
public void recycle(FloatIntOpenHashMap value) {
value.clear();
}
});
doubleIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<DoubleIntOpenHashMap>() {
doubleIntMap = build(type, limit, smartSize, availableProcessors, new AbstractRecyclerC<DoubleIntOpenHashMap>() {
@Override
public DoubleIntOpenHashMap newInstance(int sizing) {
return new DoubleIntOpenHashMap(size(sizing));
}
@Override
public void clear(DoubleIntOpenHashMap value) {
public void recycle(DoubleIntOpenHashMap value) {
value.clear();
}
});
longIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<LongIntOpenHashMap>() {
longIntMap = build(type, limit, smartSize, availableProcessors, new AbstractRecyclerC<LongIntOpenHashMap>() {
@Override
public LongIntOpenHashMap newInstance(int sizing) {
return new LongIntOpenHashMap(size(sizing));
}
@Override
public void clear(LongIntOpenHashMap value) {
public void recycle(LongIntOpenHashMap value) {
value.clear();
}
@Override
public void destroy(LongIntOpenHashMap value) {
// drop instance for GC
}
});
objectIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<ObjectIntOpenHashMap>() {
objectIntMap = build(type, limit, smartSize, availableProcessors, new AbstractRecyclerC<ObjectIntOpenHashMap>() {
@Override
public ObjectIntOpenHashMap newInstance(int sizing) {
return new ObjectIntOpenHashMap(size(sizing));
}
@Override
public void clear(ObjectIntOpenHashMap value) {
public void recycle(ObjectIntOpenHashMap value) {
value.clear();
}
});
intObjectMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<IntObjectOpenHashMap>() {
intObjectMap = build(type, limit, smartSize, availableProcessors, new AbstractRecyclerC<IntObjectOpenHashMap>() {
@Override
public IntObjectOpenHashMap newInstance(int sizing) {
return new IntObjectOpenHashMap(size(sizing));
}
@Override
public void clear(IntObjectOpenHashMap value) {
public void recycle(IntObjectOpenHashMap value) {
value.clear();
}
});
objectFloatMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<ObjectFloatOpenHashMap>() {
objectFloatMap = build(type, limit, smartSize, availableProcessors, new AbstractRecyclerC<ObjectFloatOpenHashMap>() {
@Override
public ObjectFloatOpenHashMap newInstance(int sizing) {
return new ObjectFloatOpenHashMap(size(sizing));
}
@Override
public void clear(ObjectFloatOpenHashMap value) {
public void recycle(ObjectFloatOpenHashMap value) {
value.clear();
}
});

View File

@ -19,6 +19,8 @@
package org.elasticsearch.cache.recycler;
import org.elasticsearch.common.recycler.AbstractRecyclerC;
import com.google.common.base.Strings;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.component.AbstractComponent;
@ -107,45 +109,53 @@ public class PageCacheRecycler extends AbstractComponent {
final double totalWeight = bytesWeight + intsWeight + longsWeight + doublesWeight + objectsWeight;
bytePage = build(type, maxCount(limit, BigArrays.BYTE_PAGE_SIZE, bytesWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C<byte[]>() {
bytePage = build(type, maxCount(limit, BigArrays.BYTE_PAGE_SIZE, bytesWeight, totalWeight), searchThreadPoolSize, availableProcessors, new AbstractRecyclerC<byte[]>() {
@Override
public byte[] newInstance(int sizing) {
return new byte[BigArrays.BYTE_PAGE_SIZE];
}
@Override
public void clear(byte[] value) {}
public void recycle(byte[] value) {
// nothing to do
}
});
intPage = build(type, maxCount(limit, BigArrays.INT_PAGE_SIZE, intsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C<int[]>() {
intPage = build(type, maxCount(limit, BigArrays.INT_PAGE_SIZE, intsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new AbstractRecyclerC<int[]>() {
@Override
public int[] newInstance(int sizing) {
return new int[BigArrays.INT_PAGE_SIZE];
}
@Override
public void clear(int[] value) {}
public void recycle(int[] value) {
// nothing to do
}
});
longPage = build(type, maxCount(limit, BigArrays.LONG_PAGE_SIZE, longsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C<long[]>() {
longPage = build(type, maxCount(limit, BigArrays.LONG_PAGE_SIZE, longsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new AbstractRecyclerC<long[]>() {
@Override
public long[] newInstance(int sizing) {
return new long[BigArrays.LONG_PAGE_SIZE];
}
@Override
public void clear(long[] value) {}
public void recycle(long[] value) {
// nothing to do
}
});
doublePage = build(type, maxCount(limit, BigArrays.DOUBLE_PAGE_SIZE, doublesWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C<double[]>() {
doublePage = build(type, maxCount(limit, BigArrays.DOUBLE_PAGE_SIZE, doublesWeight, totalWeight), searchThreadPoolSize, availableProcessors, new AbstractRecyclerC<double[]>() {
@Override
public double[] newInstance(int sizing) {
return new double[BigArrays.DOUBLE_PAGE_SIZE];
}
@Override
public void clear(double[] value) {}
public void recycle(double[] value) {
// nothing to do
}
});
objectPage = build(type, maxCount(limit, BigArrays.OBJECT_PAGE_SIZE, objectsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C<Object[]>() {
objectPage = build(type, maxCount(limit, BigArrays.OBJECT_PAGE_SIZE, objectsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new AbstractRecyclerC<Object[]>() {
@Override
public Object[] newInstance(int sizing) {
return new Object[BigArrays.OBJECT_PAGE_SIZE];
}
@Override
public void clear(Object[] value) {
public void recycle(Object[] value) {
Arrays.fill(value, null); // we need to remove the strong refs on the objects stored in the array
}
});

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.recycler;
public abstract class AbstractRecyclerC<T> implements Recycler.C<T> {
public abstract T newInstance(int sizing);
public abstract void recycle(T value);
public void destroy(T value) {
// by default we simply drop the object for GC.
}
}

View File

@ -39,6 +39,11 @@ public class DequeRecycler<T> extends AbstractRecycler<T> {
@Override
public void close() {
// call destroy() for every cached object
for (T t : deque) {
c.destroy(t);
}
// finally get rid of all references
deque.clear();
}
@ -57,7 +62,9 @@ public class DequeRecycler<T> extends AbstractRecycler<T> {
}
/** Called after a release. */
protected void afterRelease(boolean recycled) {}
protected void afterRelease(boolean recycled) {
// nothing to do
}
private class DV implements Recycler.V<T> {
@ -86,9 +93,12 @@ public class DequeRecycler<T> extends AbstractRecycler<T> {
}
final boolean recycle = beforeRelease();
if (recycle) {
c.clear(value);
c.recycle(value);
deque.addFirst(value);
}
else {
c.destroy(value);
}
value = null;
afterRelease(recycle);
return true;

View File

@ -36,7 +36,7 @@ public class NoneRecycler<T> extends AbstractRecycler<T> {
@Override
public void close() {
// no-op
}
public static class NV<T> implements Recycler.V<T> {

View File

@ -34,8 +34,11 @@ public interface Recycler<T> {
/** Create a new empty instance of the given size. */
T newInstance(int sizing);
/** Clear the data. This operation is called when the data-structure is released. */
void clear(T value);
/** Recycle the data. This operation is called when the data structure is released. */
void recycle(T value);
/** Destroy the data. This operation allows the data structure to release any internal resources before GC. */
void destroy(T value);
}
public static interface V<T> extends Releasable {

View File

@ -104,6 +104,7 @@ public enum Recyclers {
@Override
public void close() {
recyclers.get().close();
recyclers.close();
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.benchmark.common.recycler;
import org.elasticsearch.common.recycler.AbstractRecyclerC;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.recycler.Recycler;
@ -75,7 +77,7 @@ public class RecyclerBenchmark {
public static void main(String[] args) throws InterruptedException {
final int limit = 100;
final Recycler.C<Object> c = new Recycler.C<Object>() {
final Recycler.C<Object> c = new AbstractRecyclerC<Object>() {
@Override
public Object newInstance(int sizing) {
@ -83,7 +85,9 @@ public class RecyclerBenchmark {
}
@Override
public void clear(Object value) {}
public void recycle(Object value) {
// do nothing
}
};
final ImmutableMap<String, Recycler<Object>> recyclers = ImmutableMap.<String, Recycler<Object>>builder()

View File

@ -20,60 +20,103 @@
package org.elasticsearch.common.recycler;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.recycler.Recycler.V;
import org.elasticsearch.test.ElasticsearchTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public abstract class AbstractRecyclerTests extends ElasticsearchTestCase {
protected static final Recycler.C<byte[]> RECYCLER_C = new Recycler.C<byte[]>() {
// marker states for data
protected static final byte FRESH = 1;
protected static final byte RECYCLED = 2;
protected static final byte DEAD = 42;
protected static final Recycler.C<byte[]> RECYCLER_C = new AbstractRecyclerC<byte[]>() {
@Override
public byte[] newInstance(int sizing) {
return new byte[10];
byte[] value = new byte[10];
// "fresh" is intentionally not 0 to ensure we covered this code path
Arrays.fill(value, FRESH);
return value;
}
@Override
public void clear(byte[] value) {
Arrays.fill(value, (byte) 0);
public void recycle(byte[] value) {
Arrays.fill(value, RECYCLED);
}
@Override
public void destroy(byte[] value) {
// we cannot really free the internals of a byte[], so mark it for verification
Arrays.fill(value, DEAD);
}
};
protected abstract Recycler<byte[]> newRecycler();
protected void assertFresh(byte[] data) {
assertNotNull(data);
for (int i = 0; i < data.length; ++i) {
assertEquals(FRESH, data[i]);
}
}
protected void assertRecycled(byte[] data) {
assertNotNull(data);
for (int i = 0; i < data.length; ++i) {
assertEquals(RECYCLED, data[i]);
}
}
protected void assertDead(byte[] data) {
assertNotNull(data);
for (int i = 0; i < data.length; ++i) {
assertEquals(DEAD, data[i]);
}
}
protected abstract Recycler<byte[]> newRecycler(int limit);
protected int limit = randomIntBetween(5, 10);
public void testReuse() {
Recycler<byte[]> r = newRecycler();
Recycler<byte[]> r = newRecycler(limit);
Recycler.V<byte[]> o = r.obtain();
assertFalse(o.isRecycled());
final byte[] b1 = o.v();
assertFresh(b1);
o.release();
assertRecycled(b1);
o = r.obtain();
final byte[] b2 = o.v();
if (o.isRecycled()) {
assertRecycled(b2);
assertSame(b1, b2);
} else {
assertFresh(b2);
assertNotSame(b1, b2);
}
o.release();
r.close();
}
public void testClear() {
Recycler<byte[]> r = newRecycler();
public void testRecycle() {
Recycler<byte[]> r = newRecycler(limit);
Recycler.V<byte[]> o = r.obtain();
assertFresh(o.v());
getRandom().nextBytes(o.v());
o.release();
o = r.obtain();
for (int i = 0; i < o.v().length; ++i) {
assertEquals(0, o.v()[i]);
}
assertRecycled(o.v());
o.release();
r.close();
}
public void testDoubleRelease() {
final Recycler<byte[]> r = newRecycler();
final Recycler<byte[]> r = newRecycler(limit);
final Recycler.V<byte[]> v1 = r.obtain();
v1.release();
try {
@ -89,4 +132,50 @@ public abstract class AbstractRecyclerTests extends ElasticsearchTestCase {
r.close();
}
public void testDestroyWhenOverCapacity() {
Recycler<byte[]> r = newRecycler(limit);
// get & keep reference to new/recycled data
Recycler.V<byte[]> o = r.obtain();
byte[] data = o.v();
assertFresh(data);
// now exhaust the recycler
List<V<byte[]>> vals = new ArrayList<V<byte[]>>(limit);
for (int i = 0; i < limit ; ++i) {
vals.add(r.obtain());
}
// Recycler size increases on release, not on obtain!
for (V<byte[]> v: vals) {
v.release();
}
// release first ref, verify for destruction
o.release();
assertDead(data);
// close the rest
r.close();
}
public void testClose() {
Recycler<byte[]> r = newRecycler(limit);
// get & keep reference to pooled data
Recycler.V<byte[]> o = r.obtain();
byte[] data = o.v();
assertFresh(data);
// randomize & return to pool
getRandom().nextBytes(data);
o.release();
// verify that recycle() ran
assertRecycled(data);
// closing the recycler should mark recycled instances via destroy()
r.close();
assertDead(data);
}
}

View File

@ -22,8 +22,8 @@ package org.elasticsearch.common.recycler;
public class ConcurrentRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return Recyclers.concurrent(Recyclers.dequeFactory(RECYCLER_C, randomIntBetween(5, 10)), randomIntBetween(1,5));
protected Recycler<byte[]> newRecycler(int limit) {
return Recyclers.concurrent(Recyclers.dequeFactory(RECYCLER_C, limit), randomIntBetween(1,5));
}
}

View File

@ -22,8 +22,8 @@ package org.elasticsearch.common.recycler;
public class LockedRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return Recyclers.locked(Recyclers.deque(RECYCLER_C, randomIntBetween(5, 10)));
protected Recycler<byte[]> newRecycler(int limit) {
return Recyclers.locked(Recyclers.deque(RECYCLER_C, limit));
}
}

View File

@ -22,8 +22,16 @@ package org.elasticsearch.common.recycler;
public class NoneRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
protected Recycler<byte[]> newRecycler(int limit) {
return Recyclers.none(RECYCLER_C);
}
protected void assertRecycled(byte[] data) {
// will never match
}
protected void assertDead(byte[] data) {
// will never match
}
}

View File

@ -22,8 +22,8 @@ package org.elasticsearch.common.recycler;
public class QueueRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return Recyclers.concurrentDeque(RECYCLER_C, randomIntBetween(5, 10));
protected Recycler<byte[]> newRecycler(int limit) {
return Recyclers.concurrentDeque(RECYCLER_C, limit);
}
}

View File

@ -22,8 +22,8 @@ package org.elasticsearch.common.recycler;
public class SoftConcurrentRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return Recyclers.concurrent(Recyclers.softFactory(Recyclers.dequeFactory(RECYCLER_C, randomIntBetween(5, 10))), randomIntBetween(1, 5));
protected Recycler<byte[]> newRecycler(int limit) {
return Recyclers.concurrent(Recyclers.softFactory(Recyclers.dequeFactory(RECYCLER_C, limit)), randomIntBetween(1, 5));
}
}

View File

@ -22,8 +22,8 @@ package org.elasticsearch.common.recycler;
public class SoftThreadLocalRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return Recyclers.threadLocal(Recyclers.softFactory(Recyclers.dequeFactory(RECYCLER_C, 10)));
protected Recycler<byte[]> newRecycler(int limit) {
return Recyclers.threadLocal(Recyclers.softFactory(Recyclers.dequeFactory(RECYCLER_C, limit)));
}
}

View File

@ -22,8 +22,8 @@ package org.elasticsearch.common.recycler;
public class ThreadLocalRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return Recyclers.threadLocal(Recyclers.dequeFactory(RECYCLER_C, 10));
protected Recycler<byte[]> newRecycler(int limit) {
return Recyclers.threadLocal(Recyclers.dequeFactory(RECYCLER_C, limit));
}
}