diff --git a/src/main/java/org/elasticsearch/cache/recycler/CacheRecycler.java b/src/main/java/org/elasticsearch/cache/recycler/CacheRecycler.java index ef21ecb10b6..4eb12a8dba8 100644 --- a/src/main/java/org/elasticsearch/cache/recycler/CacheRecycler.java +++ b/src/main/java/org/elasticsearch/cache/recycler/CacheRecycler.java @@ -24,11 +24,14 @@ import com.google.common.base.Strings; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.recycler.*; +import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import java.util.Locale; +import static org.elasticsearch.common.recycler.Recyclers.*; + @SuppressWarnings("unchecked") public class CacheRecycler extends AbstractComponent { @@ -66,8 +69,9 @@ public class CacheRecycler extends AbstractComponent { final Type type = Type.parse(settings.get("type")); int limit = settings.getAsInt("limit", 10); int smartSize = settings.getAsInt("smart_size", 1024); + final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); - hashMap = build(type, limit, smartSize, new Recycler.C() { + hashMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public ObjectObjectOpenHashMap newInstance(int sizing) { return new ObjectObjectOpenHashMap(size(sizing)); @@ -78,7 +82,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - hashSet = build(type, limit, smartSize, new Recycler.C() { + hashSet = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public ObjectOpenHashSet newInstance(int sizing) { return new ObjectOpenHashSet(size(sizing), 0.5f); @@ -89,7 +93,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - doubleObjectMap = build(type, limit, smartSize, new Recycler.C() { + doubleObjectMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public DoubleObjectOpenHashMap newInstance(int sizing) { return new DoubleObjectOpenHashMap(size(sizing)); @@ -100,7 +104,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - longObjectMap = build(type, limit, smartSize, new Recycler.C() { + longObjectMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public LongObjectOpenHashMap newInstance(int sizing) { return new LongObjectOpenHashMap(size(sizing)); @@ -111,7 +115,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - longLongMap = build(type, limit, smartSize, new Recycler.C() { + longLongMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public LongLongOpenHashMap newInstance(int sizing) { return new LongLongOpenHashMap(size(sizing)); @@ -122,7 +126,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - intIntMap = build(type, limit, smartSize, new Recycler.C() { + intIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public IntIntOpenHashMap newInstance(int sizing) { return new IntIntOpenHashMap(size(sizing)); @@ -133,7 +137,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - floatIntMap = build(type, limit, smartSize, new Recycler.C() { + floatIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public FloatIntOpenHashMap newInstance(int sizing) { return new FloatIntOpenHashMap(size(sizing)); @@ -144,7 +148,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - doubleIntMap = build(type, limit, smartSize, new Recycler.C() { + doubleIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public DoubleIntOpenHashMap newInstance(int sizing) { return new DoubleIntOpenHashMap(size(sizing)); @@ -155,7 +159,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - longIntMap = build(type, limit, smartSize, new Recycler.C() { + longIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public LongIntOpenHashMap newInstance(int sizing) { return new LongIntOpenHashMap(size(sizing)); @@ -166,7 +170,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - objectIntMap = build(type, limit, smartSize, new Recycler.C() { + objectIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public ObjectIntOpenHashMap newInstance(int sizing) { return new ObjectIntOpenHashMap(size(sizing)); @@ -177,7 +181,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - intObjectMap = build(type, limit, smartSize, new Recycler.C() { + intObjectMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public IntObjectOpenHashMap newInstance(int sizing) { return new IntObjectOpenHashMap(size(sizing)); @@ -188,7 +192,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - objectFloatMap = build(type, limit, smartSize, new Recycler.C() { + objectFloatMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public ObjectFloatOpenHashMap newInstance(int sizing) { return new ObjectFloatOpenHashMap(size(sizing)); @@ -253,12 +257,12 @@ public class CacheRecycler extends AbstractComponent { return sizing > 0 ? sizing : 256; } - private Recycler build(Type type, int limit, int smartSize, Recycler.C c) { + private Recycler build(Type type, int limit, int smartSize, int availableProcessors, Recycler.C c) { Recycler recycler; try { - recycler = type.build(c, limit); + recycler = type.build(c, limit, availableProcessors); if (smartSize > 0) { - recycler = new Recycler.Sizing(recycler, smartSize); + recycler = sizing(recycler, none(c), smartSize); } } catch (IllegalArgumentException ex) { throw new ElasticsearchIllegalArgumentException("no type support [" + type + "] for recycler"); @@ -270,40 +274,44 @@ public class CacheRecycler extends AbstractComponent { public static enum Type { SOFT_THREAD_LOCAL { @Override - Recycler build(Recycler.C c, int limit) { - return new SoftThreadLocalRecycler(c, limit); - } - @Override - boolean perThread() { - return true; + Recycler build(Recycler.C c, int limit, int availableProcessors) { + return threadLocal(softFactory(dequeFactory(c, limit))); } }, THREAD_LOCAL { @Override - Recycler build(Recycler.C c, int limit) { - return new ThreadLocalRecycler(c, limit); - } - @Override - boolean perThread() { - return true; + Recycler build(Recycler.C c, int limit, int availableProcessors) { + return threadLocal(dequeFactory(c, limit)); } }, QUEUE { @Override - Recycler build(Recycler.C c, int limit) { - return new QueueRecycler(c, limit); + Recycler build(Recycler.C c, int limit, int availableProcessors) { + return concurrentDeque(c, limit); + } + }, + SOFT_CONCURRENT { + @Override + Recycler build(Recycler.C c, int limit, int availableProcessors) { + return concurrent(softFactory(dequeFactory(c, limit)), availableProcessors); + } + }, + CONCURRENT { + @Override + Recycler build(Recycler.C c, int limit, int availableProcessors) { + return concurrent(dequeFactory(c, limit), availableProcessors); } }, NONE { @Override - Recycler build(Recycler.C c, int limit) { - return new NoneRecycler(c); + Recycler build(Recycler.C c, int limit, int availableProcessors) { + return none(c); } }; public static Type parse(String type) { if (Strings.isNullOrEmpty(type)) { - return SOFT_THREAD_LOCAL; + return SOFT_CONCURRENT; } try { return Type.valueOf(type.toUpperCase(Locale.ROOT)); @@ -312,9 +320,6 @@ public class CacheRecycler extends AbstractComponent { } } - abstract Recycler build(Recycler.C c, int limit); - boolean perThread() { - return false; - } + abstract Recycler build(Recycler.C c, int limit, int availableProcessors); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/cache/recycler/PageCacheRecycler.java b/src/main/java/org/elasticsearch/cache/recycler/PageCacheRecycler.java index 59b1bbae6da..12cef79a802 100644 --- a/src/main/java/org/elasticsearch/cache/recycler/PageCacheRecycler.java +++ b/src/main/java/org/elasticsearch/cache/recycler/PageCacheRecycler.java @@ -19,18 +19,20 @@ package org.elasticsearch.cache.recycler; -import org.elasticsearch.cache.recycler.CacheRecycler.Type; +import com.google.common.base.Strings; +import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.recycler.NoneRecycler; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; +import java.util.Locale; + +import static org.elasticsearch.common.recycler.Recyclers.*; /** A recycler of fixed-size pages. */ public class PageCacheRecycler extends AbstractComponent { @@ -79,17 +81,9 @@ public class PageCacheRecycler extends AbstractComponent { public PageCacheRecycler(Settings settings, ThreadPool threadPool) { super(settings); final Type type = Type.parse(componentSettings.get(TYPE)); - long limit = componentSettings.getAsMemory(LIMIT_HEAP, "10%").bytes(); - if (type.perThread()) { - final long limitPerThread = componentSettings.getAsBytesSize(LIMIT_PER_THREAD, new ByteSizeValue(-1)).bytes(); - if (limitPerThread != -1) { - // if the per_thread limit is set, it has precedence - limit = limitPerThread; - } else { - // divide memory equally to all search threads - limit /= maximumSearchThreadPoolSize(threadPool, settings); - } - } + final long limit = componentSettings.getAsMemory(LIMIT_HEAP, "10%").bytes(); + final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + final int searchThreadPoolSize = maximumSearchThreadPoolSize(threadPool, settings); // We have a global amount of memory that we need to divide across data types. // Since some types are more useful than other ones we give them different weights. @@ -113,7 +107,7 @@ public class PageCacheRecycler extends AbstractComponent { final double totalWeight = bytesWeight + intsWeight + longsWeight + doublesWeight + objectsWeight; - bytePage = build(type, maxCount(limit, BigArrays.BYTE_PAGE_SIZE, bytesWeight, totalWeight), new Recycler.C() { + bytePage = build(type, maxCount(limit, BigArrays.BYTE_PAGE_SIZE, bytesWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C() { @Override public byte[] newInstance(int sizing) { return new byte[BigArrays.BYTE_PAGE_SIZE]; @@ -121,7 +115,7 @@ public class PageCacheRecycler extends AbstractComponent { @Override public void clear(byte[] value) {} }); - intPage = build(type, maxCount(limit, BigArrays.INT_PAGE_SIZE, intsWeight, totalWeight), new Recycler.C() { + intPage = build(type, maxCount(limit, BigArrays.INT_PAGE_SIZE, intsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C() { @Override public int[] newInstance(int sizing) { return new int[BigArrays.INT_PAGE_SIZE]; @@ -129,7 +123,7 @@ public class PageCacheRecycler extends AbstractComponent { @Override public void clear(int[] value) {} }); - longPage = build(type, maxCount(limit, BigArrays.LONG_PAGE_SIZE, longsWeight, totalWeight), new Recycler.C() { + longPage = build(type, maxCount(limit, BigArrays.LONG_PAGE_SIZE, longsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C() { @Override public long[] newInstance(int sizing) { return new long[BigArrays.LONG_PAGE_SIZE]; @@ -137,7 +131,7 @@ public class PageCacheRecycler extends AbstractComponent { @Override public void clear(long[] value) {} }); - doublePage = build(type, maxCount(limit, BigArrays.DOUBLE_PAGE_SIZE, doublesWeight, totalWeight), new Recycler.C() { + doublePage = build(type, maxCount(limit, BigArrays.DOUBLE_PAGE_SIZE, doublesWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C() { @Override public double[] newInstance(int sizing) { return new double[BigArrays.DOUBLE_PAGE_SIZE]; @@ -145,7 +139,7 @@ public class PageCacheRecycler extends AbstractComponent { @Override public void clear(double[] value) {} }); - objectPage = build(type, maxCount(limit, BigArrays.OBJECT_PAGE_SIZE, objectsWeight, totalWeight), new Recycler.C() { + objectPage = build(type, maxCount(limit, BigArrays.OBJECT_PAGE_SIZE, objectsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C() { @Override public Object[] newInstance(int sizing) { return new Object[BigArrays.OBJECT_PAGE_SIZE]; @@ -194,13 +188,65 @@ public class PageCacheRecycler extends AbstractComponent { return objectPage.obtain(); } - private static Recycler build(Type type, int limit, Recycler.C c) { + private static Recycler build(Type type, int limit, int estimatedThreadPoolSize, int availableProcessors, Recycler.C c) { final Recycler recycler; if (limit == 0) { - recycler = new NoneRecycler(c); + recycler = none(c); } else { - recycler = type.build(c, limit); + recycler = type.build(c, limit, estimatedThreadPoolSize, availableProcessors); } return recycler; } + + public static enum Type { + SOFT_THREAD_LOCAL { + @Override + Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors) { + return threadLocal(softFactory(dequeFactory(c, limit / estimatedThreadPoolSize))); + } + }, + THREAD_LOCAL { + @Override + Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors) { + return threadLocal(dequeFactory(c, limit / estimatedThreadPoolSize)); + } + }, + QUEUE { + @Override + Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors) { + return concurrentDeque(c, limit); + } + }, + SOFT_CONCURRENT { + @Override + Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors) { + return concurrent(softFactory(dequeFactory(c, limit / availableProcessors)), availableProcessors); + } + }, + CONCURRENT { + @Override + Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors) { + return concurrent(dequeFactory(c, limit / availableProcessors), availableProcessors); + } + }, + NONE { + @Override + Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors) { + return none(c); + } + }; + + public static Type parse(String type) { + if (Strings.isNullOrEmpty(type)) { + return SOFT_CONCURRENT; + } + try { + return Type.valueOf(type.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new ElasticsearchIllegalArgumentException("no type support [" + type + "]"); + } + } + + abstract Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors); + } } diff --git a/src/main/java/org/elasticsearch/common/recycler/AbstractRecycler.java b/src/main/java/org/elasticsearch/common/recycler/AbstractRecycler.java new file mode 100644 index 00000000000..6730c5119a3 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/recycler/AbstractRecycler.java @@ -0,0 +1,40 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + + +abstract class AbstractRecycler implements Recycler { + + protected final Recycler.C c; + + protected AbstractRecycler(Recycler.C c) { + this.c = c; + } + + public V obtain() { + return obtain(-1); + } + + @Override + public void close() { + // no-op by default + } + +} diff --git a/src/main/java/org/elasticsearch/common/recycler/ConcurrentDequeRecycler.java b/src/main/java/org/elasticsearch/common/recycler/ConcurrentDequeRecycler.java new file mode 100644 index 00000000000..4c22df2d08e --- /dev/null +++ b/src/main/java/org/elasticsearch/common/recycler/ConcurrentDequeRecycler.java @@ -0,0 +1,68 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; + +import java.util.Deque; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A {@link Recycler} implementation based on a concurrent {@link Deque}. This implementation is thread-safe. + */ +public class ConcurrentDequeRecycler extends DequeRecycler { + + // we maintain size separately because concurrent deque implementations typically have linear-time size() impls + final AtomicInteger size; + + public ConcurrentDequeRecycler(C c, int maxSize) { + super(c, ConcurrentCollections.newDeque(), maxSize); + this.size = new AtomicInteger(); + } + + @Override + public void close() { + assert deque.size() == size.get(); + super.close(); + size.set(0); + } + + @Override + public V obtain(int sizing) { + final V v = super.obtain(sizing); + if (v.isRecycled()) { + size.decrementAndGet(); + } + return v; + } + + @Override + protected boolean beforeRelease() { + return size.incrementAndGet() <= maxSize; + } + + @Override + protected void afterRelease(boolean recycled) { + if (!recycled) { + size.decrementAndGet(); + } + } + +} diff --git a/src/main/java/org/elasticsearch/common/recycler/QueueRecycler.java b/src/main/java/org/elasticsearch/common/recycler/DequeRecycler.java similarity index 61% rename from src/main/java/org/elasticsearch/common/recycler/QueueRecycler.java rename to src/main/java/org/elasticsearch/common/recycler/DequeRecycler.java index 27b0a1366b9..c10ebcc2884 100644 --- a/src/main/java/org/elasticsearch/common/recycler/QueueRecycler.java +++ b/src/main/java/org/elasticsearch/common/recycler/DequeRecycler.java @@ -20,54 +20,51 @@ package org.elasticsearch.common.recycler; import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Deque; /** + * A {@link Recycler} implementation based on a {@link Deque}. This implementation is NOT thread-safe. */ -public class QueueRecycler extends Recycler { +public class DequeRecycler extends AbstractRecycler { - final Queue queue; - final AtomicInteger size; + final Deque deque; final int maxSize; - public QueueRecycler(C c, int maxSize) { - this(c, ConcurrentCollections.newQueue(), maxSize); - } - - public QueueRecycler(C c, Queue queue, int maxSize) { + public DequeRecycler(C c, Deque queue, int maxSize) { super(c); - this.queue = queue; + this.deque = queue; this.maxSize = maxSize; - // we maintain size separately because concurrent queue implementations typically have linear-time size() impls - this.size = new AtomicInteger(); } @Override public void close() { - assert queue.size() == size.get(); - queue.clear(); - size.set(0); + deque.clear(); } @Override public V obtain(int sizing) { - final T v = queue.poll(); + final T v = deque.pollFirst(); if (v == null) { - return new QV(c.newInstance(sizing), false); + return new DV(c.newInstance(sizing), false); } - size.decrementAndGet(); - return new QV(v, true); + return new DV(v, true); } - class QV implements Recycler.V { + /** Called before releasing an object, returns true if the object should be recycled and false otherwise. */ + protected boolean beforeRelease() { + return deque.size() < maxSize; + } + + /** Called after a release. */ + protected void afterRelease(boolean recycled) {} + + private class DV implements Recycler.V { T value; final boolean recycled; - QV(T value, boolean recycled) { + DV(T value, boolean recycled) { this.value = value; this.recycled = recycled; } @@ -87,13 +84,13 @@ public class QueueRecycler extends Recycler { if (value == null) { throw new ElasticsearchIllegalStateException("recycler entry already released..."); } - if (size.incrementAndGet() <= maxSize) { + final boolean recycle = beforeRelease(); + if (recycle) { c.clear(value); - queue.offer(value); - } else { - size.decrementAndGet(); + deque.addFirst(value); } value = null; + afterRelease(recycle); return true; } } diff --git a/src/main/java/org/elasticsearch/common/recycler/FilterRecycler.java b/src/main/java/org/elasticsearch/common/recycler/FilterRecycler.java new file mode 100644 index 00000000000..9bb6b86d41d --- /dev/null +++ b/src/main/java/org/elasticsearch/common/recycler/FilterRecycler.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + +abstract class FilterRecycler implements Recycler { + + /** Get the delegate instance to foward calls to. */ + protected abstract Recycler getDelegate(); + + /** Wrap a recycled reference. */ + protected Recycler.V wrap(Recycler.V delegate) { + return delegate; + } + + @Override + public Recycler.V obtain(int sizing) { + return wrap(getDelegate().obtain(sizing)); + } + + @Override + public Recycler.V obtain() { + return wrap(getDelegate().obtain()); + } + + @Override + public void close() { + getDelegate().close(); + } + +} diff --git a/src/main/java/org/elasticsearch/common/recycler/NoneRecycler.java b/src/main/java/org/elasticsearch/common/recycler/NoneRecycler.java index 09c2c21f73b..0b7a3a4d9ea 100644 --- a/src/main/java/org/elasticsearch/common/recycler/NoneRecycler.java +++ b/src/main/java/org/elasticsearch/common/recycler/NoneRecycler.java @@ -23,7 +23,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException; /** */ -public class NoneRecycler extends Recycler { +public class NoneRecycler extends AbstractRecycler { public NoneRecycler(C c) { super(c); diff --git a/src/main/java/org/elasticsearch/common/recycler/Recycler.java b/src/main/java/org/elasticsearch/common/recycler/Recycler.java index 6d00cad9e13..b74acba477a 100644 --- a/src/main/java/org/elasticsearch/common/recycler/Recycler.java +++ b/src/main/java/org/elasticsearch/common/recycler/Recycler.java @@ -23,31 +23,10 @@ import org.elasticsearch.common.lease.Releasable; /** */ -public abstract class Recycler { +public interface Recycler { - public static class Sizing extends Recycler { - - private final Recycler recycler; - private final int smartSize; - - public Sizing(Recycler recycler, int smartSize) { - super(recycler.c); - this.recycler = recycler; - this.smartSize = smartSize; - } - - @Override - public void close() { - recycler.close(); - } - - @Override - public V obtain(int sizing) { - if (sizing > 0 && sizing < smartSize) { - return new NoneRecycler.NV(c.newInstance(sizing)); - } - return recycler.obtain(sizing); - } + public static interface Factory { + Recycler build(); } public static interface C { @@ -69,17 +48,9 @@ public abstract class Recycler { } - protected final C c; + void close(); - protected Recycler(C c) { - this.c = c; - } + V obtain(); - public abstract void close(); - - public V obtain() { - return obtain(-1); - } - - public abstract V obtain(int sizing); + V obtain(int sizing); } diff --git a/src/main/java/org/elasticsearch/common/recycler/Recyclers.java b/src/main/java/org/elasticsearch/common/recycler/Recyclers.java new file mode 100644 index 00000000000..c1797bd64ba --- /dev/null +++ b/src/main/java/org/elasticsearch/common/recycler/Recyclers.java @@ -0,0 +1,252 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + +import com.carrotsearch.hppc.hash.MurmurHash3; +import com.google.common.collect.Queues; +import org.apache.lucene.util.CloseableThreadLocal; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; + +import java.lang.ref.SoftReference; + +public enum Recyclers { + ; + + /** Return a {@link Recycler} that never recycles entries. */ + public static Recycler none(Recycler.C c) { + return new NoneRecycler(c); + } + + /** Return a concurrent recycler based on a deque. */ + public static Recycler concurrentDeque(Recycler.C c, int limit) { + return new ConcurrentDequeRecycler(c, limit); + } + + /** Return a recycler based on a deque. */ + public static Recycler deque(Recycler.C c, int limit) { + return new DequeRecycler(c, Queues.newArrayDeque(), limit); + } + + /** Return a recycler based on a deque. */ + public static Recycler.Factory dequeFactory(final Recycler.C c, final int limit) { + return new Recycler.Factory() { + @Override + public Recycler build() { + return deque(c, limit); + } + }; + } + + /** Wrap two recyclers and forward to calls to smallObjectRecycler when size < minSize and to + * defaultRecycler otherwise. */ + public static Recycler sizing(final Recycler defaultRecycler, final Recycler smallObjectRecycler, final int minSize) { + return new FilterRecycler() { + + @Override + protected Recycler getDelegate() { + return defaultRecycler; + } + + @Override + public Recycler.V obtain(int sizing) { + if (sizing > 0 && sizing < minSize) { + return smallObjectRecycler.obtain(sizing); + } + return super.obtain(sizing); + } + + @Override + public void close() { + defaultRecycler.close(); + smallObjectRecycler.close(); + } + + }; + } + + /** Create a thread-local recycler, where each thread will have its own instance, create through the provided factory. */ + public static Recycler threadLocal(final Recycler.Factory factory) { + return new FilterRecycler() { + + private final CloseableThreadLocal> recyclers; + + { + recyclers = new CloseableThreadLocal>() { + @Override + protected Recycler initialValue() { + return factory.build(); + } + }; + } + + @Override + protected Recycler getDelegate() { + return recyclers.get(); + } + + @Override + public void close() { + recyclers.close(); + } + + }; + } + + /** Create a recycler that is wrapped inside a soft reference, so that it cannot cause {@link OutOfMemoryError}s. */ + public static Recycler soft(final Recycler.Factory factory) { + return new FilterRecycler() { + + SoftReference> ref; + + { + ref = new SoftReference>(null); + } + + @Override + protected Recycler getDelegate() { + Recycler recycler = ref.get(); + if (recycler == null) { + recycler = factory.build(); + ref = new SoftReference>(recycler); + } + return recycler; + } + + }; + } + + /** Create a recycler that wraps data in a SoftReference. + * @see #soft(org.elasticsearch.common.recycler.Recycler.Factory) */ + public static Recycler.Factory softFactory(final Recycler.Factory factory) { + return new Recycler.Factory() { + @Override + public Recycler build() { + return soft(factory); + } + }; + } + + /** Wrap the provided recycler so that calls to {@link Recycler#obtain()} and {@link Recycler.V#release()} are protected by + * a lock. */ + public static Recycler locked(final Recycler recycler) { + return new FilterRecycler() { + + private final Object lock; + + { + this.lock = new Object(); + } + + @Override + protected Recycler getDelegate() { + return recycler; + } + + @Override + public org.elasticsearch.common.recycler.Recycler.V obtain(int sizing) { + synchronized (lock) { + return super.obtain(sizing); + } + } + + @Override + public org.elasticsearch.common.recycler.Recycler.V obtain() { + synchronized (lock) { + return super.obtain(); + } + } + + @Override + protected Recycler.V wrap(final Recycler.V delegate) { + return new Recycler.V() { + + @Override + public boolean release() throws ElasticsearchException { + synchronized (lock) { + return delegate.release(); + } + } + + @Override + public T v() { + return delegate.v(); + } + + @Override + public boolean isRecycled() { + return delegate.isRecycled(); + } + + }; + } + + }; + } + + /** Create a concurrent implementation that can support concurrent access from concurrencyLevel threads with little contention. */ + public static Recycler concurrent(final Recycler.Factory factory, final int concurrencyLevel) { + if (concurrencyLevel < 1) { + throw new ElasticsearchIllegalArgumentException("concurrencyLevel must be >= 1"); + } + if (concurrencyLevel == 1) { + return locked(factory.build()); + } + return new FilterRecycler() { + + private final Recycler[] recyclers; + { + @SuppressWarnings("unchecked") + final Recycler[] recyclers = new Recycler[concurrencyLevel]; + this.recyclers = recyclers; + for (int i = 0; i < concurrencyLevel; ++i) { + recyclers[i] = locked(factory.build()); + } + } + + final int slot() { + final long id = Thread.currentThread().getId(); + // don't trust Thread.hashCode to have equiprobable low bits + int slot = (int) MurmurHash3.hash(id); + // make positive, otherwise % may return negative numbers + slot &= 0x7FFFFFFF; + slot %= concurrencyLevel; + return slot; + } + + @Override + protected Recycler getDelegate() { + return recyclers[slot()]; + } + + @Override + public void close() { + for (Recycler recycler : recyclers) { + recycler.close(); + } + } + + }; + } + + public static Recycler concurrent(final Recycler.Factory factory) { + return concurrent(factory, Runtime.getRuntime().availableProcessors()); + } +} diff --git a/src/main/java/org/elasticsearch/common/recycler/SoftThreadLocalRecycler.java b/src/main/java/org/elasticsearch/common/recycler/SoftThreadLocalRecycler.java deleted file mode 100644 index 16063f4d1e2..00000000000 --- a/src/main/java/org/elasticsearch/common/recycler/SoftThreadLocalRecycler.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.recycler; - -import org.apache.lucene.util.CloseableThreadLocal; - -import java.lang.ref.SoftReference; - -/** - */ -public class SoftThreadLocalRecycler extends Recycler { - - private final CloseableThreadLocal>> threadLocal = new CloseableThreadLocal>>(); - - final int stackLimit; - - public SoftThreadLocalRecycler(C c, int stackLimit) { - super(c); - this.stackLimit = stackLimit; - } - - @Override - public void close() { - threadLocal.close(); - } - - @Override - public V obtain(int sizing) { - SoftReference> ref = threadLocal.get(); - ThreadLocalRecycler.Stack stack = (ref == null) ? null : ref.get(); - if (stack == null) { - stack = new ThreadLocalRecycler.Stack(stackLimit, Thread.currentThread()); - threadLocal.set(new SoftReference>(stack)); - } - - final T o = stack.pop(); - if (o == null) { - return new ThreadLocalRecycler.TV(stack, c, c.newInstance(sizing), false); - } else { - return new ThreadLocalRecycler.TV(stack, c, o, true); - } - } -} diff --git a/src/main/java/org/elasticsearch/common/recycler/ThreadLocalRecycler.java b/src/main/java/org/elasticsearch/common/recycler/ThreadLocalRecycler.java deleted file mode 100644 index da917655c31..00000000000 --- a/src/main/java/org/elasticsearch/common/recycler/ThreadLocalRecycler.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.recycler; - -import org.apache.lucene.util.ArrayUtil; -import org.apache.lucene.util.CloseableThreadLocal; -import org.apache.lucene.util.RamUsageEstimator; -import org.elasticsearch.ElasticsearchIllegalStateException; - -/** - */ -public class ThreadLocalRecycler extends Recycler { - - private final CloseableThreadLocal> threadLocal = new CloseableThreadLocal>() { - @Override - protected Stack initialValue() { - return new Stack(stackLimit, Thread.currentThread()); - } - }; - - final int stackLimit; - - public ThreadLocalRecycler(C c, int stackLimit) { - super(c); - this.stackLimit = stackLimit; - } - - @Override - public void close() { - threadLocal.close(); - } - - @Override - public V obtain(int sizing) { - Stack stack = threadLocal.get(); - final T o = stack.pop(); - if (o == null) { - return new TV(stack, c, c.newInstance(sizing), false); - } else { - return new TV(stack, c, o, true); - } - } - - static class TV implements Recycler.V { - - final Stack stack; - final C c; - T value; - final boolean recycled; - - TV(Stack stack, C c, T value, boolean recycled) { - this.stack = stack; - this.c = c; - this.value = value; - this.recycled = recycled; - } - - @Override - public T v() { - return value; - } - - @Override - public boolean isRecycled() { - return recycled; - } - - @Override - public boolean release() { - assert Thread.currentThread() == stack.thread; - if (value == null) { - throw new ElasticsearchIllegalStateException("recycler entry already released..."); - } - c.clear(value); - stack.push(value); - value = null; - return true; - } - } - - - static final class Stack { - - final int stackLimit; - final Thread thread; - private T[] elements; - private int size; - - @SuppressWarnings({"unchecked", "SuspiciousArrayCast"}) - Stack(int stackLimit, Thread thread) { - this.stackLimit = stackLimit; - this.thread = thread; - elements = newArray(stackLimit < 10 ? stackLimit : 10); - } - - T pop() { - int size = this.size; - if (size == 0) { - return null; - } - size--; - T ret = elements[size]; - elements[size] = null; - this.size = size; - return ret; - } - - void push(T o) { - int size = this.size; - if (size == elements.length) { - if (size >= stackLimit) { - return; - } - T[] newElements = newArray(Math.min(stackLimit, ArrayUtil.oversize(size + 1, RamUsageEstimator.NUM_BYTES_OBJECT_REF))); - System.arraycopy(elements, 0, newElements, 0, size); - elements = newElements; - } - - elements[size] = o; - this.size = size + 1; - } - - @SuppressWarnings({"unchecked", "SuspiciousArrayCast"}) - private static T[] newArray(int length) { - return (T[]) new Object[length]; - } - } -} diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java b/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java index 7e2882bc5d6..6edf4d11786 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java @@ -21,8 +21,10 @@ package org.elasticsearch.common.util.concurrent; import com.google.common.collect.Sets; import jsr166e.ConcurrentHashMapV8; +import jsr166y.ConcurrentLinkedDeque; import jsr166y.LinkedTransferQueue; +import java.util.Deque; import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -83,6 +85,10 @@ public abstract class ConcurrentCollections { return new ConcurrentLinkedQueue(); } + public static Deque newDeque() { + return new ConcurrentLinkedDeque(); + } + public static BlockingQueue newBlockingQueue() { return new LinkedTransferQueue(); } diff --git a/src/test/java/org/elasticsearch/benchmark/common/recycler/RecyclerBenchmark.java b/src/test/java/org/elasticsearch/benchmark/common/recycler/RecyclerBenchmark.java new file mode 100644 index 00000000000..e8cdc91f532 --- /dev/null +++ b/src/test/java/org/elasticsearch/benchmark/common/recycler/RecyclerBenchmark.java @@ -0,0 +1,123 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.benchmark.common.recycler; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.common.recycler.Recycler; + +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.common.recycler.Recyclers.*; + +/** Benchmark that tries to measure the overhead of object recycling depending on concurrent access. */ +public class RecyclerBenchmark { + + private static final long NUM_RECYCLES = 5000000L; + private static final Random RANDOM = new Random(0); + + private static long bench(final Recycler recycler, long numRecycles, int numThreads) throws InterruptedException { + final AtomicLong recycles = new AtomicLong(numRecycles); + final CountDownLatch latch = new CountDownLatch(1); + final Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < numThreads; ++i){ + // Thread ids happen to be generated sequentially, so we also generate random threads so that distribution of IDs + // is not perfect for the concurrent recycler + for (int j = RANDOM.nextInt(5); j >= 0; --j) { + new Thread(); + } + + threads[i] = new Thread() { + @Override + public void run() { + try { + latch.await(); + } catch (InterruptedException e) { + return; + } + while (recycles.getAndDecrement() > 0) { + final Recycler.V v = recycler.obtain(); + v.release(); + } + } + }; + } + for (Thread thread : threads) { + thread.start(); + } + final long start = System.nanoTime(); + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + return System.nanoTime() - start; + } + + public static void main(String[] args) throws InterruptedException { + final int limit = 100; + final Recycler.C c = new Recycler.C() { + + @Override + public Object newInstance(int sizing) { + return new Object(); + } + + @Override + public void clear(Object value) {} + }; + + final ImmutableMap> recyclers = ImmutableMap.>builder() + .put("none", none(c)) + .put("concurrent-queue", concurrentDeque(c, limit)) + .put("thread-local", threadLocal(dequeFactory(c, limit))) + .put("soft-thread-local", threadLocal(softFactory(dequeFactory(c, limit)))) + .put("locked", locked(deque(c, limit))) + .put("concurrent", concurrent(dequeFactory(c, limit), Runtime.getRuntime().availableProcessors())) + .put("soft-concurrent", concurrent(softFactory(dequeFactory(c, limit)), Runtime.getRuntime().availableProcessors())).build(); + + // warmup + final long start = System.nanoTime(); + while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10)) { + for (Recycler recycler : recyclers.values()) { + bench(recycler, NUM_RECYCLES, 2); + } + } + + // run + for (int numThreads = 1; numThreads <= 4 * Runtime.getRuntime().availableProcessors(); numThreads *= 2) { + System.out.println("## " + numThreads + " threads\n"); + System.gc(); + Thread.sleep(1000); + for (Recycler recycler : recyclers.values()) { + bench(recycler, NUM_RECYCLES, numThreads); + } + for (int i = 0; i < 5; ++i) { + for (Map.Entry> entry : recyclers.entrySet()) { + System.out.println(entry.getKey() + "\t" + TimeUnit.NANOSECONDS.toMillis(bench(entry.getValue(), NUM_RECYCLES, numThreads))); + } + System.out.println(); + } + } + } + +} diff --git a/src/test/java/org/elasticsearch/common/recycler/ConcurrentRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/ConcurrentRecyclerTests.java new file mode 100644 index 00000000000..758041d0d8e --- /dev/null +++ b/src/test/java/org/elasticsearch/common/recycler/ConcurrentRecyclerTests.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + +public class ConcurrentRecyclerTests extends AbstractRecyclerTests { + + @Override + protected Recycler newRecycler() { + return Recyclers.concurrent(Recyclers.dequeFactory(RECYCLER_C, randomIntBetween(5, 10)), randomIntBetween(1,5)); + } + +} diff --git a/src/test/java/org/elasticsearch/common/recycler/LockedRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/LockedRecyclerTests.java new file mode 100644 index 00000000000..9ffdf7aa370 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/recycler/LockedRecyclerTests.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + +public class LockedRecyclerTests extends AbstractRecyclerTests { + + @Override + protected Recycler newRecycler() { + return Recyclers.locked(Recyclers.deque(RECYCLER_C, randomIntBetween(5, 10))); + } + +} diff --git a/src/test/java/org/elasticsearch/common/recycler/NoneRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/NoneRecyclerTests.java index f830494f54e..a60c0ba18b3 100644 --- a/src/test/java/org/elasticsearch/common/recycler/NoneRecyclerTests.java +++ b/src/test/java/org/elasticsearch/common/recycler/NoneRecyclerTests.java @@ -23,7 +23,7 @@ public class NoneRecyclerTests extends AbstractRecyclerTests { @Override protected Recycler newRecycler() { - return new NoneRecycler(RECYCLER_C); + return Recyclers.none(RECYCLER_C); } } diff --git a/src/test/java/org/elasticsearch/common/recycler/QueueRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/QueueRecyclerTests.java index 2deeee04f28..f693c30ccf7 100644 --- a/src/test/java/org/elasticsearch/common/recycler/QueueRecyclerTests.java +++ b/src/test/java/org/elasticsearch/common/recycler/QueueRecyclerTests.java @@ -23,7 +23,7 @@ public class QueueRecyclerTests extends AbstractRecyclerTests { @Override protected Recycler newRecycler() { - return new QueueRecycler(RECYCLER_C, randomIntBetween(5, 10)); + return Recyclers.concurrentDeque(RECYCLER_C, randomIntBetween(5, 10)); } } diff --git a/src/test/java/org/elasticsearch/common/recycler/SoftConcurrentRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/SoftConcurrentRecyclerTests.java new file mode 100644 index 00000000000..0320ff54805 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/recycler/SoftConcurrentRecyclerTests.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + +public class SoftConcurrentRecyclerTests extends AbstractRecyclerTests { + + @Override + protected Recycler newRecycler() { + return Recyclers.concurrent(Recyclers.softFactory(Recyclers.dequeFactory(RECYCLER_C, randomIntBetween(5, 10))), randomIntBetween(1, 5)); + } + +} diff --git a/src/test/java/org/elasticsearch/common/recycler/SoftThreadLocalRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/SoftThreadLocalRecyclerTests.java index 5bfa4d21233..2a5d2536bc6 100644 --- a/src/test/java/org/elasticsearch/common/recycler/SoftThreadLocalRecyclerTests.java +++ b/src/test/java/org/elasticsearch/common/recycler/SoftThreadLocalRecyclerTests.java @@ -23,7 +23,7 @@ public class SoftThreadLocalRecyclerTests extends AbstractRecyclerTests { @Override protected Recycler newRecycler() { - return new SoftThreadLocalRecycler(RECYCLER_C, randomIntBetween(5, 10)); + return Recyclers.threadLocal(Recyclers.softFactory(Recyclers.dequeFactory(RECYCLER_C, 10))); } } diff --git a/src/test/java/org/elasticsearch/common/recycler/ThreadLocalRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/ThreadLocalRecyclerTests.java index c6fba545eb5..5ab68928a43 100644 --- a/src/test/java/org/elasticsearch/common/recycler/ThreadLocalRecyclerTests.java +++ b/src/test/java/org/elasticsearch/common/recycler/ThreadLocalRecyclerTests.java @@ -23,7 +23,7 @@ public class ThreadLocalRecyclerTests extends AbstractRecyclerTests { @Override protected Recycler newRecycler() { - return new ThreadLocalRecycler(RECYCLER_C, randomIntBetween(5, 10)); + return Recyclers.threadLocal(Recyclers.dequeFactory(RECYCLER_C, 10)); } }