clean thread locals (even static ones!) on Server#close or TransportClient#close

This commit is contained in:
kimchy 2010-03-23 10:31:15 +02:00
parent 14af9d28fd
commit 5b11de8958
23 changed files with 337 additions and 107 deletions

View File

@ -57,6 +57,7 @@ import org.elasticsearch.timer.TimerModule;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.Tuple;
import org.elasticsearch.util.settings.ImmutableSettings;
import org.elasticsearch.util.settings.Settings;
@ -204,6 +205,8 @@ public class TransportClient implements Client {
injector.getInstance(TimerService.class).close();
injector.getInstance(ThreadPool.class).shutdown();
ThreadLocals.clearReferencesThreadLocals();
}
@Override public AdminClient admin() {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.Preconditions;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.json.Jackson;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.json.StringJsonBuilder;
@ -134,9 +135,9 @@ public class JsonDocumentMapper implements DocumentMapper, ToJson {
}
private ThreadLocal<JsonParseContext> cache = new ThreadLocal<JsonParseContext>() {
@Override protected JsonParseContext initialValue() {
return new JsonParseContext(JsonDocumentMapper.this, new JsonPath(0));
private ThreadLocal<ThreadLocals.CleanableValue<JsonParseContext>> cache = new ThreadLocal<ThreadLocals.CleanableValue<JsonParseContext>>() {
@Override protected ThreadLocals.CleanableValue<JsonParseContext> initialValue() {
return new ThreadLocals.CleanableValue<JsonParseContext>(new JsonParseContext(JsonDocumentMapper.this, new JsonPath(0)));
}
};
@ -277,7 +278,7 @@ public class JsonDocumentMapper implements DocumentMapper, ToJson {
}
@Override public ParsedDocument parse(String type, String id, byte[] source, ParseListener listener) {
JsonParseContext jsonContext = cache.get();
JsonParseContext jsonContext = cache.get().get();
if (type != null && !type.equals(this.type)) {
throw new MapperParsingException("Type mismatch, provide type [" + type + "] but mapper is of type [" + this.type + "]");

View File

@ -25,6 +25,7 @@ import org.apache.lucene.document.Field;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.gnu.trove.TIntObjectHashMap;
import org.elasticsearch.util.json.JsonBuilder;
@ -77,9 +78,9 @@ public abstract class JsonNumberFieldMapper<T extends Number> extends JsonFieldM
}
}
private static final ThreadLocal<TIntObjectHashMap<Deque<CachedNumericTokenStream>>> cachedStreams = new ThreadLocal<TIntObjectHashMap<Deque<CachedNumericTokenStream>>>() {
@Override protected TIntObjectHashMap<Deque<CachedNumericTokenStream>> initialValue() {
return new TIntObjectHashMap<Deque<CachedNumericTokenStream>>();
private static final ThreadLocal<ThreadLocals.CleanableValue<TIntObjectHashMap<Deque<CachedNumericTokenStream>>>> cachedStreams = new ThreadLocal<ThreadLocals.CleanableValue<TIntObjectHashMap<Deque<CachedNumericTokenStream>>>>() {
@Override protected ThreadLocals.CleanableValue<TIntObjectHashMap<Deque<CachedNumericTokenStream>>> initialValue() {
return new ThreadLocals.CleanableValue<TIntObjectHashMap<Deque<CachedNumericTokenStream>>>(new TIntObjectHashMap<Deque<CachedNumericTokenStream>>());
}
};
@ -152,10 +153,10 @@ public abstract class JsonNumberFieldMapper<T extends Number> extends JsonFieldM
* sicne it implements the end method.
*/
protected CachedNumericTokenStream popCachedStream(int precisionStep) {
Deque<CachedNumericTokenStream> deque = cachedStreams.get().get(precisionStep);
Deque<CachedNumericTokenStream> deque = cachedStreams.get().get().get(precisionStep);
if (deque == null) {
deque = new ArrayDeque<CachedNumericTokenStream>();
cachedStreams.get().put(precisionStep, deque);
cachedStreams.get().get().put(precisionStep, deque);
deque.add(new CachedNumericTokenStream(new NumericTokenStream(precisionStep), precisionStep));
}
if (deque.isEmpty()) {
@ -189,7 +190,7 @@ public abstract class JsonNumberFieldMapper<T extends Number> extends JsonFieldM
*/
public void close() throws IOException {
numericTokenStream.close();
cachedStreams.get().get(precisionStep).add(this);
cachedStreams.get().get().get(precisionStep).add(this);
}
/**

View File

@ -35,6 +35,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryParsingException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.io.FastCharArrayReader;
import org.elasticsearch.util.io.FastCharArrayWriter;
import org.elasticsearch.util.io.FastStringReader;
@ -57,9 +58,9 @@ public class JsonIndexQueryParser extends AbstractIndexComponent implements Inde
public static final String JSON_FILTER_PREFIX = "index.queryparser.json.filter";
}
private ThreadLocal<JsonQueryParseContext> cache = new ThreadLocal<JsonQueryParseContext>() {
@Override protected JsonQueryParseContext initialValue() {
return new JsonQueryParseContext(index, queryParserRegistry, mapperService, filterCache);
private ThreadLocal<ThreadLocals.CleanableValue<JsonQueryParseContext>> cache = new ThreadLocal<ThreadLocals.CleanableValue<JsonQueryParseContext>>() {
@Override protected ThreadLocals.CleanableValue<JsonQueryParseContext> initialValue() {
return new ThreadLocals.CleanableValue<JsonQueryParseContext>(new JsonQueryParseContext(index, queryParserRegistry, mapperService, filterCache));
}
};
@ -125,7 +126,7 @@ public class JsonIndexQueryParser extends AbstractIndexComponent implements Inde
try {
FastCharArrayWriter unsafeChars = queryBuilder.buildAsUnsafeChars();
jp = jsonFactory.createJsonParser(new FastCharArrayReader(unsafeChars.unsafeCharArray(), 0, unsafeChars.size()));
return parse(cache.get(), jp);
return parse(cache.get().get(), jp);
} catch (QueryParsingException e) {
throw e;
} catch (Exception e) {
@ -145,7 +146,7 @@ public class JsonIndexQueryParser extends AbstractIndexComponent implements Inde
JsonParser jp = null;
try {
jp = jsonFactory.createJsonParser(source);
return parse(cache.get(), jp);
return parse(cache.get().get(), jp);
} catch (QueryParsingException e) {
throw e;
} catch (Exception e) {
@ -165,7 +166,7 @@ public class JsonIndexQueryParser extends AbstractIndexComponent implements Inde
JsonParser jp = null;
try {
jp = jsonFactory.createJsonParser(new FastStringReader(source));
return parse(cache.get(), jp);
return parse(cache.get().get(), jp);
} catch (QueryParsingException e) {
throw e;
} catch (Exception e) {
@ -183,7 +184,7 @@ public class JsonIndexQueryParser extends AbstractIndexComponent implements Inde
public Query parse(JsonParser jsonParser) {
try {
return parse(cache.get(), jsonParser);
return parse(cache.get().get(), jsonParser);
} catch (IOException e) {
throw new QueryParsingException(index, "Failed to parse", e);
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.rest;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.json.JsonBuilder;
import java.io.IOException;
@ -38,9 +39,9 @@ public class JsonRestResponse extends AbstractRestResponse {
System.arraycopy(U_END_JSONP.result, 0, END_JSONP, 0, U_END_JSONP.length);
}
private static ThreadLocal<UnicodeUtil.UTF8Result> prefixCache = new ThreadLocal<UnicodeUtil.UTF8Result>() {
@Override protected UnicodeUtil.UTF8Result initialValue() {
return new UnicodeUtil.UTF8Result();
private static ThreadLocal<ThreadLocals.CleanableValue<UnicodeUtil.UTF8Result>> prefixCache = new ThreadLocal<ThreadLocals.CleanableValue<UnicodeUtil.UTF8Result>>() {
@Override protected ThreadLocals.CleanableValue<UnicodeUtil.UTF8Result> initialValue() {
return new ThreadLocals.CleanableValue<UnicodeUtil.UTF8Result>(new UnicodeUtil.UTF8Result());
}
};
@ -115,7 +116,7 @@ public class JsonRestResponse extends AbstractRestResponse {
if (callback == null) {
return null;
}
UnicodeUtil.UTF8Result result = prefixCache.get();
UnicodeUtil.UTF8Result result = prefixCache.get().get();
UnicodeUtil.UTF16toUTF8(callback, 0, callback.length(), result);
result.result[result.length] = '(';
result.length++;

View File

@ -20,15 +20,16 @@
package org.elasticsearch.rest;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.util.ThreadLocals;
/**
* @author kimchy (Shay Banon)
*/
public class StringRestResponse extends Utf8RestResponse {
private static ThreadLocal<UnicodeUtil.UTF8Result> cache = new ThreadLocal<UnicodeUtil.UTF8Result>() {
@Override protected UnicodeUtil.UTF8Result initialValue() {
return new UnicodeUtil.UTF8Result();
private static ThreadLocal<ThreadLocals.CleanableValue<UnicodeUtil.UTF8Result>> cache = new ThreadLocal<ThreadLocals.CleanableValue<UnicodeUtil.UTF8Result>>() {
@Override protected ThreadLocals.CleanableValue<UnicodeUtil.UTF8Result> initialValue() {
return new ThreadLocals.CleanableValue<UnicodeUtil.UTF8Result>(new UnicodeUtil.UTF8Result());
}
};
@ -41,7 +42,7 @@ public class StringRestResponse extends Utf8RestResponse {
}
private static UnicodeUtil.UTF8Result convert(String content) {
UnicodeUtil.UTF8Result result = cache.get();
UnicodeUtil.UTF8Result result = cache.get().get();
UnicodeUtil.UTF16toUTF8(content, 0, content.length(), result);
return result;
}

View File

@ -57,6 +57,7 @@ import org.elasticsearch.timer.TimerModule;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.Tuple;
import org.elasticsearch.util.component.Lifecycle;
import org.elasticsearch.util.guice.Injectors;
@ -238,6 +239,8 @@ public final class InternalServer implements Server {
// ignore
}
ThreadLocals.clearReferencesThreadLocals();
logger.info("{{}}: Closed", Version.full());
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util;
import org.elasticsearch.util.logging.Loggers;
import org.slf4j.Logger;
import java.lang.ref.Reference;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* @author kimchy (shay.banon)
*/
public class ThreadLocals {
private static final Logger logger = Loggers.getLogger(ThreadLocals.class);
public static class CleanableValue<T> {
private T value;
public CleanableValue(T value) {
this.value = value;
}
public T get() {
return value;
}
public void set(T value) {
this.value = value;
}
}
public static void clearReferencesThreadLocals() {
try {
Thread[] threads = getThreads();
// Make the fields in the Thread class that store ThreadLocals
// accessible
Field threadLocalsField = Thread.class.getDeclaredField("threadLocals");
threadLocalsField.setAccessible(true);
Field inheritableThreadLocalsField = Thread.class.getDeclaredField("inheritableThreadLocals");
inheritableThreadLocalsField.setAccessible(true);
// Make the underlying array of ThreadLoad.ThreadLocalMap.Entry objects
// accessible
Class<?> tlmClass = Class.forName("java.lang.ThreadLocal$ThreadLocalMap");
Field tableField = tlmClass.getDeclaredField("table");
tableField.setAccessible(true);
for (int i = 0; i < threads.length; i++) {
Object threadLocalMap;
if (threads[i] != null) {
// Clear the first map
threadLocalMap = threadLocalsField.get(threads[i]);
clearThreadLocalMap(threadLocalMap, tableField);
// Clear the second map
threadLocalMap =
inheritableThreadLocalsField.get(threads[i]);
clearThreadLocalMap(threadLocalMap, tableField);
}
}
} catch (Exception e) {
logger.debug("Failed to clean thread locals", e);
}
}
/*
* Clears the given thread local map object. Also pass in the field that
* points to the internal table to save re-calculating it on every
* call to this method.
*/
private static void clearThreadLocalMap(Object map, Field internalTableField) throws NoSuchMethodException, IllegalAccessException, NoSuchFieldException, InvocationTargetException {
if (map != null) {
Method mapRemove = map.getClass().getDeclaredMethod("remove", ThreadLocal.class);
mapRemove.setAccessible(true);
Object[] table = (Object[]) internalTableField.get(map);
int staleEntriesCount = 0;
if (table != null) {
for (int j = 0; j < table.length; j++) {
if (table[j] != null) {
boolean remove = false;
// Check the key
Object key = ((Reference<?>) table[j]).get();
// Check the value
Field valueField = table[j].getClass().getDeclaredField("value");
valueField.setAccessible(true);
Object value = valueField.get(table[j]);
if ((value != null && CleanableValue.class.isAssignableFrom(value.getClass()))) {
remove = true;
}
if (remove) {
Object[] args = new Object[4];
if (key != null) {
args[0] = key.getClass().getCanonicalName();
args[1] = key.toString();
}
args[2] = value.getClass().getCanonicalName();
args[3] = value.toString();
if (logger.isDebugEnabled()) {
logger.debug("ThreadLocal with key of type [{0}] (value [{1}]) and a value of type [{2}] (value [{3}]): The ThreadLocal has been forcibly removed.", args);
}
if (key == null) {
staleEntriesCount++;
} else {
mapRemove.invoke(map, key);
}
}
}
}
}
if (staleEntriesCount > 0) {
Method mapRemoveStale = map.getClass().getDeclaredMethod("expungeStaleEntries");
mapRemoveStale.setAccessible(true);
mapRemoveStale.invoke(map);
}
}
}
/*
* Get the set of current threads as an array.
*/
private static Thread[] getThreads() {
// Get the current thread group
ThreadGroup tg = Thread.currentThread().getThreadGroup();
// Find the root thread group
while (tg.getParent() != null) {
tg = tg.getParent();
}
int threadCountGuess = tg.activeCount() + 50;
Thread[] threads = new Thread[threadCountGuess];
int threadCountActual = tg.enumerate(threads);
// Make sure we don't miss any threads
while (threadCountActual == threadCountGuess) {
threadCountGuess *= 2;
threads = new Thread[threadCountGuess];
// Note tg.enumerate(Thread[]) silently ignores any threads that
// can't fit into the array
threadCountActual = tg.enumerate(threads);
}
return threads;
}
}

View File

@ -28,15 +28,15 @@ import java.util.Arrays;
*/
public class Unicode {
private static ThreadLocal<UnicodeUtil.UTF8Result> cachedUtf8Result = new ThreadLocal<UnicodeUtil.UTF8Result>() {
@Override protected UnicodeUtil.UTF8Result initialValue() {
return new UnicodeUtil.UTF8Result();
private static ThreadLocal<ThreadLocals.CleanableValue<UnicodeUtil.UTF8Result>> cachedUtf8Result = new ThreadLocal<ThreadLocals.CleanableValue<UnicodeUtil.UTF8Result>>() {
@Override protected ThreadLocals.CleanableValue<UnicodeUtil.UTF8Result> initialValue() {
return new ThreadLocals.CleanableValue<UnicodeUtil.UTF8Result>(new UnicodeUtil.UTF8Result());
}
};
private static ThreadLocal<UTF16Result> cachedUtf16Result = new ThreadLocal<UTF16Result>() {
@Override protected UTF16Result initialValue() {
return new UTF16Result();
private static ThreadLocal<ThreadLocals.CleanableValue<UTF16Result>> cachedUtf16Result = new ThreadLocal<ThreadLocals.CleanableValue<UTF16Result>>() {
@Override protected ThreadLocals.CleanableValue<UTF16Result> initialValue() {
return new ThreadLocals.CleanableValue<UTF16Result>(new UTF16Result());
}
};
@ -61,7 +61,7 @@ public class Unicode {
if (source == null) {
return null;
}
UnicodeUtil.UTF8Result result = cachedUtf8Result.get();
UnicodeUtil.UTF8Result result = cachedUtf8Result.get().get();
UnicodeUtil.UTF16toUTF8(source, 0, source.length(), result);
return result;
}
@ -99,7 +99,7 @@ public class Unicode {
if (source == null) {
return null;
}
UTF16Result result = cachedUtf16Result.get();
UTF16Result result = cachedUtf16Result.get().get();
UTF8toUTF16(source, offset, length, result);
return result;
}

View File

@ -25,6 +25,8 @@
package org.elasticsearch.util.concurrent;
import org.elasticsearch.util.ThreadLocals;
import java.util.Random;
/**
@ -77,10 +79,10 @@ public class ThreadLocalRandom extends Random {
/**
* The actual ThreadLocal
*/
private static final ThreadLocal<ThreadLocalRandom> localRandom =
new ThreadLocal<ThreadLocalRandom>() {
protected ThreadLocalRandom initialValue() {
return new ThreadLocalRandom();
private static final ThreadLocal<ThreadLocals.CleanableValue<ThreadLocalRandom>> localRandom =
new ThreadLocal<ThreadLocals.CleanableValue<ThreadLocalRandom>>() {
protected ThreadLocals.CleanableValue<ThreadLocalRandom> initialValue() {
return new ThreadLocals.CleanableValue<ThreadLocalRandom>(new ThreadLocalRandom());
}
};
@ -100,7 +102,7 @@ public class ThreadLocalRandom extends Random {
* @return the current thread's {@code ThreadLocalRandom}
*/
public static ThreadLocalRandom current() {
return localRandom.get();
return localRandom.get().get();
}
/**

View File

@ -19,6 +19,7 @@
package org.elasticsearch.util.io;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.concurrent.NotThreadSafe;
import java.io.DataOutputStream;
@ -34,9 +35,9 @@ public class ByteArrayDataOutputStream extends DataOutputStream {
*/
public static class Cached {
private static final ThreadLocal<ByteArrayDataOutputStream> cache = new ThreadLocal<ByteArrayDataOutputStream>() {
@Override protected ByteArrayDataOutputStream initialValue() {
return new ByteArrayDataOutputStream();
private static final ThreadLocal<ThreadLocals.CleanableValue<ByteArrayDataOutputStream>> cache = new ThreadLocal<ThreadLocals.CleanableValue<ByteArrayDataOutputStream>>() {
@Override protected ThreadLocals.CleanableValue<ByteArrayDataOutputStream> initialValue() {
return new ThreadLocals.CleanableValue<ByteArrayDataOutputStream>(new ByteArrayDataOutputStream());
}
};
@ -44,7 +45,7 @@ public class ByteArrayDataOutputStream extends DataOutputStream {
* Returns the cached thread local byte strean, with its internal stream cleared.
*/
public static ByteArrayDataOutputStream cached() {
ByteArrayDataOutputStream os = cache.get();
ByteArrayDataOutputStream os = cache.get().get();
((FastByteArrayOutputStream) os.out).reset();
return os;
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.util.io;
import org.elasticsearch.util.ThreadLocals;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
@ -36,9 +38,9 @@ public class FastByteArrayOutputStream extends OutputStream {
*/
public static class Cached {
private static final ThreadLocal<FastByteArrayOutputStream> cache = new ThreadLocal<FastByteArrayOutputStream>() {
@Override protected FastByteArrayOutputStream initialValue() {
return new FastByteArrayOutputStream();
private static final ThreadLocal<ThreadLocals.CleanableValue<FastByteArrayOutputStream>> cache = new ThreadLocal<ThreadLocals.CleanableValue<FastByteArrayOutputStream>>() {
@Override protected ThreadLocals.CleanableValue<FastByteArrayOutputStream> initialValue() {
return new ThreadLocals.CleanableValue<FastByteArrayOutputStream>(new FastByteArrayOutputStream());
}
};
@ -46,7 +48,7 @@ public class FastByteArrayOutputStream extends OutputStream {
* Returns the cached thread local byte stream, with its internal stream cleared.
*/
public static FastByteArrayOutputStream cached() {
FastByteArrayOutputStream os = cache.get();
FastByteArrayOutputStream os = cache.get().get();
os.reset();
return os;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.util.io;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.concurrent.NotThreadSafe;
import java.io.IOException;
@ -38,9 +39,9 @@ public class FastCharArrayWriter extends Writer {
*/
public static class Cached {
private static final ThreadLocal<FastCharArrayWriter> cache = new ThreadLocal<FastCharArrayWriter>() {
@Override protected FastCharArrayWriter initialValue() {
return new FastCharArrayWriter();
private static final ThreadLocal<ThreadLocals.CleanableValue<FastCharArrayWriter>> cache = new ThreadLocal<ThreadLocals.CleanableValue<FastCharArrayWriter>>() {
@Override protected ThreadLocals.CleanableValue<FastCharArrayWriter> initialValue() {
return new ThreadLocals.CleanableValue<FastCharArrayWriter>(new FastCharArrayWriter());
}
};
@ -48,7 +49,7 @@ public class FastCharArrayWriter extends Writer {
* Returns the cached thread local byte stream, with its internal stream cleared.
*/
public static FastCharArrayWriter cached() {
FastCharArrayWriter os = cache.get();
FastCharArrayWriter os = cache.get().get();
os.reset();
return os;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.util.io;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.concurrent.NotThreadSafe;
import java.io.Writer;
@ -39,9 +40,9 @@ public class StringBuilderWriter extends Writer {
*/
public static class Cached {
private static final ThreadLocal<StringBuilderWriter> cache = new ThreadLocal<StringBuilderWriter>() {
@Override protected StringBuilderWriter initialValue() {
return new StringBuilderWriter();
private static final ThreadLocal<ThreadLocals.CleanableValue<StringBuilderWriter>> cache = new ThreadLocal<ThreadLocals.CleanableValue<StringBuilderWriter>>() {
@Override protected ThreadLocals.CleanableValue<StringBuilderWriter> initialValue() {
return new ThreadLocals.CleanableValue<StringBuilderWriter>(new StringBuilderWriter());
}
};
@ -49,7 +50,7 @@ public class StringBuilderWriter extends Writer {
* Returns the cached thread local writer, with its internal {@link StringBuilder} cleared.
*/
public static StringBuilderWriter cached() {
StringBuilderWriter writer = cache.get();
StringBuilderWriter writer = cache.get().get();
writer.getBuilder().setLength(0);
return writer;
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.util.io.compression;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.Unicode;
import org.elasticsearch.util.io.FastByteArrayInputStream;
import org.elasticsearch.util.io.FastByteArrayOutputStream;
@ -36,9 +37,9 @@ public class GZIPCompressor implements Compressor {
private static class Cached {
private static final ThreadLocal<CompressHolder> cache = new ThreadLocal<CompressHolder>() {
@Override protected CompressHolder initialValue() {
return new CompressHolder();
private static final ThreadLocal<ThreadLocals.CleanableValue<CompressHolder>> cache = new ThreadLocal<ThreadLocals.CleanableValue<CompressHolder>>() {
@Override protected ThreadLocals.CleanableValue<CompressHolder> initialValue() {
return new ThreadLocals.CleanableValue<CompressHolder>(new CompressHolder());
}
};
@ -46,7 +47,7 @@ public class GZIPCompressor implements Compressor {
* Returns the cached thread local byte strean, with its internal stream cleared.
*/
public static CompressHolder cached() {
CompressHolder ch = cache.get();
CompressHolder ch = cache.get().get();
ch.bos.reset();
return ch;
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.util.io.compression;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.Unicode;
import org.elasticsearch.util.io.compression.lzf.LZFDecoder;
import org.elasticsearch.util.io.compression.lzf.LZFEncoder;
@ -33,14 +34,14 @@ public class LzfCompressor implements Compressor {
private static class Cached {
private static final ThreadLocal<CompressHolder> cache = new ThreadLocal<CompressHolder>() {
@Override protected CompressHolder initialValue() {
return new CompressHolder();
private static final ThreadLocal<ThreadLocals.CleanableValue<CompressHolder>> cache = new ThreadLocal<ThreadLocals.CleanableValue<CompressHolder>>() {
@Override protected ThreadLocals.CleanableValue<CompressHolder> initialValue() {
return new ThreadLocals.CleanableValue<CompressHolder>(new CompressHolder());
}
};
public static CompressHolder cached() {
return cache.get();
return cache.get().get();
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.util.io.compression;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.Unicode;
import org.elasticsearch.util.io.FastByteArrayOutputStream;
@ -36,9 +37,9 @@ public class ZipCompressor implements Compressor {
private static class Cached {
private static final ThreadLocal<CompressHolder> cache = new ThreadLocal<CompressHolder>() {
@Override protected CompressHolder initialValue() {
return new CompressHolder();
private static final ThreadLocal<ThreadLocals.CleanableValue<CompressHolder>> cache = new ThreadLocal<ThreadLocals.CleanableValue<CompressHolder>>() {
@Override protected ThreadLocals.CleanableValue<CompressHolder> initialValue() {
return new ThreadLocals.CleanableValue<CompressHolder>(new CompressHolder());
}
};
@ -46,7 +47,7 @@ public class ZipCompressor implements Compressor {
* Returns the cached thread local byte strean, with its internal stream cleared.
*/
public static CompressHolder cached() {
CompressHolder ch = cache.get();
CompressHolder ch = cache.get().get();
ch.bos.reset();
return ch;
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.util.io.stream;
import org.elasticsearch.util.ThreadLocals;
import java.io.IOException;
import java.util.Arrays;
@ -32,15 +34,15 @@ public class BytesStreamOutput extends StreamOutput {
*/
public static class Cached {
private static final ThreadLocal<BytesStreamOutput> cache = new ThreadLocal<BytesStreamOutput>() {
@Override protected BytesStreamOutput initialValue() {
return new BytesStreamOutput();
private static final ThreadLocal<ThreadLocals.CleanableValue<BytesStreamOutput>> cache = new ThreadLocal<ThreadLocals.CleanableValue<BytesStreamOutput>>() {
@Override protected ThreadLocals.CleanableValue<BytesStreamOutput> initialValue() {
return new ThreadLocals.CleanableValue<BytesStreamOutput>(new BytesStreamOutput());
}
};
private static final ThreadLocal<HandlesStreamOutput> cacheHandles = new ThreadLocal<HandlesStreamOutput>() {
@Override protected HandlesStreamOutput initialValue() {
return new HandlesStreamOutput(new BytesStreamOutput());
private static final ThreadLocal<ThreadLocals.CleanableValue<HandlesStreamOutput>> cacheHandles = new ThreadLocal<ThreadLocals.CleanableValue<HandlesStreamOutput>>() {
@Override protected ThreadLocals.CleanableValue<HandlesStreamOutput> initialValue() {
return new ThreadLocals.CleanableValue<HandlesStreamOutput>(new HandlesStreamOutput(new BytesStreamOutput()));
}
};
@ -48,13 +50,13 @@ public class BytesStreamOutput extends StreamOutput {
* Returns the cached thread local byte stream, with its internal stream cleared.
*/
public static BytesStreamOutput cached() {
BytesStreamOutput os = cache.get();
BytesStreamOutput os = cache.get().get();
os.reset();
return os;
}
public static HandlesStreamOutput cachedHandles() throws IOException {
HandlesStreamOutput os = cacheHandles.get();
HandlesStreamOutput os = cacheHandles.get().get();
os.reset();
return os;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.util.io.stream;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.gnu.trove.TIntObjectHashMap;
import java.io.IOException;
@ -30,9 +31,9 @@ public class HandlesStreamInput extends StreamInput {
public static class Cached {
private static final ThreadLocal<HandlesStreamInput> cache = new ThreadLocal<HandlesStreamInput>() {
@Override protected HandlesStreamInput initialValue() {
return new HandlesStreamInput();
private static final ThreadLocal<ThreadLocals.CleanableValue<HandlesStreamInput>> cache = new ThreadLocal<ThreadLocals.CleanableValue<HandlesStreamInput>>() {
@Override protected ThreadLocals.CleanableValue<HandlesStreamInput> initialValue() {
return new ThreadLocals.CleanableValue<HandlesStreamInput>(new HandlesStreamInput());
}
};
@ -40,7 +41,7 @@ public class HandlesStreamInput extends StreamInput {
* Returns the cached thread local byte stream, with its internal stream cleared.
*/
public static HandlesStreamInput cached(StreamInput in) {
HandlesStreamInput os = cache.get();
HandlesStreamInput os = cache.get().get();
os.reset(in);
return os;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.util.io.stream;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.Unicode;
import java.io.EOFException;
@ -30,9 +31,9 @@ import java.io.InputStream;
*/
public abstract class StreamInput extends InputStream {
private static ThreadLocal<byte[]> cachedBytes = new ThreadLocal<byte[]>() {
@Override protected byte[] initialValue() {
return new byte[256];
private static ThreadLocal<ThreadLocals.CleanableValue<byte[]>> cachedBytes = new ThreadLocal<ThreadLocals.CleanableValue<byte[]>>() {
@Override protected ThreadLocals.CleanableValue<byte[]> initialValue() {
return new ThreadLocals.CleanableValue<byte[]>(new byte[256]);
}
};
@ -108,10 +109,10 @@ public abstract class StreamInput extends InputStream {
*/
public String readUTF() throws IOException {
int length = readVInt();
byte[] bytes = cachedBytes.get();
byte[] bytes = cachedBytes.get().get();
if (bytes == null || length > bytes.length) {
bytes = new byte[(int) (length * 1.25)];
cachedBytes.set(bytes);
cachedBytes.get().set(bytes);
}
readBytes(bytes, 0, length);
return Unicode.fromBytes(bytes, 0, length);

View File

@ -23,6 +23,7 @@ import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.Unicode;
import org.elasticsearch.util.io.FastByteArrayOutputStream;
@ -38,16 +39,10 @@ public class BinaryJsonBuilder extends JsonBuilder<BinaryJsonBuilder> {
*/
public static class Cached {
private BinaryJsonBuilder builder;
public Cached(BinaryJsonBuilder builder) {
this.builder = builder;
}
private static final ThreadLocal<Cached> cache = new ThreadLocal<Cached>() {
@Override protected Cached initialValue() {
private static final ThreadLocal<ThreadLocals.CleanableValue<BinaryJsonBuilder>> cache = new ThreadLocal<ThreadLocals.CleanableValue<BinaryJsonBuilder>>() {
@Override protected ThreadLocals.CleanableValue<BinaryJsonBuilder> initialValue() {
try {
return new Cached(new BinaryJsonBuilder());
return new ThreadLocals.CleanableValue<BinaryJsonBuilder>(new BinaryJsonBuilder());
} catch (IOException e) {
throw new ElasticSearchException("Failed to create json generator", e);
}
@ -58,9 +53,9 @@ public class BinaryJsonBuilder extends JsonBuilder<BinaryJsonBuilder> {
* Returns the cached thread local generator, with its internal {@link StringBuilder} cleared.
*/
static BinaryJsonBuilder cached() throws IOException {
Cached cached = cache.get();
cached.builder.reset();
return cached.builder;
ThreadLocals.CleanableValue<BinaryJsonBuilder> cached = cache.get();
cached.get().reset();
return cached.get();
}
}

View File

@ -23,6 +23,7 @@ import org.apache.lucene.util.UnicodeUtil;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.Unicode;
import org.elasticsearch.util.concurrent.NotThreadSafe;
import org.elasticsearch.util.io.FastCharArrayWriter;
@ -40,16 +41,10 @@ public class StringJsonBuilder extends JsonBuilder<StringJsonBuilder> {
*/
public static class Cached {
private StringJsonBuilder builder;
public Cached(StringJsonBuilder builder) {
this.builder = builder;
}
private static final ThreadLocal<Cached> cache = new ThreadLocal<Cached>() {
@Override protected Cached initialValue() {
private static final ThreadLocal<ThreadLocals.CleanableValue<StringJsonBuilder>> cache = new ThreadLocal<ThreadLocals.CleanableValue<StringJsonBuilder>>() {
@Override protected ThreadLocals.CleanableValue<StringJsonBuilder> initialValue() {
try {
return new Cached(new StringJsonBuilder());
return new ThreadLocals.CleanableValue<StringJsonBuilder>(new StringJsonBuilder());
} catch (IOException e) {
throw new ElasticSearchException("Failed to create json generator", e);
}
@ -60,9 +55,9 @@ public class StringJsonBuilder extends JsonBuilder<StringJsonBuilder> {
* Returns the cached thread local generator, with its internal {@link StringBuilder} cleared.
*/
static StringJsonBuilder cached() throws IOException {
Cached cached = cache.get();
cached.builder.reset();
return cached.builder;
StringJsonBuilder sb = cache.get().get();
sb.reset();
return sb;
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util;
import org.testng.annotations.Test;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
@Test
public class ThreadLocalsTests {
private static final ThreadLocal<ThreadLocals.CleanableValue<AtomicInteger>> local = new ThreadLocal<ThreadLocals.CleanableValue<AtomicInteger>>() {
@Override protected ThreadLocals.CleanableValue<AtomicInteger> initialValue() {
return new ThreadLocals.CleanableValue<AtomicInteger>(new AtomicInteger());
}
};
@Test public void testCleanThreadLocals() {
assertThat(local.get().get().get(), equalTo(0));
local.get().get().incrementAndGet();
assertThat(local.get().get().get(), equalTo(1));
ThreadLocals.clearReferencesThreadLocals();
assertThat(local.get().get().get(), equalTo(0));
ThreadLocals.clearReferencesThreadLocals();
}
}