Fix error message if an incompatible node connects (#24884)

This message broken in recent refactoring, this commit also adds a
basic unit-test to ensure we maintain the correct version.
This commit is contained in:
Simon Willnauer 2017-05-25 15:02:43 +02:00 committed by GitHub
parent 7d03cff820
commit 1325681a03
2 changed files with 37 additions and 11 deletions

View File

@ -1324,16 +1324,8 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
streamIn = compressor.streamInput(streamIn);
}
// for handshakes we are compatible with N-2 since otherwise we can't figure out our initial version
// since we are compatible with N-1 and N+1 so we always send our minCompatVersion as the initial version in the
// handshake. This looks odd but it's required to establish the connection correctly we check for real compatibility
// once the connection is established
final Version compatibilityVersion = TransportStatus.isHandshake(status) ? getCurrentVersion().minimumCompatibilityVersion()
: getCurrentVersion();
if (version.isCompatible(compatibilityVersion) == false) {
throw new IllegalStateException("Received message from unsupported version: [" + version
+ "] minimal compatible version is: [" + compatibilityVersion.minimumCompatibilityVersion() + "]");
}
final boolean isHandshake = TransportStatus.isHandshake(status);
ensureVersionCompatibility(version, getCurrentVersion(), isHandshake);
streamIn = new NamedWriteableAwareStreamInput(streamIn, namedWriteableRegistry);
streamIn.setVersion(version);
threadPool.getThreadContext().readHeaders(streamIn);
@ -1341,7 +1333,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status);
} else {
final TransportResponseHandler<?> handler;
if (TransportStatus.isHandshake(status)) {
if (isHandshake) {
handler = pendingHandshakes.remove(requestId);
} else {
TransportResponseHandler theHandler = transportServiceAdapter.onResponseReceived(requestId);
@ -1377,6 +1369,19 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
}
static void ensureVersionCompatibility(Version version, Version currentVersion, boolean isHandshake) {
// for handshakes we are compatible with N-2 since otherwise we can't figure out our initial version
// since we are compatible with N-1 and N+1 so we always send our minCompatVersion as the initial version in the
// handshake. This looks odd but it's required to establish the connection correctly we check for real compatibility
// once the connection is established
final Version compatibilityVersion = isHandshake ? currentVersion.minimumCompatibilityVersion() : currentVersion;
if (version.isCompatible(compatibilityVersion) == false) {
final Version minCompatibilityVersion = isHandshake ? compatibilityVersion : compatibilityVersion.minimumCompatibilityVersion();
String msg = "Received " + (isHandshake? "handshake " : "") + "message from unsupported version: [";
throw new IllegalStateException(msg + version + "] minimal compatible version is: [" + minCompatibilityVersion + "]");
}
}
private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, final TransportResponseHandler handler) {
final TransportResponse response = handler.newInstance();
response.remoteAddress(new TransportAddress(remoteAddress));

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -150,6 +151,26 @@ public class TCPTransportTests extends ESTestCase {
assertEquals(102, addresses[2].getPort());
}
public void testEnsureVersionCompatibility() {
TcpTransport.ensureVersionCompatibility(VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(),
Version.CURRENT), Version.CURRENT, randomBoolean());
TcpTransport.ensureVersionCompatibility(Version.fromString("5.0.0"), Version.fromString("6.0.0"), true);
IllegalStateException ise = expectThrows(IllegalStateException.class, () ->
TcpTransport.ensureVersionCompatibility(Version.fromString("5.0.0"), Version.fromString("6.0.0"), false));
assertEquals("Received message from unsupported version: [5.0.0] minimal compatible version is: [5.4.0]", ise.getMessage());
ise = expectThrows(IllegalStateException.class, () ->
TcpTransport.ensureVersionCompatibility(Version.fromString("2.3.0"), Version.fromString("6.0.0"), true));
assertEquals("Received handshake message from unsupported version: [2.3.0] minimal compatible version is: [5.4.0]",
ise.getMessage());
ise = expectThrows(IllegalStateException.class, () ->
TcpTransport.ensureVersionCompatibility(Version.fromString("2.3.0"), Version.fromString("6.0.0"), false));
assertEquals("Received message from unsupported version: [2.3.0] minimal compatible version is: [5.4.0]",
ise.getMessage());
}
public void testCompressRequest() throws IOException {
final boolean compressed = randomBoolean();
final AtomicBoolean called = new AtomicBoolean(false);