diff --git a/src/main/java/org/elasticsearch/Version.java b/src/main/java/org/elasticsearch/Version.java index 4df0bded3f6..26481821c69 100644 --- a/src/main/java/org/elasticsearch/Version.java +++ b/src/main/java/org/elasticsearch/Version.java @@ -58,7 +58,7 @@ public class Version { return fromId(in.readVInt()); } - private static Version fromId(int id) { + public static Version fromId(int id) { switch (id) { case V_0_18_0_ID: return V_0_18_0; diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 356c60cc7f8..9c042216cf4 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -58,6 +58,8 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF */ public class MulticastZenPing extends AbstractLifecycleComponent implements ZenPing { + private static final byte[] INTERNAL_HEADER = new byte[]{1, 9, 8, 4}; + private final String address; private final int port; @@ -267,6 +269,8 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); try { HandlesStreamOutput out = cachedEntry.cachedHandlesBytes(); + out.writeBytes(INTERNAL_HEADER); + Version.writeVersion(Version.CURRENT, out); out.writeInt(id); clusterName.writeTo(out); nodesProvider.nodes().localNode().writeTo(out); @@ -365,7 +369,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem ClusterName clusterName = null; Map externalPingData = null; - XContentType xContentType; + XContentType xContentType = null; synchronized (receiveMutex) { try { @@ -379,17 +383,34 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem continue; } try { - xContentType = XContentFactory.xContentType(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()); - if (xContentType != null) { - // an external ping - externalPingData = XContentFactory.xContent(xContentType) - .createParser(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()) - .mapAndClose(); - } else { - StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength())); + boolean internal = false; + if (datagramPacketReceive.getLength() > 4) { + int counter = 0; + for (; counter < INTERNAL_HEADER.length; counter++) { + if (datagramPacketReceive.getData()[datagramPacketReceive.getOffset() + counter] != INTERNAL_HEADER[counter]) { + break; + } + } + if (counter == INTERNAL_HEADER.length) { + internal = true; + } + } + if (internal) { + StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset() + INTERNAL_HEADER.length, datagramPacketReceive.getLength())); + Version version = Version.readVersion(input); id = input.readInt(); clusterName = ClusterName.readClusterName(input); requestingNodeX = readNode(input); + } else { + xContentType = XContentFactory.xContentType(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()); + if (xContentType != null) { + // an external ping + externalPingData = XContentFactory.xContent(xContentType) + .createParser(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()) + .mapAndClose(); + } else { + throw new ElasticSearchIllegalStateException("failed multicast message, probably message from previous version"); + } } } catch (Exception e) { logger.warn("failed to read requesting data from {}", e, datagramPacketReceive.getSocketAddress());