From e01f8c250d9c79911180b2e383fb184f4d278222 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Tue, 7 Jan 2014 14:42:44 +0100 Subject: [PATCH] Change the default recycler type. Recycling is not thread-local anymore by default but instead there are several pools of objects to recycle that threads may use depending on their id. Each pool is protected by its own lock so up to ${number of pools} threads may recycler objects concurrently. Recyclers have also been refactored for better composability, for example there is a soft recycler that creates a recycler that wraps data around a SoftReference and a thread-local recycler that can take any factory or recyclers and instantiates a dedicated instance per thread. RecyclerBenchmark has been added to try to figure out the overhead of object recycling depending on the recycler type and the number of threads trying to recycle objects concurrently. Close #4647 --- .../cache/recycler/CacheRecycler.java | 79 +++--- .../cache/recycler/PageCacheRecycler.java | 90 +++++-- .../common/recycler/AbstractRecycler.java | 40 +++ .../recycler/ConcurrentDequeRecycler.java | 68 +++++ ...{QueueRecycler.java => DequeRecycler.java} | 51 ++-- .../common/recycler/FilterRecycler.java | 47 ++++ .../common/recycler/NoneRecycler.java | 2 +- .../common/recycler/Recycler.java | 41 +-- .../common/recycler/Recyclers.java | 252 ++++++++++++++++++ .../recycler/SoftThreadLocalRecycler.java | 60 ----- .../common/recycler/ThreadLocalRecycler.java | 145 ---------- .../concurrent/ConcurrentCollections.java | 6 + .../common/recycler/RecyclerBenchmark.java | 123 +++++++++ .../recycler/ConcurrentRecyclerTests.java | 29 ++ .../common/recycler/LockedRecyclerTests.java | 29 ++ .../common/recycler/NoneRecyclerTests.java | 2 +- .../common/recycler/QueueRecyclerTests.java | 2 +- .../recycler/SoftConcurrentRecyclerTests.java | 29 ++ .../SoftThreadLocalRecyclerTests.java | 2 +- .../recycler/ThreadLocalRecyclerTests.java | 2 +- 20 files changed, 768 insertions(+), 331 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/recycler/AbstractRecycler.java create mode 100644 src/main/java/org/elasticsearch/common/recycler/ConcurrentDequeRecycler.java rename src/main/java/org/elasticsearch/common/recycler/{QueueRecycler.java => DequeRecycler.java} (61%) create mode 100644 src/main/java/org/elasticsearch/common/recycler/FilterRecycler.java create mode 100644 src/main/java/org/elasticsearch/common/recycler/Recyclers.java delete mode 100644 src/main/java/org/elasticsearch/common/recycler/SoftThreadLocalRecycler.java delete mode 100644 src/main/java/org/elasticsearch/common/recycler/ThreadLocalRecycler.java create mode 100644 src/test/java/org/elasticsearch/benchmark/common/recycler/RecyclerBenchmark.java create mode 100644 src/test/java/org/elasticsearch/common/recycler/ConcurrentRecyclerTests.java create mode 100644 src/test/java/org/elasticsearch/common/recycler/LockedRecyclerTests.java create mode 100644 src/test/java/org/elasticsearch/common/recycler/SoftConcurrentRecyclerTests.java diff --git a/src/main/java/org/elasticsearch/cache/recycler/CacheRecycler.java b/src/main/java/org/elasticsearch/cache/recycler/CacheRecycler.java index ef21ecb10b6..4eb12a8dba8 100644 --- a/src/main/java/org/elasticsearch/cache/recycler/CacheRecycler.java +++ b/src/main/java/org/elasticsearch/cache/recycler/CacheRecycler.java @@ -24,11 +24,14 @@ import com.google.common.base.Strings; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.recycler.*; +import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import java.util.Locale; +import static org.elasticsearch.common.recycler.Recyclers.*; + @SuppressWarnings("unchecked") public class CacheRecycler extends AbstractComponent { @@ -66,8 +69,9 @@ public class CacheRecycler extends AbstractComponent { final Type type = Type.parse(settings.get("type")); int limit = settings.getAsInt("limit", 10); int smartSize = settings.getAsInt("smart_size", 1024); + final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); - hashMap = build(type, limit, smartSize, new Recycler.C() { + hashMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public ObjectObjectOpenHashMap newInstance(int sizing) { return new ObjectObjectOpenHashMap(size(sizing)); @@ -78,7 +82,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - hashSet = build(type, limit, smartSize, new Recycler.C() { + hashSet = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public ObjectOpenHashSet newInstance(int sizing) { return new ObjectOpenHashSet(size(sizing), 0.5f); @@ -89,7 +93,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - doubleObjectMap = build(type, limit, smartSize, new Recycler.C() { + doubleObjectMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public DoubleObjectOpenHashMap newInstance(int sizing) { return new DoubleObjectOpenHashMap(size(sizing)); @@ -100,7 +104,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - longObjectMap = build(type, limit, smartSize, new Recycler.C() { + longObjectMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public LongObjectOpenHashMap newInstance(int sizing) { return new LongObjectOpenHashMap(size(sizing)); @@ -111,7 +115,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - longLongMap = build(type, limit, smartSize, new Recycler.C() { + longLongMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public LongLongOpenHashMap newInstance(int sizing) { return new LongLongOpenHashMap(size(sizing)); @@ -122,7 +126,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - intIntMap = build(type, limit, smartSize, new Recycler.C() { + intIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public IntIntOpenHashMap newInstance(int sizing) { return new IntIntOpenHashMap(size(sizing)); @@ -133,7 +137,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - floatIntMap = build(type, limit, smartSize, new Recycler.C() { + floatIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public FloatIntOpenHashMap newInstance(int sizing) { return new FloatIntOpenHashMap(size(sizing)); @@ -144,7 +148,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - doubleIntMap = build(type, limit, smartSize, new Recycler.C() { + doubleIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public DoubleIntOpenHashMap newInstance(int sizing) { return new DoubleIntOpenHashMap(size(sizing)); @@ -155,7 +159,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - longIntMap = build(type, limit, smartSize, new Recycler.C() { + longIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public LongIntOpenHashMap newInstance(int sizing) { return new LongIntOpenHashMap(size(sizing)); @@ -166,7 +170,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - objectIntMap = build(type, limit, smartSize, new Recycler.C() { + objectIntMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public ObjectIntOpenHashMap newInstance(int sizing) { return new ObjectIntOpenHashMap(size(sizing)); @@ -177,7 +181,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - intObjectMap = build(type, limit, smartSize, new Recycler.C() { + intObjectMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public IntObjectOpenHashMap newInstance(int sizing) { return new IntObjectOpenHashMap(size(sizing)); @@ -188,7 +192,7 @@ public class CacheRecycler extends AbstractComponent { value.clear(); } }); - objectFloatMap = build(type, limit, smartSize, new Recycler.C() { + objectFloatMap = build(type, limit, smartSize, availableProcessors, new Recycler.C() { @Override public ObjectFloatOpenHashMap newInstance(int sizing) { return new ObjectFloatOpenHashMap(size(sizing)); @@ -253,12 +257,12 @@ public class CacheRecycler extends AbstractComponent { return sizing > 0 ? sizing : 256; } - private Recycler build(Type type, int limit, int smartSize, Recycler.C c) { + private Recycler build(Type type, int limit, int smartSize, int availableProcessors, Recycler.C c) { Recycler recycler; try { - recycler = type.build(c, limit); + recycler = type.build(c, limit, availableProcessors); if (smartSize > 0) { - recycler = new Recycler.Sizing(recycler, smartSize); + recycler = sizing(recycler, none(c), smartSize); } } catch (IllegalArgumentException ex) { throw new ElasticsearchIllegalArgumentException("no type support [" + type + "] for recycler"); @@ -270,40 +274,44 @@ public class CacheRecycler extends AbstractComponent { public static enum Type { SOFT_THREAD_LOCAL { @Override - Recycler build(Recycler.C c, int limit) { - return new SoftThreadLocalRecycler(c, limit); - } - @Override - boolean perThread() { - return true; + Recycler build(Recycler.C c, int limit, int availableProcessors) { + return threadLocal(softFactory(dequeFactory(c, limit))); } }, THREAD_LOCAL { @Override - Recycler build(Recycler.C c, int limit) { - return new ThreadLocalRecycler(c, limit); - } - @Override - boolean perThread() { - return true; + Recycler build(Recycler.C c, int limit, int availableProcessors) { + return threadLocal(dequeFactory(c, limit)); } }, QUEUE { @Override - Recycler build(Recycler.C c, int limit) { - return new QueueRecycler(c, limit); + Recycler build(Recycler.C c, int limit, int availableProcessors) { + return concurrentDeque(c, limit); + } + }, + SOFT_CONCURRENT { + @Override + Recycler build(Recycler.C c, int limit, int availableProcessors) { + return concurrent(softFactory(dequeFactory(c, limit)), availableProcessors); + } + }, + CONCURRENT { + @Override + Recycler build(Recycler.C c, int limit, int availableProcessors) { + return concurrent(dequeFactory(c, limit), availableProcessors); } }, NONE { @Override - Recycler build(Recycler.C c, int limit) { - return new NoneRecycler(c); + Recycler build(Recycler.C c, int limit, int availableProcessors) { + return none(c); } }; public static Type parse(String type) { if (Strings.isNullOrEmpty(type)) { - return SOFT_THREAD_LOCAL; + return SOFT_CONCURRENT; } try { return Type.valueOf(type.toUpperCase(Locale.ROOT)); @@ -312,9 +320,6 @@ public class CacheRecycler extends AbstractComponent { } } - abstract Recycler build(Recycler.C c, int limit); - boolean perThread() { - return false; - } + abstract Recycler build(Recycler.C c, int limit, int availableProcessors); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/cache/recycler/PageCacheRecycler.java b/src/main/java/org/elasticsearch/cache/recycler/PageCacheRecycler.java index 59b1bbae6da..12cef79a802 100644 --- a/src/main/java/org/elasticsearch/cache/recycler/PageCacheRecycler.java +++ b/src/main/java/org/elasticsearch/cache/recycler/PageCacheRecycler.java @@ -19,18 +19,20 @@ package org.elasticsearch.cache.recycler; -import org.elasticsearch.cache.recycler.CacheRecycler.Type; +import com.google.common.base.Strings; +import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.recycler.NoneRecycler; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; +import java.util.Locale; + +import static org.elasticsearch.common.recycler.Recyclers.*; /** A recycler of fixed-size pages. */ public class PageCacheRecycler extends AbstractComponent { @@ -79,17 +81,9 @@ public class PageCacheRecycler extends AbstractComponent { public PageCacheRecycler(Settings settings, ThreadPool threadPool) { super(settings); final Type type = Type.parse(componentSettings.get(TYPE)); - long limit = componentSettings.getAsMemory(LIMIT_HEAP, "10%").bytes(); - if (type.perThread()) { - final long limitPerThread = componentSettings.getAsBytesSize(LIMIT_PER_THREAD, new ByteSizeValue(-1)).bytes(); - if (limitPerThread != -1) { - // if the per_thread limit is set, it has precedence - limit = limitPerThread; - } else { - // divide memory equally to all search threads - limit /= maximumSearchThreadPoolSize(threadPool, settings); - } - } + final long limit = componentSettings.getAsMemory(LIMIT_HEAP, "10%").bytes(); + final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + final int searchThreadPoolSize = maximumSearchThreadPoolSize(threadPool, settings); // We have a global amount of memory that we need to divide across data types. // Since some types are more useful than other ones we give them different weights. @@ -113,7 +107,7 @@ public class PageCacheRecycler extends AbstractComponent { final double totalWeight = bytesWeight + intsWeight + longsWeight + doublesWeight + objectsWeight; - bytePage = build(type, maxCount(limit, BigArrays.BYTE_PAGE_SIZE, bytesWeight, totalWeight), new Recycler.C() { + bytePage = build(type, maxCount(limit, BigArrays.BYTE_PAGE_SIZE, bytesWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C() { @Override public byte[] newInstance(int sizing) { return new byte[BigArrays.BYTE_PAGE_SIZE]; @@ -121,7 +115,7 @@ public class PageCacheRecycler extends AbstractComponent { @Override public void clear(byte[] value) {} }); - intPage = build(type, maxCount(limit, BigArrays.INT_PAGE_SIZE, intsWeight, totalWeight), new Recycler.C() { + intPage = build(type, maxCount(limit, BigArrays.INT_PAGE_SIZE, intsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C() { @Override public int[] newInstance(int sizing) { return new int[BigArrays.INT_PAGE_SIZE]; @@ -129,7 +123,7 @@ public class PageCacheRecycler extends AbstractComponent { @Override public void clear(int[] value) {} }); - longPage = build(type, maxCount(limit, BigArrays.LONG_PAGE_SIZE, longsWeight, totalWeight), new Recycler.C() { + longPage = build(type, maxCount(limit, BigArrays.LONG_PAGE_SIZE, longsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C() { @Override public long[] newInstance(int sizing) { return new long[BigArrays.LONG_PAGE_SIZE]; @@ -137,7 +131,7 @@ public class PageCacheRecycler extends AbstractComponent { @Override public void clear(long[] value) {} }); - doublePage = build(type, maxCount(limit, BigArrays.DOUBLE_PAGE_SIZE, doublesWeight, totalWeight), new Recycler.C() { + doublePage = build(type, maxCount(limit, BigArrays.DOUBLE_PAGE_SIZE, doublesWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C() { @Override public double[] newInstance(int sizing) { return new double[BigArrays.DOUBLE_PAGE_SIZE]; @@ -145,7 +139,7 @@ public class PageCacheRecycler extends AbstractComponent { @Override public void clear(double[] value) {} }); - objectPage = build(type, maxCount(limit, BigArrays.OBJECT_PAGE_SIZE, objectsWeight, totalWeight), new Recycler.C() { + objectPage = build(type, maxCount(limit, BigArrays.OBJECT_PAGE_SIZE, objectsWeight, totalWeight), searchThreadPoolSize, availableProcessors, new Recycler.C() { @Override public Object[] newInstance(int sizing) { return new Object[BigArrays.OBJECT_PAGE_SIZE]; @@ -194,13 +188,65 @@ public class PageCacheRecycler extends AbstractComponent { return objectPage.obtain(); } - private static Recycler build(Type type, int limit, Recycler.C c) { + private static Recycler build(Type type, int limit, int estimatedThreadPoolSize, int availableProcessors, Recycler.C c) { final Recycler recycler; if (limit == 0) { - recycler = new NoneRecycler(c); + recycler = none(c); } else { - recycler = type.build(c, limit); + recycler = type.build(c, limit, estimatedThreadPoolSize, availableProcessors); } return recycler; } + + public static enum Type { + SOFT_THREAD_LOCAL { + @Override + Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors) { + return threadLocal(softFactory(dequeFactory(c, limit / estimatedThreadPoolSize))); + } + }, + THREAD_LOCAL { + @Override + Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors) { + return threadLocal(dequeFactory(c, limit / estimatedThreadPoolSize)); + } + }, + QUEUE { + @Override + Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors) { + return concurrentDeque(c, limit); + } + }, + SOFT_CONCURRENT { + @Override + Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors) { + return concurrent(softFactory(dequeFactory(c, limit / availableProcessors)), availableProcessors); + } + }, + CONCURRENT { + @Override + Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors) { + return concurrent(dequeFactory(c, limit / availableProcessors), availableProcessors); + } + }, + NONE { + @Override + Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors) { + return none(c); + } + }; + + public static Type parse(String type) { + if (Strings.isNullOrEmpty(type)) { + return SOFT_CONCURRENT; + } + try { + return Type.valueOf(type.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new ElasticsearchIllegalArgumentException("no type support [" + type + "]"); + } + } + + abstract Recycler build(Recycler.C c, int limit, int estimatedThreadPoolSize, int availableProcessors); + } } diff --git a/src/main/java/org/elasticsearch/common/recycler/AbstractRecycler.java b/src/main/java/org/elasticsearch/common/recycler/AbstractRecycler.java new file mode 100644 index 00000000000..6730c5119a3 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/recycler/AbstractRecycler.java @@ -0,0 +1,40 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + + +abstract class AbstractRecycler implements Recycler { + + protected final Recycler.C c; + + protected AbstractRecycler(Recycler.C c) { + this.c = c; + } + + public V obtain() { + return obtain(-1); + } + + @Override + public void close() { + // no-op by default + } + +} diff --git a/src/main/java/org/elasticsearch/common/recycler/ConcurrentDequeRecycler.java b/src/main/java/org/elasticsearch/common/recycler/ConcurrentDequeRecycler.java new file mode 100644 index 00000000000..4c22df2d08e --- /dev/null +++ b/src/main/java/org/elasticsearch/common/recycler/ConcurrentDequeRecycler.java @@ -0,0 +1,68 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; + +import java.util.Deque; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A {@link Recycler} implementation based on a concurrent {@link Deque}. This implementation is thread-safe. + */ +public class ConcurrentDequeRecycler extends DequeRecycler { + + // we maintain size separately because concurrent deque implementations typically have linear-time size() impls + final AtomicInteger size; + + public ConcurrentDequeRecycler(C c, int maxSize) { + super(c, ConcurrentCollections.newDeque(), maxSize); + this.size = new AtomicInteger(); + } + + @Override + public void close() { + assert deque.size() == size.get(); + super.close(); + size.set(0); + } + + @Override + public V obtain(int sizing) { + final V v = super.obtain(sizing); + if (v.isRecycled()) { + size.decrementAndGet(); + } + return v; + } + + @Override + protected boolean beforeRelease() { + return size.incrementAndGet() <= maxSize; + } + + @Override + protected void afterRelease(boolean recycled) { + if (!recycled) { + size.decrementAndGet(); + } + } + +} diff --git a/src/main/java/org/elasticsearch/common/recycler/QueueRecycler.java b/src/main/java/org/elasticsearch/common/recycler/DequeRecycler.java similarity index 61% rename from src/main/java/org/elasticsearch/common/recycler/QueueRecycler.java rename to src/main/java/org/elasticsearch/common/recycler/DequeRecycler.java index 27b0a1366b9..c10ebcc2884 100644 --- a/src/main/java/org/elasticsearch/common/recycler/QueueRecycler.java +++ b/src/main/java/org/elasticsearch/common/recycler/DequeRecycler.java @@ -20,54 +20,51 @@ package org.elasticsearch.common.recycler; import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Deque; /** + * A {@link Recycler} implementation based on a {@link Deque}. This implementation is NOT thread-safe. */ -public class QueueRecycler extends Recycler { +public class DequeRecycler extends AbstractRecycler { - final Queue queue; - final AtomicInteger size; + final Deque deque; final int maxSize; - public QueueRecycler(C c, int maxSize) { - this(c, ConcurrentCollections.newQueue(), maxSize); - } - - public QueueRecycler(C c, Queue queue, int maxSize) { + public DequeRecycler(C c, Deque queue, int maxSize) { super(c); - this.queue = queue; + this.deque = queue; this.maxSize = maxSize; - // we maintain size separately because concurrent queue implementations typically have linear-time size() impls - this.size = new AtomicInteger(); } @Override public void close() { - assert queue.size() == size.get(); - queue.clear(); - size.set(0); + deque.clear(); } @Override public V obtain(int sizing) { - final T v = queue.poll(); + final T v = deque.pollFirst(); if (v == null) { - return new QV(c.newInstance(sizing), false); + return new DV(c.newInstance(sizing), false); } - size.decrementAndGet(); - return new QV(v, true); + return new DV(v, true); } - class QV implements Recycler.V { + /** Called before releasing an object, returns true if the object should be recycled and false otherwise. */ + protected boolean beforeRelease() { + return deque.size() < maxSize; + } + + /** Called after a release. */ + protected void afterRelease(boolean recycled) {} + + private class DV implements Recycler.V { T value; final boolean recycled; - QV(T value, boolean recycled) { + DV(T value, boolean recycled) { this.value = value; this.recycled = recycled; } @@ -87,13 +84,13 @@ public class QueueRecycler extends Recycler { if (value == null) { throw new ElasticsearchIllegalStateException("recycler entry already released..."); } - if (size.incrementAndGet() <= maxSize) { + final boolean recycle = beforeRelease(); + if (recycle) { c.clear(value); - queue.offer(value); - } else { - size.decrementAndGet(); + deque.addFirst(value); } value = null; + afterRelease(recycle); return true; } } diff --git a/src/main/java/org/elasticsearch/common/recycler/FilterRecycler.java b/src/main/java/org/elasticsearch/common/recycler/FilterRecycler.java new file mode 100644 index 00000000000..9bb6b86d41d --- /dev/null +++ b/src/main/java/org/elasticsearch/common/recycler/FilterRecycler.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + +abstract class FilterRecycler implements Recycler { + + /** Get the delegate instance to foward calls to. */ + protected abstract Recycler getDelegate(); + + /** Wrap a recycled reference. */ + protected Recycler.V wrap(Recycler.V delegate) { + return delegate; + } + + @Override + public Recycler.V obtain(int sizing) { + return wrap(getDelegate().obtain(sizing)); + } + + @Override + public Recycler.V obtain() { + return wrap(getDelegate().obtain()); + } + + @Override + public void close() { + getDelegate().close(); + } + +} diff --git a/src/main/java/org/elasticsearch/common/recycler/NoneRecycler.java b/src/main/java/org/elasticsearch/common/recycler/NoneRecycler.java index 09c2c21f73b..0b7a3a4d9ea 100644 --- a/src/main/java/org/elasticsearch/common/recycler/NoneRecycler.java +++ b/src/main/java/org/elasticsearch/common/recycler/NoneRecycler.java @@ -23,7 +23,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException; /** */ -public class NoneRecycler extends Recycler { +public class NoneRecycler extends AbstractRecycler { public NoneRecycler(C c) { super(c); diff --git a/src/main/java/org/elasticsearch/common/recycler/Recycler.java b/src/main/java/org/elasticsearch/common/recycler/Recycler.java index 6d00cad9e13..b74acba477a 100644 --- a/src/main/java/org/elasticsearch/common/recycler/Recycler.java +++ b/src/main/java/org/elasticsearch/common/recycler/Recycler.java @@ -23,31 +23,10 @@ import org.elasticsearch.common.lease.Releasable; /** */ -public abstract class Recycler { +public interface Recycler { - public static class Sizing extends Recycler { - - private final Recycler recycler; - private final int smartSize; - - public Sizing(Recycler recycler, int smartSize) { - super(recycler.c); - this.recycler = recycler; - this.smartSize = smartSize; - } - - @Override - public void close() { - recycler.close(); - } - - @Override - public V obtain(int sizing) { - if (sizing > 0 && sizing < smartSize) { - return new NoneRecycler.NV(c.newInstance(sizing)); - } - return recycler.obtain(sizing); - } + public static interface Factory { + Recycler build(); } public static interface C { @@ -69,17 +48,9 @@ public abstract class Recycler { } - protected final C c; + void close(); - protected Recycler(C c) { - this.c = c; - } + V obtain(); - public abstract void close(); - - public V obtain() { - return obtain(-1); - } - - public abstract V obtain(int sizing); + V obtain(int sizing); } diff --git a/src/main/java/org/elasticsearch/common/recycler/Recyclers.java b/src/main/java/org/elasticsearch/common/recycler/Recyclers.java new file mode 100644 index 00000000000..c1797bd64ba --- /dev/null +++ b/src/main/java/org/elasticsearch/common/recycler/Recyclers.java @@ -0,0 +1,252 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + +import com.carrotsearch.hppc.hash.MurmurHash3; +import com.google.common.collect.Queues; +import org.apache.lucene.util.CloseableThreadLocal; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; + +import java.lang.ref.SoftReference; + +public enum Recyclers { + ; + + /** Return a {@link Recycler} that never recycles entries. */ + public static Recycler none(Recycler.C c) { + return new NoneRecycler(c); + } + + /** Return a concurrent recycler based on a deque. */ + public static Recycler concurrentDeque(Recycler.C c, int limit) { + return new ConcurrentDequeRecycler(c, limit); + } + + /** Return a recycler based on a deque. */ + public static Recycler deque(Recycler.C c, int limit) { + return new DequeRecycler(c, Queues.newArrayDeque(), limit); + } + + /** Return a recycler based on a deque. */ + public static Recycler.Factory dequeFactory(final Recycler.C c, final int limit) { + return new Recycler.Factory() { + @Override + public Recycler build() { + return deque(c, limit); + } + }; + } + + /** Wrap two recyclers and forward to calls to smallObjectRecycler when size < minSize and to + * defaultRecycler otherwise. */ + public static Recycler sizing(final Recycler defaultRecycler, final Recycler smallObjectRecycler, final int minSize) { + return new FilterRecycler() { + + @Override + protected Recycler getDelegate() { + return defaultRecycler; + } + + @Override + public Recycler.V obtain(int sizing) { + if (sizing > 0 && sizing < minSize) { + return smallObjectRecycler.obtain(sizing); + } + return super.obtain(sizing); + } + + @Override + public void close() { + defaultRecycler.close(); + smallObjectRecycler.close(); + } + + }; + } + + /** Create a thread-local recycler, where each thread will have its own instance, create through the provided factory. */ + public static Recycler threadLocal(final Recycler.Factory factory) { + return new FilterRecycler() { + + private final CloseableThreadLocal> recyclers; + + { + recyclers = new CloseableThreadLocal>() { + @Override + protected Recycler initialValue() { + return factory.build(); + } + }; + } + + @Override + protected Recycler getDelegate() { + return recyclers.get(); + } + + @Override + public void close() { + recyclers.close(); + } + + }; + } + + /** Create a recycler that is wrapped inside a soft reference, so that it cannot cause {@link OutOfMemoryError}s. */ + public static Recycler soft(final Recycler.Factory factory) { + return new FilterRecycler() { + + SoftReference> ref; + + { + ref = new SoftReference>(null); + } + + @Override + protected Recycler getDelegate() { + Recycler recycler = ref.get(); + if (recycler == null) { + recycler = factory.build(); + ref = new SoftReference>(recycler); + } + return recycler; + } + + }; + } + + /** Create a recycler that wraps data in a SoftReference. + * @see #soft(org.elasticsearch.common.recycler.Recycler.Factory) */ + public static Recycler.Factory softFactory(final Recycler.Factory factory) { + return new Recycler.Factory() { + @Override + public Recycler build() { + return soft(factory); + } + }; + } + + /** Wrap the provided recycler so that calls to {@link Recycler#obtain()} and {@link Recycler.V#release()} are protected by + * a lock. */ + public static Recycler locked(final Recycler recycler) { + return new FilterRecycler() { + + private final Object lock; + + { + this.lock = new Object(); + } + + @Override + protected Recycler getDelegate() { + return recycler; + } + + @Override + public org.elasticsearch.common.recycler.Recycler.V obtain(int sizing) { + synchronized (lock) { + return super.obtain(sizing); + } + } + + @Override + public org.elasticsearch.common.recycler.Recycler.V obtain() { + synchronized (lock) { + return super.obtain(); + } + } + + @Override + protected Recycler.V wrap(final Recycler.V delegate) { + return new Recycler.V() { + + @Override + public boolean release() throws ElasticsearchException { + synchronized (lock) { + return delegate.release(); + } + } + + @Override + public T v() { + return delegate.v(); + } + + @Override + public boolean isRecycled() { + return delegate.isRecycled(); + } + + }; + } + + }; + } + + /** Create a concurrent implementation that can support concurrent access from concurrencyLevel threads with little contention. */ + public static Recycler concurrent(final Recycler.Factory factory, final int concurrencyLevel) { + if (concurrencyLevel < 1) { + throw new ElasticsearchIllegalArgumentException("concurrencyLevel must be >= 1"); + } + if (concurrencyLevel == 1) { + return locked(factory.build()); + } + return new FilterRecycler() { + + private final Recycler[] recyclers; + { + @SuppressWarnings("unchecked") + final Recycler[] recyclers = new Recycler[concurrencyLevel]; + this.recyclers = recyclers; + for (int i = 0; i < concurrencyLevel; ++i) { + recyclers[i] = locked(factory.build()); + } + } + + final int slot() { + final long id = Thread.currentThread().getId(); + // don't trust Thread.hashCode to have equiprobable low bits + int slot = (int) MurmurHash3.hash(id); + // make positive, otherwise % may return negative numbers + slot &= 0x7FFFFFFF; + slot %= concurrencyLevel; + return slot; + } + + @Override + protected Recycler getDelegate() { + return recyclers[slot()]; + } + + @Override + public void close() { + for (Recycler recycler : recyclers) { + recycler.close(); + } + } + + }; + } + + public static Recycler concurrent(final Recycler.Factory factory) { + return concurrent(factory, Runtime.getRuntime().availableProcessors()); + } +} diff --git a/src/main/java/org/elasticsearch/common/recycler/SoftThreadLocalRecycler.java b/src/main/java/org/elasticsearch/common/recycler/SoftThreadLocalRecycler.java deleted file mode 100644 index 16063f4d1e2..00000000000 --- a/src/main/java/org/elasticsearch/common/recycler/SoftThreadLocalRecycler.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.recycler; - -import org.apache.lucene.util.CloseableThreadLocal; - -import java.lang.ref.SoftReference; - -/** - */ -public class SoftThreadLocalRecycler extends Recycler { - - private final CloseableThreadLocal>> threadLocal = new CloseableThreadLocal>>(); - - final int stackLimit; - - public SoftThreadLocalRecycler(C c, int stackLimit) { - super(c); - this.stackLimit = stackLimit; - } - - @Override - public void close() { - threadLocal.close(); - } - - @Override - public V obtain(int sizing) { - SoftReference> ref = threadLocal.get(); - ThreadLocalRecycler.Stack stack = (ref == null) ? null : ref.get(); - if (stack == null) { - stack = new ThreadLocalRecycler.Stack(stackLimit, Thread.currentThread()); - threadLocal.set(new SoftReference>(stack)); - } - - final T o = stack.pop(); - if (o == null) { - return new ThreadLocalRecycler.TV(stack, c, c.newInstance(sizing), false); - } else { - return new ThreadLocalRecycler.TV(stack, c, o, true); - } - } -} diff --git a/src/main/java/org/elasticsearch/common/recycler/ThreadLocalRecycler.java b/src/main/java/org/elasticsearch/common/recycler/ThreadLocalRecycler.java deleted file mode 100644 index da917655c31..00000000000 --- a/src/main/java/org/elasticsearch/common/recycler/ThreadLocalRecycler.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.recycler; - -import org.apache.lucene.util.ArrayUtil; -import org.apache.lucene.util.CloseableThreadLocal; -import org.apache.lucene.util.RamUsageEstimator; -import org.elasticsearch.ElasticsearchIllegalStateException; - -/** - */ -public class ThreadLocalRecycler extends Recycler { - - private final CloseableThreadLocal> threadLocal = new CloseableThreadLocal>() { - @Override - protected Stack initialValue() { - return new Stack(stackLimit, Thread.currentThread()); - } - }; - - final int stackLimit; - - public ThreadLocalRecycler(C c, int stackLimit) { - super(c); - this.stackLimit = stackLimit; - } - - @Override - public void close() { - threadLocal.close(); - } - - @Override - public V obtain(int sizing) { - Stack stack = threadLocal.get(); - final T o = stack.pop(); - if (o == null) { - return new TV(stack, c, c.newInstance(sizing), false); - } else { - return new TV(stack, c, o, true); - } - } - - static class TV implements Recycler.V { - - final Stack stack; - final C c; - T value; - final boolean recycled; - - TV(Stack stack, C c, T value, boolean recycled) { - this.stack = stack; - this.c = c; - this.value = value; - this.recycled = recycled; - } - - @Override - public T v() { - return value; - } - - @Override - public boolean isRecycled() { - return recycled; - } - - @Override - public boolean release() { - assert Thread.currentThread() == stack.thread; - if (value == null) { - throw new ElasticsearchIllegalStateException("recycler entry already released..."); - } - c.clear(value); - stack.push(value); - value = null; - return true; - } - } - - - static final class Stack { - - final int stackLimit; - final Thread thread; - private T[] elements; - private int size; - - @SuppressWarnings({"unchecked", "SuspiciousArrayCast"}) - Stack(int stackLimit, Thread thread) { - this.stackLimit = stackLimit; - this.thread = thread; - elements = newArray(stackLimit < 10 ? stackLimit : 10); - } - - T pop() { - int size = this.size; - if (size == 0) { - return null; - } - size--; - T ret = elements[size]; - elements[size] = null; - this.size = size; - return ret; - } - - void push(T o) { - int size = this.size; - if (size == elements.length) { - if (size >= stackLimit) { - return; - } - T[] newElements = newArray(Math.min(stackLimit, ArrayUtil.oversize(size + 1, RamUsageEstimator.NUM_BYTES_OBJECT_REF))); - System.arraycopy(elements, 0, newElements, 0, size); - elements = newElements; - } - - elements[size] = o; - this.size = size + 1; - } - - @SuppressWarnings({"unchecked", "SuspiciousArrayCast"}) - private static T[] newArray(int length) { - return (T[]) new Object[length]; - } - } -} diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java b/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java index 7e2882bc5d6..6edf4d11786 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java @@ -21,8 +21,10 @@ package org.elasticsearch.common.util.concurrent; import com.google.common.collect.Sets; import jsr166e.ConcurrentHashMapV8; +import jsr166y.ConcurrentLinkedDeque; import jsr166y.LinkedTransferQueue; +import java.util.Deque; import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -83,6 +85,10 @@ public abstract class ConcurrentCollections { return new ConcurrentLinkedQueue(); } + public static Deque newDeque() { + return new ConcurrentLinkedDeque(); + } + public static BlockingQueue newBlockingQueue() { return new LinkedTransferQueue(); } diff --git a/src/test/java/org/elasticsearch/benchmark/common/recycler/RecyclerBenchmark.java b/src/test/java/org/elasticsearch/benchmark/common/recycler/RecyclerBenchmark.java new file mode 100644 index 00000000000..e8cdc91f532 --- /dev/null +++ b/src/test/java/org/elasticsearch/benchmark/common/recycler/RecyclerBenchmark.java @@ -0,0 +1,123 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.benchmark.common.recycler; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.common.recycler.Recycler; + +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.common.recycler.Recyclers.*; + +/** Benchmark that tries to measure the overhead of object recycling depending on concurrent access. */ +public class RecyclerBenchmark { + + private static final long NUM_RECYCLES = 5000000L; + private static final Random RANDOM = new Random(0); + + private static long bench(final Recycler recycler, long numRecycles, int numThreads) throws InterruptedException { + final AtomicLong recycles = new AtomicLong(numRecycles); + final CountDownLatch latch = new CountDownLatch(1); + final Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < numThreads; ++i){ + // Thread ids happen to be generated sequentially, so we also generate random threads so that distribution of IDs + // is not perfect for the concurrent recycler + for (int j = RANDOM.nextInt(5); j >= 0; --j) { + new Thread(); + } + + threads[i] = new Thread() { + @Override + public void run() { + try { + latch.await(); + } catch (InterruptedException e) { + return; + } + while (recycles.getAndDecrement() > 0) { + final Recycler.V v = recycler.obtain(); + v.release(); + } + } + }; + } + for (Thread thread : threads) { + thread.start(); + } + final long start = System.nanoTime(); + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + return System.nanoTime() - start; + } + + public static void main(String[] args) throws InterruptedException { + final int limit = 100; + final Recycler.C c = new Recycler.C() { + + @Override + public Object newInstance(int sizing) { + return new Object(); + } + + @Override + public void clear(Object value) {} + }; + + final ImmutableMap> recyclers = ImmutableMap.>builder() + .put("none", none(c)) + .put("concurrent-queue", concurrentDeque(c, limit)) + .put("thread-local", threadLocal(dequeFactory(c, limit))) + .put("soft-thread-local", threadLocal(softFactory(dequeFactory(c, limit)))) + .put("locked", locked(deque(c, limit))) + .put("concurrent", concurrent(dequeFactory(c, limit), Runtime.getRuntime().availableProcessors())) + .put("soft-concurrent", concurrent(softFactory(dequeFactory(c, limit)), Runtime.getRuntime().availableProcessors())).build(); + + // warmup + final long start = System.nanoTime(); + while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10)) { + for (Recycler recycler : recyclers.values()) { + bench(recycler, NUM_RECYCLES, 2); + } + } + + // run + for (int numThreads = 1; numThreads <= 4 * Runtime.getRuntime().availableProcessors(); numThreads *= 2) { + System.out.println("## " + numThreads + " threads\n"); + System.gc(); + Thread.sleep(1000); + for (Recycler recycler : recyclers.values()) { + bench(recycler, NUM_RECYCLES, numThreads); + } + for (int i = 0; i < 5; ++i) { + for (Map.Entry> entry : recyclers.entrySet()) { + System.out.println(entry.getKey() + "\t" + TimeUnit.NANOSECONDS.toMillis(bench(entry.getValue(), NUM_RECYCLES, numThreads))); + } + System.out.println(); + } + } + } + +} diff --git a/src/test/java/org/elasticsearch/common/recycler/ConcurrentRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/ConcurrentRecyclerTests.java new file mode 100644 index 00000000000..758041d0d8e --- /dev/null +++ b/src/test/java/org/elasticsearch/common/recycler/ConcurrentRecyclerTests.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + +public class ConcurrentRecyclerTests extends AbstractRecyclerTests { + + @Override + protected Recycler newRecycler() { + return Recyclers.concurrent(Recyclers.dequeFactory(RECYCLER_C, randomIntBetween(5, 10)), randomIntBetween(1,5)); + } + +} diff --git a/src/test/java/org/elasticsearch/common/recycler/LockedRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/LockedRecyclerTests.java new file mode 100644 index 00000000000..9ffdf7aa370 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/recycler/LockedRecyclerTests.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + +public class LockedRecyclerTests extends AbstractRecyclerTests { + + @Override + protected Recycler newRecycler() { + return Recyclers.locked(Recyclers.deque(RECYCLER_C, randomIntBetween(5, 10))); + } + +} diff --git a/src/test/java/org/elasticsearch/common/recycler/NoneRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/NoneRecyclerTests.java index f830494f54e..a60c0ba18b3 100644 --- a/src/test/java/org/elasticsearch/common/recycler/NoneRecyclerTests.java +++ b/src/test/java/org/elasticsearch/common/recycler/NoneRecyclerTests.java @@ -23,7 +23,7 @@ public class NoneRecyclerTests extends AbstractRecyclerTests { @Override protected Recycler newRecycler() { - return new NoneRecycler(RECYCLER_C); + return Recyclers.none(RECYCLER_C); } } diff --git a/src/test/java/org/elasticsearch/common/recycler/QueueRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/QueueRecyclerTests.java index 2deeee04f28..f693c30ccf7 100644 --- a/src/test/java/org/elasticsearch/common/recycler/QueueRecyclerTests.java +++ b/src/test/java/org/elasticsearch/common/recycler/QueueRecyclerTests.java @@ -23,7 +23,7 @@ public class QueueRecyclerTests extends AbstractRecyclerTests { @Override protected Recycler newRecycler() { - return new QueueRecycler(RECYCLER_C, randomIntBetween(5, 10)); + return Recyclers.concurrentDeque(RECYCLER_C, randomIntBetween(5, 10)); } } diff --git a/src/test/java/org/elasticsearch/common/recycler/SoftConcurrentRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/SoftConcurrentRecyclerTests.java new file mode 100644 index 00000000000..0320ff54805 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/recycler/SoftConcurrentRecyclerTests.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.recycler; + +public class SoftConcurrentRecyclerTests extends AbstractRecyclerTests { + + @Override + protected Recycler newRecycler() { + return Recyclers.concurrent(Recyclers.softFactory(Recyclers.dequeFactory(RECYCLER_C, randomIntBetween(5, 10))), randomIntBetween(1, 5)); + } + +} diff --git a/src/test/java/org/elasticsearch/common/recycler/SoftThreadLocalRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/SoftThreadLocalRecyclerTests.java index 5bfa4d21233..2a5d2536bc6 100644 --- a/src/test/java/org/elasticsearch/common/recycler/SoftThreadLocalRecyclerTests.java +++ b/src/test/java/org/elasticsearch/common/recycler/SoftThreadLocalRecyclerTests.java @@ -23,7 +23,7 @@ public class SoftThreadLocalRecyclerTests extends AbstractRecyclerTests { @Override protected Recycler newRecycler() { - return new SoftThreadLocalRecycler(RECYCLER_C, randomIntBetween(5, 10)); + return Recyclers.threadLocal(Recyclers.softFactory(Recyclers.dequeFactory(RECYCLER_C, 10))); } } diff --git a/src/test/java/org/elasticsearch/common/recycler/ThreadLocalRecyclerTests.java b/src/test/java/org/elasticsearch/common/recycler/ThreadLocalRecyclerTests.java index c6fba545eb5..5ab68928a43 100644 --- a/src/test/java/org/elasticsearch/common/recycler/ThreadLocalRecyclerTests.java +++ b/src/test/java/org/elasticsearch/common/recycler/ThreadLocalRecyclerTests.java @@ -23,7 +23,7 @@ public class ThreadLocalRecyclerTests extends AbstractRecyclerTests { @Override protected Recycler newRecycler() { - return new ThreadLocalRecycler(RECYCLER_C, randomIntBetween(5, 10)); + return Recyclers.threadLocal(Recyclers.dequeFactory(RECYCLER_C, 10)); } }