Change the default recycler type.

Recycling is not thread-local anymore by default but instead there are several
pools of objects to recycle that threads may use depending on their id.
Each pool is protected by its own lock so up to ${number of pools} threads may
recycler objects concurrently.

Recyclers have also been refactored for better composability, for example there
is a soft recycler that creates a recycler that wraps data around a
SoftReference and a thread-local recycler that can take any factory or recyclers
and instantiates a dedicated instance per thread.

RecyclerBenchmark has been added to try to figure out the overhead of object
recycling depending on the recycler type and the number of threads trying to
recycle objects concurrently.

Close #4647
This commit is contained in:
Adrien Grand 2014-01-07 14:42:44 +01:00
parent 3ab73ab957
commit e01f8c250d
20 changed files with 768 additions and 331 deletions

View File

@ -24,11 +24,14 @@ import com.google.common.base.Strings;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.recycler.*;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import java.util.Locale;
import static org.elasticsearch.common.recycler.Recyclers.*;
@SuppressWarnings("unchecked")
public class CacheRecycler extends AbstractComponent {
@ -66,8 +69,9 @@ public class CacheRecycler extends AbstractComponent {
final Type type = Type.parse(settings.get("type"));
int limit = settings.getAsInt("limit", 10);
int smartSize = settings.getAsInt("smart_size", 1024);
final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
hashMap = build(type, limit, smartSize, new Recycler.C<ObjectObjectOpenHashMap>() {
hashMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<ObjectObjectOpenHashMap>() {
@Override
public ObjectObjectOpenHashMap newInstance(int sizing) {
return new ObjectObjectOpenHashMap(size(sizing));
@ -78,7 +82,7 @@ public class CacheRecycler extends AbstractComponent {
value.clear();
}
});
hashSet = build(type, limit, smartSize, new Recycler.C<ObjectOpenHashSet>() {
hashSet = build(type, limit, smartSize, availableProcessors, new Recycler.C<ObjectOpenHashSet>() {
@Override
public ObjectOpenHashSet newInstance(int sizing) {
return new ObjectOpenHashSet(size(sizing), 0.5f);
@ -89,7 +93,7 @@ public class CacheRecycler extends AbstractComponent {
value.clear();
}
});
doubleObjectMap = build(type, limit, smartSize, new Recycler.C<DoubleObjectOpenHashMap>() {
doubleObjectMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<DoubleObjectOpenHashMap>() {
@Override
public DoubleObjectOpenHashMap newInstance(int sizing) {
return new DoubleObjectOpenHashMap(size(sizing));
@ -100,7 +104,7 @@ public class CacheRecycler extends AbstractComponent {
value.clear();
}
});
longObjectMap = build(type, limit, smartSize, new Recycler.C<LongObjectOpenHashMap>() {
longObjectMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<LongObjectOpenHashMap>() {
@Override
public LongObjectOpenHashMap newInstance(int sizing) {
return new LongObjectOpenHashMap(size(sizing));
@ -111,7 +115,7 @@ public class CacheRecycler extends AbstractComponent {
value.clear();
}
});
longLongMap = build(type, limit, smartSize, new Recycler.C<LongLongOpenHashMap>() {
longLongMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<LongLongOpenHashMap>() {
@Override
public LongLongOpenHashMap newInstance(int sizing) {
return new LongLongOpenHashMap(size(sizing));
@ -122,7 +126,7 @@ public class CacheRecycler extends AbstractComponent {
value.clear();
}
});
intIntMap = build(type, limit, smartSize, new Recycler.C<IntIntOpenHashMap>() {
intIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<IntIntOpenHashMap>() {
@Override
public IntIntOpenHashMap newInstance(int sizing) {
return new IntIntOpenHashMap(size(sizing));
@ -133,7 +137,7 @@ public class CacheRecycler extends AbstractComponent {
value.clear();
}
});
floatIntMap = build(type, limit, smartSize, new Recycler.C<FloatIntOpenHashMap>() {
floatIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<FloatIntOpenHashMap>() {
@Override
public FloatIntOpenHashMap newInstance(int sizing) {
return new FloatIntOpenHashMap(size(sizing));
@ -144,7 +148,7 @@ public class CacheRecycler extends AbstractComponent {
value.clear();
}
});
doubleIntMap = build(type, limit, smartSize, new Recycler.C<DoubleIntOpenHashMap>() {
doubleIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<DoubleIntOpenHashMap>() {
@Override
public DoubleIntOpenHashMap newInstance(int sizing) {
return new DoubleIntOpenHashMap(size(sizing));
@ -155,7 +159,7 @@ public class CacheRecycler extends AbstractComponent {
value.clear();
}
});
longIntMap = build(type, limit, smartSize, new Recycler.C<LongIntOpenHashMap>() {
longIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<LongIntOpenHashMap>() {
@Override
public LongIntOpenHashMap newInstance(int sizing) {
return new LongIntOpenHashMap(size(sizing));
@ -166,7 +170,7 @@ public class CacheRecycler extends AbstractComponent {
value.clear();
}
});
objectIntMap = build(type, limit, smartSize, new Recycler.C<ObjectIntOpenHashMap>() {
objectIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<ObjectIntOpenHashMap>() {
@Override
public ObjectIntOpenHashMap newInstance(int sizing) {
return new ObjectIntOpenHashMap(size(sizing));
@ -177,7 +181,7 @@ public class CacheRecycler extends AbstractComponent {
value.clear();
}
});
intObjectMap = build(type, limit, smartSize, new Recycler.C<IntObjectOpenHashMap>() {
intObjectMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<IntObjectOpenHashMap>() {
@Override
public IntObjectOpenHashMap newInstance(int sizing) {
return new IntObjectOpenHashMap(size(sizing));
@ -188,7 +192,7 @@ public class CacheRecycler extends AbstractComponent {
value.clear();
}
});
objectFloatMap = build(type, limit, smartSize, new Recycler.C<ObjectFloatOpenHashMap>() {
objectFloatMap = build(type, limit, smartSize, availableProcessors, new Recycler.C<ObjectFloatOpenHashMap>() {
@Override
public ObjectFloatOpenHashMap newInstance(int sizing) {
return new ObjectFloatOpenHashMap(size(sizing));
@ -253,12 +257,12 @@ public class CacheRecycler extends AbstractComponent {
return sizing > 0 ? sizing : 256;
}
private <T> Recycler<T> build(Type type, int limit, int smartSize, Recycler.C<T> c) {
private <T> Recycler<T> build(Type type, int limit, int smartSize, int availableProcessors, Recycler.C<T> c) {
Recycler<T> recycler;
try {
recycler = type.build(c, limit);
recycler = type.build(c, limit, availableProcessors);
if (smartSize > 0) {
recycler = new Recycler.Sizing<T>(recycler, smartSize);
recycler = sizing(recycler, none(c), smartSize);
}
} catch (IllegalArgumentException ex) {
throw new ElasticsearchIllegalArgumentException("no type support [" + type + "] for recycler");
@ -270,40 +274,44 @@ public class CacheRecycler extends AbstractComponent {
public static enum Type {
SOFT_THREAD_LOCAL {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit) {
return new SoftThreadLocalRecycler<T>(c, limit);
}
@Override
boolean perThread() {
return true;
<T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
return threadLocal(softFactory(dequeFactory(c, limit)));
}
},
THREAD_LOCAL {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit) {
return new ThreadLocalRecycler<T>(c, limit);
}
@Override
boolean perThread() {
return true;
<T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
return threadLocal(dequeFactory(c, limit));
}
},
QUEUE {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit) {
return new QueueRecycler<T>(c, limit);
<T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
return concurrentDeque(c, limit);
}
},
SOFT_CONCURRENT {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
return concurrent(softFactory(dequeFactory(c, limit)), availableProcessors);
}
},
CONCURRENT {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
return concurrent(dequeFactory(c, limit), availableProcessors);
}
},
NONE {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit) {
return new NoneRecycler<T>(c);
<T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
return none(c);
}
};
public static Type parse(String type) {
if (Strings.isNullOrEmpty(type)) {
return SOFT_THREAD_LOCAL;
return SOFT_CONCURRENT;
}
try {
return Type.valueOf(type.toUpperCase(Locale.ROOT));
@ -312,9 +320,6 @@ public class CacheRecycler extends AbstractComponent {
}
}
abstract <T> Recycler<T> build(Recycler.C<T> c, int limit);
boolean perThread() {
return false;
}
abstract <T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors);
}
}

View File

@ -19,18 +19,20 @@
package org.elasticsearch.cache.recycler;
import org.elasticsearch.cache.recycler.CacheRecycler.Type;
import com.google.common.base.Strings;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.recycler.NoneRecycler;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Locale;
import static org.elasticsearch.common.recycler.Recyclers.*;
/** A recycler of fixed-size pages. */
public class PageCacheRecycler extends AbstractComponent {
@ -79,17 +81,9 @@ public class PageCacheRecycler extends AbstractComponent {
public PageCacheRecycler(Settings settings, ThreadPool threadPool) {
super(settings);
final Type type = Type.parse(componentSettings.get(TYPE));
long limit = componentSettings.getAsMemory(LIMIT_HEAP, "10%").bytes();
if (type.perThread()) {
final long limitPerThread = componentSettings.getAsBytesSize(LIMIT_PER_THREAD, new ByteSizeValue(-1)).bytes();
if (limitPerThread != -1) {
// if the per_thread limit is set, it has precedence
limit = limitPerThread;
} else {
// divide memory equally to all search threads
limit /= maximumSearchThreadPoolSize(threadPool, settings);
}
}
final long limit = componentSettings.getAsMemory(LIMIT_HEAP, "10%").bytes();
final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
final int searchThreadPoolSize = maximumSearchThreadPoolSize(threadPool, settings);
// We have a global amount of memory that we need to divide across data types.
// Since some types are more useful than other ones we give them different weights.
@ -113,7 +107,7 @@ public class PageCacheRecycler extends AbstractComponent {
final double totalWeight = bytesWeight + intsWeight + longsWeight + doublesWeight + objectsWeight;
bytePage = build(type, maxCount(limit, BigArrays.BYTE_PAGE_SIZE, bytesWeight, totalWeight), new Recycler.C<byte[]>() {
bytePage = build(type, maxCount(limit, BigArrays.BYTE_PAGE_SIZE, bytesWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C<byte[]>() {
@Override
public byte[] newInstance(int sizing) {
return new byte[BigArrays.BYTE_PAGE_SIZE];
@ -121,7 +115,7 @@ public class PageCacheRecycler extends AbstractComponent {
@Override
public void clear(byte[] value) {}
});
intPage = build(type, maxCount(limit, BigArrays.INT_PAGE_SIZE, intsWeight, totalWeight), new Recycler.C<int[]>() {
intPage = build(type, maxCount(limit, BigArrays.INT_PAGE_SIZE, intsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C<int[]>() {
@Override
public int[] newInstance(int sizing) {
return new int[BigArrays.INT_PAGE_SIZE];
@ -129,7 +123,7 @@ public class PageCacheRecycler extends AbstractComponent {
@Override
public void clear(int[] value) {}
});
longPage = build(type, maxCount(limit, BigArrays.LONG_PAGE_SIZE, longsWeight, totalWeight), new Recycler.C<long[]>() {
longPage = build(type, maxCount(limit, BigArrays.LONG_PAGE_SIZE, longsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C<long[]>() {
@Override
public long[] newInstance(int sizing) {
return new long[BigArrays.LONG_PAGE_SIZE];
@ -137,7 +131,7 @@ public class PageCacheRecycler extends AbstractComponent {
@Override
public void clear(long[] value) {}
});
doublePage = build(type, maxCount(limit, BigArrays.DOUBLE_PAGE_SIZE, doublesWeight, totalWeight), new Recycler.C<double[]>() {
doublePage = build(type, maxCount(limit, BigArrays.DOUBLE_PAGE_SIZE, doublesWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C<double[]>() {
@Override
public double[] newInstance(int sizing) {
return new double[BigArrays.DOUBLE_PAGE_SIZE];
@ -145,7 +139,7 @@ public class PageCacheRecycler extends AbstractComponent {
@Override
public void clear(double[] value) {}
});
objectPage = build(type, maxCount(limit, BigArrays.OBJECT_PAGE_SIZE, objectsWeight, totalWeight), new Recycler.C<Object[]>() {
objectPage = build(type, maxCount(limit, BigArrays.OBJECT_PAGE_SIZE, objectsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C<Object[]>() {
@Override
public Object[] newInstance(int sizing) {
return new Object[BigArrays.OBJECT_PAGE_SIZE];
@ -194,13 +188,65 @@ public class PageCacheRecycler extends AbstractComponent {
return objectPage.obtain();
}
private static <T> Recycler<T> build(Type type, int limit, Recycler.C<T> c) {
private static <T> Recycler<T> build(Type type, int limit, int estimatedThreadPoolSize, int availableProcessors, Recycler.C<T> c) {
final Recycler<T> recycler;
if (limit == 0) {
recycler = new NoneRecycler<T>(c);
recycler = none(c);
} else {
recycler = type.build(c, limit);
recycler = type.build(c, limit, estimatedThreadPoolSize, availableProcessors);
}
return recycler;
}
public static enum Type {
SOFT_THREAD_LOCAL {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int estimatedThreadPoolSize, int availableProcessors) {
return threadLocal(softFactory(dequeFactory(c, limit / estimatedThreadPoolSize)));
}
},
THREAD_LOCAL {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int estimatedThreadPoolSize, int availableProcessors) {
return threadLocal(dequeFactory(c, limit / estimatedThreadPoolSize));
}
},
QUEUE {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int estimatedThreadPoolSize, int availableProcessors) {
return concurrentDeque(c, limit);
}
},
SOFT_CONCURRENT {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int estimatedThreadPoolSize, int availableProcessors) {
return concurrent(softFactory(dequeFactory(c, limit / availableProcessors)), availableProcessors);
}
},
CONCURRENT {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int estimatedThreadPoolSize, int availableProcessors) {
return concurrent(dequeFactory(c, limit / availableProcessors), availableProcessors);
}
},
NONE {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int estimatedThreadPoolSize, int availableProcessors) {
return none(c);
}
};
public static Type parse(String type) {
if (Strings.isNullOrEmpty(type)) {
return SOFT_CONCURRENT;
}
try {
return Type.valueOf(type.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new ElasticsearchIllegalArgumentException("no type support [" + type + "]");
}
}
abstract <T> Recycler<T> build(Recycler.C<T> c, int limit, int estimatedThreadPoolSize, int availableProcessors);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.recycler;
abstract class AbstractRecycler<T> implements Recycler<T> {
protected final Recycler.C<T> c;
protected AbstractRecycler(Recycler.C<T> c) {
this.c = c;
}
public V<T> obtain() {
return obtain(-1);
}
@Override
public void close() {
// no-op by default
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.recycler;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A {@link Recycler} implementation based on a concurrent {@link Deque}. This implementation is thread-safe.
*/
public class ConcurrentDequeRecycler<T> extends DequeRecycler<T> {
// we maintain size separately because concurrent deque implementations typically have linear-time size() impls
final AtomicInteger size;
public ConcurrentDequeRecycler(C<T> c, int maxSize) {
super(c, ConcurrentCollections.<T>newDeque(), maxSize);
this.size = new AtomicInteger();
}
@Override
public void close() {
assert deque.size() == size.get();
super.close();
size.set(0);
}
@Override
public V<T> obtain(int sizing) {
final V<T> v = super.obtain(sizing);
if (v.isRecycled()) {
size.decrementAndGet();
}
return v;
}
@Override
protected boolean beforeRelease() {
return size.incrementAndGet() <= maxSize;
}
@Override
protected void afterRelease(boolean recycled) {
if (!recycled) {
size.decrementAndGet();
}
}
}

View File

@ -20,54 +20,51 @@
package org.elasticsearch.common.recycler;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Deque;
/**
* A {@link Recycler} implementation based on a {@link Deque}. This implementation is NOT thread-safe.
*/
public class QueueRecycler<T> extends Recycler<T> {
public class DequeRecycler<T> extends AbstractRecycler<T> {
final Queue<T> queue;
final AtomicInteger size;
final Deque<T> deque;
final int maxSize;
public QueueRecycler(C<T> c, int maxSize) {
this(c, ConcurrentCollections.<T>newQueue(), maxSize);
}
public QueueRecycler(C<T> c, Queue<T> queue, int maxSize) {
public DequeRecycler(C<T> c, Deque<T> queue, int maxSize) {
super(c);
this.queue = queue;
this.deque = queue;
this.maxSize = maxSize;
// we maintain size separately because concurrent queue implementations typically have linear-time size() impls
this.size = new AtomicInteger();
}
@Override
public void close() {
assert queue.size() == size.get();
queue.clear();
size.set(0);
deque.clear();
}
@Override
public V<T> obtain(int sizing) {
final T v = queue.poll();
final T v = deque.pollFirst();
if (v == null) {
return new QV(c.newInstance(sizing), false);
return new DV(c.newInstance(sizing), false);
}
size.decrementAndGet();
return new QV(v, true);
return new DV(v, true);
}
class QV implements Recycler.V<T> {
/** Called before releasing an object, returns true if the object should be recycled and false otherwise. */
protected boolean beforeRelease() {
return deque.size() < maxSize;
}
/** Called after a release. */
protected void afterRelease(boolean recycled) {}
private class DV implements Recycler.V<T> {
T value;
final boolean recycled;
QV(T value, boolean recycled) {
DV(T value, boolean recycled) {
this.value = value;
this.recycled = recycled;
}
@ -87,13 +84,13 @@ public class QueueRecycler<T> extends Recycler<T> {
if (value == null) {
throw new ElasticsearchIllegalStateException("recycler entry already released...");
}
if (size.incrementAndGet() <= maxSize) {
final boolean recycle = beforeRelease();
if (recycle) {
c.clear(value);
queue.offer(value);
} else {
size.decrementAndGet();
deque.addFirst(value);
}
value = null;
afterRelease(recycle);
return true;
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.recycler;
abstract class FilterRecycler<T> implements Recycler<T> {
/** Get the delegate instance to foward calls to. */
protected abstract Recycler<T> getDelegate();
/** Wrap a recycled reference. */
protected Recycler.V<T> wrap(Recycler.V<T> delegate) {
return delegate;
}
@Override
public Recycler.V<T> obtain(int sizing) {
return wrap(getDelegate().obtain(sizing));
}
@Override
public Recycler.V<T> obtain() {
return wrap(getDelegate().obtain());
}
@Override
public void close() {
getDelegate().close();
}
}

View File

@ -23,7 +23,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException;
/**
*/
public class NoneRecycler<T> extends Recycler<T> {
public class NoneRecycler<T> extends AbstractRecycler<T> {
public NoneRecycler(C<T> c) {
super(c);

View File

@ -23,31 +23,10 @@ import org.elasticsearch.common.lease.Releasable;
/**
*/
public abstract class Recycler<T> {
public interface Recycler<T> {
public static class Sizing<T> extends Recycler<T> {
private final Recycler<T> recycler;
private final int smartSize;
public Sizing(Recycler<T> recycler, int smartSize) {
super(recycler.c);
this.recycler = recycler;
this.smartSize = smartSize;
}
@Override
public void close() {
recycler.close();
}
@Override
public V<T> obtain(int sizing) {
if (sizing > 0 && sizing < smartSize) {
return new NoneRecycler.NV<T>(c.newInstance(sizing));
}
return recycler.obtain(sizing);
}
public static interface Factory<T> {
Recycler<T> build();
}
public static interface C<T> {
@ -69,17 +48,9 @@ public abstract class Recycler<T> {
}
protected final C<T> c;
void close();
protected Recycler(C<T> c) {
this.c = c;
}
public abstract void close();
public V<T> obtain() {
return obtain(-1);
}
public abstract V<T> obtain(int sizing);
V<T> obtain();
V<T> obtain(int sizing);
}

View File

@ -0,0 +1,252 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.recycler;
import com.carrotsearch.hppc.hash.MurmurHash3;
import com.google.common.collect.Queues;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import java.lang.ref.SoftReference;
public enum Recyclers {
;
/** Return a {@link Recycler} that never recycles entries. */
public static <T> Recycler<T> none(Recycler.C<T> c) {
return new NoneRecycler<T>(c);
}
/** Return a concurrent recycler based on a deque. */
public static <T> Recycler<T> concurrentDeque(Recycler.C<T> c, int limit) {
return new ConcurrentDequeRecycler<T>(c, limit);
}
/** Return a recycler based on a deque. */
public static <T> Recycler<T> deque(Recycler.C<T> c, int limit) {
return new DequeRecycler<T>(c, Queues.<T>newArrayDeque(), limit);
}
/** Return a recycler based on a deque. */
public static <T> Recycler.Factory<T> dequeFactory(final Recycler.C<T> c, final int limit) {
return new Recycler.Factory<T>() {
@Override
public Recycler<T> build() {
return deque(c, limit);
}
};
}
/** Wrap two recyclers and forward to calls to <code>smallObjectRecycler</code> when <code>size &lt; minSize</code> and to
* <code>defaultRecycler</code> otherwise. */
public static <T> Recycler<T> sizing(final Recycler<T> defaultRecycler, final Recycler<T> smallObjectRecycler, final int minSize) {
return new FilterRecycler<T>() {
@Override
protected Recycler<T> getDelegate() {
return defaultRecycler;
}
@Override
public Recycler.V<T> obtain(int sizing) {
if (sizing > 0 && sizing < minSize) {
return smallObjectRecycler.obtain(sizing);
}
return super.obtain(sizing);
}
@Override
public void close() {
defaultRecycler.close();
smallObjectRecycler.close();
}
};
}
/** Create a thread-local recycler, where each thread will have its own instance, create through the provided factory. */
public static <T> Recycler<T> threadLocal(final Recycler.Factory<T> factory) {
return new FilterRecycler<T>() {
private final CloseableThreadLocal<Recycler<T>> recyclers;
{
recyclers = new CloseableThreadLocal<Recycler<T>>() {
@Override
protected Recycler<T> initialValue() {
return factory.build();
}
};
}
@Override
protected Recycler<T> getDelegate() {
return recyclers.get();
}
@Override
public void close() {
recyclers.close();
}
};
}
/** Create a recycler that is wrapped inside a soft reference, so that it cannot cause {@link OutOfMemoryError}s. */
public static <T> Recycler<T> soft(final Recycler.Factory<T> factory) {
return new FilterRecycler<T>() {
SoftReference<Recycler<T>> ref;
{
ref = new SoftReference<Recycler<T>>(null);
}
@Override
protected Recycler<T> getDelegate() {
Recycler<T> recycler = ref.get();
if (recycler == null) {
recycler = factory.build();
ref = new SoftReference<Recycler<T>>(recycler);
}
return recycler;
}
};
}
/** Create a recycler that wraps data in a SoftReference.
* @see #soft(org.elasticsearch.common.recycler.Recycler.Factory) */
public static <T> Recycler.Factory<T> softFactory(final Recycler.Factory<T> factory) {
return new Recycler.Factory<T>() {
@Override
public Recycler<T> build() {
return soft(factory);
}
};
}
/** Wrap the provided recycler so that calls to {@link Recycler#obtain()} and {@link Recycler.V#release()} are protected by
* a lock. */
public static <T> Recycler<T> locked(final Recycler<T> recycler) {
return new FilterRecycler<T>() {
private final Object lock;
{
this.lock = new Object();
}
@Override
protected Recycler<T> getDelegate() {
return recycler;
}
@Override
public org.elasticsearch.common.recycler.Recycler.V<T> obtain(int sizing) {
synchronized (lock) {
return super.obtain(sizing);
}
}
@Override
public org.elasticsearch.common.recycler.Recycler.V<T> obtain() {
synchronized (lock) {
return super.obtain();
}
}
@Override
protected Recycler.V<T> wrap(final Recycler.V<T> delegate) {
return new Recycler.V<T>() {
@Override
public boolean release() throws ElasticsearchException {
synchronized (lock) {
return delegate.release();
}
}
@Override
public T v() {
return delegate.v();
}
@Override
public boolean isRecycled() {
return delegate.isRecycled();
}
};
}
};
}
/** Create a concurrent implementation that can support concurrent access from <code>concurrencyLevel</code> threads with little contention. */
public static <T> Recycler<T> concurrent(final Recycler.Factory<T> factory, final int concurrencyLevel) {
if (concurrencyLevel < 1) {
throw new ElasticsearchIllegalArgumentException("concurrencyLevel must be >= 1");
}
if (concurrencyLevel == 1) {
return locked(factory.build());
}
return new FilterRecycler<T>() {
private final Recycler<T>[] recyclers;
{
@SuppressWarnings("unchecked")
final Recycler<T>[] recyclers = new Recycler[concurrencyLevel];
this.recyclers = recyclers;
for (int i = 0; i < concurrencyLevel; ++i) {
recyclers[i] = locked(factory.build());
}
}
final int slot() {
final long id = Thread.currentThread().getId();
// don't trust Thread.hashCode to have equiprobable low bits
int slot = (int) MurmurHash3.hash(id);
// make positive, otherwise % may return negative numbers
slot &= 0x7FFFFFFF;
slot %= concurrencyLevel;
return slot;
}
@Override
protected Recycler<T> getDelegate() {
return recyclers[slot()];
}
@Override
public void close() {
for (Recycler<T> recycler : recyclers) {
recycler.close();
}
}
};
}
public static <T> Recycler<T> concurrent(final Recycler.Factory<T> factory) {
return concurrent(factory, Runtime.getRuntime().availableProcessors());
}
}

View File

@ -1,60 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.recycler;
import org.apache.lucene.util.CloseableThreadLocal;
import java.lang.ref.SoftReference;
/**
*/
public class SoftThreadLocalRecycler<T> extends Recycler<T> {
private final CloseableThreadLocal<SoftReference<ThreadLocalRecycler.Stack<T>>> threadLocal = new CloseableThreadLocal<SoftReference<ThreadLocalRecycler.Stack<T>>>();
final int stackLimit;
public SoftThreadLocalRecycler(C<T> c, int stackLimit) {
super(c);
this.stackLimit = stackLimit;
}
@Override
public void close() {
threadLocal.close();
}
@Override
public V<T> obtain(int sizing) {
SoftReference<ThreadLocalRecycler.Stack<T>> ref = threadLocal.get();
ThreadLocalRecycler.Stack<T> stack = (ref == null) ? null : ref.get();
if (stack == null) {
stack = new ThreadLocalRecycler.Stack<T>(stackLimit, Thread.currentThread());
threadLocal.set(new SoftReference<ThreadLocalRecycler.Stack<T>>(stack));
}
final T o = stack.pop();
if (o == null) {
return new ThreadLocalRecycler.TV<T>(stack, c, c.newInstance(sizing), false);
} else {
return new ThreadLocalRecycler.TV<T>(stack, c, o, true);
}
}
}

View File

@ -1,145 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.recycler;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.CloseableThreadLocal;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchIllegalStateException;
/**
*/
public class ThreadLocalRecycler<T> extends Recycler<T> {
private final CloseableThreadLocal<Stack<T>> threadLocal = new CloseableThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(stackLimit, Thread.currentThread());
}
};
final int stackLimit;
public ThreadLocalRecycler(C<T> c, int stackLimit) {
super(c);
this.stackLimit = stackLimit;
}
@Override
public void close() {
threadLocal.close();
}
@Override
public V<T> obtain(int sizing) {
Stack<T> stack = threadLocal.get();
final T o = stack.pop();
if (o == null) {
return new TV<T>(stack, c, c.newInstance(sizing), false);
} else {
return new TV<T>(stack, c, o, true);
}
}
static class TV<T> implements Recycler.V<T> {
final Stack<T> stack;
final C<T> c;
T value;
final boolean recycled;
TV(Stack<T> stack, C<T> c, T value, boolean recycled) {
this.stack = stack;
this.c = c;
this.value = value;
this.recycled = recycled;
}
@Override
public T v() {
return value;
}
@Override
public boolean isRecycled() {
return recycled;
}
@Override
public boolean release() {
assert Thread.currentThread() == stack.thread;
if (value == null) {
throw new ElasticsearchIllegalStateException("recycler entry already released...");
}
c.clear(value);
stack.push(value);
value = null;
return true;
}
}
static final class Stack<T> {
final int stackLimit;
final Thread thread;
private T[] elements;
private int size;
@SuppressWarnings({"unchecked", "SuspiciousArrayCast"})
Stack(int stackLimit, Thread thread) {
this.stackLimit = stackLimit;
this.thread = thread;
elements = newArray(stackLimit < 10 ? stackLimit : 10);
}
T pop() {
int size = this.size;
if (size == 0) {
return null;
}
size--;
T ret = elements[size];
elements[size] = null;
this.size = size;
return ret;
}
void push(T o) {
int size = this.size;
if (size == elements.length) {
if (size >= stackLimit) {
return;
}
T[] newElements = newArray(Math.min(stackLimit, ArrayUtil.oversize(size + 1, RamUsageEstimator.NUM_BYTES_OBJECT_REF)));
System.arraycopy(elements, 0, newElements, 0, size);
elements = newElements;
}
elements[size] = o;
this.size = size + 1;
}
@SuppressWarnings({"unchecked", "SuspiciousArrayCast"})
private static <T> T[] newArray(int length) {
return (T[]) new Object[length];
}
}
}

View File

@ -21,8 +21,10 @@ package org.elasticsearch.common.util.concurrent;
import com.google.common.collect.Sets;
import jsr166e.ConcurrentHashMapV8;
import jsr166y.ConcurrentLinkedDeque;
import jsr166y.LinkedTransferQueue;
import java.util.Deque;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@ -83,6 +85,10 @@ public abstract class ConcurrentCollections {
return new ConcurrentLinkedQueue<T>();
}
public static <T> Deque<T> newDeque() {
return new ConcurrentLinkedDeque<T>();
}
public static <T> BlockingQueue<T> newBlockingQueue() {
return new LinkedTransferQueue<T>();
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.common.recycler;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.recycler.Recycler;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.common.recycler.Recyclers.*;
/** Benchmark that tries to measure the overhead of object recycling depending on concurrent access. */
public class RecyclerBenchmark {
private static final long NUM_RECYCLES = 5000000L;
private static final Random RANDOM = new Random(0);
private static long bench(final Recycler<?> recycler, long numRecycles, int numThreads) throws InterruptedException {
final AtomicLong recycles = new AtomicLong(numRecycles);
final CountDownLatch latch = new CountDownLatch(1);
final Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; ++i){
// Thread ids happen to be generated sequentially, so we also generate random threads so that distribution of IDs
// is not perfect for the concurrent recycler
for (int j = RANDOM.nextInt(5); j >= 0; --j) {
new Thread();
}
threads[i] = new Thread() {
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
return;
}
while (recycles.getAndDecrement() > 0) {
final Recycler.V<?> v = recycler.obtain();
v.release();
}
}
};
}
for (Thread thread : threads) {
thread.start();
}
final long start = System.nanoTime();
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
return System.nanoTime() - start;
}
public static void main(String[] args) throws InterruptedException {
final int limit = 100;
final Recycler.C<Object> c = new Recycler.C<Object>() {
@Override
public Object newInstance(int sizing) {
return new Object();
}
@Override
public void clear(Object value) {}
};
final ImmutableMap<String, Recycler<Object>> recyclers = ImmutableMap.<String, Recycler<Object>>builder()
.put("none", none(c))
.put("concurrent-queue", concurrentDeque(c, limit))
.put("thread-local", threadLocal(dequeFactory(c, limit)))
.put("soft-thread-local", threadLocal(softFactory(dequeFactory(c, limit))))
.put("locked", locked(deque(c, limit)))
.put("concurrent", concurrent(dequeFactory(c, limit), Runtime.getRuntime().availableProcessors()))
.put("soft-concurrent", concurrent(softFactory(dequeFactory(c, limit)), Runtime.getRuntime().availableProcessors())).build();
// warmup
final long start = System.nanoTime();
while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10)) {
for (Recycler<?> recycler : recyclers.values()) {
bench(recycler, NUM_RECYCLES, 2);
}
}
// run
for (int numThreads = 1; numThreads <= 4 * Runtime.getRuntime().availableProcessors(); numThreads *= 2) {
System.out.println("## " + numThreads + " threads\n");
System.gc();
Thread.sleep(1000);
for (Recycler<?> recycler : recyclers.values()) {
bench(recycler, NUM_RECYCLES, numThreads);
}
for (int i = 0; i < 5; ++i) {
for (Map.Entry<String, Recycler<Object>> entry : recyclers.entrySet()) {
System.out.println(entry.getKey() + "\t" + TimeUnit.NANOSECONDS.toMillis(bench(entry.getValue(), NUM_RECYCLES, numThreads)));
}
System.out.println();
}
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.recycler;
public class ConcurrentRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return Recyclers.concurrent(Recyclers.dequeFactory(RECYCLER_C, randomIntBetween(5, 10)), randomIntBetween(1,5));
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.recycler;
public class LockedRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return Recyclers.locked(Recyclers.deque(RECYCLER_C, randomIntBetween(5, 10)));
}
}

View File

@ -23,7 +23,7 @@ public class NoneRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return new NoneRecycler<byte[]>(RECYCLER_C);
return Recyclers.none(RECYCLER_C);
}
}

View File

@ -23,7 +23,7 @@ public class QueueRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return new QueueRecycler<byte[]>(RECYCLER_C, randomIntBetween(5, 10));
return Recyclers.concurrentDeque(RECYCLER_C, randomIntBetween(5, 10));
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.recycler;
public class SoftConcurrentRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return Recyclers.concurrent(Recyclers.softFactory(Recyclers.dequeFactory(RECYCLER_C, randomIntBetween(5, 10))), randomIntBetween(1, 5));
}
}

View File

@ -23,7 +23,7 @@ public class SoftThreadLocalRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return new SoftThreadLocalRecycler<byte[]>(RECYCLER_C, randomIntBetween(5, 10));
return Recyclers.threadLocal(Recyclers.softFactory(Recyclers.dequeFactory(RECYCLER_C, 10)));
}
}

View File

@ -23,7 +23,7 @@ public class ThreadLocalRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return new ThreadLocalRecycler<byte[]>(RECYCLER_C, randomIntBetween(5, 10));
return Recyclers.threadLocal(Recyclers.dequeFactory(RECYCLER_C, 10));
}
}