From 5b11de8958d2ae4fc99eec9a79d69796d459e4f3 Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 23 Mar 2010 10:31:15 +0200 Subject: [PATCH] clean thread locals (even static ones!) on Server#close or TransportClient#close --- .../client/transport/TransportClient.java | 3 + .../index/mapper/json/JsonDocumentMapper.java | 9 +- .../mapper/json/JsonNumberFieldMapper.java | 13 +- .../query/json/JsonIndexQueryParser.java | 15 +- .../elasticsearch/rest/JsonRestResponse.java | 9 +- .../rest/StringRestResponse.java | 9 +- .../server/internal/InternalServer.java | 3 + .../org/elasticsearch/util/ThreadLocals.java | 166 ++++++++++++++++++ .../java/org/elasticsearch/util/Unicode.java | 16 +- .../util/concurrent/ThreadLocalRandom.java | 12 +- .../util/io/ByteArrayDataOutputStream.java | 9 +- .../util/io/FastByteArrayOutputStream.java | 10 +- .../util/io/FastCharArrayWriter.java | 9 +- .../util/io/StringBuilderWriter.java | 9 +- .../util/io/compression/GZIPCompressor.java | 9 +- .../util/io/compression/LzfCompressor.java | 9 +- .../util/io/compression/ZipCompressor.java | 9 +- .../util/io/stream/BytesStreamOutput.java | 18 +- .../util/io/stream/HandlesStreamInput.java | 9 +- .../util/io/stream/StreamInput.java | 11 +- .../util/json/BinaryJsonBuilder.java | 19 +- .../util/json/StringJsonBuilder.java | 19 +- .../elasticsearch/util/ThreadLocalsTests.java | 49 ++++++ 23 files changed, 337 insertions(+), 107 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/util/ThreadLocals.java create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/util/ThreadLocalsTests.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 98e211f88ef..4d46c434dd2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -57,6 +57,7 @@ import org.elasticsearch.timer.TimerModule; import org.elasticsearch.timer.TimerService; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.Tuple; import org.elasticsearch.util.settings.ImmutableSettings; import org.elasticsearch.util.settings.Settings; @@ -204,6 +205,8 @@ public class TransportClient implements Client { injector.getInstance(TimerService.class).close(); injector.getInstance(ThreadPool.class).shutdown(); + + ThreadLocals.clearReferencesThreadLocals(); } @Override public AdminClient admin() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/json/JsonDocumentMapper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/json/JsonDocumentMapper.java index cd7cc703aa5..c3039e1cfc6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/json/JsonDocumentMapper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/json/JsonDocumentMapper.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.mapper.*; import org.elasticsearch.util.Nullable; import org.elasticsearch.util.Preconditions; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.json.Jackson; import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.json.StringJsonBuilder; @@ -134,9 +135,9 @@ public class JsonDocumentMapper implements DocumentMapper, ToJson { } - private ThreadLocal cache = new ThreadLocal() { - @Override protected JsonParseContext initialValue() { - return new JsonParseContext(JsonDocumentMapper.this, new JsonPath(0)); + private ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new JsonParseContext(JsonDocumentMapper.this, new JsonPath(0))); } }; @@ -277,7 +278,7 @@ public class JsonDocumentMapper implements DocumentMapper, ToJson { } @Override public ParsedDocument parse(String type, String id, byte[] source, ParseListener listener) { - JsonParseContext jsonContext = cache.get(); + JsonParseContext jsonContext = cache.get().get(); if (type != null && !type.equals(this.type)) { throw new MapperParsingException("Type mismatch, provide type [" + type + "] but mapper is of type [" + this.type + "]"); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/json/JsonNumberFieldMapper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/json/JsonNumberFieldMapper.java index ba621629056..236f3853ffe 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/json/JsonNumberFieldMapper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/json/JsonNumberFieldMapper.java @@ -25,6 +25,7 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.Fieldable; import org.apache.lucene.util.NumericUtils; import org.elasticsearch.index.analysis.NamedAnalyzer; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.gnu.trove.TIntObjectHashMap; import org.elasticsearch.util.json.JsonBuilder; @@ -77,9 +78,9 @@ public abstract class JsonNumberFieldMapper extends JsonFieldM } } - private static final ThreadLocal>> cachedStreams = new ThreadLocal>>() { - @Override protected TIntObjectHashMap> initialValue() { - return new TIntObjectHashMap>(); + private static final ThreadLocal>>> cachedStreams = new ThreadLocal>>>() { + @Override protected ThreadLocals.CleanableValue>> initialValue() { + return new ThreadLocals.CleanableValue>>(new TIntObjectHashMap>()); } }; @@ -152,10 +153,10 @@ public abstract class JsonNumberFieldMapper extends JsonFieldM * sicne it implements the end method. */ protected CachedNumericTokenStream popCachedStream(int precisionStep) { - Deque deque = cachedStreams.get().get(precisionStep); + Deque deque = cachedStreams.get().get().get(precisionStep); if (deque == null) { deque = new ArrayDeque(); - cachedStreams.get().put(precisionStep, deque); + cachedStreams.get().get().put(precisionStep, deque); deque.add(new CachedNumericTokenStream(new NumericTokenStream(precisionStep), precisionStep)); } if (deque.isEmpty()) { @@ -189,7 +190,7 @@ public abstract class JsonNumberFieldMapper extends JsonFieldM */ public void close() throws IOException { numericTokenStream.close(); - cachedStreams.get().get(precisionStep).add(this); + cachedStreams.get().get().get(precisionStep).add(this); } /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/query/json/JsonIndexQueryParser.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/query/json/JsonIndexQueryParser.java index 839d37cb4aa..28b7c49055a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/query/json/JsonIndexQueryParser.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/query/json/JsonIndexQueryParser.java @@ -35,6 +35,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryParsingException; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.util.Nullable; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.io.FastCharArrayReader; import org.elasticsearch.util.io.FastCharArrayWriter; import org.elasticsearch.util.io.FastStringReader; @@ -57,9 +58,9 @@ public class JsonIndexQueryParser extends AbstractIndexComponent implements Inde public static final String JSON_FILTER_PREFIX = "index.queryparser.json.filter"; } - private ThreadLocal cache = new ThreadLocal() { - @Override protected JsonQueryParseContext initialValue() { - return new JsonQueryParseContext(index, queryParserRegistry, mapperService, filterCache); + private ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new JsonQueryParseContext(index, queryParserRegistry, mapperService, filterCache)); } }; @@ -125,7 +126,7 @@ public class JsonIndexQueryParser extends AbstractIndexComponent implements Inde try { FastCharArrayWriter unsafeChars = queryBuilder.buildAsUnsafeChars(); jp = jsonFactory.createJsonParser(new FastCharArrayReader(unsafeChars.unsafeCharArray(), 0, unsafeChars.size())); - return parse(cache.get(), jp); + return parse(cache.get().get(), jp); } catch (QueryParsingException e) { throw e; } catch (Exception e) { @@ -145,7 +146,7 @@ public class JsonIndexQueryParser extends AbstractIndexComponent implements Inde JsonParser jp = null; try { jp = jsonFactory.createJsonParser(source); - return parse(cache.get(), jp); + return parse(cache.get().get(), jp); } catch (QueryParsingException e) { throw e; } catch (Exception e) { @@ -165,7 +166,7 @@ public class JsonIndexQueryParser extends AbstractIndexComponent implements Inde JsonParser jp = null; try { jp = jsonFactory.createJsonParser(new FastStringReader(source)); - return parse(cache.get(), jp); + return parse(cache.get().get(), jp); } catch (QueryParsingException e) { throw e; } catch (Exception e) { @@ -183,7 +184,7 @@ public class JsonIndexQueryParser extends AbstractIndexComponent implements Inde public Query parse(JsonParser jsonParser) { try { - return parse(cache.get(), jsonParser); + return parse(cache.get().get(), jsonParser); } catch (IOException e) { throw new QueryParsingException(index, "Failed to parse", e); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/JsonRestResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/JsonRestResponse.java index 1f9e70d76d8..7672079e5d5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/JsonRestResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/JsonRestResponse.java @@ -20,6 +20,7 @@ package org.elasticsearch.rest; import org.apache.lucene.util.UnicodeUtil; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.json.JsonBuilder; import java.io.IOException; @@ -38,9 +39,9 @@ public class JsonRestResponse extends AbstractRestResponse { System.arraycopy(U_END_JSONP.result, 0, END_JSONP, 0, U_END_JSONP.length); } - private static ThreadLocal prefixCache = new ThreadLocal() { - @Override protected UnicodeUtil.UTF8Result initialValue() { - return new UnicodeUtil.UTF8Result(); + private static ThreadLocal> prefixCache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new UnicodeUtil.UTF8Result()); } }; @@ -115,7 +116,7 @@ public class JsonRestResponse extends AbstractRestResponse { if (callback == null) { return null; } - UnicodeUtil.UTF8Result result = prefixCache.get(); + UnicodeUtil.UTF8Result result = prefixCache.get().get(); UnicodeUtil.UTF16toUTF8(callback, 0, callback.length(), result); result.result[result.length] = '('; result.length++; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/StringRestResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/StringRestResponse.java index d96e5fef21c..ce8821f2d7e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/StringRestResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/StringRestResponse.java @@ -20,15 +20,16 @@ package org.elasticsearch.rest; import org.apache.lucene.util.UnicodeUtil; +import org.elasticsearch.util.ThreadLocals; /** * @author kimchy (Shay Banon) */ public class StringRestResponse extends Utf8RestResponse { - private static ThreadLocal cache = new ThreadLocal() { - @Override protected UnicodeUtil.UTF8Result initialValue() { - return new UnicodeUtil.UTF8Result(); + private static ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new UnicodeUtil.UTF8Result()); } }; @@ -41,7 +42,7 @@ public class StringRestResponse extends Utf8RestResponse { } private static UnicodeUtil.UTF8Result convert(String content) { - UnicodeUtil.UTF8Result result = cache.get(); + UnicodeUtil.UTF8Result result = cache.get().get(); UnicodeUtil.UTF16toUTF8(content, 0, content.length(), result); return result; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/server/internal/InternalServer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/server/internal/InternalServer.java index bfe7783317e..ceb5c698077 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/server/internal/InternalServer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/server/internal/InternalServer.java @@ -57,6 +57,7 @@ import org.elasticsearch.timer.TimerModule; import org.elasticsearch.timer.TimerService; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.Tuple; import org.elasticsearch.util.component.Lifecycle; import org.elasticsearch.util.guice.Injectors; @@ -238,6 +239,8 @@ public final class InternalServer implements Server { // ignore } + ThreadLocals.clearReferencesThreadLocals(); + logger.info("{{}}: Closed", Version.full()); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/ThreadLocals.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/ThreadLocals.java new file mode 100644 index 00000000000..7030e3c16f6 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/ThreadLocals.java @@ -0,0 +1,166 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util; + +import org.elasticsearch.util.logging.Loggers; +import org.slf4j.Logger; + +import java.lang.ref.Reference; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * @author kimchy (shay.banon) + */ +public class ThreadLocals { + + private static final Logger logger = Loggers.getLogger(ThreadLocals.class); + + public static class CleanableValue { + + private T value; + + public CleanableValue(T value) { + this.value = value; + } + + public T get() { + return value; + } + + public void set(T value) { + this.value = value; + } + } + + public static void clearReferencesThreadLocals() { + try { + Thread[] threads = getThreads(); + // Make the fields in the Thread class that store ThreadLocals + // accessible + Field threadLocalsField = Thread.class.getDeclaredField("threadLocals"); + threadLocalsField.setAccessible(true); + Field inheritableThreadLocalsField = Thread.class.getDeclaredField("inheritableThreadLocals"); + inheritableThreadLocalsField.setAccessible(true); + // Make the underlying array of ThreadLoad.ThreadLocalMap.Entry objects + // accessible + Class tlmClass = Class.forName("java.lang.ThreadLocal$ThreadLocalMap"); + Field tableField = tlmClass.getDeclaredField("table"); + tableField.setAccessible(true); + + for (int i = 0; i < threads.length; i++) { + Object threadLocalMap; + if (threads[i] != null) { + // Clear the first map + threadLocalMap = threadLocalsField.get(threads[i]); + clearThreadLocalMap(threadLocalMap, tableField); + // Clear the second map + threadLocalMap = + inheritableThreadLocalsField.get(threads[i]); + clearThreadLocalMap(threadLocalMap, tableField); + } + } + } catch (Exception e) { + logger.debug("Failed to clean thread locals", e); + } + } + + + /* + * Clears the given thread local map object. Also pass in the field that + * points to the internal table to save re-calculating it on every + * call to this method. + */ + + private static void clearThreadLocalMap(Object map, Field internalTableField) throws NoSuchMethodException, IllegalAccessException, NoSuchFieldException, InvocationTargetException { + if (map != null) { + Method mapRemove = map.getClass().getDeclaredMethod("remove", ThreadLocal.class); + mapRemove.setAccessible(true); + Object[] table = (Object[]) internalTableField.get(map); + int staleEntriesCount = 0; + if (table != null) { + for (int j = 0; j < table.length; j++) { + if (table[j] != null) { + boolean remove = false; + // Check the key + Object key = ((Reference) table[j]).get(); + // Check the value + Field valueField = table[j].getClass().getDeclaredField("value"); + valueField.setAccessible(true); + Object value = valueField.get(table[j]); + if ((value != null && CleanableValue.class.isAssignableFrom(value.getClass()))) { + remove = true; + } + if (remove) { + Object[] args = new Object[4]; + if (key != null) { + args[0] = key.getClass().getCanonicalName(); + args[1] = key.toString(); + } + args[2] = value.getClass().getCanonicalName(); + args[3] = value.toString(); + if (logger.isDebugEnabled()) { + logger.debug("ThreadLocal with key of type [{0}] (value [{1}]) and a value of type [{2}] (value [{3}]): The ThreadLocal has been forcibly removed.", args); + } + if (key == null) { + staleEntriesCount++; + } else { + mapRemove.invoke(map, key); + } + } + } + } + } + if (staleEntriesCount > 0) { + Method mapRemoveStale = map.getClass().getDeclaredMethod("expungeStaleEntries"); + mapRemoveStale.setAccessible(true); + mapRemoveStale.invoke(map); + } + } + } + + /* + * Get the set of current threads as an array. + */ + + private static Thread[] getThreads() { + // Get the current thread group + ThreadGroup tg = Thread.currentThread().getThreadGroup(); + // Find the root thread group + while (tg.getParent() != null) { + tg = tg.getParent(); + } + + int threadCountGuess = tg.activeCount() + 50; + Thread[] threads = new Thread[threadCountGuess]; + int threadCountActual = tg.enumerate(threads); + // Make sure we don't miss any threads + while (threadCountActual == threadCountGuess) { + threadCountGuess *= 2; + threads = new Thread[threadCountGuess]; + // Note tg.enumerate(Thread[]) silently ignores any threads that + // can't fit into the array + threadCountActual = tg.enumerate(threads); + } + + return threads; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/Unicode.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/Unicode.java index af1b011247f..719ccea6c61 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/Unicode.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/Unicode.java @@ -28,15 +28,15 @@ import java.util.Arrays; */ public class Unicode { - private static ThreadLocal cachedUtf8Result = new ThreadLocal() { - @Override protected UnicodeUtil.UTF8Result initialValue() { - return new UnicodeUtil.UTF8Result(); + private static ThreadLocal> cachedUtf8Result = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new UnicodeUtil.UTF8Result()); } }; - private static ThreadLocal cachedUtf16Result = new ThreadLocal() { - @Override protected UTF16Result initialValue() { - return new UTF16Result(); + private static ThreadLocal> cachedUtf16Result = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new UTF16Result()); } }; @@ -61,7 +61,7 @@ public class Unicode { if (source == null) { return null; } - UnicodeUtil.UTF8Result result = cachedUtf8Result.get(); + UnicodeUtil.UTF8Result result = cachedUtf8Result.get().get(); UnicodeUtil.UTF16toUTF8(source, 0, source.length(), result); return result; } @@ -99,7 +99,7 @@ public class Unicode { if (source == null) { return null; } - UTF16Result result = cachedUtf16Result.get(); + UTF16Result result = cachedUtf16Result.get().get(); UTF8toUTF16(source, offset, length, result); return result; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ThreadLocalRandom.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ThreadLocalRandom.java index aaab53a19b4..520ce86ce48 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ThreadLocalRandom.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ThreadLocalRandom.java @@ -25,6 +25,8 @@ package org.elasticsearch.util.concurrent; +import org.elasticsearch.util.ThreadLocals; + import java.util.Random; /** @@ -77,10 +79,10 @@ public class ThreadLocalRandom extends Random { /** * The actual ThreadLocal */ - private static final ThreadLocal localRandom = - new ThreadLocal() { - protected ThreadLocalRandom initialValue() { - return new ThreadLocalRandom(); + private static final ThreadLocal> localRandom = + new ThreadLocal>() { + protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new ThreadLocalRandom()); } }; @@ -100,7 +102,7 @@ public class ThreadLocalRandom extends Random { * @return the current thread's {@code ThreadLocalRandom} */ public static ThreadLocalRandom current() { - return localRandom.get(); + return localRandom.get().get(); } /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/ByteArrayDataOutputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/ByteArrayDataOutputStream.java index 3d84a6369be..9df488bd84d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/ByteArrayDataOutputStream.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/ByteArrayDataOutputStream.java @@ -19,6 +19,7 @@ package org.elasticsearch.util.io; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.concurrent.NotThreadSafe; import java.io.DataOutputStream; @@ -34,9 +35,9 @@ public class ByteArrayDataOutputStream extends DataOutputStream { */ public static class Cached { - private static final ThreadLocal cache = new ThreadLocal() { - @Override protected ByteArrayDataOutputStream initialValue() { - return new ByteArrayDataOutputStream(); + private static final ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new ByteArrayDataOutputStream()); } }; @@ -44,7 +45,7 @@ public class ByteArrayDataOutputStream extends DataOutputStream { * Returns the cached thread local byte strean, with its internal stream cleared. */ public static ByteArrayDataOutputStream cached() { - ByteArrayDataOutputStream os = cache.get(); + ByteArrayDataOutputStream os = cache.get().get(); ((FastByteArrayOutputStream) os.out).reset(); return os; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/FastByteArrayOutputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/FastByteArrayOutputStream.java index 72085ffa30b..0f407837c36 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/FastByteArrayOutputStream.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/FastByteArrayOutputStream.java @@ -19,6 +19,8 @@ package org.elasticsearch.util.io; +import org.elasticsearch.util.ThreadLocals; + import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; @@ -36,9 +38,9 @@ public class FastByteArrayOutputStream extends OutputStream { */ public static class Cached { - private static final ThreadLocal cache = new ThreadLocal() { - @Override protected FastByteArrayOutputStream initialValue() { - return new FastByteArrayOutputStream(); + private static final ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new FastByteArrayOutputStream()); } }; @@ -46,7 +48,7 @@ public class FastByteArrayOutputStream extends OutputStream { * Returns the cached thread local byte stream, with its internal stream cleared. */ public static FastByteArrayOutputStream cached() { - FastByteArrayOutputStream os = cache.get(); + FastByteArrayOutputStream os = cache.get().get(); os.reset(); return os; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/FastCharArrayWriter.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/FastCharArrayWriter.java index 1b87078bf1e..f3119d882b3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/FastCharArrayWriter.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/FastCharArrayWriter.java @@ -19,6 +19,7 @@ package org.elasticsearch.util.io; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.concurrent.NotThreadSafe; import java.io.IOException; @@ -38,9 +39,9 @@ public class FastCharArrayWriter extends Writer { */ public static class Cached { - private static final ThreadLocal cache = new ThreadLocal() { - @Override protected FastCharArrayWriter initialValue() { - return new FastCharArrayWriter(); + private static final ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new FastCharArrayWriter()); } }; @@ -48,7 +49,7 @@ public class FastCharArrayWriter extends Writer { * Returns the cached thread local byte stream, with its internal stream cleared. */ public static FastCharArrayWriter cached() { - FastCharArrayWriter os = cache.get(); + FastCharArrayWriter os = cache.get().get(); os.reset(); return os; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/StringBuilderWriter.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/StringBuilderWriter.java index 073bf834659..c89c05944c5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/StringBuilderWriter.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/StringBuilderWriter.java @@ -19,6 +19,7 @@ package org.elasticsearch.util.io; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.concurrent.NotThreadSafe; import java.io.Writer; @@ -39,9 +40,9 @@ public class StringBuilderWriter extends Writer { */ public static class Cached { - private static final ThreadLocal cache = new ThreadLocal() { - @Override protected StringBuilderWriter initialValue() { - return new StringBuilderWriter(); + private static final ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new StringBuilderWriter()); } }; @@ -49,7 +50,7 @@ public class StringBuilderWriter extends Writer { * Returns the cached thread local writer, with its internal {@link StringBuilder} cleared. */ public static StringBuilderWriter cached() { - StringBuilderWriter writer = cache.get(); + StringBuilderWriter writer = cache.get().get(); writer.getBuilder().setLength(0); return writer; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/compression/GZIPCompressor.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/compression/GZIPCompressor.java index c18ed67dfc6..3c13a20610d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/compression/GZIPCompressor.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/compression/GZIPCompressor.java @@ -21,6 +21,7 @@ package org.elasticsearch.util.io.compression; import org.apache.lucene.util.UnicodeUtil; import org.elasticsearch.util.SizeUnit; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.Unicode; import org.elasticsearch.util.io.FastByteArrayInputStream; import org.elasticsearch.util.io.FastByteArrayOutputStream; @@ -36,9 +37,9 @@ public class GZIPCompressor implements Compressor { private static class Cached { - private static final ThreadLocal cache = new ThreadLocal() { - @Override protected CompressHolder initialValue() { - return new CompressHolder(); + private static final ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new CompressHolder()); } }; @@ -46,7 +47,7 @@ public class GZIPCompressor implements Compressor { * Returns the cached thread local byte strean, with its internal stream cleared. */ public static CompressHolder cached() { - CompressHolder ch = cache.get(); + CompressHolder ch = cache.get().get(); ch.bos.reset(); return ch; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/compression/LzfCompressor.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/compression/LzfCompressor.java index 1b9fe319ad3..0e2f2a36ac1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/compression/LzfCompressor.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/compression/LzfCompressor.java @@ -20,6 +20,7 @@ package org.elasticsearch.util.io.compression; import org.apache.lucene.util.UnicodeUtil; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.Unicode; import org.elasticsearch.util.io.compression.lzf.LZFDecoder; import org.elasticsearch.util.io.compression.lzf.LZFEncoder; @@ -33,14 +34,14 @@ public class LzfCompressor implements Compressor { private static class Cached { - private static final ThreadLocal cache = new ThreadLocal() { - @Override protected CompressHolder initialValue() { - return new CompressHolder(); + private static final ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new CompressHolder()); } }; public static CompressHolder cached() { - return cache.get(); + return cache.get().get(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/compression/ZipCompressor.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/compression/ZipCompressor.java index 8e9d699bbd6..9222b834b29 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/compression/ZipCompressor.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/compression/ZipCompressor.java @@ -21,6 +21,7 @@ package org.elasticsearch.util.io.compression; import org.apache.lucene.util.UnicodeUtil; import org.elasticsearch.util.SizeUnit; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.Unicode; import org.elasticsearch.util.io.FastByteArrayOutputStream; @@ -36,9 +37,9 @@ public class ZipCompressor implements Compressor { private static class Cached { - private static final ThreadLocal cache = new ThreadLocal() { - @Override protected CompressHolder initialValue() { - return new CompressHolder(); + private static final ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new CompressHolder()); } }; @@ -46,7 +47,7 @@ public class ZipCompressor implements Compressor { * Returns the cached thread local byte strean, with its internal stream cleared. */ public static CompressHolder cached() { - CompressHolder ch = cache.get(); + CompressHolder ch = cache.get().get(); ch.bos.reset(); return ch; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/BytesStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/BytesStreamOutput.java index 969e54182ac..c8b06855087 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/BytesStreamOutput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/BytesStreamOutput.java @@ -19,6 +19,8 @@ package org.elasticsearch.util.io.stream; +import org.elasticsearch.util.ThreadLocals; + import java.io.IOException; import java.util.Arrays; @@ -32,15 +34,15 @@ public class BytesStreamOutput extends StreamOutput { */ public static class Cached { - private static final ThreadLocal cache = new ThreadLocal() { - @Override protected BytesStreamOutput initialValue() { - return new BytesStreamOutput(); + private static final ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new BytesStreamOutput()); } }; - private static final ThreadLocal cacheHandles = new ThreadLocal() { - @Override protected HandlesStreamOutput initialValue() { - return new HandlesStreamOutput(new BytesStreamOutput()); + private static final ThreadLocal> cacheHandles = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new HandlesStreamOutput(new BytesStreamOutput())); } }; @@ -48,13 +50,13 @@ public class BytesStreamOutput extends StreamOutput { * Returns the cached thread local byte stream, with its internal stream cleared. */ public static BytesStreamOutput cached() { - BytesStreamOutput os = cache.get(); + BytesStreamOutput os = cache.get().get(); os.reset(); return os; } public static HandlesStreamOutput cachedHandles() throws IOException { - HandlesStreamOutput os = cacheHandles.get(); + HandlesStreamOutput os = cacheHandles.get().get(); os.reset(); return os; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/HandlesStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/HandlesStreamInput.java index 8fd99d11426..203aeea4eba 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/HandlesStreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/HandlesStreamInput.java @@ -19,6 +19,7 @@ package org.elasticsearch.util.io.stream; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.gnu.trove.TIntObjectHashMap; import java.io.IOException; @@ -30,9 +31,9 @@ public class HandlesStreamInput extends StreamInput { public static class Cached { - private static final ThreadLocal cache = new ThreadLocal() { - @Override protected HandlesStreamInput initialValue() { - return new HandlesStreamInput(); + private static final ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new HandlesStreamInput()); } }; @@ -40,7 +41,7 @@ public class HandlesStreamInput extends StreamInput { * Returns the cached thread local byte stream, with its internal stream cleared. */ public static HandlesStreamInput cached(StreamInput in) { - HandlesStreamInput os = cache.get(); + HandlesStreamInput os = cache.get().get(); os.reset(in); return os; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/StreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/StreamInput.java index a330b18ae08..da4b8d2c9aa 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/StreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/io/stream/StreamInput.java @@ -19,6 +19,7 @@ package org.elasticsearch.util.io.stream; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.Unicode; import java.io.EOFException; @@ -30,9 +31,9 @@ import java.io.InputStream; */ public abstract class StreamInput extends InputStream { - private static ThreadLocal cachedBytes = new ThreadLocal() { - @Override protected byte[] initialValue() { - return new byte[256]; + private static ThreadLocal> cachedBytes = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new byte[256]); } }; @@ -108,10 +109,10 @@ public abstract class StreamInput extends InputStream { */ public String readUTF() throws IOException { int length = readVInt(); - byte[] bytes = cachedBytes.get(); + byte[] bytes = cachedBytes.get().get(); if (bytes == null || length > bytes.length) { bytes = new byte[(int) (length * 1.25)]; - cachedBytes.set(bytes); + cachedBytes.get().set(bytes); } readBytes(bytes, 0, length); return Unicode.fromBytes(bytes, 0, length); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/json/BinaryJsonBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/json/BinaryJsonBuilder.java index 3017850dca5..b5649da23b6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/json/BinaryJsonBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/json/BinaryJsonBuilder.java @@ -23,6 +23,7 @@ import org.codehaus.jackson.JsonEncoding; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerator; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.Unicode; import org.elasticsearch.util.io.FastByteArrayOutputStream; @@ -38,16 +39,10 @@ public class BinaryJsonBuilder extends JsonBuilder { */ public static class Cached { - private BinaryJsonBuilder builder; - - public Cached(BinaryJsonBuilder builder) { - this.builder = builder; - } - - private static final ThreadLocal cache = new ThreadLocal() { - @Override protected Cached initialValue() { + private static final ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { try { - return new Cached(new BinaryJsonBuilder()); + return new ThreadLocals.CleanableValue(new BinaryJsonBuilder()); } catch (IOException e) { throw new ElasticSearchException("Failed to create json generator", e); } @@ -58,9 +53,9 @@ public class BinaryJsonBuilder extends JsonBuilder { * Returns the cached thread local generator, with its internal {@link StringBuilder} cleared. */ static BinaryJsonBuilder cached() throws IOException { - Cached cached = cache.get(); - cached.builder.reset(); - return cached.builder; + ThreadLocals.CleanableValue cached = cache.get(); + cached.get().reset(); + return cached.get(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/json/StringJsonBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/json/StringJsonBuilder.java index fff9fb42713..c34191b8042 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/json/StringJsonBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/json/StringJsonBuilder.java @@ -23,6 +23,7 @@ import org.apache.lucene.util.UnicodeUtil; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerator; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.util.ThreadLocals; import org.elasticsearch.util.Unicode; import org.elasticsearch.util.concurrent.NotThreadSafe; import org.elasticsearch.util.io.FastCharArrayWriter; @@ -40,16 +41,10 @@ public class StringJsonBuilder extends JsonBuilder { */ public static class Cached { - private StringJsonBuilder builder; - - public Cached(StringJsonBuilder builder) { - this.builder = builder; - } - - private static final ThreadLocal cache = new ThreadLocal() { - @Override protected Cached initialValue() { + private static final ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { try { - return new Cached(new StringJsonBuilder()); + return new ThreadLocals.CleanableValue(new StringJsonBuilder()); } catch (IOException e) { throw new ElasticSearchException("Failed to create json generator", e); } @@ -60,9 +55,9 @@ public class StringJsonBuilder extends JsonBuilder { * Returns the cached thread local generator, with its internal {@link StringBuilder} cleared. */ static StringJsonBuilder cached() throws IOException { - Cached cached = cache.get(); - cached.builder.reset(); - return cached.builder; + StringJsonBuilder sb = cache.get().get(); + sb.reset(); + return sb; } } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/util/ThreadLocalsTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/util/ThreadLocalsTests.java new file mode 100644 index 00000000000..1ac39f8dd1d --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/util/ThreadLocalsTests.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util; + +import org.testng.annotations.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +@Test +public class ThreadLocalsTests { + + private static final ThreadLocal> local = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new AtomicInteger()); + } + }; + + @Test public void testCleanThreadLocals() { + assertThat(local.get().get().get(), equalTo(0)); + local.get().get().incrementAndGet(); + assertThat(local.get().get().get(), equalTo(1)); + ThreadLocals.clearReferencesThreadLocals(); + assertThat(local.get().get().get(), equalTo(0)); + ThreadLocals.clearReferencesThreadLocals(); + } +}