Support Multicast discovery for external clients, closes #1532.

This commit is contained in:
Shay Banon 2011-12-11 18:54:07 +02:00
parent 5258b505eb
commit de861d6f43
8 changed files with 283 additions and 89 deletions

View File

@ -39,12 +39,10 @@ import java.util.Map;
/**
* Node information (static, does not change over time).
*
*
*/
public class NodeInfo extends NodeOperationResponse {
private ImmutableMap<String, String> attributes;
private ImmutableMap<String, String> serviceAttributes;
private Settings settings;
@ -63,11 +61,11 @@ public class NodeInfo extends NodeOperationResponse {
NodeInfo() {
}
public NodeInfo(DiscoveryNode node, ImmutableMap<String, String> attributes, Settings settings,
public NodeInfo(DiscoveryNode node, ImmutableMap<String, String> serviceAttributes, Settings settings,
OsInfo os, ProcessInfo process, JvmInfo jvm, NetworkInfo network,
TransportInfo transport, @Nullable HttpInfo http) {
super(node);
this.attributes = attributes;
this.serviceAttributes = serviceAttributes;
this.settings = settings;
this.os = os;
this.process = process;
@ -78,17 +76,17 @@ public class NodeInfo extends NodeOperationResponse {
}
/**
* The attributes of the node.
* The service attributes of the node.
*/
public ImmutableMap<String, String> attributes() {
return this.attributes;
public ImmutableMap<String, String> serviceAttributes() {
return this.serviceAttributes;
}
/**
* The attributes of the node.
*/
public ImmutableMap<String, String> getAttributes() {
return attributes();
public ImmutableMap<String, String> getServiceAttributes() {
return serviceAttributes();
}
/**
@ -191,7 +189,7 @@ public class NodeInfo extends NodeOperationResponse {
for (int i = 0; i < size; i++) {
builder.put(in.readUTF(), in.readUTF());
}
attributes = builder.build();
serviceAttributes = builder.build();
settings = ImmutableSettings.readSettingsFromStream(in);
if (in.readBoolean()) {
os = OsInfo.readOsInfo(in);
@ -216,8 +214,8 @@ public class NodeInfo extends NodeOperationResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
out.writeVInt(serviceAttributes.size());
for (Map.Entry<String, String> entry : serviceAttributes.entrySet()) {
out.writeUTF(entry.getKey());
out.writeUTF(entry.getValue());
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.discovery.zen;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.node.service.NodeService;
/**
*
@ -27,4 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
public interface DiscoveryNodesProvider {
DiscoveryNodes nodes();
@Nullable
NodeService nodeService();
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.UUID;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
@ -45,6 +46,7 @@ import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -106,6 +108,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private final AtomicBoolean initialStateSent = new AtomicBoolean();
@Nullable
private NodeService nodeService;
@Inject
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
@ -137,6 +142,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
}
public void setNodeService(@Nullable NodeService nodeService) {
this.nodeService = nodeService;
}
@Override
protected void doStart() throws ElasticSearchException {
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
@ -227,6 +236,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return newNodesBuilder().put(localNode).localNodeId(localNode.id()).build();
}
@Override
public NodeService nodeService() {
return this.nodeService;
}
@Override
public void publish(ClusterState clusterState) {
if (!master) {

View File

@ -29,6 +29,9 @@ import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingException;
@ -36,10 +39,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.SocketTimeoutException;
import java.net.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -75,7 +75,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
private final NetworkService networkService;
private final boolean sendPing;
private final boolean pingEnabled;
private volatile DiscoveryNodesProvider nodesProvider;
@ -115,7 +115,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
this.bufferSize = componentSettings.getAsInt("buffer_size", 2048);
this.ttl = componentSettings.getAsInt("ttl", 3);
this.sendPing = componentSettings.getAsBoolean("send_ping", true);
this.pingEnabled = componentSettings.getAsBoolean("ping.enabled", true);
logger.debug("using group [{}], with port [{}], ttl [{}], and address [{}]", group, port, ttl, address);
@ -225,7 +225,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
@Override
public void ping(final PingListener listener, final TimeValue timeout) {
if (!sendPing) {
if (!pingEnabled) {
threadPool.cached().execute(new Runnable() {
@Override
public void run() {
@ -359,9 +359,13 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
public void run() {
while (running) {
try {
int id;
DiscoveryNode requestingNodeX;
ClusterName clusterName;
int id = -1;
DiscoveryNode requestingNodeX = null;
ClusterName clusterName = null;
Map<String, Object> externalPingData = null;
XContentType xContentType;
synchronized (receiveMutex) {
try {
multicastSocket.receive(datagramPacketReceive);
@ -374,73 +378,158 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
continue;
}
try {
StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()));
id = input.readInt();
clusterName = ClusterName.readClusterName(input);
requestingNodeX = readNode(input);
xContentType = XContentFactory.xContentType(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength());
if (xContentType != null) {
// an external ping
externalPingData = XContentFactory.xContent(xContentType)
.createParser(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength())
.mapAndClose();
} else {
StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()));
id = input.readInt();
clusterName = ClusterName.readClusterName(input);
requestingNodeX = readNode(input);
}
} catch (Exception e) {
logger.warn("failed to read requesting node from {}", e, datagramPacketReceive.getSocketAddress());
logger.warn("failed to read requesting data from {}", e, datagramPacketReceive.getSocketAddress());
continue;
}
}
DiscoveryNodes discoveryNodes = nodesProvider.nodes();
final DiscoveryNode requestingNode = requestingNodeX;
if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
// that's me, ignore
continue;
}
if (!clusterName.equals(MulticastZenPing.this.clusterName)) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] received ping_request from [{}], but wrong cluster_name [{}], expected [{}], ignoring", id, requestingNode, clusterName, MulticastZenPing.this.clusterName);
}
continue;
}
// don't connect between two client nodes, no need for that...
if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] received ping_request from [{}], both are client nodes, ignoring", id, requestingNode, clusterName);
}
continue;
}
final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
multicastPingResponse.id = id;
multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName);
if (logger.isTraceEnabled()) {
logger.trace("[{}] received ping_request from [{}], sending {}", id, requestingNode, multicastPingResponse.pingResponse);
}
if (!transportService.nodeConnected(requestingNode)) {
// do the connect and send on a thread pool
threadPool.cached().execute(new Runnable() {
@Override
public void run() {
// connect to the node if possible
try {
transportService.connectToNode(requestingNode);
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
});
} catch (Exception e) {
logger.warn("failed to connect to requesting node {}", e, requestingNode);
}
}
});
if (externalPingData != null) {
handleExternalPingRequest(externalPingData, xContentType, datagramPacketReceive.getSocketAddress());
} else {
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
});
handleNodePingRequest(id, requestingNodeX, clusterName);
}
} catch (Exception e) {
logger.warn("unexpected exception in multicast receiver", e);
}
}
}
@SuppressWarnings("unchecked")
private void handleExternalPingRequest(Map<String, Object> externalPingData, XContentType contentType, SocketAddress remoteAddress) {
if (externalPingData.containsKey("response")) {
// ignoring responses sent over the multicast channel
logger.trace("got an external ping response (ignoring) from {}, content {}", remoteAddress, externalPingData);
return;
}
if (multicastSocket == null) {
logger.debug("can't send ping response, no socket, from {}, content {}", remoteAddress, externalPingData);
return;
}
Map<String, Object> request = (Map<String, Object>) externalPingData.get("request");
if (request == null) {
logger.warn("malformed external ping request, no 'request' element from {}, content {}", remoteAddress, externalPingData);
return;
}
String clusterName = request.containsKey("cluster_name") ? request.get("cluster_name").toString() : request.containsKey("clusterName") ? request.get("clusterName").toString() : null;
if (clusterName == null) {
logger.warn("malformed external ping request, missing 'cluster_name' element within request, from {}, content {}", remoteAddress, externalPingData);
return;
}
if (!clusterName.equals(MulticastZenPing.this.clusterName.value())) {
logger.trace("got request for cluster_name {}, but our cluster_name is {}, from {}, content {}", clusterName, MulticastZenPing.this.clusterName.value(), remoteAddress, externalPingData);
return;
}
if (logger.isTraceEnabled()) {
logger.trace("got external ping request from {}, content {}", remoteAddress, externalPingData);
}
try {
DiscoveryNode localNode = nodesProvider.nodes().localNode();
XContentBuilder builder = XContentFactory.contentBuilder(contentType);
builder.startObject().startObject("response");
builder.field("cluster_name", MulticastZenPing.this.clusterName.value());
builder.field("transport_address", localNode.address().toString());
if (nodesProvider.nodeService() != null) {
for (Map.Entry<String, String> attr : nodesProvider.nodeService().attributes().entrySet()) {
builder.field(attr.getKey(), attr.getValue());
}
}
builder.startObject("attributes");
for (Map.Entry<String, String> attr : localNode.attributes().entrySet()) {
builder.field(attr.getKey(), attr.getValue());
}
builder.endObject();
builder.endObject().endObject();
synchronized (sendMutex) {
datagramPacketSend.setData(builder.underlyingBytes(), 0, builder.underlyingBytesLength());
multicastSocket.send(datagramPacketSend);
if (logger.isTraceEnabled()) {
logger.trace("sending external ping response {}", builder.string());
}
}
} catch (Exception e) {
logger.warn("failed to send external multicast response", e);
}
}
private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName clusterName) {
if (!pingEnabled) {
return;
}
DiscoveryNodes discoveryNodes = nodesProvider.nodes();
final DiscoveryNode requestingNode = requestingNodeX;
if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
// that's me, ignore
return;
}
if (!clusterName.equals(MulticastZenPing.this.clusterName)) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] received ping_request from [{}], but wrong cluster_name [{}], expected [{}], ignoring", id, requestingNode, clusterName, MulticastZenPing.this.clusterName);
}
return;
}
// don't connect between two client nodes, no need for that...
if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] received ping_request from [{}], both are client nodes, ignoring", id, requestingNode, clusterName);
}
return;
}
final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
multicastPingResponse.id = id;
multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName);
if (logger.isTraceEnabled()) {
logger.trace("[{}] received ping_request from [{}], sending {}", id, requestingNode, multicastPingResponse.pingResponse);
}
if (!transportService.nodeConnected(requestingNode)) {
// do the connect and send on a thread pool
threadPool.cached().execute(new Runnable() {
@Override
public void run() {
// connect to the node if possible
try {
transportService.connectToNode(requestingNode);
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
});
} catch (Exception e) {
logger.warn("failed to connect to requesting node {}", e, requestingNode);
}
}
});
} else {
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
});
}
}
}
}

View File

@ -48,7 +48,7 @@ public class NodeService extends AbstractComponent {
@Nullable
private HttpServer httpServer;
private volatile ImmutableMap<String, String> nodeAttributes = ImmutableMap.of();
private volatile ImmutableMap<String, String> serviceAttributes = ImmutableMap.of();
@Inject
public NodeService(Settings settings, MonitorService monitorService, ClusterService clusterService, TransportService transportService, IndicesService indicesService) {
@ -63,16 +63,33 @@ public class NodeService extends AbstractComponent {
this.httpServer = httpServer;
}
public synchronized void putNodeAttribute(String key, String value) {
nodeAttributes = new MapBuilder<String, String>().putAll(nodeAttributes).put(key, value).immutableMap();
@Deprecated
public void putNodeAttribute(String key, String value) {
putAttribute(key, value);
}
public synchronized void removeNodeAttribute(String key) {
nodeAttributes = new MapBuilder<String, String>().putAll(nodeAttributes).remove(key).immutableMap();
@Deprecated
public void removeNodeAttribute(String key) {
removeAttribute(key);
}
public synchronized void putAttribute(String key, String value) {
serviceAttributes = new MapBuilder<String, String>().putAll(serviceAttributes).put(key, value).immutableMap();
}
public synchronized void removeAttribute(String key) {
serviceAttributes = new MapBuilder<String, String>().putAll(serviceAttributes).remove(key).immutableMap();
}
/**
* Attributes different services in the node can add to be reported as part of the node info (for example).
*/
public ImmutableMap<String, String> attributes() {
return this.serviceAttributes;
}
public NodeInfo info() {
return new NodeInfo(clusterService.state().nodes().localNode(), nodeAttributes, settings,
return new NodeInfo(clusterService.state().nodes().localNode(), serviceAttributes, settings,
monitorService.osService().info(), monitorService.processService().info(),
monitorService.jvmService().info(), monitorService.networkService().info(),
transportService.info(), httpServer == null ? null : httpServer.info());

View File

@ -73,15 +73,16 @@ public class RestNodesInfoAction extends BaseRestHandler {
builder.field("name", nodeInfo.node().name(), XContentBuilder.FieldCaseConversion.NONE);
builder.field("transport_address", nodeInfo.node().address().toString());
for (Map.Entry<String, String> nodeAttribute : nodeInfo.serviceAttributes().entrySet()) {
builder.field(nodeAttribute.getKey(), nodeAttribute.getValue());
}
builder.startObject("attributes");
for (Map.Entry<String, String> attr : nodeInfo.node().attributes().entrySet()) {
builder.field(attr.getKey(), attr.getValue());
}
builder.endObject();
for (Map.Entry<String, String> nodeAttribute : nodeInfo.attributes().entrySet()) {
builder.field(nodeAttribute.getKey(), nodeAttribute.getValue());
}
if (includeSettings) {
builder.startObject("settings");

View File

@ -22,15 +22,23 @@ package org.elasticsearch.test.unit.discovery.zen.ping.multicast;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.local.LocalTransport;
import org.testng.annotations.Test;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@ -56,6 +64,11 @@ public class MulticastZenPingTests {
public DiscoveryNodes nodes() {
return DiscoveryNodes.newNodesBuilder().put(nodeA).localNodeId("A").build();
}
@Override
public NodeService nodeService() {
return null;
}
});
zenPingA.start();
@ -65,6 +78,11 @@ public class MulticastZenPingTests {
public DiscoveryNodes nodes() {
return DiscoveryNodes.newNodesBuilder().put(nodeB).localNodeId("B").build();
}
@Override
public NodeService nodeService() {
return null;
}
});
zenPingB.start();
@ -80,4 +98,45 @@ public class MulticastZenPingTests {
threadPool.shutdown();
}
}
@Test
public void testExternalPing() throws Exception {
ThreadPool threadPool = new ThreadPool();
ClusterName clusterName = new ClusterName("test");
final TransportService transportServiceA = new TransportService(new LocalTransport(threadPool), threadPool).start();
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress());
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName);
zenPingA.setNodesProvider(new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.newNodesBuilder().put(nodeA).localNodeId("A").build();
}
@Override
public NodeService nodeService() {
return null;
}
});
zenPingA.start();
try {
Loggers.getLogger(MulticastZenPing.class).setLevel("TRACE");
MulticastSocket multicastSocket = new MulticastSocket(54328);
multicastSocket.setReceiveBufferSize(2048);
multicastSocket.setSendBufferSize(2048);
multicastSocket.setSoTimeout(60000);
DatagramPacket datagramPacket = new DatagramPacket(new byte[2048], 2048, InetAddress.getByName("224.2.2.4"), 54328);
XContentBuilder builder = XContentFactory.jsonBuilder().startObject().startObject("request").field("cluster_name", "test").endObject().endObject();
datagramPacket.setData(builder.copiedBytes());
multicastSocket.send(datagramPacket);
Thread.sleep(100);
} finally {
Loggers.getLogger(MulticastZenPing.class).setLevel("INFO");
zenPingA.close();
threadPool.shutdown();
}
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty.NettyTransport;
@ -69,6 +70,11 @@ public class UnicastZenPingTests {
public DiscoveryNodes nodes() {
return DiscoveryNodes.newNodesBuilder().put(nodeA).localNodeId("A").build();
}
@Override
public NodeService nodeService() {
return null;
}
});
zenPingA.start();
@ -78,6 +84,11 @@ public class UnicastZenPingTests {
public DiscoveryNodes nodes() {
return DiscoveryNodes.newNodesBuilder().put(nodeB).localNodeId("B").build();
}
@Override
public NodeService nodeService() {
return null;
}
});
zenPingB.start();