diff --git a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 302f8296ad3..9eef4401144 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -127,6 +127,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } streamIn = compressor.streamInput(streamIn); } + if (version.onOrAfter(Version.CURRENT.minimumCompatibilityVersion()) == false || version.major != Version.CURRENT.major) { + throw new IllegalStateException("Received message from unsupported version: [" + version + + "] minimal compatible version is: [" +Version.CURRENT.minimumCompatibilityVersion() + "]"); + } streamIn.setVersion(version); if (TransportStatus.isRequest(status)) { threadContext.readHeaders(streamIn); diff --git a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 454fa836b8e..2a25b86bc83 100644 --- a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -56,11 +56,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { protected ThreadPool threadPool; - protected static final Version version0 = Version.fromId(/*0*/99); + protected static final Version version0 = Version.CURRENT.minimumCompatibilityVersion(); protected DiscoveryNode nodeA; protected MockTransportService serviceA; - protected static final Version version1 = Version.fromId(199); + protected static final Version version1 = Version.fromId(Version.CURRENT.id+1); protected DiscoveryNode nodeB; protected MockTransportService serviceB; @@ -542,12 +542,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testTimeoutSendExceptionWithDelayedResponse() throws Exception { + CountDownLatch doneLatch = new CountDownLatch(1); serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { @Override public void messageReceived(StringMessageRequest request, TransportChannel channel) { TimeValue sleep = TimeValue.parseTimeValue(request.message, null, "sleep"); try { - Thread.sleep(sleep.millis()); + doneLatch.await(sleep.millis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // ignore } @@ -625,6 +626,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } serviceA.removeHandler("sayHelloTimeoutDelayedResponse"); + doneLatch.countDown(); } @TestLogging(value = "test. transport.tracer:TRACE")