diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java index 526675f8cfb..58fde8e2e5d 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java @@ -39,12 +39,10 @@ import java.util.Map; /** * Node information (static, does not change over time). - * - * */ public class NodeInfo extends NodeOperationResponse { - private ImmutableMap attributes; + private ImmutableMap serviceAttributes; private Settings settings; @@ -63,11 +61,11 @@ public class NodeInfo extends NodeOperationResponse { NodeInfo() { } - public NodeInfo(DiscoveryNode node, ImmutableMap attributes, Settings settings, + public NodeInfo(DiscoveryNode node, ImmutableMap serviceAttributes, Settings settings, OsInfo os, ProcessInfo process, JvmInfo jvm, NetworkInfo network, TransportInfo transport, @Nullable HttpInfo http) { super(node); - this.attributes = attributes; + this.serviceAttributes = serviceAttributes; this.settings = settings; this.os = os; this.process = process; @@ -78,17 +76,17 @@ public class NodeInfo extends NodeOperationResponse { } /** - * The attributes of the node. + * The service attributes of the node. */ - public ImmutableMap attributes() { - return this.attributes; + public ImmutableMap serviceAttributes() { + return this.serviceAttributes; } /** * The attributes of the node. */ - public ImmutableMap getAttributes() { - return attributes(); + public ImmutableMap getServiceAttributes() { + return serviceAttributes(); } /** @@ -191,7 +189,7 @@ public class NodeInfo extends NodeOperationResponse { for (int i = 0; i < size; i++) { builder.put(in.readUTF(), in.readUTF()); } - attributes = builder.build(); + serviceAttributes = builder.build(); settings = ImmutableSettings.readSettingsFromStream(in); if (in.readBoolean()) { os = OsInfo.readOsInfo(in); @@ -216,8 +214,8 @@ public class NodeInfo extends NodeOperationResponse { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeVInt(attributes.size()); - for (Map.Entry entry : attributes.entrySet()) { + out.writeVInt(serviceAttributes.size()); + for (Map.Entry entry : serviceAttributes.entrySet()) { out.writeUTF(entry.getKey()); out.writeUTF(entry.getValue()); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/DiscoveryNodesProvider.java b/src/main/java/org/elasticsearch/discovery/zen/DiscoveryNodesProvider.java index d2487b65f6d..465d2f6647f 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/DiscoveryNodesProvider.java +++ b/src/main/java/org/elasticsearch/discovery/zen/DiscoveryNodesProvider.java @@ -20,6 +20,8 @@ package org.elasticsearch.discovery.zen; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.node.service.NodeService; /** * @@ -27,4 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; public interface DiscoveryNodesProvider { DiscoveryNodes nodes(); + + @Nullable + NodeService nodeService(); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index a85b14f282b..a07f3e69b37 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.UUID; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; @@ -45,6 +46,7 @@ import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.ZenPingService; import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.node.service.NodeService; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -106,6 +108,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private final AtomicBoolean initialStateSent = new AtomicBoolean(); + @Nullable + private NodeService nodeService; + @Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService, @@ -137,6 +142,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen this.membership = new MembershipAction(settings, transportService, this, new MembershipListener()); } + public void setNodeService(@Nullable NodeService nodeService) { + this.nodeService = nodeService; + } + @Override protected void doStart() throws ElasticSearchException { Map nodeAttributes = discoveryNodeService.buildAttributes(); @@ -227,6 +236,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return newNodesBuilder().put(localNode).localNodeId(localNode.id()).build(); } + @Override + public NodeService nodeService() { + return this.nodeService; + } + @Override public void publish(ClusterState clusterState) { if (!master) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 7878e979bff..61b7518f878 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -29,6 +29,9 @@ import org.elasticsearch.common.io.stream.*; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.ZenPingException; @@ -36,10 +39,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetAddress; -import java.net.MulticastSocket; -import java.net.SocketTimeoutException; +import java.net.*; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -75,7 +75,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem private final NetworkService networkService; - private final boolean sendPing; + private final boolean pingEnabled; private volatile DiscoveryNodesProvider nodesProvider; @@ -115,7 +115,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem this.bufferSize = componentSettings.getAsInt("buffer_size", 2048); this.ttl = componentSettings.getAsInt("ttl", 3); - this.sendPing = componentSettings.getAsBoolean("send_ping", true); + this.pingEnabled = componentSettings.getAsBoolean("ping.enabled", true); logger.debug("using group [{}], with port [{}], ttl [{}], and address [{}]", group, port, ttl, address); @@ -225,7 +225,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem @Override public void ping(final PingListener listener, final TimeValue timeout) { - if (!sendPing) { + if (!pingEnabled) { threadPool.cached().execute(new Runnable() { @Override public void run() { @@ -359,9 +359,13 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem public void run() { while (running) { try { - int id; - DiscoveryNode requestingNodeX; - ClusterName clusterName; + int id = -1; + DiscoveryNode requestingNodeX = null; + ClusterName clusterName = null; + + Map externalPingData = null; + XContentType xContentType; + synchronized (receiveMutex) { try { multicastSocket.receive(datagramPacketReceive); @@ -374,73 +378,158 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem continue; } try { - StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength())); - id = input.readInt(); - clusterName = ClusterName.readClusterName(input); - requestingNodeX = readNode(input); + xContentType = XContentFactory.xContentType(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()); + if (xContentType != null) { + // an external ping + externalPingData = XContentFactory.xContent(xContentType) + .createParser(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()) + .mapAndClose(); + } else { + StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength())); + id = input.readInt(); + clusterName = ClusterName.readClusterName(input); + requestingNodeX = readNode(input); + } } catch (Exception e) { - logger.warn("failed to read requesting node from {}", e, datagramPacketReceive.getSocketAddress()); + logger.warn("failed to read requesting data from {}", e, datagramPacketReceive.getSocketAddress()); continue; } } - DiscoveryNodes discoveryNodes = nodesProvider.nodes(); - final DiscoveryNode requestingNode = requestingNodeX; - if (requestingNode.id().equals(discoveryNodes.localNodeId())) { - // that's me, ignore - continue; - } - if (!clusterName.equals(MulticastZenPing.this.clusterName)) { - if (logger.isTraceEnabled()) { - logger.trace("[{}] received ping_request from [{}], but wrong cluster_name [{}], expected [{}], ignoring", id, requestingNode, clusterName, MulticastZenPing.this.clusterName); - } - continue; - } - // don't connect between two client nodes, no need for that... - if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) { - if (logger.isTraceEnabled()) { - logger.trace("[{}] received ping_request from [{}], both are client nodes, ignoring", id, requestingNode, clusterName); - } - continue; - } - final MulticastPingResponse multicastPingResponse = new MulticastPingResponse(); - multicastPingResponse.id = id; - multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName); - - if (logger.isTraceEnabled()) { - logger.trace("[{}] received ping_request from [{}], sending {}", id, requestingNode, multicastPingResponse.pingResponse); - } - - if (!transportService.nodeConnected(requestingNode)) { - // do the connect and send on a thread pool - threadPool.cached().execute(new Runnable() { - @Override - public void run() { - // connect to the node if possible - try { - transportService.connectToNode(requestingNode); - transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleException(TransportException exp) { - logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); - } - }); - } catch (Exception e) { - logger.warn("failed to connect to requesting node {}", e, requestingNode); - } - } - }); + if (externalPingData != null) { + handleExternalPingRequest(externalPingData, xContentType, datagramPacketReceive.getSocketAddress()); } else { - transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleException(TransportException exp) { - logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); - } - }); + handleNodePingRequest(id, requestingNodeX, clusterName); } } catch (Exception e) { logger.warn("unexpected exception in multicast receiver", e); } } } + + @SuppressWarnings("unchecked") + private void handleExternalPingRequest(Map externalPingData, XContentType contentType, SocketAddress remoteAddress) { + if (externalPingData.containsKey("response")) { + // ignoring responses sent over the multicast channel + logger.trace("got an external ping response (ignoring) from {}, content {}", remoteAddress, externalPingData); + return; + } + + if (multicastSocket == null) { + logger.debug("can't send ping response, no socket, from {}, content {}", remoteAddress, externalPingData); + return; + } + + Map request = (Map) externalPingData.get("request"); + if (request == null) { + logger.warn("malformed external ping request, no 'request' element from {}, content {}", remoteAddress, externalPingData); + return; + } + + String clusterName = request.containsKey("cluster_name") ? request.get("cluster_name").toString() : request.containsKey("clusterName") ? request.get("clusterName").toString() : null; + if (clusterName == null) { + logger.warn("malformed external ping request, missing 'cluster_name' element within request, from {}, content {}", remoteAddress, externalPingData); + return; + } + + if (!clusterName.equals(MulticastZenPing.this.clusterName.value())) { + logger.trace("got request for cluster_name {}, but our cluster_name is {}, from {}, content {}", clusterName, MulticastZenPing.this.clusterName.value(), remoteAddress, externalPingData); + return; + } + if (logger.isTraceEnabled()) { + logger.trace("got external ping request from {}, content {}", remoteAddress, externalPingData); + } + + try { + DiscoveryNode localNode = nodesProvider.nodes().localNode(); + + XContentBuilder builder = XContentFactory.contentBuilder(contentType); + builder.startObject().startObject("response"); + builder.field("cluster_name", MulticastZenPing.this.clusterName.value()); + builder.field("transport_address", localNode.address().toString()); + + if (nodesProvider.nodeService() != null) { + for (Map.Entry attr : nodesProvider.nodeService().attributes().entrySet()) { + builder.field(attr.getKey(), attr.getValue()); + } + } + + builder.startObject("attributes"); + for (Map.Entry attr : localNode.attributes().entrySet()) { + builder.field(attr.getKey(), attr.getValue()); + } + builder.endObject(); + + builder.endObject().endObject(); + synchronized (sendMutex) { + datagramPacketSend.setData(builder.underlyingBytes(), 0, builder.underlyingBytesLength()); + multicastSocket.send(datagramPacketSend); + if (logger.isTraceEnabled()) { + logger.trace("sending external ping response {}", builder.string()); + } + } + } catch (Exception e) { + logger.warn("failed to send external multicast response", e); + } + } + + private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName clusterName) { + if (!pingEnabled) { + return; + } + DiscoveryNodes discoveryNodes = nodesProvider.nodes(); + final DiscoveryNode requestingNode = requestingNodeX; + if (requestingNode.id().equals(discoveryNodes.localNodeId())) { + // that's me, ignore + return; + } + if (!clusterName.equals(MulticastZenPing.this.clusterName)) { + if (logger.isTraceEnabled()) { + logger.trace("[{}] received ping_request from [{}], but wrong cluster_name [{}], expected [{}], ignoring", id, requestingNode, clusterName, MulticastZenPing.this.clusterName); + } + return; + } + // don't connect between two client nodes, no need for that... + if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) { + if (logger.isTraceEnabled()) { + logger.trace("[{}] received ping_request from [{}], both are client nodes, ignoring", id, requestingNode, clusterName); + } + return; + } + final MulticastPingResponse multicastPingResponse = new MulticastPingResponse(); + multicastPingResponse.id = id; + multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName); + + if (logger.isTraceEnabled()) { + logger.trace("[{}] received ping_request from [{}], sending {}", id, requestingNode, multicastPingResponse.pingResponse); + } + + if (!transportService.nodeConnected(requestingNode)) { + // do the connect and send on a thread pool + threadPool.cached().execute(new Runnable() { + @Override + public void run() { + // connect to the node if possible + try { + transportService.connectToNode(requestingNode); + transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleException(TransportException exp) { + logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); + } + }); + } catch (Exception e) { + logger.warn("failed to connect to requesting node {}", e, requestingNode); + } + } + }); + } else { + transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleException(TransportException exp) { + logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); + } + }); + } + } } } diff --git a/src/main/java/org/elasticsearch/node/service/NodeService.java b/src/main/java/org/elasticsearch/node/service/NodeService.java index 28537bfed74..3728e450f9c 100644 --- a/src/main/java/org/elasticsearch/node/service/NodeService.java +++ b/src/main/java/org/elasticsearch/node/service/NodeService.java @@ -48,7 +48,7 @@ public class NodeService extends AbstractComponent { @Nullable private HttpServer httpServer; - private volatile ImmutableMap nodeAttributes = ImmutableMap.of(); + private volatile ImmutableMap serviceAttributes = ImmutableMap.of(); @Inject public NodeService(Settings settings, MonitorService monitorService, ClusterService clusterService, TransportService transportService, IndicesService indicesService) { @@ -63,16 +63,33 @@ public class NodeService extends AbstractComponent { this.httpServer = httpServer; } - public synchronized void putNodeAttribute(String key, String value) { - nodeAttributes = new MapBuilder().putAll(nodeAttributes).put(key, value).immutableMap(); + @Deprecated + public void putNodeAttribute(String key, String value) { + putAttribute(key, value); } - public synchronized void removeNodeAttribute(String key) { - nodeAttributes = new MapBuilder().putAll(nodeAttributes).remove(key).immutableMap(); + @Deprecated + public void removeNodeAttribute(String key) { + removeAttribute(key); + } + + public synchronized void putAttribute(String key, String value) { + serviceAttributes = new MapBuilder().putAll(serviceAttributes).put(key, value).immutableMap(); + } + + public synchronized void removeAttribute(String key) { + serviceAttributes = new MapBuilder().putAll(serviceAttributes).remove(key).immutableMap(); + } + + /** + * Attributes different services in the node can add to be reported as part of the node info (for example). + */ + public ImmutableMap attributes() { + return this.serviceAttributes; } public NodeInfo info() { - return new NodeInfo(clusterService.state().nodes().localNode(), nodeAttributes, settings, + return new NodeInfo(clusterService.state().nodes().localNode(), serviceAttributes, settings, monitorService.osService().info(), monitorService.processService().info(), monitorService.jvmService().info(), monitorService.networkService().info(), transportService.info(), httpServer == null ? null : httpServer.info()); diff --git a/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/info/RestNodesInfoAction.java b/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/info/RestNodesInfoAction.java index 20f46ba31f6..184e3ed9c6c 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/info/RestNodesInfoAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/info/RestNodesInfoAction.java @@ -73,15 +73,16 @@ public class RestNodesInfoAction extends BaseRestHandler { builder.field("name", nodeInfo.node().name(), XContentBuilder.FieldCaseConversion.NONE); builder.field("transport_address", nodeInfo.node().address().toString()); + for (Map.Entry nodeAttribute : nodeInfo.serviceAttributes().entrySet()) { + builder.field(nodeAttribute.getKey(), nodeAttribute.getValue()); + } + builder.startObject("attributes"); for (Map.Entry attr : nodeInfo.node().attributes().entrySet()) { builder.field(attr.getKey(), attr.getValue()); } builder.endObject(); - for (Map.Entry nodeAttribute : nodeInfo.attributes().entrySet()) { - builder.field(nodeAttribute.getKey(), nodeAttribute.getValue()); - } if (includeSettings) { builder.startObject("settings"); diff --git a/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/multicast/MulticastZenPingTests.java b/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/multicast/MulticastZenPingTests.java index 9c2cce4476d..3b3c47e9276 100644 --- a/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/multicast/MulticastZenPingTests.java +++ b/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/multicast/MulticastZenPingTests.java @@ -22,15 +22,23 @@ package org.elasticsearch.test.unit.discovery.zen.ping.multicast; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing; +import org.elasticsearch.node.service.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.local.LocalTransport; import org.testng.annotations.Test; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.MulticastSocket; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -56,6 +64,11 @@ public class MulticastZenPingTests { public DiscoveryNodes nodes() { return DiscoveryNodes.newNodesBuilder().put(nodeA).localNodeId("A").build(); } + + @Override + public NodeService nodeService() { + return null; + } }); zenPingA.start(); @@ -65,6 +78,11 @@ public class MulticastZenPingTests { public DiscoveryNodes nodes() { return DiscoveryNodes.newNodesBuilder().put(nodeB).localNodeId("B").build(); } + + @Override + public NodeService nodeService() { + return null; + } }); zenPingB.start(); @@ -80,4 +98,45 @@ public class MulticastZenPingTests { threadPool.shutdown(); } } + + @Test + public void testExternalPing() throws Exception { + ThreadPool threadPool = new ThreadPool(); + + ClusterName clusterName = new ClusterName("test"); + final TransportService transportServiceA = new TransportService(new LocalTransport(threadPool), threadPool).start(); + final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress()); + + MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName); + zenPingA.setNodesProvider(new DiscoveryNodesProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.newNodesBuilder().put(nodeA).localNodeId("A").build(); + } + + @Override + public NodeService nodeService() { + return null; + } + }); + zenPingA.start(); + + try { + Loggers.getLogger(MulticastZenPing.class).setLevel("TRACE"); + MulticastSocket multicastSocket = new MulticastSocket(54328); + multicastSocket.setReceiveBufferSize(2048); + multicastSocket.setSendBufferSize(2048); + multicastSocket.setSoTimeout(60000); + + DatagramPacket datagramPacket = new DatagramPacket(new byte[2048], 2048, InetAddress.getByName("224.2.2.4"), 54328); + XContentBuilder builder = XContentFactory.jsonBuilder().startObject().startObject("request").field("cluster_name", "test").endObject().endObject(); + datagramPacket.setData(builder.copiedBytes()); + multicastSocket.send(datagramPacket); + Thread.sleep(100); + } finally { + Loggers.getLogger(MulticastZenPing.class).setLevel("INFO"); + zenPingA.close(); + threadPool.shutdown(); + } + } } diff --git a/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/unicast/UnicastZenPingTests.java b/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/unicast/UnicastZenPingTests.java index eedf50d39d0..ebeb09e8776 100644 --- a/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/unicast/UnicastZenPingTests.java +++ b/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/unicast/UnicastZenPingTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; +import org.elasticsearch.node.service.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.netty.NettyTransport; @@ -69,6 +70,11 @@ public class UnicastZenPingTests { public DiscoveryNodes nodes() { return DiscoveryNodes.newNodesBuilder().put(nodeA).localNodeId("A").build(); } + + @Override + public NodeService nodeService() { + return null; + } }); zenPingA.start(); @@ -78,6 +84,11 @@ public class UnicastZenPingTests { public DiscoveryNodes nodes() { return DiscoveryNodes.newNodesBuilder().put(nodeB).localNodeId("B").build(); } + + @Override + public NodeService nodeService() { + return null; + } }); zenPingB.start();