diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java index eee988031e0..995a7345df0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -222,9 +222,14 @@ public class ClusterState { } public static byte[] toBytes(ClusterState state) throws IOException { - BytesStreamOutput os = CachedStreamOutput.cachedBytes(); - writeTo(state, os); - return os.copiedByteArray(); + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + try { + BytesStreamOutput os = cachedEntry.cachedBytes(); + writeTo(state, os); + return os.copiedByteArray(); + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } } public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java index 248f799ff55..d896e7842d1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java @@ -56,11 +56,12 @@ public class ChunkEncoder { * @param totalLength Total encoded length; used for calculating size * of hash table to use */ - public ChunkEncoder(int totalLength) { + // ES: Added recycler as a parameter so we can control its caching + public ChunkEncoder(int totalLength, BufferRecycler recycler) { int largestChunkLen = Math.max(totalLength, LZFChunk.MAX_CHUNK_LEN); int suggestedHashLen = calcHashLen(largestChunkLen); - _recycler = BufferRecycler.instance(); + _recycler = recycler; _hashTable = _recycler.allocEncodingHash(suggestedHashLen); _hashModulo = _hashTable.length - 1; // Ok, then, what's the worst case output buffer length? diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java index a6928b4f422..e2a3c8c2490 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java @@ -35,7 +35,7 @@ public class LZFEncoder { * Result consists of a sequence of chunks. */ public static byte[] encode(byte[] data, int length) throws IOException { - ChunkEncoder enc = new ChunkEncoder(length); + ChunkEncoder enc = new ChunkEncoder(length, BufferRecycler.instance()); byte[] result = encode(enc, data, length); // important: may be able to reuse buffers enc.close(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java index 1c22952fe36..a22e7b1105d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java @@ -18,8 +18,8 @@ public class LZFOutputStream extends OutputStream { protected int _position = 0; public LZFOutputStream(final OutputStream outputStream) { - _encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE); _recycler = BufferRecycler.instance(); + _encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE, _recycler); _outputStream = outputStream; _outputBuffer = _recycler.allocOutputBuffer(OUTPUT_BUFFER_SIZE); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java index 57c83fa312d..6310228e183 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java @@ -19,71 +19,121 @@ package org.elasticsearch.common.io.stream; +import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; + import java.io.IOException; import java.lang.ref.SoftReference; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; /** * @author kimchy (shay.banon) */ public class CachedStreamOutput { - static class Entry { - final BytesStreamOutput bytes; - final HandlesStreamOutput handles; - final LZFStreamOutput lzf; + private static Entry newEntry() { + BytesStreamOutput bytes = new BytesStreamOutput(); + HandlesStreamOutput handles = new HandlesStreamOutput(bytes); + LZFStreamOutput lzf = new LZFStreamOutput(bytes, true); + return new Entry(bytes, handles, lzf); + } + + public static class Entry { + private final BytesStreamOutput bytes; + private final HandlesStreamOutput handles; + private final LZFStreamOutput lzf; Entry(BytesStreamOutput bytes, HandlesStreamOutput handles, LZFStreamOutput lzf) { this.bytes = bytes; this.handles = handles; this.lzf = lzf; } + + /** + * Returns the underlying bytes without any resetting. + */ + public BytesStreamOutput bytes() { + return bytes; + } + + /** + * Returns cached bytes that are also reset. + */ + public BytesStreamOutput cachedBytes() { + bytes.reset(); + return bytes; + } + + public LZFStreamOutput cachedLZFBytes() throws IOException { + lzf.reset(); + return lzf; + } + + public HandlesStreamOutput cachedHandlesLzfBytes() throws IOException { + handles.reset(lzf); + return handles; + } + + public HandlesStreamOutput cachedHandlesBytes() throws IOException { + handles.reset(bytes); + return handles; + } } - private static final ThreadLocal> cache = new ThreadLocal>(); + static class SoftWrapper { + private SoftReference ref; - static Entry instance() { - SoftReference ref = cache.get(); - Entry entry = ref == null ? null : ref.get(); - if (entry == null) { - BytesStreamOutput bytes = new BytesStreamOutput(); - HandlesStreamOutput handles = new HandlesStreamOutput(bytes); - LZFStreamOutput lzf = new LZFStreamOutput(bytes, true); - entry = new Entry(bytes, handles, lzf); - cache.set(new SoftReference(entry)); + public SoftWrapper() { } + + public void set(T ref) { + this.ref = new SoftReference(ref); + } + + public T get() { + return ref == null ? null : ref.get(); + } + + public void clear() { + ref = null; + } + } + + private static final SoftWrapper> cache = new SoftWrapper>(); + private static final AtomicInteger counter = new AtomicInteger(); + private static final int BYTES_LIMIT = 10 * 1024 * 1024; // don't cache entries that are bigger than that... + private static final int COUNT_LIMIT = 100; + + public static void clear() { + cache.clear(); + } + + public static Entry popEntry() { + Queue ref = cache.get(); + if (ref == null) { + return newEntry(); + } + Entry entry = ref.poll(); + if (entry == null) { + return newEntry(); + } + counter.decrementAndGet(); return entry; } - public static void clear() { - cache.remove(); - } - - /** - * Returns the cached thread local byte stream, with its internal stream cleared. - */ - public static BytesStreamOutput cachedBytes() { - BytesStreamOutput os = instance().bytes; - os.reset(); - return os; - } - - public static LZFStreamOutput cachedLZFBytes() throws IOException { - LZFStreamOutput lzf = instance().lzf; - lzf.reset(); - return lzf; - } - - public static HandlesStreamOutput cachedHandlesLzfBytes() throws IOException { - Entry entry = instance(); - HandlesStreamOutput os = entry.handles; - os.reset(entry.lzf); - return os; - } - - public static HandlesStreamOutput cachedHandlesBytes() throws IOException { - Entry entry = instance(); - HandlesStreamOutput os = entry.handles; - os.reset(entry.bytes); - return os; + public static void pushEntry(Entry entry) { + if (entry.bytes().unsafeByteArray().length > BYTES_LIMIT) { + return; + } + Queue ref = cache.get(); + if (ref == null) { + ref = new LinkedTransferQueue(); + cache.set(ref); + } + if (counter.incrementAndGet() > COUNT_LIMIT) { + counter.decrementAndGet(); + } else { + ref.add(entry); + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java index 072ff2bc991..6167ac50b38 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java @@ -43,8 +43,8 @@ public class LZFStreamOutput extends StreamOutput { public LZFStreamOutput(StreamOutput out, boolean neverClose) { this.neverClose = neverClose; - _encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE); - _recycler = BufferRecycler.instance(); + _recycler = neverClose ? new BufferRecycler() : BufferRecycler.instance(); + _encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE, _recycler); _outputStream = out; _outputBuffer = _recycler.allocOutputBuffer(OUTPUT_BUFFER_SIZE); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 1f75d363b30..42ec103db3b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -181,11 +181,11 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem final AtomicReference response = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); ping(new PingListener() { - @Override public void onPing(PingResponse[] pings) { - response.set(pings); - latch.countDown(); - } - }, timeout); + @Override public void onPing(PingResponse[] pings) { + response.set(pings); + latch.countDown(); + } + }, timeout); try { latch.await(); return response.get(); @@ -219,17 +219,20 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem private void sendPingRequest(int id, boolean remove) { synchronized (sendMutex) { + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); try { - HandlesStreamOutput out = CachedStreamOutput.cachedHandlesBytes(); + HandlesStreamOutput out = cachedEntry.cachedHandlesBytes(); out.writeInt(id); clusterName.writeTo(out); nodesProvider.nodes().localNode().writeTo(out); - datagramPacketSend.setData(((BytesStreamOutput) out.wrappedOut()).copiedByteArray()); + datagramPacketSend.setData(cachedEntry.bytes().copiedByteArray()); } catch (IOException e) { if (remove) { receivedResponses.remove(id); } throw new ZenPingException("Failed to serialize ping request", e); + } finally { + CachedStreamOutput.pushEntry(cachedEntry); } try { multicastSocket.send(datagramPacketSend); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index a14faeeb6d3..32794dce593 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -63,16 +63,18 @@ public class PublishClusterStateAction extends AbstractComponent { DiscoveryNode localNode = nodesProvider.nodes().localNode(); // serialize the cluster state here, so we won't do it several times per node + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); byte[] clusterStateInBytes; try { - HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesLzfBytes(); + HandlesStreamOutput stream = cachedEntry.cachedHandlesLzfBytes(); ClusterState.Builder.writeTo(clusterState, stream); stream.flush(); - BytesStreamOutput wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut()); - clusterStateInBytes = wrapped.copiedByteArray(); + clusterStateInBytes = cachedEntry.bytes().copiedByteArray(); } catch (Exception e) { logger.warn("failed to serialize cluster_state before publishing it to nodes", e); return; + } finally { + CachedStreamOutput.pushEntry(cachedEntry); } for (final DiscoveryNode node : clusterState.nodes()) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index e4e91095fdb..1878b995c46 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -140,8 +140,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } @Override public void add(Operation operation) throws TranslogException { + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); try { - BytesStreamOutput out = CachedStreamOutput.cachedBytes(); + BytesStreamOutput out = cachedEntry.cachedBytes(); out.writeInt(0); // marker for the size... TranslogStreams.writeTranslogOperation(out, operation); out.flush(); @@ -164,6 +165,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } } catch (Exception e) { throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); + } finally { + CachedStreamOutput.pushEntry(cachedEntry); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 631b8cfaff5..bf457cfe4d6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -135,30 +135,35 @@ public class LocalTransport extends AbstractLifecycleComponent implem } @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException { - HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes(); + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + try { + HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes(); - stream.writeLong(requestId); - byte status = 0; - status = TransportStreams.statusSetRequest(status); - stream.writeByte(status); // 0 for request, 1 for response. + stream.writeLong(requestId); + byte status = 0; + status = TransportStreams.statusSetRequest(status); + stream.writeByte(status); // 0 for request, 1 for response. - stream.writeUTF(action); - message.writeTo(stream); + stream.writeUTF(action); + message.writeTo(stream); - final LocalTransport targetTransport = connectedNodes.get(node); - if (targetTransport == null) { - throw new NodeNotConnectedException(node, "Node not connected"); - } - - final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); - - transportServiceAdapter.sent(data.length); - - threadPool.cached().execute(new Runnable() { - @Override public void run() { - targetTransport.messageReceived(data, action, LocalTransport.this, requestId); + final LocalTransport targetTransport = connectedNodes.get(node); + if (targetTransport == null) { + throw new NodeNotConnectedException(node, "Node not connected"); } - }); + + final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); + + transportServiceAdapter.sent(data.length); + + threadPool.cached().execute(new Runnable() { + @Override public void run() { + targetTransport.messageReceived(data, action, LocalTransport.this, requestId); + } + }); + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } } ThreadPool threadPool() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java index dadb731fc8c..b41802c360b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -63,43 +63,53 @@ public class LocalTransportChannel implements TransportChannel { } @Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException { - HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes(); - stream.writeLong(requestId); - byte status = 0; - status = TransportStreams.statusSetResponse(status); - stream.writeByte(status); // 0 for request, 1 for response. - message.writeTo(stream); - final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); - targetTransport.threadPool().cached().execute(new Runnable() { - @Override public void run() { - targetTransport.messageReceived(data, action, sourceTransport, null); - } - }); + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + try { + HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes(); + stream.writeLong(requestId); + byte status = 0; + status = TransportStreams.statusSetResponse(status); + stream.writeByte(status); // 0 for request, 1 for response. + message.writeTo(stream); + final byte[] data = cachedEntry.bytes().copiedByteArray(); + targetTransport.threadPool().cached().execute(new Runnable() { + @Override public void run() { + targetTransport.messageReceived(data, action, sourceTransport, null); + } + }); + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } } @Override public void sendResponse(Throwable error) throws IOException { - BytesStreamOutput stream; + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); try { - stream = CachedStreamOutput.cachedBytes(); - writeResponseExceptionHeader(stream); - RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, error); - ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); - too.writeObject(tx); - too.close(); - } catch (NotSerializableException e) { - stream = CachedStreamOutput.cachedBytes(); - writeResponseExceptionHeader(stream); - RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, new NotSerializableTransportException(error)); - ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); - too.writeObject(tx); - too.close(); - } - final byte[] data = stream.copiedByteArray(); - targetTransport.threadPool().cached().execute(new Runnable() { - @Override public void run() { - targetTransport.messageReceived(data, action, sourceTransport, null); + BytesStreamOutput stream; + try { + stream = cachedEntry.cachedBytes(); + writeResponseExceptionHeader(stream); + RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, error); + ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); + too.writeObject(tx); + too.close(); + } catch (NotSerializableException e) { + stream = cachedEntry.cachedBytes(); + writeResponseExceptionHeader(stream); + RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, new NotSerializableTransportException(error)); + ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); + too.writeObject(tx); + too.close(); } - }); + final byte[] data = stream.copiedByteArray(); + targetTransport.threadPool().cached().execute(new Runnable() { + @Override public void run() { + targetTransport.messageReceived(data, action, sourceTransport, null); + } + }); + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } } private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 2e68321a233..f6432940490 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.netty.OpenChannelsHandler; import org.elasticsearch.common.netty.bootstrap.ClientBootstrap; @@ -307,7 +308,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem threadPool.cached().execute(new Runnable() { @Override public void run() { try { - for (Iterator it = connectedNodes.values().iterator(); it.hasNext();) { + for (Iterator it = connectedNodes.values().iterator(); it.hasNext(); ) { NodeChannels nodeChannels = it.next(); it.remove(); nodeChannels.close(); @@ -331,7 +332,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem serverBootstrap = null; } - for (Iterator it = connectedNodes.values().iterator(); it.hasNext();) { + for (Iterator it = connectedNodes.values().iterator(); it.hasNext(); ) { NodeChannels nodeChannels = it.next(); it.remove(); nodeChannels.close(); @@ -427,10 +428,11 @@ public class NettyTransport extends AbstractLifecycleComponent implem options.withCompress(true); } - byte[] data = TransportStreams.buildRequest(requestId, action, message, options); - - ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data); - ChannelFuture channelFuture = targetChannel.write(buffer); + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + TransportStreams.buildRequest(cachedEntry, requestId, action, message, options); + ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(cachedEntry.bytes().unsafeByteArray(), 0, cachedEntry.bytes().size()); + ChannelFuture future = targetChannel.write(buffer); + future.addListener(new CacheFutureListener(cachedEntry)); // We handle close connection exception in the #exceptionCaught method, which is the main reason we want to add this future // channelFuture.addListener(new ChannelFutureListener() { // @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -641,4 +643,18 @@ public class NettyTransport extends AbstractLifecycleComponent implem } } } + + public static class CacheFutureListener implements ChannelFutureListener { + + private final CachedStreamOutput.Entry cachedEntry; + + public CacheFutureListener(CachedStreamOutput.Entry cachedEntry) { + this.cachedEntry = cachedEntry; + } + + @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { + CachedStreamOutput.pushEntry(cachedEntry); + } + } + } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index 32c402464ce..962ba30fd99 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.netty.buffer.ChannelBuffer; import org.elasticsearch.common.netty.buffer.ChannelBuffers; import org.elasticsearch.common.netty.channel.Channel; +import org.elasticsearch.common.netty.channel.ChannelFuture; import org.elasticsearch.transport.NotSerializableTransportException; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportChannel; @@ -69,31 +70,35 @@ public class NettyTransportChannel implements TransportChannel { if (transport.compress) { options.withCompress(true); } - byte[] data = TransportStreams.buildResponse(requestId, message, options); - ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data); - channel.write(buffer); + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + TransportStreams.buildResponse(cachedEntry, requestId, message, options); + ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(cachedEntry.bytes().unsafeByteArray(), 0, cachedEntry.bytes().size()); + ChannelFuture future = channel.write(buffer); + future.addListener(new NettyTransport.CacheFutureListener(cachedEntry)); } @Override public void sendResponse(Throwable error) throws IOException { + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); BytesStreamOutput stream; try { - stream = CachedStreamOutput.cachedBytes(); + stream = cachedEntry.cachedBytes(); writeResponseExceptionHeader(stream); RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error); ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); too.writeObject(tx); too.close(); } catch (NotSerializableException e) { - stream = CachedStreamOutput.cachedBytes(); + stream = cachedEntry.cachedBytes(); writeResponseExceptionHeader(stream); RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error)); ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); too.writeObject(tx); too.close(); } - ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(stream.copiedByteArray()); + ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(stream.unsafeByteArray(), 0, stream.size()); buffer.setInt(0, buffer.writerIndex() - 4); // update real size. - channel.write(buffer); + ChannelFuture future = channel.write(buffer); + future.addListener(new NettyTransport.CacheFutureListener(cachedEntry)); } private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/support/TransportStreams.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/support/TransportStreams.java index d3fd23c01ae..809a97d344d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/support/TransportStreams.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/support/TransportStreams.java @@ -19,7 +19,9 @@ package org.elasticsearch.transport.support; -import org.elasticsearch.common.io.stream.*; +import org.elasticsearch.common.io.stream.CachedStreamOutput; +import org.elasticsearch.common.io.stream.HandlesStreamOutput; +import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseOptions; @@ -31,9 +33,10 @@ import java.io.IOException; public class TransportStreams { public static final int HEADER_SIZE = 4 + 8 + 1; + public static final byte[] HEADER_PLACEHOLDER = new byte[HEADER_SIZE]; public static void writeHeader(byte[] data, int dataLength, long requestId, byte status) { - writeInt(data, 0, dataLength + 9); // add the requestId and the status + writeInt(data, 0, dataLength - 4); // no need for the header, already there writeLong(data, 4, requestId); data[12] = status; } @@ -96,58 +99,43 @@ public class TransportStreams { return value; } - public static byte[] buildRequest(final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException { + public static void buildRequest(CachedStreamOutput.Entry cachedEntry, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException { byte status = 0; status = TransportStreams.statusSetRequest(status); - BytesStreamOutput wrapped; if (options.compress()) { status = TransportStreams.statusSetCompress(status); - HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesLzfBytes(); + HandlesStreamOutput stream = cachedEntry.cachedHandlesLzfBytes(); + cachedEntry.bytes().write(HEADER_PLACEHOLDER); stream.writeUTF(action); message.writeTo(stream); stream.flush(); - wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut()); - stream.cleanHandles(); } else { - HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes(); + HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes(); + cachedEntry.bytes().write(HEADER_PLACEHOLDER); stream.writeUTF(action); message.writeTo(stream); stream.flush(); - wrapped = ((BytesStreamOutput) stream.wrappedOut()); - stream.cleanHandles(); } - - byte[] data = new byte[HEADER_SIZE + wrapped.size()]; - TransportStreams.writeHeader(data, wrapped.size(), requestId, status); - System.arraycopy(wrapped.unsafeByteArray(), 0, data, HEADER_SIZE, wrapped.size()); - - return data; + TransportStreams.writeHeader(cachedEntry.bytes().unsafeByteArray(), cachedEntry.bytes().size(), requestId, status); } - public static byte[] buildResponse(final long requestId, Streamable message, TransportResponseOptions options) throws IOException { + public static void buildResponse(CachedStreamOutput.Entry cachedEntry, final long requestId, Streamable message, TransportResponseOptions options) throws IOException { byte status = 0; status = TransportStreams.statusSetResponse(status); - BytesStreamOutput wrapped; if (options.compress()) { status = TransportStreams.statusSetCompress(status); - HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesLzfBytes(); + HandlesStreamOutput stream = cachedEntry.cachedHandlesLzfBytes(); + cachedEntry.bytes().write(HEADER_PLACEHOLDER); message.writeTo(stream); stream.flush(); - wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut()); } else { - HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes(); + HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes(); + cachedEntry.bytes().write(HEADER_PLACEHOLDER); message.writeTo(stream); stream.flush(); - wrapped = ((BytesStreamOutput) stream.wrappedOut()); } - - - byte[] data = new byte[HEADER_SIZE + wrapped.size()]; - TransportStreams.writeHeader(data, wrapped.size(), requestId, status); - System.arraycopy(wrapped.unsafeByteArray(), 0, data, HEADER_SIZE, wrapped.size()); - - return data; + TransportStreams.writeHeader(cachedEntry.bytes().unsafeByteArray(), cachedEntry.bytes().size(), requestId, status); } } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/common/io/streams/BytesStreamsTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/common/io/streams/BytesStreamsTests.java index 7bf6740ec53..9d960ab4109 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/common/io/streams/BytesStreamsTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/common/io/streams/BytesStreamsTests.java @@ -34,7 +34,7 @@ import static org.hamcrest.Matchers.*; public class BytesStreamsTests { @Test public void testSimpleStreams() throws Exception { - BytesStreamOutput out = CachedStreamOutput.cachedBytes(); + BytesStreamOutput out = CachedStreamOutput.popEntry().cachedBytes(); out.writeBoolean(false); out.writeByte((byte) 1); out.writeShort((short) -1); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java index 1a957bfcbbc..df6cb99db28 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java @@ -39,6 +39,10 @@ public class SimpleNettyTransportTests extends AbstractSimpleTransportTests { serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress()); } + @Override public void testHelloWorld() { + super.testHelloWorld(); //To change body of overridden methods use File | Settings | File Templates. + } + @Override public void testVoidMessageCompressed() { super.testVoidMessageCompressed(); //To change body of overridden methods use File | Settings | File Templates. }