clean thread locals without needing a wrapper

clean thread locals smartly by identifying "our" classes, and removing them, so there is no need to wrap it in our our clenable values
This commit is contained in:
Shay Banon 2013-05-15 12:13:13 +02:00
parent 4d357660ca
commit f92eed8591
7 changed files with 41 additions and 53 deletions

View File

@ -21,7 +21,6 @@ package org.elasticsearch.common;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import java.util.Arrays;
@ -30,17 +29,17 @@ import java.util.Arrays;
*/
public class Unicode {
private static ThreadLocal<ThreadLocals.CleanableValue<BytesRef>> cachedUtf8Result = new ThreadLocal<ThreadLocals.CleanableValue<BytesRef>>() {
private static ThreadLocal<BytesRef> cachedUtf8Result = new ThreadLocal<BytesRef>() {
@Override
protected ThreadLocals.CleanableValue<BytesRef> initialValue() {
return new ThreadLocals.CleanableValue<BytesRef>(new BytesRef());
protected BytesRef initialValue() {
return new BytesRef();
}
};
private static ThreadLocal<ThreadLocals.CleanableValue<UTF16Result>> cachedUtf16Result = new ThreadLocal<ThreadLocals.CleanableValue<UTF16Result>>() {
private static ThreadLocal<UTF16Result> cachedUtf16Result = new ThreadLocal<UTF16Result>() {
@Override
protected ThreadLocals.CleanableValue<UTF16Result> initialValue() {
return new ThreadLocals.CleanableValue<UTF16Result>(new UTF16Result());
protected UTF16Result initialValue() {
return new UTF16Result();
}
};
@ -73,7 +72,7 @@ public class Unicode {
if (source == null) {
return null;
}
BytesRef result = cachedUtf8Result.get().get();
BytesRef result = cachedUtf8Result.get();
UnicodeUtil.UTF16toUTF8(source, 0, source.length(), result);
return result;
}
@ -111,7 +110,7 @@ public class Unicode {
if (source == null) {
return null;
}
UTF16Result result = cachedUtf16Result.get().get();
UTF16Result result = cachedUtf16Result.get();
UTF8toUTF16(source, offset, length, result);
return result;
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import java.lang.ref.Reference;
import java.lang.ref.SoftReference;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@ -34,23 +35,6 @@ public class ThreadLocals {
private static final ESLogger logger = Loggers.getLogger(ThreadLocals.class);
public static class CleanableValue<T> {
private T value;
public CleanableValue(T value) {
this.value = value;
}
public T get() {
return value;
}
public void set(T value) {
this.value = value;
}
}
public static void clearReferencesThreadLocals() {
try {
Thread[] threads = getThreads();
@ -107,8 +91,17 @@ public class ThreadLocals {
Field valueField = tableValue.getClass().getDeclaredField("value");
valueField.setAccessible(true);
Object value = valueField.get(tableValue);
if ((value != null && CleanableValue.class.isAssignableFrom(value.getClass()))) {
remove = true;
if (value != null) {
Object actualValue = value;
if (value instanceof SoftReference) {
actualValue = ((SoftReference) value).get();
}
if (actualValue != null) {
String actualValueClassName = actualValue.getClass().getName();
if (actualValueClassName.startsWith("org.elasticsearch") || actualValueClassName.startsWith("org.lucene")) {
remove = true;
}
}
}
if (remove) {
Object[] args = new Object[4];

View File

@ -339,10 +339,6 @@ public final class InternalNode implements Node {
}
stopWatch.stop();
CacheRecycler.clear();
CachedStreams.clear();
ThreadLocals.clearReferencesThreadLocals();
if (logger.isTraceEnabled()) {
logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint());
}
@ -350,6 +346,10 @@ public final class InternalNode implements Node {
injector.getInstance(NodeEnvironment.class).close();
Injectors.close(injector);
CacheRecycler.clear();
CachedStreams.clear();
ThreadLocals.clearReferencesThreadLocals();
logger.info("{{}}[{}]: closed", Version.CURRENT, JvmInfo.jvmInfo().pid());
}

View File

@ -21,17 +21,16 @@ package org.elasticsearch.rest;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
/**
*
*/
public class StringRestResponse extends Utf8RestResponse {
private static ThreadLocal<ThreadLocals.CleanableValue<BytesRef>> cache = new ThreadLocal<ThreadLocals.CleanableValue<BytesRef>>() {
private static ThreadLocal<BytesRef> cache = new ThreadLocal<BytesRef>() {
@Override
protected ThreadLocals.CleanableValue<BytesRef> initialValue() {
return new ThreadLocals.CleanableValue<BytesRef>(new BytesRef());
protected BytesRef initialValue() {
return new BytesRef();
}
};
@ -44,7 +43,7 @@ public class StringRestResponse extends Utf8RestResponse {
}
private static BytesRef convert(String content) {
BytesRef result = cache.get().get();
BytesRef result = cache.get();
UnicodeUtil.UTF16toUTF8(content, 0, content.length(), result);
return result;
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.rest;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
@ -41,10 +40,10 @@ public class XContentRestResponse extends AbstractRestResponse {
System.arraycopy(U_END_JSONP.bytes, U_END_JSONP.offset, END_JSONP, 0, U_END_JSONP.length);
}
private static ThreadLocal<ThreadLocals.CleanableValue<BytesRef>> prefixCache = new ThreadLocal<ThreadLocals.CleanableValue<BytesRef>>() {
private static ThreadLocal<BytesRef> prefixCache = new ThreadLocal<BytesRef>() {
@Override
protected ThreadLocals.CleanableValue<BytesRef> initialValue() {
return new ThreadLocals.CleanableValue<BytesRef>(new BytesRef());
protected BytesRef initialValue() {
return new BytesRef();
}
};
@ -55,7 +54,7 @@ public class XContentRestResponse extends AbstractRestResponse {
private final XContentBuilder builder;
public XContentRestResponse(RestRequest request, RestStatus status, XContentBuilder builder) throws IOException {
if(request == null) {
if (request == null) {
throw new ElasticSearchIllegalArgumentException("request must be set");
}
this.builder = builder;
@ -147,7 +146,7 @@ public class XContentRestResponse extends AbstractRestResponse {
if (callback == null) {
return null;
}
BytesRef result = prefixCache.get().get();
BytesRef result = prefixCache.get();
UnicodeUtil.UTF16toUTF8(callback, 0, callback.length(), result);
result.bytes[result.length] = '(';
result.length++;

View File

@ -27,7 +27,6 @@ import org.apache.lucene.index.TermContext;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.TermStatistics;
import org.elasticsearch.common.collect.XMaps;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchPhase;
import org.elasticsearch.search.internal.SearchContext;
@ -39,10 +38,10 @@ import java.util.Map;
*/
public class DfsPhase implements SearchPhase {
private static ThreadLocal<ThreadLocals.CleanableValue<THashSet<Term>>> cachedTermsSet = new ThreadLocal<ThreadLocals.CleanableValue<THashSet<Term>>>() {
private static ThreadLocal<THashSet<Term>> cachedTermsSet = new ThreadLocal<THashSet<Term>>() {
@Override
protected ThreadLocals.CleanableValue<THashSet<Term>> initialValue() {
return new ThreadLocals.CleanableValue<THashSet<Term>>(new THashSet<Term>());
protected THashSet<Term> initialValue() {
return new THashSet<Term>();
}
};
@ -62,7 +61,7 @@ public class DfsPhase implements SearchPhase {
context.updateRewriteQuery(context.searcher().rewrite(context.query()));
}
termsSet = cachedTermsSet.get().get();
termsSet = cachedTermsSet.get();
if (!termsSet.isEmpty()) {
termsSet.clear();
}

View File

@ -23,7 +23,6 @@ import com.google.common.collect.Iterators;
import gnu.trove.map.hash.TIntObjectHashMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.search.SearchHit;
@ -80,15 +79,15 @@ public class InternalSearchHits implements SearchHits {
}
}
private static final ThreadLocal<ThreadLocals.CleanableValue<StreamContext>> cache = new ThreadLocal<ThreadLocals.CleanableValue<StreamContext>>() {
private static final ThreadLocal<StreamContext> cache = new ThreadLocal<StreamContext>() {
@Override
protected ThreadLocals.CleanableValue<StreamContext> initialValue() {
return new ThreadLocals.CleanableValue<StreamContext>(new StreamContext());
protected StreamContext initialValue() {
return new StreamContext();
}
};
public static StreamContext streamContext() {
return cache.get().get().reset();
return cache.get().reset();
}