Transport: Add header token

This allows to more easily identify when illegal content is being sent to the transport port
closes #2280
This commit is contained in:
Shay Banon 2012-09-21 23:56:27 +02:00
parent a3af3d2f47
commit cc7eb263be
12 changed files with 217 additions and 199 deletions

View File

@ -185,6 +185,14 @@ public class Version implements Serializable {
return version.id >= id;
}
public boolean before(Version version) {
return version.id < id;
}
public boolean onOrBefore(Version version) {
return version.id <= id;
}
/**
* Just the version number (without -SNAPSHOT if snapshot).
*/

View File

@ -229,13 +229,13 @@ public class DiscoveryNode implements Streamable, Serializable {
@Override
public void readFrom(StreamInput in) throws IOException {
nodeName = in.readUTF().intern();
nodeId = in.readUTF().intern();
nodeName = in.readString().intern();
nodeId = in.readString().intern();
address = TransportAddressSerializers.addressFromStream(in);
int size = in.readVInt();
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
for (int i = 0; i < size; i++) {
builder.put(in.readUTF().intern(), in.readUTF().intern());
builder.put(in.readString().intern(), in.readString().intern());
}
attributes = builder.build();
version = Version.readVersion(in);
@ -243,13 +243,13 @@ public class DiscoveryNode implements Streamable, Serializable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(nodeName);
out.writeUTF(nodeId);
out.writeString(nodeName);
out.writeString(nodeId);
addressToStream(out, address);
out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
out.writeUTF(entry.getKey());
out.writeUTF(entry.getValue());
out.writeString(entry.getKey());
out.writeString(entry.getValue());
}
Version.writeVersion(version, out);
}

View File

