better cached stream output logic, and reduce byte array copying when sending a message over the transport
This commit is contained in:
parent
45956a5a27
commit
fe52c5665f
|
@ -222,9 +222,14 @@ public class ClusterState {
|
|||
}
|
||||
|
||||
public static byte[] toBytes(ClusterState state) throws IOException {
|
||||
BytesStreamOutput os = CachedStreamOutput.cachedBytes();
|
||||
writeTo(state, os);
|
||||
return os.copiedByteArray();
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
try {
|
||||
BytesStreamOutput os = cachedEntry.cachedBytes();
|
||||
writeTo(state, os);
|
||||
return os.copiedByteArray();
|
||||
} finally {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
}
|
||||
|
||||
public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException {
|
||||
|
|
|
@ -56,11 +56,12 @@ public class ChunkEncoder {
|
|||
* @param totalLength Total encoded length; used for calculating size
|
||||
* of hash table to use
|
||||
*/
|
||||
public ChunkEncoder(int totalLength) {
|
||||
// ES: Added recycler as a parameter so we can control its caching
|
||||
public ChunkEncoder(int totalLength, BufferRecycler recycler) {
|
||||
int largestChunkLen = Math.max(totalLength, LZFChunk.MAX_CHUNK_LEN);
|
||||
|
||||
int suggestedHashLen = calcHashLen(largestChunkLen);
|
||||
_recycler = BufferRecycler.instance();
|
||||
_recycler = recycler;
|
||||
_hashTable = _recycler.allocEncodingHash(suggestedHashLen);
|
||||
_hashModulo = _hashTable.length - 1;
|
||||
// Ok, then, what's the worst case output buffer length?
|
||||
|
|
|
@ -35,7 +35,7 @@ public class LZFEncoder {
|
|||
* Result consists of a sequence of chunks.
|
||||
*/
|
||||
public static byte[] encode(byte[] data, int length) throws IOException {
|
||||
ChunkEncoder enc = new ChunkEncoder(length);
|
||||
ChunkEncoder enc = new ChunkEncoder(length, BufferRecycler.instance());
|
||||
byte[] result = encode(enc, data, length);
|
||||
// important: may be able to reuse buffers
|
||||
enc.close();
|
||||
|
|
|
@ -18,8 +18,8 @@ public class LZFOutputStream extends OutputStream {
|
|||
protected int _position = 0;
|
||||
|
||||
public LZFOutputStream(final OutputStream outputStream) {
|
||||
_encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE);
|
||||
_recycler = BufferRecycler.instance();
|
||||
_encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE, _recycler);
|
||||
_outputStream = outputStream;
|
||||
_outputBuffer = _recycler.allocOutputBuffer(OUTPUT_BUFFER_SIZE);
|
||||
}
|
||||
|
|
|
@ -19,71 +19,121 @@
|
|||
|
||||
package org.elasticsearch.common.io.stream;
|
||||
|
||||
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class CachedStreamOutput {
|
||||
|
||||
static class Entry {
|
||||
final BytesStreamOutput bytes;
|
||||
final HandlesStreamOutput handles;
|
||||
final LZFStreamOutput lzf;
|
||||
private static Entry newEntry() {
|
||||
BytesStreamOutput bytes = new BytesStreamOutput();
|
||||
HandlesStreamOutput handles = new HandlesStreamOutput(bytes);
|
||||
LZFStreamOutput lzf = new LZFStreamOutput(bytes, true);
|
||||
return new Entry(bytes, handles, lzf);
|
||||
}
|
||||
|
||||
public static class Entry {
|
||||
private final BytesStreamOutput bytes;
|
||||
private final HandlesStreamOutput handles;
|
||||
private final LZFStreamOutput lzf;
|
||||
|
||||
Entry(BytesStreamOutput bytes, HandlesStreamOutput handles, LZFStreamOutput lzf) {
|
||||
this.bytes = bytes;
|
||||
this.handles = handles;
|
||||
this.lzf = lzf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the underlying bytes without any resetting.
|
||||
*/
|
||||
public BytesStreamOutput bytes() {
|
||||
return bytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns cached bytes that are also reset.
|
||||
*/
|
||||
public BytesStreamOutput cachedBytes() {
|
||||
bytes.reset();
|
||||
return bytes;
|
||||
}
|
||||
|
||||
public LZFStreamOutput cachedLZFBytes() throws IOException {
|
||||
lzf.reset();
|
||||
return lzf;
|
||||
}
|
||||
|
||||
public HandlesStreamOutput cachedHandlesLzfBytes() throws IOException {
|
||||
handles.reset(lzf);
|
||||
return handles;
|
||||
}
|
||||
|
||||
public HandlesStreamOutput cachedHandlesBytes() throws IOException {
|
||||
handles.reset(bytes);
|
||||
return handles;
|
||||
}
|
||||
}
|
||||
|
||||
private static final ThreadLocal<SoftReference<Entry>> cache = new ThreadLocal<SoftReference<Entry>>();
|
||||
static class SoftWrapper<T> {
|
||||
private SoftReference<T> ref;
|
||||
|
||||
static Entry instance() {
|
||||
SoftReference<Entry> ref = cache.get();
|
||||
Entry entry = ref == null ? null : ref.get();
|
||||
if (entry == null) {
|
||||
BytesStreamOutput bytes = new BytesStreamOutput();
|
||||
HandlesStreamOutput handles = new HandlesStreamOutput(bytes);
|
||||
LZFStreamOutput lzf = new LZFStreamOutput(bytes, true);
|
||||
entry = new Entry(bytes, handles, lzf);
|
||||
cache.set(new SoftReference<Entry>(entry));
|
||||
public SoftWrapper() {
|
||||
}
|
||||
|
||||
public void set(T ref) {
|
||||
this.ref = new SoftReference<T>(ref);
|
||||
}
|
||||
|
||||
public T get() {
|
||||
return ref == null ? null : ref.get();
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
ref = null;
|
||||
}
|
||||
}
|
||||
|
||||
private static final SoftWrapper<Queue<Entry>> cache = new SoftWrapper<Queue<Entry>>();
|
||||
private static final AtomicInteger counter = new AtomicInteger();
|
||||
private static final int BYTES_LIMIT = 10 * 1024 * 1024; // don't cache entries that are bigger than that...
|
||||
private static final int COUNT_LIMIT = 100;
|
||||
|
||||
public static void clear() {
|
||||
cache.clear();
|
||||
}
|
||||
|
||||
public static Entry popEntry() {
|
||||
Queue<Entry> ref = cache.get();
|
||||
if (ref == null) {
|
||||
return newEntry();
|
||||
}
|
||||
Entry entry = ref.poll();
|
||||
if (entry == null) {
|
||||
return newEntry();
|
||||
}
|
||||
counter.decrementAndGet();
|
||||
return entry;
|
||||
}
|
||||
|
||||
public static void clear() {
|
||||
cache.remove();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the cached thread local byte stream, with its internal stream cleared.
|
||||
*/
|
||||
public static BytesStreamOutput cachedBytes() {
|
||||
BytesStreamOutput os = instance().bytes;
|
||||
os.reset();
|
||||
return os;
|
||||
}
|
||||
|
||||
public static LZFStreamOutput cachedLZFBytes() throws IOException {
|
||||
LZFStreamOutput lzf = instance().lzf;
|
||||
lzf.reset();
|
||||
return lzf;
|
||||
}
|
||||
|
||||
public static HandlesStreamOutput cachedHandlesLzfBytes() throws IOException {
|
||||
Entry entry = instance();
|
||||
HandlesStreamOutput os = entry.handles;
|
||||
os.reset(entry.lzf);
|
||||
return os;
|
||||
}
|
||||
|
||||
public static HandlesStreamOutput cachedHandlesBytes() throws IOException {
|
||||
Entry entry = instance();
|
||||
HandlesStreamOutput os = entry.handles;
|
||||
os.reset(entry.bytes);
|
||||
return os;
|
||||
public static void pushEntry(Entry entry) {
|
||||
if (entry.bytes().unsafeByteArray().length > BYTES_LIMIT) {
|
||||
return;
|
||||
}
|
||||
Queue<Entry> ref = cache.get();
|
||||
if (ref == null) {
|
||||
ref = new LinkedTransferQueue<Entry>();
|
||||
cache.set(ref);
|
||||
}
|
||||
if (counter.incrementAndGet() > COUNT_LIMIT) {
|
||||
counter.decrementAndGet();
|
||||
} else {
|
||||
ref.add(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,8 +43,8 @@ public class LZFStreamOutput extends StreamOutput {
|
|||
|
||||
public LZFStreamOutput(StreamOutput out, boolean neverClose) {
|
||||
this.neverClose = neverClose;
|
||||
_encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE);
|
||||
_recycler = BufferRecycler.instance();
|
||||
_recycler = neverClose ? new BufferRecycler() : BufferRecycler.instance();
|
||||
_encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE, _recycler);
|
||||
_outputStream = out;
|
||||
_outputBuffer = _recycler.allocOutputBuffer(OUTPUT_BUFFER_SIZE);
|
||||
}
|
||||
|
|
|
@ -181,11 +181,11 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
|||
final AtomicReference<PingResponse[]> response = new AtomicReference<PingResponse[]>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
ping(new PingListener() {
|
||||
@Override public void onPing(PingResponse[] pings) {
|
||||
response.set(pings);
|
||||
latch.countDown();
|
||||
}
|
||||
}, timeout);
|
||||
@Override public void onPing(PingResponse[] pings) {
|
||||
response.set(pings);
|
||||
latch.countDown();
|
||||
}
|
||||
}, timeout);
|
||||
try {
|
||||
latch.await();
|
||||
return response.get();
|
||||
|
@ -219,17 +219,20 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
|||
|
||||
private void sendPingRequest(int id, boolean remove) {
|
||||
synchronized (sendMutex) {
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
try {
|
||||
HandlesStreamOutput out = CachedStreamOutput.cachedHandlesBytes();
|
||||
HandlesStreamOutput out = cachedEntry.cachedHandlesBytes();
|
||||
out.writeInt(id);
|
||||
clusterName.writeTo(out);
|
||||
nodesProvider.nodes().localNode().writeTo(out);
|
||||
datagramPacketSend.setData(((BytesStreamOutput) out.wrappedOut()).copiedByteArray());
|
||||
datagramPacketSend.setData(cachedEntry.bytes().copiedByteArray());
|
||||
} catch (IOException e) {
|
||||
if (remove) {
|
||||
receivedResponses.remove(id);
|
||||
}
|
||||
throw new ZenPingException("Failed to serialize ping request", e);
|
||||
} finally {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
try {
|
||||
multicastSocket.send(datagramPacketSend);
|
||||
|
|
|
@ -63,16 +63,18 @@ public class PublishClusterStateAction extends AbstractComponent {
|
|||
DiscoveryNode localNode = nodesProvider.nodes().localNode();
|
||||
|
||||
// serialize the cluster state here, so we won't do it several times per node
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
byte[] clusterStateInBytes;
|
||||
try {
|
||||
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesLzfBytes();
|
||||
HandlesStreamOutput stream = cachedEntry.cachedHandlesLzfBytes();
|
||||
ClusterState.Builder.writeTo(clusterState, stream);
|
||||
stream.flush();
|
||||
BytesStreamOutput wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut());
|
||||
clusterStateInBytes = wrapped.copiedByteArray();
|
||||
clusterStateInBytes = cachedEntry.bytes().copiedByteArray();
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to serialize cluster_state before publishing it to nodes", e);
|
||||
return;
|
||||
} finally {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
|
||||
for (final DiscoveryNode node : clusterState.nodes()) {
|
||||
|
|
|
@ -140,8 +140,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
|||
}
|
||||
|
||||
@Override public void add(Operation operation) throws TranslogException {
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
try {
|
||||
BytesStreamOutput out = CachedStreamOutput.cachedBytes();
|
||||
BytesStreamOutput out = cachedEntry.cachedBytes();
|
||||
out.writeInt(0); // marker for the size...
|
||||
TranslogStreams.writeTranslogOperation(out, operation);
|
||||
out.flush();
|
||||
|
@ -164,6 +165,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
|||
}
|
||||
} catch (Exception e) {
|
||||
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
|
||||
} finally {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -135,30 +135,35 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
}
|
||||
|
||||
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {
|
||||
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes();
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
try {
|
||||
HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes();
|
||||
|
||||
stream.writeLong(requestId);
|
||||
byte status = 0;
|
||||
status = TransportStreams.statusSetRequest(status);
|
||||
stream.writeByte(status); // 0 for request, 1 for response.
|
||||
stream.writeLong(requestId);
|
||||
byte status = 0;
|
||||
status = TransportStreams.statusSetRequest(status);
|
||||
stream.writeByte(status); // 0 for request, 1 for response.
|
||||
|
||||
stream.writeUTF(action);
|
||||
message.writeTo(stream);
|
||||
stream.writeUTF(action);
|
||||
message.writeTo(stream);
|
||||
|
||||
final LocalTransport targetTransport = connectedNodes.get(node);
|
||||
if (targetTransport == null) {
|
||||
throw new NodeNotConnectedException(node, "Node not connected");
|
||||
}
|
||||
|
||||
final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
|
||||
|
||||
transportServiceAdapter.sent(data.length);
|
||||
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
targetTransport.messageReceived(data, action, LocalTransport.this, requestId);
|
||||
final LocalTransport targetTransport = connectedNodes.get(node);
|
||||
if (targetTransport == null) {
|
||||
throw new NodeNotConnectedException(node, "Node not connected");
|
||||
}
|
||||
});
|
||||
|
||||
final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
|
||||
|
||||
transportServiceAdapter.sent(data.length);
|
||||
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
targetTransport.messageReceived(data, action, LocalTransport.this, requestId);
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
}
|
||||
|
||||
ThreadPool threadPool() {
|
||||
|
|
|
@ -63,43 +63,53 @@ public class LocalTransportChannel implements TransportChannel {
|
|||
}
|
||||
|
||||
@Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException {
|
||||
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes();
|
||||
stream.writeLong(requestId);
|
||||
byte status = 0;
|
||||
status = TransportStreams.statusSetResponse(status);
|
||||
stream.writeByte(status); // 0 for request, 1 for response.
|
||||
message.writeTo(stream);
|
||||
final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
|
||||
targetTransport.threadPool().cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
targetTransport.messageReceived(data, action, sourceTransport, null);
|
||||
}
|
||||
});
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
try {
|
||||
HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes();
|
||||
stream.writeLong(requestId);
|
||||
byte status = 0;
|
||||
status = TransportStreams.statusSetResponse(status);
|
||||
stream.writeByte(status); // 0 for request, 1 for response.
|
||||
message.writeTo(stream);
|
||||
final byte[] data = cachedEntry.bytes().copiedByteArray();
|
||||
targetTransport.threadPool().cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
targetTransport.messageReceived(data, action, sourceTransport, null);
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void sendResponse(Throwable error) throws IOException {
|
||||
BytesStreamOutput stream;
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
try {
|
||||
stream = CachedStreamOutput.cachedBytes();
|
||||
writeResponseExceptionHeader(stream);
|
||||
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, error);
|
||||
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
|
||||
too.writeObject(tx);
|
||||
too.close();
|
||||
} catch (NotSerializableException e) {
|
||||
stream = CachedStreamOutput.cachedBytes();
|
||||
writeResponseExceptionHeader(stream);
|
||||
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, new NotSerializableTransportException(error));
|
||||
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
|
||||
too.writeObject(tx);
|
||||
too.close();
|
||||
}
|
||||
final byte[] data = stream.copiedByteArray();
|
||||
targetTransport.threadPool().cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
targetTransport.messageReceived(data, action, sourceTransport, null);
|
||||
BytesStreamOutput stream;
|
||||
try {
|
||||
stream = cachedEntry.cachedBytes();
|
||||
writeResponseExceptionHeader(stream);
|
||||
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, error);
|
||||
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
|
||||
too.writeObject(tx);
|
||||
too.close();
|
||||
} catch (NotSerializableException e) {
|
||||
stream = cachedEntry.cachedBytes();
|
||||
writeResponseExceptionHeader(stream);
|
||||
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, new NotSerializableTransportException(error));
|
||||
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
|
||||
too.writeObject(tx);
|
||||
too.close();
|
||||
}
|
||||
});
|
||||
final byte[] data = stream.copiedByteArray();
|
||||
targetTransport.threadPool().cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
targetTransport.messageReceived(data, action, sourceTransport, null);
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
}
|
||||
|
||||
private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException {
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.collect.ImmutableList;
|
|||
import org.elasticsearch.common.collect.Lists;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.CachedStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.netty.OpenChannelsHandler;
|
||||
import org.elasticsearch.common.netty.bootstrap.ClientBootstrap;
|
||||
|
@ -307,7 +308,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
threadPool.cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
try {
|
||||
for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext();) {
|
||||
for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) {
|
||||
NodeChannels nodeChannels = it.next();
|
||||
it.remove();
|
||||
nodeChannels.close();
|
||||
|
@ -331,7 +332,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
serverBootstrap = null;
|
||||
}
|
||||
|
||||
for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext();) {
|
||||
for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) {
|
||||
NodeChannels nodeChannels = it.next();
|
||||
it.remove();
|
||||
nodeChannels.close();
|
||||
|
@ -427,10 +428,11 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
options.withCompress(true);
|
||||
}
|
||||
|
||||
byte[] data = TransportStreams.buildRequest(requestId, action, message, options);
|
||||
|
||||
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
|
||||
ChannelFuture channelFuture = targetChannel.write(buffer);
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
TransportStreams.buildRequest(cachedEntry, requestId, action, message, options);
|
||||
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(cachedEntry.bytes().unsafeByteArray(), 0, cachedEntry.bytes().size());
|
||||
ChannelFuture future = targetChannel.write(buffer);
|
||||
future.addListener(new CacheFutureListener(cachedEntry));
|
||||
// We handle close connection exception in the #exceptionCaught method, which is the main reason we want to add this future
|
||||
// channelFuture.addListener(new ChannelFutureListener() {
|
||||
// @Override public void operationComplete(ChannelFuture future) throws Exception {
|
||||
|
@ -641,4 +643,18 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class CacheFutureListener implements ChannelFutureListener {
|
||||
|
||||
private final CachedStreamOutput.Entry cachedEntry;
|
||||
|
||||
public CacheFutureListener(CachedStreamOutput.Entry cachedEntry) {
|
||||
this.cachedEntry = cachedEntry;
|
||||
}
|
||||
|
||||
@Override public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.Streamable;
|
|||
import org.elasticsearch.common.netty.buffer.ChannelBuffer;
|
||||
import org.elasticsearch.common.netty.buffer.ChannelBuffers;
|
||||
import org.elasticsearch.common.netty.channel.Channel;
|
||||
import org.elasticsearch.common.netty.channel.ChannelFuture;
|
||||
import org.elasticsearch.transport.NotSerializableTransportException;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -69,31 +70,35 @@ public class NettyTransportChannel implements TransportChannel {
|
|||
if (transport.compress) {
|
||||
options.withCompress(true);
|
||||
}
|
||||
byte[] data = TransportStreams.buildResponse(requestId, message, options);
|
||||
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
|
||||
channel.write(buffer);
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
TransportStreams.buildResponse(cachedEntry, requestId, message, options);
|
||||
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(cachedEntry.bytes().unsafeByteArray(), 0, cachedEntry.bytes().size());
|
||||
ChannelFuture future = channel.write(buffer);
|
||||
future.addListener(new NettyTransport.CacheFutureListener(cachedEntry));
|
||||
}
|
||||
|
||||
@Override public void sendResponse(Throwable error) throws IOException {
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
BytesStreamOutput stream;
|
||||
try {
|
||||
stream = CachedStreamOutput.cachedBytes();
|
||||
stream = cachedEntry.cachedBytes();
|
||||
writeResponseExceptionHeader(stream);
|
||||
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
|
||||
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
|
||||
too.writeObject(tx);
|
||||
too.close();
|
||||
} catch (NotSerializableException e) {
|
||||
stream = CachedStreamOutput.cachedBytes();
|
||||
stream = cachedEntry.cachedBytes();
|
||||
writeResponseExceptionHeader(stream);
|
||||
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error));
|
||||
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
|
||||
too.writeObject(tx);
|
||||
too.close();
|
||||
}
|
||||
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(stream.copiedByteArray());
|
||||
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(stream.unsafeByteArray(), 0, stream.size());
|
||||
buffer.setInt(0, buffer.writerIndex() - 4); // update real size.
|
||||
channel.write(buffer);
|
||||
ChannelFuture future = channel.write(buffer);
|
||||
future.addListener(new NettyTransport.CacheFutureListener(cachedEntry));
|
||||
}
|
||||
|
||||
private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException {
|
||||
|
|
|
@ -19,7 +19,9 @@
|
|||
|
||||
package org.elasticsearch.transport.support;
|
||||
|
||||
import org.elasticsearch.common.io.stream.*;
|
||||
import org.elasticsearch.common.io.stream.CachedStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportResponseOptions;
|
||||
|
||||
|
@ -31,9 +33,10 @@ import java.io.IOException;
|
|||
public class TransportStreams {
|
||||
|
||||
public static final int HEADER_SIZE = 4 + 8 + 1;
|
||||
public static final byte[] HEADER_PLACEHOLDER = new byte[HEADER_SIZE];
|
||||
|
||||
public static void writeHeader(byte[] data, int dataLength, long requestId, byte status) {
|
||||
writeInt(data, 0, dataLength + 9); // add the requestId and the status
|
||||
writeInt(data, 0, dataLength - 4); // no need for the header, already there
|
||||
writeLong(data, 4, requestId);
|
||||
data[12] = status;
|
||||
}
|
||||
|
@ -96,58 +99,43 @@ public class TransportStreams {
|
|||
return value;
|
||||
}
|
||||
|
||||
public static byte[] buildRequest(final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException {
|
||||
public static void buildRequest(CachedStreamOutput.Entry cachedEntry, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException {
|
||||
byte status = 0;
|
||||
status = TransportStreams.statusSetRequest(status);
|
||||
|
||||
BytesStreamOutput wrapped;
|
||||
if (options.compress()) {
|
||||
status = TransportStreams.statusSetCompress(status);
|
||||
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesLzfBytes();
|
||||
HandlesStreamOutput stream = cachedEntry.cachedHandlesLzfBytes();
|
||||
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
|
||||
stream.writeUTF(action);
|
||||
message.writeTo(stream);
|
||||
stream.flush();
|
||||
wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut());
|
||||
stream.cleanHandles();
|
||||
} else {
|
||||
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes();
|
||||
HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes();
|
||||
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
|
||||
stream.writeUTF(action);
|
||||
message.writeTo(stream);
|
||||
stream.flush();
|
||||
wrapped = ((BytesStreamOutput) stream.wrappedOut());
|
||||
stream.cleanHandles();
|
||||
}
|
||||
|
||||
byte[] data = new byte[HEADER_SIZE + wrapped.size()];
|
||||
TransportStreams.writeHeader(data, wrapped.size(), requestId, status);
|
||||
System.arraycopy(wrapped.unsafeByteArray(), 0, data, HEADER_SIZE, wrapped.size());
|
||||
|
||||
return data;
|
||||
TransportStreams.writeHeader(cachedEntry.bytes().unsafeByteArray(), cachedEntry.bytes().size(), requestId, status);
|
||||
}
|
||||
|
||||
public static byte[] buildResponse(final long requestId, Streamable message, TransportResponseOptions options) throws IOException {
|
||||
public static void buildResponse(CachedStreamOutput.Entry cachedEntry, final long requestId, Streamable message, TransportResponseOptions options) throws IOException {
|
||||
byte status = 0;
|
||||
status = TransportStreams.statusSetResponse(status);
|
||||
|
||||
BytesStreamOutput wrapped;
|
||||
if (options.compress()) {
|
||||
status = TransportStreams.statusSetCompress(status);
|
||||
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesLzfBytes();
|
||||
HandlesStreamOutput stream = cachedEntry.cachedHandlesLzfBytes();
|
||||
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
|
||||
message.writeTo(stream);
|
||||
stream.flush();
|
||||
wrapped = ((BytesStreamOutput) ((LZFStreamOutput) stream.wrappedOut()).wrappedOut());
|
||||
} else {
|
||||
HandlesStreamOutput stream = CachedStreamOutput.cachedHandlesBytes();
|
||||
HandlesStreamOutput stream = cachedEntry.cachedHandlesBytes();
|
||||
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
|
||||
message.writeTo(stream);
|
||||
stream.flush();
|
||||
wrapped = ((BytesStreamOutput) stream.wrappedOut());
|
||||
}
|
||||
|
||||
|
||||
byte[] data = new byte[HEADER_SIZE + wrapped.size()];
|
||||
TransportStreams.writeHeader(data, wrapped.size(), requestId, status);
|
||||
System.arraycopy(wrapped.unsafeByteArray(), 0, data, HEADER_SIZE, wrapped.size());
|
||||
|
||||
return data;
|
||||
TransportStreams.writeHeader(cachedEntry.bytes().unsafeByteArray(), cachedEntry.bytes().size(), requestId, status);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ import static org.hamcrest.Matchers.*;
|
|||
public class BytesStreamsTests {
|
||||
|
||||
@Test public void testSimpleStreams() throws Exception {
|
||||
BytesStreamOutput out = CachedStreamOutput.cachedBytes();
|
||||
BytesStreamOutput out = CachedStreamOutput.popEntry().cachedBytes();
|
||||
out.writeBoolean(false);
|
||||
out.writeByte((byte) 1);
|
||||
out.writeShort((short) -1);
|
||||
|
|
|
@ -39,6 +39,10 @@ public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
|
|||
serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress());
|
||||
}
|
||||
|
||||
@Override public void testHelloWorld() {
|
||||
super.testHelloWorld(); //To change body of overridden methods use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
@Override public void testVoidMessageCompressed() {
|
||||
super.testVoidMessageCompressed(); //To change body of overridden methods use File | Settings | File Templates.
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue