From 990f3715809ec2318a6884375460d820b7d214b5 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 24 Apr 2011 23:28:35 +0300 Subject: [PATCH] better cached streams logic --- .../client/transport/TransportClient.java | 2 ++ .../common/io/CachedStreams.java | 32 +++++++++++++++++++ .../common/io/FastByteArrayOutputStream.java | 24 ++++++++------ .../common/io/stream/CachedStreamInput.java | 27 ++++++++++------ .../common/io/stream/CachedStreamOutput.java | 29 +++++++++++------ .../node/internal/InternalNode.java | 2 ++ 6 files changed, 87 insertions(+), 29 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/io/CachedStreams.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 39a6f772365..cdd0b735f8b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -51,6 +51,7 @@ import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.ModulesBuilder; +import org.elasticsearch.common.io.CachedStreams; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; @@ -231,6 +232,7 @@ public class TransportClient extends AbstractClient { } CacheRecycler.clear(); + CachedStreams.clear(); ThreadLocals.clearReferencesThreadLocals(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/CachedStreams.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/CachedStreams.java new file mode 100644 index 00000000000..916b5761a1b --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/CachedStreams.java @@ -0,0 +1,32 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io; + +import org.elasticsearch.common.io.stream.CachedStreamInput; +import org.elasticsearch.common.io.stream.CachedStreamOutput; + +public class CachedStreams { + + public static void clear() { + FastByteArrayOutputStream.Cached.clear(); + CachedStreamInput.clear(); + CachedStreamOutput.clear(); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FastByteArrayOutputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FastByteArrayOutputStream.java index fdf1fa96fd8..d65f1fe6ca7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FastByteArrayOutputStream.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FastByteArrayOutputStream.java @@ -19,11 +19,10 @@ package org.elasticsearch.common.io; -import org.elasticsearch.common.thread.ThreadLocals; - import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; +import java.lang.ref.SoftReference; import java.util.Arrays; /** @@ -38,19 +37,24 @@ public class FastByteArrayOutputStream extends OutputStream { */ public static class Cached { - private static final ThreadLocal> cache = new ThreadLocal>() { - @Override protected ThreadLocals.CleanableValue initialValue() { - return new ThreadLocals.CleanableValue(new FastByteArrayOutputStream()); - } - }; + private static final ThreadLocal> cache = new ThreadLocal>(); /** * Returns the cached thread local byte stream, with its internal stream cleared. */ public static FastByteArrayOutputStream cached() { - FastByteArrayOutputStream os = cache.get().get(); - os.reset(); - return os; + SoftReference ref = cache.get(); + FastByteArrayOutputStream fos = ref == null ? null : ref.get(); + if (fos == null) { + fos = new FastByteArrayOutputStream(); + cache.set(new SoftReference(fos)); + } + fos.reset(); + return fos; + } + + public static void clear() { + cache.remove(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java index e2cd88f65a9..b5825a84ec0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java @@ -19,9 +19,8 @@ package org.elasticsearch.common.io.stream; -import org.elasticsearch.common.thread.ThreadLocals; - import java.io.IOException; +import java.lang.ref.SoftReference; /** * @author kimchy (shay.banon) @@ -38,28 +37,38 @@ public class CachedStreamInput { } } - private static final ThreadLocal> cache = new ThreadLocal>() { - @Override protected ThreadLocals.CleanableValue initialValue() { + private static final ThreadLocal> cache = new ThreadLocal>(); + + static Entry instance() { + SoftReference ref = cache.get(); + Entry entry = ref == null ? null : ref.get(); + if (entry == null) { HandlesStreamInput handles = new HandlesStreamInput(); LZFStreamInput lzf = new LZFStreamInput(null, true); - return new ThreadLocals.CleanableValue(new Entry(handles, lzf)); + entry = new Entry(handles, lzf); + cache.set(new SoftReference(entry)); } - }; + return entry; + } + + public static void clear() { + cache.remove(); + } public static LZFStreamInput cachedLzf(StreamInput in) throws IOException { - LZFStreamInput lzf = cache.get().get().lzf; + LZFStreamInput lzf = instance().lzf; lzf.reset(in); return lzf; } public static HandlesStreamInput cachedHandles(StreamInput in) { - HandlesStreamInput handles = cache.get().get().handles; + HandlesStreamInput handles = instance().handles; handles.reset(in); return handles; } public static HandlesStreamInput cachedHandlesLzf(StreamInput in) throws IOException { - Entry entry = cache.get().get(); + Entry entry = instance(); entry.lzf.reset(in); entry.handles.reset(entry.lzf); return entry.handles; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java index 935e9a7f859..57c83fa312d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java @@ -19,9 +19,8 @@ package org.elasticsearch.common.io.stream; -import org.elasticsearch.common.thread.ThreadLocals; - import java.io.IOException; +import java.lang.ref.SoftReference; /** * @author kimchy (shay.banon) @@ -40,39 +39,49 @@ public class CachedStreamOutput { } } - private static final ThreadLocal> cache = new ThreadLocal>() { - @Override protected ThreadLocals.CleanableValue initialValue() { + private static final ThreadLocal> cache = new ThreadLocal>(); + + static Entry instance() { + SoftReference ref = cache.get(); + Entry entry = ref == null ? null : ref.get(); + if (entry == null) { BytesStreamOutput bytes = new BytesStreamOutput(); HandlesStreamOutput handles = new HandlesStreamOutput(bytes); LZFStreamOutput lzf = new LZFStreamOutput(bytes, true); - return new ThreadLocals.CleanableValue(new Entry(bytes, handles, lzf)); + entry = new Entry(bytes, handles, lzf); + cache.set(new SoftReference(entry)); } - }; + return entry; + } + + public static void clear() { + cache.remove(); + } /** * Returns the cached thread local byte stream, with its internal stream cleared. */ public static BytesStreamOutput cachedBytes() { - BytesStreamOutput os = cache.get().get().bytes; + BytesStreamOutput os = instance().bytes; os.reset(); return os; } public static LZFStreamOutput cachedLZFBytes() throws IOException { - LZFStreamOutput lzf = cache.get().get().lzf; + LZFStreamOutput lzf = instance().lzf; lzf.reset(); return lzf; } public static HandlesStreamOutput cachedHandlesLzfBytes() throws IOException { - Entry entry = cache.get().get(); + Entry entry = instance(); HandlesStreamOutput os = entry.handles; os.reset(entry.lzf); return os; } public static HandlesStreamOutput cachedHandlesBytes() throws IOException { - Entry entry = cache.get().get(); + Entry entry = instance(); HandlesStreamOutput os = entry.handles; os.reset(entry.bytes); return os; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java index d043032dcd4..ec5de0e7f59 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Injectors; import org.elasticsearch.common.inject.ModulesBuilder; +import org.elasticsearch.common.io.CachedStreams; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.network.NetworkModule; @@ -304,6 +305,7 @@ public final class InternalNode implements Node { stopWatch.stop(); CacheRecycler.clear(); + CachedStreams.clear(); ThreadLocals.clearReferencesThreadLocals(); if (logger.isTraceEnabled()) {