Revert "Close channel on handshake error with old version (#56989)"
This reverts commit c81a189da9
.
This commit is contained in:
parent
c81a189da9
commit
99f7115f22
|
@ -72,7 +72,7 @@ public class InboundDecoder implements Releasable {
|
|||
} else {
|
||||
totalNetworkSize = messageLength + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE;
|
||||
|
||||
Header header = readHeader(version, messageLength, reference);
|
||||
Header header = readHeader(messageLength, reference);
|
||||
bytesConsumed += headerBytesToRead;
|
||||
if (header.isCompressed()) {
|
||||
decompressor = new TransportDecompressor(recycler);
|
||||
|
@ -166,8 +166,7 @@ public class InboundDecoder implements Releasable {
|
|||
}
|
||||
}
|
||||
|
||||
// exposed for use in tests
|
||||
static Header readHeader(Version version, int networkMessageSize, BytesReference bytesReference) throws IOException {
|
||||
private Header readHeader(int networkMessageSize, BytesReference bytesReference) throws IOException {
|
||||
try (StreamInput streamInput = bytesReference.streamInput()) {
|
||||
streamInput.skip(TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
|
||||
long requestId = streamInput.readLong();
|
||||
|
|
|
@ -143,14 +143,7 @@ public class InboundHandler {
|
|||
try {
|
||||
handshaker.handleHandshake(transportChannel, requestId, stream);
|
||||
} catch (Exception e) {
|
||||
if (Version.CURRENT.isCompatible(header.getVersion())) {
|
||||
sendErrorResponse(action, transportChannel, e);
|
||||
} else {
|
||||
logger.warn(new ParameterizedMessage(
|
||||
"could not send error response to handshake received on [{}] using wire format version [{}], closing channel",
|
||||
channel, header.getVersion()), e);
|
||||
channel.close();
|
||||
}
|
||||
sendErrorResponse(action, transportChannel, e);
|
||||
}
|
||||
} else {
|
||||
final TransportChannel transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version,
|
||||
|
|
|
@ -19,37 +19,25 @@
|
|||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.MockLogAppender;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
@ -177,82 +165,4 @@ public class InboundHandlerTests extends ESTestCase {
|
|||
assertEquals(responseValue, responseCaptor.get().value);
|
||||
}
|
||||
}
|
||||
|
||||
public void testSendsErrorResponseToHandshakeFromCompatibleVersion() throws Exception {
|
||||
// Nodes use their minimum compatibility version for the TCP handshake, so a node from v(major-1).x will report its version as
|
||||
// v(major-2).last in the TCP handshake, with which we are not really compatible. We put extra effort into making sure that if
|
||||
// successful we can respond correctly in a format this old, but we do not guarantee that we can respond correctly with an error
|
||||
// response. However if the two nodes are from the same major version then we do guarantee compatibility of error responses.
|
||||
|
||||
final Version remoteVersion = VersionUtils.randomCompatibleVersion(random(), version);
|
||||
final long requestId = randomNonNegativeLong();
|
||||
final Header requestHeader = new Header(between(0, 100), requestId,
|
||||
TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), remoteVersion);
|
||||
final InboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader);
|
||||
requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME;
|
||||
requestHeader.headers = Tuple.tuple(org.elasticsearch.common.collect.Map.of(), org.elasticsearch.common.collect.Map.of());
|
||||
handler.inboundMessage(channel, requestMessage);
|
||||
|
||||
final BytesReference responseBytesReference = channel.getMessageCaptor().get();
|
||||
final Header responseHeader = InboundDecoder.readHeader(remoteVersion, responseBytesReference.length(), responseBytesReference);
|
||||
assertTrue(responseHeader.isResponse());
|
||||
assertTrue(responseHeader.isError());
|
||||
}
|
||||
|
||||
|
||||
public void testClosesChannelOnErrorInHandshakeWithIncompatibleVersion() throws Exception {
|
||||
// Nodes use their minimum compatibility version for the TCP handshake, so a node from v(major-1).x will report its version as
|
||||
// v(major-2).last in the TCP handshake, with which we are not really compatible. We put extra effort into making sure that if
|
||||
// successful we can respond correctly in a format this old, but we do not guarantee that we can respond correctly with an error
|
||||
// response so we must just close the connection on an error. To avoid the failure disappearing into a black hole we at least log
|
||||
// it.
|
||||
|
||||
final MockLogAppender mockAppender = new MockLogAppender();
|
||||
mockAppender.start();
|
||||
mockAppender.addExpectation(
|
||||
new MockLogAppender.SeenEventExpectation(
|
||||
"expected message",
|
||||
InboundHandler.class.getCanonicalName(),
|
||||
Level.WARN,
|
||||
"could not send error response to handshake"));
|
||||
final Logger inboundHandlerLogger = LogManager.getLogger(InboundHandler.class);
|
||||
Loggers.addAppender(inboundHandlerLogger, mockAppender);
|
||||
|
||||
try {
|
||||
final AtomicBoolean isClosed = new AtomicBoolean();
|
||||
channel.addCloseListener(ActionListener.wrap(() -> assertTrue(isClosed.compareAndSet(false, true))));
|
||||
|
||||
final Version remoteVersion = Version.fromId(randomIntBetween(0, version.minimumCompatibilityVersion().id - 1));
|
||||
final long requestId = randomNonNegativeLong();
|
||||
final Header requestHeader = new Header(between(0, 100), requestId,
|
||||
TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)), remoteVersion);
|
||||
final InboundMessage requestMessage = unreadableInboundHandshake(remoteVersion, requestHeader);
|
||||
requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME;
|
||||
requestHeader.headers = Tuple.tuple(org.elasticsearch.common.collect.Map.of(), org.elasticsearch.common.collect.Map.of());
|
||||
handler.inboundMessage(channel, requestMessage);
|
||||
assertTrue(isClosed.get());
|
||||
assertNull(channel.getMessageCaptor().get());
|
||||
mockAppender.assertAllExpectationsMatched();
|
||||
} finally {
|
||||
Loggers.removeAppender(inboundHandlerLogger, mockAppender);
|
||||
mockAppender.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private static InboundMessage unreadableInboundHandshake(Version remoteVersion, Header requestHeader) {
|
||||
return new InboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> { }) {
|
||||
@Override
|
||||
public StreamInput openOrGetStreamInput() {
|
||||
final StreamInput streamInput = new InputStreamStreamInput(new InputStream() {
|
||||
@Override
|
||||
public int read() {
|
||||
throw new ElasticsearchException("unreadable handshake");
|
||||
}
|
||||
});
|
||||
streamInput.setVersion(remoteVersion);
|
||||
return streamInput;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue