improve multicast message to have an internal header for node to node discovery

This commit is contained in:
Shay Banon 2011-12-16 18:29:10 +02:00
parent 3ddbc6469a
commit f74256afcd
2 changed files with 31 additions and 10 deletions

View File

@ -58,7 +58,7 @@ public class Version {
return fromId(in.readVInt());
}
private static Version fromId(int id) {
public static Version fromId(int id) {
switch (id) {
case V_0_18_0_ID:
return V_0_18_0;

View File

@ -58,6 +58,8 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
*/
public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implements ZenPing {
private static final byte[] INTERNAL_HEADER = new byte[]{1, 9, 8, 4};
private final String address;
private final int port;
@ -267,6 +269,8 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
HandlesStreamOutput out = cachedEntry.cachedHandlesBytes();
out.writeBytes(INTERNAL_HEADER);
Version.writeVersion(Version.CURRENT, out);
out.writeInt(id);
clusterName.writeTo(out);
nodesProvider.nodes().localNode().writeTo(out);
@ -365,7 +369,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
ClusterName clusterName = null;
Map<String, Object> externalPingData = null;
XContentType xContentType;
XContentType xContentType = null;
synchronized (receiveMutex) {
try {
@ -379,17 +383,34 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
continue;
}
try {
xContentType = XContentFactory.xContentType(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength());
if (xContentType != null) {
// an external ping
externalPingData = XContentFactory.xContent(xContentType)
.createParser(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength())
.mapAndClose();
} else {
StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()));
boolean internal = false;
if (datagramPacketReceive.getLength() > 4) {
int counter = 0;
for (; counter < INTERNAL_HEADER.length; counter++) {
if (datagramPacketReceive.getData()[datagramPacketReceive.getOffset() + counter] != INTERNAL_HEADER[counter]) {
break;
}
}
if (counter == INTERNAL_HEADER.length) {
internal = true;
}
}
if (internal) {
StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset() + INTERNAL_HEADER.length, datagramPacketReceive.getLength()));
Version version = Version.readVersion(input);
id = input.readInt();
clusterName = ClusterName.readClusterName(input);
requestingNodeX = readNode(input);
} else {
xContentType = XContentFactory.xContentType(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength());
if (xContentType != null) {
// an external ping
externalPingData = XContentFactory.xContent(xContentType)
.createParser(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength())
.mapAndClose();
} else {
throw new ElasticSearchIllegalStateException("failed multicast message, probably message from previous version");
}
}
} catch (Exception e) {
logger.warn("failed to read requesting data from {}", e, datagramPacketReceive.getSocketAddress());