mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-25 22:36:20 +00:00
Add int indicating size of transport header (#50085)
Currently we do not know the size of the transport header (map of request response headers, features array, and action name). This means that we must read the entire transport message to dependably act on the headers. This commit adds an int indicating the size of the transport headers. With this addition we can act upon the headers prior to reading the entire message.
This commit is contained in:
parent
e9e2e5fc71
commit
38b67f719e
@ -146,6 +146,10 @@ public abstract class StreamOutput extends OutputStream {
|
||||
this.features = Collections.unmodifiableSet(new HashSet<>(features));
|
||||
}
|
||||
|
||||
public Set<String> getFeatures() {
|
||||
return this.features;
|
||||
}
|
||||
|
||||
public long position() throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
@ -19,9 +19,7 @@
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.compress.Compressor;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
@ -62,10 +60,6 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
|
||||
}
|
||||
|
||||
InboundMessage deserialize(BytesReference reference) throws IOException {
|
||||
int messageLengthBytes = reference.length();
|
||||
final int totalMessageSize = messageLengthBytes + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
|
||||
// we have additional bytes to read, outside of the header
|
||||
boolean hasMessageBytesToRead = (totalMessageSize - TcpHeader.HEADER_SIZE) > 0;
|
||||
StreamInput streamInput = reference.streamInput();
|
||||
boolean success = false;
|
||||
try (ThreadContext.StoredContext existing = threadContext.stashContext()) {
|
||||
@ -74,23 +68,13 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
|
||||
Version remoteVersion = Version.fromId(streamInput.readInt());
|
||||
final boolean isHandshake = TransportStatus.isHandshake(status);
|
||||
ensureVersionCompatibility(remoteVersion, version, isHandshake);
|
||||
if (TransportStatus.isCompress(status) && hasMessageBytesToRead && streamInput.available() > 0) {
|
||||
Compressor compressor = getCompressor(reference);
|
||||
if (compressor == null) {
|
||||
int maxToRead = Math.min(reference.length(), 10);
|
||||
StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [")
|
||||
.append(maxToRead).append("] content bytes out of [").append(reference.length())
|
||||
.append("] readable bytes with message size [").append(messageLengthBytes).append("] ").append("] are [");
|
||||
for (int i = 0; i < maxToRead; i++) {
|
||||
sb.append(reference.get(i)).append(",");
|
||||
}
|
||||
sb.append("]");
|
||||
throw new IllegalStateException(sb.toString());
|
||||
}
|
||||
streamInput = compressor.streamInput(streamInput);
|
||||
|
||||
if (remoteVersion.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
|
||||
// Consume the variable header size
|
||||
streamInput.readInt();
|
||||
} else {
|
||||
streamInput = decompressingStream(status, remoteVersion, streamInput);
|
||||
}
|
||||
streamInput = new NamedWriteableAwareStreamInput(streamInput, namedWriteableRegistry);
|
||||
streamInput.setVersion(remoteVersion);
|
||||
|
||||
threadContext.readHeaders(streamInput);
|
||||
|
||||
@ -108,8 +92,17 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
|
||||
features = Collections.emptySet();
|
||||
}
|
||||
final String action = streamInput.readString();
|
||||
|
||||
if (remoteVersion.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
|
||||
streamInput = decompressingStream(status, remoteVersion, streamInput);
|
||||
}
|
||||
streamInput = namedWriteableStream(streamInput, remoteVersion);
|
||||
message = new Request(threadContext, remoteVersion, status, requestId, action, features, streamInput);
|
||||
} else {
|
||||
if (remoteVersion.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
|
||||
streamInput = decompressingStream(status, remoteVersion, streamInput);
|
||||
}
|
||||
streamInput = namedWriteableStream(streamInput, remoteVersion);
|
||||
message = new Response(threadContext, remoteVersion, status, requestId, streamInput);
|
||||
}
|
||||
success = true;
|
||||
@ -120,13 +113,26 @@ public abstract class InboundMessage extends NetworkMessage implements Closeable
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
static Compressor getCompressor(BytesReference message) {
|
||||
final int offset = TcpHeader.REQUEST_ID_SIZE + TcpHeader.STATUS_SIZE + TcpHeader.VERSION_ID_SIZE;
|
||||
return CompressorFactory.COMPRESSOR.isCompressed(message.slice(offset, message.length() - offset))
|
||||
? CompressorFactory.COMPRESSOR : null;
|
||||
static StreamInput decompressingStream(byte status, Version remoteVersion, StreamInput streamInput) throws IOException {
|
||||
if (TransportStatus.isCompress(status) && streamInput.available() > 0) {
|
||||
try {
|
||||
StreamInput decompressor = CompressorFactory.COMPRESSOR.streamInput(streamInput);
|
||||
decompressor.setVersion(remoteVersion);
|
||||
return decompressor;
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new IllegalStateException("stream marked as compressed, but is missing deflate header");
|
||||
}
|
||||
} else {
|
||||
return streamInput;
|
||||
}
|
||||
}
|
||||
|
||||
private StreamInput namedWriteableStream(StreamInput delegate, Version remoteVersion) {
|
||||
NamedWriteableAwareStreamInput streamInput = new NamedWriteableAwareStreamInput(delegate, namedWriteableRegistry);
|
||||
streamInput.setVersion(remoteVersion);
|
||||
return streamInput;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -30,7 +30,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
|
||||
abstract class OutboundMessage extends NetworkMessage implements Writeable {
|
||||
abstract class OutboundMessage extends NetworkMessage {
|
||||
|
||||
private final Writeable message;
|
||||
|
||||
@ -42,22 +42,39 @@ abstract class OutboundMessage extends NetworkMessage implements Writeable {
|
||||
BytesReference serialize(BytesStreamOutput bytesStream) throws IOException {
|
||||
storedContext.restore();
|
||||
bytesStream.setVersion(version);
|
||||
bytesStream.skip(TcpHeader.HEADER_SIZE);
|
||||
bytesStream.skip(TcpHeader.headerSize(version));
|
||||
|
||||
// The compressible bytes stream will not close the underlying bytes stream
|
||||
BytesReference reference;
|
||||
int variableHeaderLength = -1;
|
||||
final long preHeaderPosition = bytesStream.position();
|
||||
|
||||
if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
|
||||
writeVariableHeader(bytesStream);
|
||||
variableHeaderLength = Math.toIntExact(bytesStream.position() - preHeaderPosition);
|
||||
}
|
||||
|
||||
try (CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bytesStream, TransportStatus.isCompress(status))) {
|
||||
stream.setVersion(version);
|
||||
threadContext.writeTo(stream);
|
||||
writeTo(stream);
|
||||
stream.setFeatures(bytesStream.getFeatures());
|
||||
|
||||
if (variableHeaderLength == -1) {
|
||||
writeVariableHeader(stream);
|
||||
}
|
||||
reference = writeMessage(stream);
|
||||
}
|
||||
|
||||
bytesStream.seek(0);
|
||||
TcpHeader.writeHeader(bytesStream, requestId, status, version, reference.length() - TcpHeader.HEADER_SIZE);
|
||||
final int contentSize = reference.length() - TcpHeader.headerSize(version);
|
||||
TcpHeader.writeHeader(bytesStream, requestId, status, version, contentSize, variableHeaderLength);
|
||||
return reference;
|
||||
}
|
||||
|
||||
private BytesReference writeMessage(CompressibleBytesOutputStream stream) throws IOException {
|
||||
protected void writeVariableHeader(StreamOutput stream) throws IOException {
|
||||
threadContext.writeTo(stream);
|
||||
}
|
||||
|
||||
protected BytesReference writeMessage(CompressibleBytesOutputStream stream) throws IOException {
|
||||
final BytesReference zeroCopyBuffer;
|
||||
if (message instanceof BytesTransportRequest) {
|
||||
BytesTransportRequest bRequest = (BytesTransportRequest) message;
|
||||
@ -96,11 +113,12 @@ abstract class OutboundMessage extends NetworkMessage implements Writeable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
protected void writeVariableHeader(StreamOutput stream) throws IOException {
|
||||
super.writeVariableHeader(stream);
|
||||
if (version.onOrAfter(Version.V_6_3_0)) {
|
||||
out.writeStringArray(features);
|
||||
stream.writeStringArray(features);
|
||||
}
|
||||
out.writeString(action);
|
||||
stream.writeString(action);
|
||||
}
|
||||
|
||||
private static byte setStatus(boolean compress, boolean isHandshake, Writeable message) {
|
||||
@ -128,8 +146,9 @@ abstract class OutboundMessage extends NetworkMessage implements Writeable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.setFeatures(features);
|
||||
protected void writeVariableHeader(StreamOutput stream) throws IOException {
|
||||
super.writeVariableHeader(stream);
|
||||
stream.setFeatures(features);
|
||||
}
|
||||
|
||||
private static byte setStatus(boolean compress, boolean isHandshake, Writeable message) {
|
||||
|
@ -25,7 +25,10 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
public class TcpHeader {
|
||||
public static final int MARKER_BYTES_SIZE = 2 * 1;
|
||||
|
||||
public static final Version VERSION_WITH_HEADER_SIZE = Version.V_7_6_0;
|
||||
|
||||
public static final int MARKER_BYTES_SIZE = 2;
|
||||
|
||||
public static final int MESSAGE_LENGTH_SIZE = 4;
|
||||
|
||||
@ -35,15 +38,36 @@ public class TcpHeader {
|
||||
|
||||
public static final int VERSION_ID_SIZE = 4;
|
||||
|
||||
public static final int HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE;
|
||||
public static final int VARIABLE_HEADER_SIZE = 4;
|
||||
|
||||
public static void writeHeader(StreamOutput output, long requestId, byte status, Version version, int messageSize) throws IOException {
|
||||
private static final int PRE_76_HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE;
|
||||
|
||||
private static final int HEADER_SIZE = PRE_76_HEADER_SIZE + VARIABLE_HEADER_SIZE;
|
||||
|
||||
public static int headerSize(Version version) {
|
||||
if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) {
|
||||
return HEADER_SIZE;
|
||||
} else {
|
||||
return PRE_76_HEADER_SIZE;
|
||||
}
|
||||
}
|
||||
|
||||
public static void writeHeader(StreamOutput output, long requestId, byte status, Version version, int contentSize,
|
||||
int variableHeaderSize) throws IOException {
|
||||
output.writeByte((byte)'E');
|
||||
output.writeByte((byte)'S');
|
||||
// write the size, the size indicates the remaining message size, not including the size int
|
||||
output.writeInt(messageSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE);
|
||||
if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) {
|
||||
output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE + VARIABLE_HEADER_SIZE);
|
||||
} else {
|
||||
output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE);
|
||||
}
|
||||
output.writeLong(requestId);
|
||||
output.writeByte(status);
|
||||
output.writeInt(version.id);
|
||||
if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) {
|
||||
assert variableHeaderSize != -1 : "Variable header size not set";
|
||||
output.writeInt(variableHeaderSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -22,8 +22,6 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.compress.Compressor;
|
||||
import org.elasticsearch.common.compress.NotCompressedException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
@ -77,26 +75,24 @@ public final class TransportLogger {
|
||||
final byte status = streamInput.readByte();
|
||||
final boolean isRequest = TransportStatus.isRequest(status);
|
||||
final String type = isRequest ? "request" : "response";
|
||||
final String version = Version.fromId(streamInput.readInt()).toString();
|
||||
Version version = Version.fromId(streamInput.readInt());
|
||||
sb.append(" [length: ").append(messageLengthWithHeader);
|
||||
sb.append(", request id: ").append(requestId);
|
||||
sb.append(", type: ").append(type);
|
||||
sb.append(", version: ").append(version);
|
||||
|
||||
if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
|
||||
sb.append(", header size: ").append(streamInput.readInt()).append('B');
|
||||
} else {
|
||||
streamInput = InboundMessage.Reader.decompressingStream(status, version, streamInput);
|
||||
}
|
||||
|
||||
// read and discard headers
|
||||
ThreadContext.readHeadersFromStream(streamInput);
|
||||
|
||||
if (isRequest) {
|
||||
if (TransportStatus.isCompress(status)) {
|
||||
Compressor compressor;
|
||||
compressor = InboundMessage.getCompressor(message);
|
||||
if (compressor == null) {
|
||||
throw new IllegalStateException(new NotCompressedException());
|
||||
}
|
||||
streamInput = compressor.streamInput(streamInput);
|
||||
}
|
||||
|
||||
// read and discard headers
|
||||
ThreadContext.readHeadersFromStream(streamInput);
|
||||
// now we decode the features
|
||||
if (streamInput.getVersion().onOrAfter(Version.V_6_3_0)) {
|
||||
// discard features
|
||||
streamInput.readStringArray();
|
||||
}
|
||||
sb.append(", action: ").append(streamInput.readString());
|
||||
|
@ -191,14 +191,14 @@ public class InboundMessageTests extends ESTestCase {
|
||||
reference = request.serialize(streamOutput);
|
||||
}
|
||||
final byte[] serialized = BytesReference.toBytes(reference);
|
||||
final int statusPosition = TcpHeader.HEADER_SIZE - TcpHeader.VERSION_ID_SIZE - 1;
|
||||
final int statusPosition = TcpHeader.headerSize(Version.CURRENT) - TcpHeader.VERSION_ID_SIZE - TcpHeader.VARIABLE_HEADER_SIZE - 1;
|
||||
// force status byte to signal compressed on the otherwise uncompressed message
|
||||
serialized[statusPosition] = TransportStatus.setCompress(serialized[statusPosition]);
|
||||
reference = new BytesArray(serialized);
|
||||
InboundMessage.Reader reader = new InboundMessage.Reader(Version.CURRENT, registry, threadContext);
|
||||
BytesReference sliced = reference.slice(6, reference.length() - 6);
|
||||
final IllegalStateException iste = expectThrows(IllegalStateException.class, () -> reader.deserialize(sliced));
|
||||
assertThat(iste.getMessage(), Matchers.startsWith("stream marked as compressed, but no compressor found,"));
|
||||
assertThat(iste.getMessage(), Matchers.equalTo("stream marked as compressed, but is missing deflate header"));
|
||||
}
|
||||
|
||||
private void testVersionIncompatibility(Version version, Version currentVersion, boolean isHandshake) throws IOException {
|
||||
|
@ -24,7 +24,6 @@ import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction;
|
||||
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.CompositeBytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -61,6 +60,7 @@ public class TransportLoggerTests extends ESTestCase {
|
||||
", request id: \\d+" +
|
||||
", type: request" +
|
||||
", version: .*" +
|
||||
", header size: \\d+B" +
|
||||
", action: cluster:monitor/stats]" +
|
||||
" WRITE: \\d+B";
|
||||
final MockLogAppender.LoggingExpectation writeExpectation =
|
||||
@ -72,6 +72,7 @@ public class TransportLoggerTests extends ESTestCase {
|
||||
", request id: \\d+" +
|
||||
", type: request" +
|
||||
", version: .*" +
|
||||
", header size: \\d+B" +
|
||||
", action: cluster:monitor/stats]" +
|
||||
" READ: \\d+B";
|
||||
|
||||
@ -88,27 +89,11 @@ public class TransportLoggerTests extends ESTestCase {
|
||||
}
|
||||
|
||||
private BytesReference buildRequest() throws IOException {
|
||||
try (BytesStreamOutput messageOutput = new BytesStreamOutput()) {
|
||||
messageOutput.setVersion(Version.CURRENT);
|
||||
ThreadContext context = new ThreadContext(Settings.EMPTY);
|
||||
context.writeTo(messageOutput);
|
||||
messageOutput.writeStringArray(new String[0]);
|
||||
messageOutput.writeString(ClusterStatsAction.NAME);
|
||||
new ClusterStatsRequest().writeTo(messageOutput);
|
||||
BytesReference messageBody = messageOutput.bytes();
|
||||
final BytesReference header = buildHeader(randomInt(30), messageBody.length());
|
||||
return new CompositeBytesReference(header, messageBody);
|
||||
}
|
||||
}
|
||||
|
||||
private BytesReference buildHeader(long requestId, int length) throws IOException {
|
||||
try (BytesStreamOutput headerOutput = new BytesStreamOutput(TcpHeader.HEADER_SIZE)) {
|
||||
headerOutput.setVersion(Version.CURRENT);
|
||||
TcpHeader.writeHeader(headerOutput, requestId, TransportStatus.setRequest((byte) 0), Version.CURRENT, length);
|
||||
final BytesReference bytes = headerOutput.bytes();
|
||||
assert bytes.length() == TcpHeader.HEADER_SIZE : "header size mismatch expected: " + TcpHeader.HEADER_SIZE + " but was: "
|
||||
+ bytes.length();
|
||||
return bytes;
|
||||
boolean compress = randomBoolean();
|
||||
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
|
||||
OutboundMessage.Request request = new OutboundMessage.Request(new ThreadContext(Settings.EMPTY), new String[0],
|
||||
new ClusterStatsRequest(), Version.CURRENT, ClusterStatsAction.NAME, randomInt(30), false, compress);
|
||||
return request.serialize(bytesStreamOutput);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user