better cached streams logic

This commit is contained in:
kimchy 2011-04-24 23:28:35 +03:00
parent 9b262a7363
commit 990f371580
6 changed files with 87 additions and 29 deletions

View File

@ -51,6 +51,7 @@ import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.CachedStreams;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@ -231,6 +232,7 @@ public class TransportClient extends AbstractClient {
}
CacheRecycler.clear();
CachedStreams.clear();
ThreadLocals.clearReferencesThreadLocals();
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
public class CachedStreams {
public static void clear() {
FastByteArrayOutputStream.Cached.clear();
CachedStreamInput.clear();
CachedStreamOutput.clear();
}
}

View File

@ -19,11 +19,10 @@
package org.elasticsearch.common.io;
import org.elasticsearch.common.thread.ThreadLocals;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.lang.ref.SoftReference;
import java.util.Arrays;
/**
@ -38,19 +37,24 @@ public class FastByteArrayOutputStream extends OutputStream {
*/
public static class Cached {
private static final ThreadLocal<ThreadLocals.CleanableValue<FastByteArrayOutputStream>> cache = new ThreadLocal<ThreadLocals.CleanableValue<FastByteArrayOutputStream>>() {
@Override protected ThreadLocals.CleanableValue<FastByteArrayOutputStream> initialValue() {
return new ThreadLocals.CleanableValue<FastByteArrayOutputStream>(new FastByteArrayOutputStream());
}
};
private static final ThreadLocal<SoftReference<FastByteArrayOutputStream>> cache = new ThreadLocal<SoftReference<FastByteArrayOutputStream>>();
/**
* Returns the cached thread local byte stream, with its internal stream cleared.
*/
public static FastByteArrayOutputStream cached() {
FastByteArrayOutputStream os = cache.get().get();
os.reset();
return os;
SoftReference<FastByteArrayOutputStream> ref = cache.get();
FastByteArrayOutputStream fos = ref == null ? null : ref.get();
if (fos == null) {
fos = new FastByteArrayOutputStream();
cache.set(new SoftReference(fos));
}
fos.reset();
return fos;
}
public static void clear() {
cache.remove();
}
}

View File

@ -19,9 +19,8 @@
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.thread.ThreadLocals;
import java.io.IOException;
import java.lang.ref.SoftReference;
/**
* @author kimchy (shay.banon)
@ -38,28 +37,38 @@ public class CachedStreamInput {
}
}
private static final ThreadLocal<ThreadLocals.CleanableValue<Entry>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Entry>>() {
@Override protected ThreadLocals.CleanableValue<Entry> initialValue() {
private static final ThreadLocal<SoftReference<Entry>> cache = new ThreadLocal<SoftReference<Entry>>();
static Entry instance() {
SoftReference<Entry> ref = cache.get();
Entry entry = ref == null ? null : ref.get();
if (entry == null) {
HandlesStreamInput handles = new HandlesStreamInput();
LZFStreamInput lzf = new LZFStreamInput(null, true);
return new ThreadLocals.CleanableValue<Entry>(new Entry(handles, lzf));
entry = new Entry(handles, lzf);
cache.set(new SoftReference<Entry>(entry));
}
};
return entry;
}
public static void clear() {
cache.remove();
}
public static LZFStreamInput cachedLzf(StreamInput in) throws IOException {
LZFStreamInput lzf = cache.get().get().lzf;
LZFStreamInput lzf = instance().lzf;
lzf.reset(in);
return lzf;
}
public static HandlesStreamInput cachedHandles(StreamInput in) {
HandlesStreamInput handles = cache.get().get().handles;
HandlesStreamInput handles = instance().handles;
handles.reset(in);
return handles;
}
public static HandlesStreamInput cachedHandlesLzf(StreamInput in) throws IOException {
Entry entry = cache.get().get();
Entry entry = instance();
entry.lzf.reset(in);
entry.handles.reset(entry.lzf);
return entry.handles;

View File

@ -19,9 +19,8 @@
package org.elasticsearch.common.io.stream;
import org.elasticsearch.common.thread.ThreadLocals;
import java.io.IOException;
import java.lang.ref.SoftReference;
/**
* @author kimchy (shay.banon)
@ -40,39 +39,49 @@ public class CachedStreamOutput {
}
}
private static final ThreadLocal<ThreadLocals.CleanableValue<Entry>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Entry>>() {
@Override protected ThreadLocals.CleanableValue<Entry> initialValue() {
private static final ThreadLocal<SoftReference<Entry>> cache = new ThreadLocal<SoftReference<Entry>>();
static Entry instance() {
SoftReference<Entry> ref = cache.get();
Entry entry = ref == null ? null : ref.get();
if (entry == null) {
BytesStreamOutput bytes = new BytesStreamOutput();
HandlesStreamOutput handles = new HandlesStreamOutput(bytes);
LZFStreamOutput lzf = new LZFStreamOutput(bytes, true);
return new ThreadLocals.CleanableValue<Entry>(new Entry(bytes, handles, lzf));
entry = new Entry(bytes, handles, lzf);
cache.set(new SoftReference<Entry>(entry));
}
};
return entry;
}
public static void clear() {
cache.remove();
}
/**
* Returns the cached thread local byte stream, with its internal stream cleared.
*/
public static BytesStreamOutput cachedBytes() {
BytesStreamOutput os = cache.get().get().bytes;
BytesStreamOutput os = instance().bytes;
os.reset();
return os;
}
public static LZFStreamOutput cachedLZFBytes() throws IOException {
LZFStreamOutput lzf = cache.get().get().lzf;
LZFStreamOutput lzf = instance().lzf;
lzf.reset();
return lzf;
}
public static HandlesStreamOutput cachedHandlesLzfBytes() throws IOException {
Entry entry = cache.get().get();
Entry entry = instance();
HandlesStreamOutput os = entry.handles;
os.reset(entry.lzf);
return os;
}
public static HandlesStreamOutput cachedHandlesBytes() throws IOException {
Entry entry = cache.get().get();
Entry entry = instance();
HandlesStreamOutput os = entry.handles;
os.reset(entry.bytes);
return os;

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.CachedStreams;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkModule;
@ -304,6 +305,7 @@ public final class InternalNode implements Node {
stopWatch.stop();
CacheRecycler.clear();
CachedStreams.clear();
ThreadLocals.clearReferencesThreadLocals();
if (logger.isTraceEnabled()) {