diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java index 7196497f6ae..8266fbce094 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -25,10 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.common.io.stream.BytesStreamInput; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.*; import org.elasticsearch.common.settings.Settings; import javax.annotation.Nullable; @@ -194,7 +191,7 @@ public class ClusterState { } public static byte[] toBytes(ClusterState state) throws IOException { - BytesStreamOutput os = BytesStreamOutput.Cached.cached(); + BytesStreamOutput os = CachedStreamOutput.cachedBytes(); writeTo(state, os); return os.copiedByteArray(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java index 776931cb8c6..a247ba6724e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java @@ -19,8 +19,6 @@ package org.elasticsearch.common.io.stream; -import org.elasticsearch.common.thread.ThreadLocals; - import java.io.IOException; import java.util.Arrays; @@ -29,45 +27,6 @@ import java.util.Arrays; */ public class BytesStreamOutput extends StreamOutput { - /** - * A thread local based cache of {@link BytesStreamOutput}. - */ - public static class Cached { - - static class Entry { - final BytesStreamOutput bytes; - final HandlesStreamOutput handles; - - Entry(BytesStreamOutput bytes, HandlesStreamOutput handles) { - this.bytes = bytes; - this.handles = handles; - } - } - - private static final ThreadLocal> cache = new ThreadLocal>() { - @Override protected ThreadLocals.CleanableValue initialValue() { - BytesStreamOutput bytes = new BytesStreamOutput(); - HandlesStreamOutput handles = new HandlesStreamOutput(bytes); - return new ThreadLocals.CleanableValue(new Entry(bytes, handles)); - } - }; - - /** - * Returns the cached thread local byte stream, with its internal stream cleared. - */ - public static BytesStreamOutput cached() { - BytesStreamOutput os = cache.get().get().bytes; - os.reset(); - return os; - } - - public static HandlesStreamOutput cachedHandles() throws IOException { - HandlesStreamOutput os = cache.get().get().handles; - os.reset(); - return os; - } - } - /** * The buffer where data is stored. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java new file mode 100644 index 00000000000..d439f1e26ed --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java @@ -0,0 +1,63 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import org.elasticsearch.common.thread.ThreadLocals; + +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +public class CachedStreamOutput { + + static class Entry { + final BytesStreamOutput bytes; + final HandlesStreamOutput handles; + + Entry(BytesStreamOutput bytes, HandlesStreamOutput handles) { + this.bytes = bytes; + this.handles = handles; + } + } + + private static final ThreadLocal> cache = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + BytesStreamOutput bytes = new BytesStreamOutput(); + HandlesStreamOutput handles = new HandlesStreamOutput(bytes); + return new ThreadLocals.CleanableValue(new Entry(bytes, handles)); + } + }; + + /** + * Returns the cached thread local byte stream, with its internal stream cleared. + */ + public static BytesStreamOutput cachedBytes() { + BytesStreamOutput os = cache.get().get().bytes; + os.reset(); + return os; + } + + public static HandlesStreamOutput cachedHandles() throws IOException { + HandlesStreamOutput os = cache.get().get().handles; + os.reset(); + return os; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index d3d2309b1c0..40698abadc4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -221,7 +221,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem private void sendPingRequest(int id) { synchronized (sendMutex) { try { - HandlesStreamOutput out = BytesStreamOutput.Cached.cachedHandles(); + HandlesStreamOutput out = CachedStreamOutput.cachedHandles(); out.writeInt(id); clusterName.writeTo(out); nodesProvider.nodes().localNode().writeTo(out); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 0907077c9a5..abc2b915e2d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -135,7 +135,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem } @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException { - HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles(); + HandlesStreamOutput stream = CachedStreamOutput.cachedHandles(); stream.writeLong(requestId); byte status = 0; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java index faa363d1584..09ff76b3cb8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport.local; import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.HandlesStreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.transport.*; @@ -58,7 +59,7 @@ public class LocalTransportChannel implements TransportChannel { } @Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException { - HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles(); + HandlesStreamOutput stream = CachedStreamOutput.cachedHandles(); stream.writeLong(requestId); byte status = 0; status = Transport.Helper.setResponse(status); @@ -75,14 +76,14 @@ public class LocalTransportChannel implements TransportChannel { @Override public void sendResponse(Throwable error) throws IOException { BytesStreamOutput stream; try { - stream = BytesStreamOutput.Cached.cached(); + stream = CachedStreamOutput.cachedBytes(); writeResponseExceptionHeader(stream); RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, error); ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); too.writeObject(tx); too.close(); } catch (NotSerializableException e) { - stream = BytesStreamOutput.Cached.cached(); + stream = CachedStreamOutput.cachedBytes(); writeResponseExceptionHeader(stream); RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, new NotSerializableTransportException(error)); ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 600442ab7b8..0fa94911e2f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.HandlesStreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.netty.OpenChannelsHandler; @@ -386,7 +387,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem Channel targetChannel = nodeChannel(node); - HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles(); + HandlesStreamOutput stream = CachedStreamOutput.cachedHandles(); stream.writeBytes(LENGTH_PLACEHOLDER); // fake size stream.writeLong(requestId); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index 7276c5fcaa9..259aa034b28 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty; import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.HandlesStreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.netty.buffer.ChannelBuffer; @@ -67,7 +68,7 @@ public class NettyTransportChannel implements TransportChannel { } @Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException { - HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles(); + HandlesStreamOutput stream = CachedStreamOutput.cachedHandles(); stream.writeBytes(LENGTH_PLACEHOLDER); // fake size stream.writeLong(requestId); byte status = 0; @@ -84,14 +85,14 @@ public class NettyTransportChannel implements TransportChannel { @Override public void sendResponse(Throwable error) throws IOException { BytesStreamOutput stream; try { - stream = BytesStreamOutput.Cached.cached(); + stream = CachedStreamOutput.cachedBytes(); writeResponseExceptionHeader(stream); RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error); ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); too.writeObject(tx); too.close(); } catch (NotSerializableException e) { - stream = BytesStreamOutput.Cached.cached(); + stream = CachedStreamOutput.cachedBytes(); writeResponseExceptionHeader(stream); RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error)); ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/common/io/streams/BytesStreamsTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/common/io/streams/BytesStreamsTests.java index 4cde9848a81..7bf6740ec53 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/common/io/streams/BytesStreamsTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/common/io/streams/BytesStreamsTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.common.io.streams; import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.testng.annotations.Test; import static org.hamcrest.MatcherAssert.*; @@ -33,7 +34,7 @@ import static org.hamcrest.Matchers.*; public class BytesStreamsTests { @Test public void testSimpleStreams() throws Exception { - BytesStreamOutput out = BytesStreamOutput.Cached.cached(); + BytesStreamOutput out = CachedStreamOutput.cachedBytes(); out.writeBoolean(false); out.writeByte((byte) 1); out.writeShort((short) -1);