@ -78,6 +78,14 @@ public class BytesStreamOutput extends StreamOutput implements BytesStream {
count = newcount;
}
public void skip(int length) {
int newcount = count + length;
if (newcount > buf.length) {
buf = Arrays.copyOf(buf, Bytes.oversize(newcount, 1));
}
count = newcount;
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
if (length == 0) {

View File

@ -33,7 +33,7 @@ import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStreams;
import org.elasticsearch.transport.support.TransportStatus;
import java.io.IOException;
import java.util.Map;
@ -163,7 +163,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
stream.writeLong(requestId);
byte status = 0;
status = TransportStreams.statusSetRequest(status);
status = TransportStatus.setRequest(status);
stream.writeByte(status); // 0 for request, 1 for response.
stream.writeUTF(action);
@ -203,7 +203,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
try {
long requestId = stream.readLong();
byte status = stream.readByte();
boolean isRequest = TransportStreams.statusIsRequest(status);
boolean isRequest = TransportStatus.isRequest(status);
if (isRequest) {
handleRequest(stream, requestId, sourceTransport);
@ -211,7 +211,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
final TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (TransportStreams.statusIsError(status)) {
if (TransportStatus.isError(status)) {
handlerResponseError(stream, handler);
} else {
handleResponse(stream, handler);

View File

@ -28,7 +28,7 @@ import org.elasticsearch.transport.NotSerializableTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.support.TransportStreams;
import org.elasticsearch.transport.support.TransportStatus;
import java.io.IOException;
import java.io.NotSerializableException;
@ -71,7 +71,7 @@ public class LocalTransportChannel implements TransportChannel {
StreamOutput stream = cachedEntry.handles();
stream.writeLong(requestId);
byte status = 0;
status = TransportStreams.statusSetResponse(status);
status = TransportStatus.setResponse(status);
stream.writeByte(status); // 0 for request, 1 for response.
message.writeTo(stream);
stream.close();
@ -123,8 +123,8 @@ public class LocalTransportChannel implements TransportChannel {
private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException {
stream.writeLong(requestId);
byte status = 0;
status = TransportStreams.statusSetResponse(status);
status = TransportStreams.statusSetError(status);
status = TransportStatus.setResponse(status);
status = TransportStatus.setError(status);
stream.writeByte(status);
}
}

View File

@ -30,7 +30,7 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStreams;
import org.elasticsearch.transport.support.TransportStatus;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.*;
@ -72,8 +72,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
ChannelBuffer buffer = (ChannelBuffer) m;
int size = buffer.getInt(buffer.readerIndex() - 4);
transportServiceAdapter.received(size + 6);
transportServiceAdapter.received(size + 4);
// we have additional bytes to read, outside of the header
boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6)) != 0;
int markedReaderIndex = buffer.readerIndex();
int expectedIndexReader = markedReaderIndex + size;
@ -84,13 +86,11 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
long requestId = buffer.readLong();
byte status = buffer.readByte();
boolean isRequest = TransportStreams.statusIsRequest(status);
boolean isRequest = TransportStatus.isRequest(status);
// we have additional bytes to read, outside of the header
boolean hasBytesToRead = (size - (TransportStreams.HEADER_SIZE - 4)) != 0;
StreamInput wrappedStream;
if (TransportStreams.statusIsCompress(status) && hasBytesToRead && buffer.readable()) {
if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
Compressor compressor = CompressorFactory.compressor(buffer);
if (compressor == null) {
int maxToRead = Math.min(buffer.readableBytes(), 10);
@ -121,7 +121,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (TransportStreams.statusIsError(status)) {
if (TransportStatus.isError(status)) {
handlerResponseError(wrappedStream, handler);
} else {
handleResponse(wrappedStream, handler);
@ -132,9 +132,9 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
if (buffer.readerIndex() != expectedIndexReader) {
if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStreams.statusIsError(status));
logger.warn("Message not fully read (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStatus.isError(status));
} else {
logger.warn("Message read past expected size (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStreams.statusIsError(status));
logger.warn("Message read past expected size (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStatus.isError(status));
}
buffer.readerIndex(expectedIndexReader);
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty;
import org.jboss.netty.buffer.ChannelBuffer;
/**
*/
public class NettyHeader {
public static final int HEADER_SIZE = 2 + 4 + 8 + 1;
public static void writeHeader(ChannelBuffer buffer, long requestId, byte status) {
int index = buffer.readerIndex();
buffer.setByte(index, 'E');
index += 1;
buffer.setByte(index, 'S');
index += 1;
// write the size, the size indicates the remaining message size, not including the size int
buffer.setInt(index, buffer.readableBytes() - 6);
index += 4;
buffer.setLong(index, requestId);
index += 8;
buffer.setByte(index, status);
}
}

View File

@ -26,8 +26,10 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.netty.NettyStaticSetup;
import org.elasticsearch.common.netty.OpenChannelsHandler;
@ -43,7 +45,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStreams;
import org.elasticsearch.transport.support.TransportStatus;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
@ -58,7 +60,10 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
import java.util.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@ -472,19 +477,19 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
// ignore
}
if (isCloseConnectionException(e.getCause())) {
// disconnect the node
Channel channel = ctx.getChannel();
for (Map.Entry<DiscoveryNode, NodeChannels> entry : connectedNodes.entrySet()) {
if (entry.getValue().hasChannel(channel)) {
disconnectFromNode(entry.getKey());
}
if (logger.isTraceEnabled()) {
logger.trace("close connection exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), ctx.getChannel());
}
// close the channel, which will cause a node to be disconnected if relevant
ctx.getChannel().close();
} else if (isConnectException(e.getCause()) || e.getCause() instanceof CancelledKeyException) {
if (logger.isTraceEnabled()) {
logger.trace("(Ignoring) exception caught on netty layer [" + ctx.getChannel() + "]", e.getCause());
logger.trace("(ignoring) exception caught on transport layer [{}]", e.getCause(), ctx.getChannel());
}
} else {
logger.warn("exception caught on netty layer [" + ctx.getChannel() + "]", e.getCause());
logger.warn("exception caught on transport layer [{}], closing connection", e.getCause(), ctx.getChannel());
// close the channel, which will cause a node to be disconnected if relevant
ctx.getChannel().close();
}
}
@ -507,8 +512,27 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
TransportStreams.buildRequest(cachedEntry, requestId, action, message, options);
byte status = 0;
status = TransportStatus.setRequest(status);
if (options.compress()) {
status = TransportStatus.setCompress(status);
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor());
stream.writeString(action);
message.writeTo(stream);
stream.close();
} else {
StreamOutput stream = cachedEntry.handles();
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
stream.writeString(action);
message.writeTo(stream);
stream.close();
}
ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status);
ChannelFuture future = targetChannel.write(buffer);
future.addListener(new CacheFutureListener(cachedEntry));
// We handle close connection exception in the #exceptionCaught method, which is the main reason we want to add this future

View File

@ -19,15 +19,17 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.transport.NotSerializableTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.support.TransportStreams;
import org.elasticsearch.transport.support.TransportStatus;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
@ -73,8 +75,24 @@ public class NettyTransportChannel implements TransportChannel {
options.withCompress(true);
}
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
TransportStreams.buildResponse(cachedEntry, requestId, message, options);
byte status = 0;
status = TransportStatus.setResponse(status);
if (options.compress()) {
status = TransportStatus.setCompress(status);
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor());
message.writeTo(stream);
stream.close();
} else {
StreamOutput stream = cachedEntry.handles();
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
message.writeTo(stream);
stream.close();
}
ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status);
ChannelFuture future = channel.write(buffer);
future.addListener(new NettyTransport.CacheFutureListener(cachedEntry));
}
@ -85,7 +103,7 @@ public class NettyTransportChannel implements TransportChannel {
BytesStreamOutput stream;
try {
stream = cachedEntry.bytes();
writeResponseExceptionHeader(stream);
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
too.writeObject(tx);
@ -93,24 +111,20 @@ public class NettyTransportChannel implements TransportChannel {
} catch (NotSerializableException e) {
cachedEntry.reset();
stream = cachedEntry.bytes();
writeResponseExceptionHeader(stream);
cachedEntry.bytes().skip(NettyHeader.HEADER_SIZE);
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error));
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
too.writeObject(tx);
too.close();
}
ChannelBuffer buffer = stream.bytes().toChannelBuffer();
buffer.setInt(0, buffer.writerIndex() - 4); // update real size.
byte status = 0;
status = TransportStatus.setResponse(status);
status = TransportStatus.setError(status);
ChannelBuffer buffer = cachedEntry.bytes().bytes().toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status);
ChannelFuture future = channel.write(buffer);
future.addListener(new NettyTransport.CacheFutureListener(cachedEntry));
}
private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException {
stream.writeBytes(LENGTH_PLACEHOLDER);
stream.writeLong(requestId);
byte status = 0;
status = TransportStreams.statusSetResponse(status);
status = TransportStreams.statusSetError(status);
stream.writeByte(status);
}
}

View File

@ -18,11 +18,16 @@ public class SizeHeaderFrameDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < 4) {
if (buffer.readableBytes() < 6) {
return null;
}
int dataLen = buffer.getInt(buffer.readerIndex());
int readerIndex = buffer.readerIndex();
if (buffer.getByte(readerIndex) != 'E' || buffer.getByte(readerIndex + 1) != 'S') {
throw new StreamCorruptedException("invalid internal transport message format");
}
int dataLen = buffer.getInt(buffer.readerIndex() + 2);
if (dataLen <= 0) {
throw new StreamCorruptedException("invalid data length: " + dataLen);
}
@ -32,10 +37,10 @@ public class SizeHeaderFrameDecoder extends FrameDecoder {
"transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded [" + new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
}
if (buffer.readableBytes() < dataLen + 4) {
if (buffer.readableBytes() < dataLen + 6) {
return null;
}
buffer.skipBytes(4);
buffer.skipBytes(6);
return buffer;
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.support;
/**
*/
public class TransportStatus {
private static final byte STATUS_REQRES = 1 << 0;
private static final byte STATUS_ERROR = 1 << 1;
private static final byte STATUS_COMPRESS = 1 << 2;
public static boolean isRequest(byte value) {
return (value & STATUS_REQRES) == 0;
}
public static byte setRequest(byte value) {
value &= ~STATUS_REQRES;
return value;
}
public static byte setResponse(byte value) {
value |= STATUS_REQRES;
return value;
}
public static boolean isError(byte value) {
return (value & STATUS_ERROR) != 0;
}
public static byte setError(byte value) {
value |= STATUS_ERROR;
return value;
}
public static boolean isCompress(byte value) {
return (value & STATUS_COMPRESS) != 0;
}
public static byte setCompress(byte value) {
value |= STATUS_COMPRESS;
return value;
}
}

View File

@ -1,145 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.support;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseOptions;
import java.io.IOException;
/**
*
*/
public class TransportStreams {
public static final int HEADER_SIZE = 4 + 8 + 1;
public static final byte[] HEADER_PLACEHOLDER = new byte[HEADER_SIZE];
public static void writeHeader(byte[] data, int dataOffset, int dataLength, long requestId, byte status) {
writeInt(data, dataOffset, dataLength - 4); // no need for the header, already there
writeLong(data, dataOffset + 4, requestId);
data[dataOffset + 12] = status;
}
// same as writeLong in StreamOutput
private static void writeLong(byte[] buffer, int offset, long value) {
buffer[offset++] = ((byte) (value >> 56));
buffer[offset++] = ((byte) (value >> 48));
buffer[offset++] = ((byte) (value >> 40));
buffer[offset++] = ((byte) (value >> 32));
buffer[offset++] = ((byte) (value >> 24));
buffer[offset++] = ((byte) (value >> 16));
buffer[offset++] = ((byte) (value >> 8));
buffer[offset] = ((byte) (value));
}
// same as writeInt in StreamOutput
private static void writeInt(byte[] buffer, int offset, int value) {
buffer[offset++] = ((byte) (value >> 24));
buffer[offset++] = ((byte) (value >> 16));
buffer[offset++] = ((byte) (value >> 8));
buffer[offset] = ((byte) (value));
}
private static final byte STATUS_REQRES = 1 << 0;
private static final byte STATUS_ERROR = 1 << 1;
private static final byte STATUS_COMPRESS = 1 << 2;
public static boolean statusIsRequest(byte value) {
return (value & STATUS_REQRES) == 0;
}
public static byte statusSetRequest(byte value) {
value &= ~STATUS_REQRES;
return value;
}
public static byte statusSetResponse(byte value) {
value |= STATUS_REQRES;
return value;
}
public static boolean statusIsError(byte value) {
return (value & STATUS_ERROR) != 0;
}
public static byte statusSetError(byte value) {
value |= STATUS_ERROR;
return value;
}
public static boolean statusIsCompress(byte value) {
return (value & STATUS_COMPRESS) != 0;
}
public static byte statusSetCompress(byte value) {
value |= STATUS_COMPRESS;
return value;
}
public static void buildRequest(CachedStreamOutput.Entry cachedEntry, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException {
byte status = 0;
status = TransportStreams.statusSetRequest(status);
if (options.compress()) {
status = TransportStreams.statusSetCompress(status);
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor());
stream.writeUTF(action);
message.writeTo(stream);
stream.close();
} else {
StreamOutput stream = cachedEntry.handles();
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
stream.writeUTF(action);
message.writeTo(stream);
stream.close();
}
BytesReference bytes = cachedEntry.bytes().bytes();
TransportStreams.writeHeader(bytes.array(), bytes.arrayOffset(), bytes.length(), requestId, status);
}
public static void buildResponse(CachedStreamOutput.Entry cachedEntry, final long requestId, Streamable message, TransportResponseOptions options) throws IOException {
byte status = 0;
status = TransportStreams.statusSetResponse(status);
if (options.compress()) {
status = TransportStreams.statusSetCompress(status);
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
StreamOutput stream = cachedEntry.handles(CompressorFactory.defaultCompressor());
message.writeTo(stream);
stream.close();
} else {
StreamOutput stream = cachedEntry.handles();
cachedEntry.bytes().write(HEADER_PLACEHOLDER);
message.writeTo(stream);
stream.close();
}
BytesReference bytes = cachedEntry.bytes().bytes();
TransportStreams.writeHeader(bytes.array(), bytes.arrayOffset(), bytes.length(), requestId, status);
}
}