Make sure we don't reuse arrays when sending and error back

We want to make sure recycling will not fail for any reason while trying to send a response back that is caused by a failure, for example, if we have circuit breaker on it (at one point), sending an error back will not be affected by it.
closes #6631
This commit is contained in:
Shay Banon 2014-06-26 17:39:00 +02:00
parent e559295228
commit 4129bb6a4f
1 changed files with 24 additions and 32 deletions

View File

@ -20,9 +20,11 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -105,39 +107,29 @@ public class NettyTransportChannel implements TransportChannel {
@Override
public void sendResponse(Throwable error) throws IOException {
ReleasableBytesStreamOutput stream = new ReleasableBytesStreamOutput(transport.bigArrays);
boolean addedReleaseListener = false;
BytesStreamOutput stream = new BytesStreamOutput();
try {
try {
stream.skip(NettyHeader.HEADER_SIZE);
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
too.writeObject(tx);
too.close();
} catch (NotSerializableException e) {
stream.reset();
stream.skip(NettyHeader.HEADER_SIZE);
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error));
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
too.writeObject(tx);
too.close();
}
byte status = 0;
status = TransportStatus.setResponse(status);
status = TransportStatus.setError(status);
ReleasableBytesReference bytes = stream.bytes();
ChannelBuffer buffer = bytes.toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status, version);
ChannelFuture future = channel.write(buffer);
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
future.addListener(listener);
addedReleaseListener = true;
} finally {
if (!addedReleaseListener) {
Releasables.close(stream.bytes());
}
stream.skip(NettyHeader.HEADER_SIZE);
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
too.writeObject(tx);
too.close();
} catch (NotSerializableException e) {
stream.reset();
stream.skip(NettyHeader.HEADER_SIZE);
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error));
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
too.writeObject(tx);
too.close();
}
byte status = 0;
status = TransportStatus.setResponse(status);
status = TransportStatus.setError(status);
BytesReference bytes = stream.bytes();
ChannelBuffer buffer = bytes.toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status, version);
channel.write(buffer);
}
}