remove thread local recycler

the thread local recycler requires obtain and recycle to be called on the same thread, while other recyclers do not. Also, it can create heavy recycle usage since it depends on the threads that its being used on. The concurrent / pinned thread base one is by far better than the pure thread local (and is the default) one since it more easily bounds the elements recycled, while still allowing to mix obtain and recycle across threads.

We will end up using the paged recyclers more and more, for example, in our networking output buffer, where obtaining will happen on one thread, while recycling can potentially occur on another thread (the callback thread). Since the limit of binding to a thread of the 2 calls is not really needed, and our best implementation supports going cross threads, there is no real need to impose this restriction.
This commit is contained in:
Shay Banon 2014-02-25 14:35:25 +01:00
parent 46fe348b8b
commit 83ae1bd55e
7 changed files with 36 additions and 130 deletions

View File

@ -278,18 +278,6 @@ public class CacheRecycler extends AbstractComponent {
}
public static enum Type {
SOFT_THREAD_LOCAL {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
return threadLocal(softFactory(dequeFactory(c, limit)));
}
},
THREAD_LOCAL {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {
return threadLocal(dequeFactory(c, limit));
}
},
QUEUE {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int availableProcessors) {

View File

@ -19,12 +19,11 @@
package org.elasticsearch.cache.recycler;
import org.elasticsearch.common.recycler.AbstractRecyclerC;
import com.google.common.base.Strings;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.recycler.AbstractRecyclerC;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
@ -209,18 +208,6 @@ public class PageCacheRecycler extends AbstractComponent {
}
public static enum Type {
SOFT_THREAD_LOCAL {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int estimatedThreadPoolSize, int availableProcessors) {
return threadLocal(softFactory(dequeFactory(c, limit / estimatedThreadPoolSize)));
}
},
THREAD_LOCAL {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int estimatedThreadPoolSize, int availableProcessors) {
return threadLocal(dequeFactory(c, limit / estimatedThreadPoolSize));
}
},
QUEUE {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit, int estimatedThreadPoolSize, int availableProcessors) {

View File

@ -22,6 +22,8 @@ package org.elasticsearch.common.recycler;
import org.elasticsearch.common.lease.Releasable;
/**
* A recycled object, note, implementations should support calling obtain and then recycle
* on different threads.
*/
public interface Recycler<T> {

View File

@ -21,7 +21,6 @@ package org.elasticsearch.common.recycler;
import com.carrotsearch.hppc.hash.MurmurHash3;
import com.google.common.collect.Queues;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
@ -30,22 +29,30 @@ import java.lang.ref.SoftReference;
public enum Recyclers {
;
/** Return a {@link Recycler} that never recycles entries. */
/**
* Return a {@link Recycler} that never recycles entries.
*/
public static <T> Recycler<T> none(Recycler.C<T> c) {
return new NoneRecycler<T>(c);
}
/** Return a concurrent recycler based on a deque. */
/**
* Return a concurrent recycler based on a deque.
*/
public static <T> Recycler<T> concurrentDeque(Recycler.C<T> c, int limit) {
return new ConcurrentDequeRecycler<T>(c, limit);
}
/** Return a recycler based on a deque. */
/**
* Return a recycler based on a deque.
*/
public static <T> Recycler<T> deque(Recycler.C<T> c, int limit) {
return new DequeRecycler<T>(c, Queues.<T>newArrayDeque(), limit);
}
/** Return a recycler based on a deque. */
/**
* Return a recycler based on a deque.
*/
public static <T> Recycler.Factory<T> dequeFactory(final Recycler.C<T> c, final int limit) {
return new Recycler.Factory<T>() {
@Override
@ -55,8 +62,10 @@ public enum Recyclers {
};
}
/** Wrap two recyclers and forward to calls to <code>smallObjectRecycler</code> when <code>size &lt; minSize</code> and to
* <code>defaultRecycler</code> otherwise. */
/**
* Wrap two recyclers and forward to calls to <code>smallObjectRecycler</code> when <code>size &lt; minSize</code> and to
* <code>defaultRecycler</code> otherwise.
*/
public static <T> Recycler<T> sizing(final Recycler<T> defaultRecycler, final Recycler<T> smallObjectRecycler, final int minSize) {
return new FilterRecycler<T>() {
@ -82,36 +91,9 @@ public enum Recyclers {
};
}
/** Create a thread-local recycler, where each thread will have its own instance, create through the provided factory. */
public static <T> Recycler<T> threadLocal(final Recycler.Factory<T> factory) {
return new FilterRecycler<T>() {
private final CloseableThreadLocal<Recycler<T>> recyclers;
{
recyclers = new CloseableThreadLocal<Recycler<T>>() {
@Override
protected Recycler<T> initialValue() {
return factory.build();
}
};
}
@Override
protected Recycler<T> getDelegate() {
return recyclers.get();
}
@Override
public void close() {
recyclers.get().close();
recyclers.close();
}
};
}
/** Create a recycler that is wrapped inside a soft reference, so that it cannot cause {@link OutOfMemoryError}s. */
/**
* Create a recycler that is wrapped inside a soft reference, so that it cannot cause {@link OutOfMemoryError}s.
*/
public static <T> Recycler<T> soft(final Recycler.Factory<T> factory) {
return new FilterRecycler<T>() {
@ -134,8 +116,11 @@ public enum Recyclers {
};
}
/** Create a recycler that wraps data in a SoftReference.
* @see #soft(org.elasticsearch.common.recycler.Recycler.Factory) */
/**
* Create a recycler that wraps data in a SoftReference.
*
* @see #soft(org.elasticsearch.common.recycler.Recycler.Factory)
*/
public static <T> Recycler.Factory<T> softFactory(final Recycler.Factory<T> factory) {
return new Recycler.Factory<T>() {
@Override
@ -145,8 +130,10 @@ public enum Recyclers {
};
}
/** Wrap the provided recycler so that calls to {@link Recycler#obtain()} and {@link Recycler.V#release()} are protected by
* a lock. */
/**
* Wrap the provided recycler so that calls to {@link Recycler#obtain()} and {@link Recycler.V#release()} are protected by
* a lock.
*/
public static <T> Recycler<T> locked(final Recycler<T> recycler) {
return new FilterRecycler<T>() {
@ -202,7 +189,9 @@ public enum Recyclers {
};
}
/** Create a concurrent implementation that can support concurrent access from <code>concurrencyLevel</code> threads with little contention. */
/**
* Create a concurrent implementation that can support concurrent access from <code>concurrencyLevel</code> threads with little contention.
*/
public static <T> Recycler<T> concurrent(final Recycler.Factory<T> factory, final int concurrencyLevel) {
if (concurrencyLevel < 1) {
throw new ElasticsearchIllegalArgumentException("concurrencyLevel must be >= 1");
@ -213,6 +202,7 @@ public enum Recyclers {
return new FilterRecycler<T>() {
private final Recycler<T>[] recyclers;
{
@SuppressWarnings("unchecked")
final Recycler<T>[] recyclers = new Recycler[concurrencyLevel];

View File

@ -19,9 +19,8 @@
package org.elasticsearch.benchmark.common.recycler;
import org.elasticsearch.common.recycler.AbstractRecyclerC;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.recycler.AbstractRecyclerC;
import org.elasticsearch.common.recycler.Recycler;
import java.util.Map;
@ -93,8 +92,6 @@ public class RecyclerBenchmark {
final ImmutableMap<String, Recycler<Object>> recyclers = ImmutableMap.<String, Recycler<Object>>builder()
.put("none", none(c))
.put("concurrent-queue", concurrentDeque(c, limit))
.put("thread-local", threadLocal(dequeFactory(c, limit)))
.put("soft-thread-local", threadLocal(softFactory(dequeFactory(c, limit))))
.put("locked", locked(deque(c, limit)))
.put("concurrent", concurrent(dequeFactory(c, limit), Runtime.getRuntime().availableProcessors()))
.put("soft-concurrent", concurrent(softFactory(dequeFactory(c, limit)), Runtime.getRuntime().availableProcessors())).build();

View File

@ -1,29 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.recycler;
public class SoftThreadLocalRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler(int limit) {
return Recyclers.threadLocal(Recyclers.softFactory(Recyclers.dequeFactory(RECYCLER_C, limit)));
}
}

View File

@ -1,29 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.recycler;
public class ThreadLocalRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler(int limit) {
return Recyclers.threadLocal(Recyclers.dequeFactory(RECYCLER_C, limit));
}
}