diff --git a/.projectile b/.projectile index d2a5e762a88..49e2b292c26 100644 --- a/.projectile +++ b/.projectile @@ -16,7 +16,6 @@ -/plugins/discovery-azure/target -/plugins/discovery-ec2/target -/plugins/discovery-gce/target --/plugins/discovery-multicast/target -/plugins/jvm-example/target -/plugins/lang-expression/target -/plugins/lang-groovy/target diff --git a/core/src/main/java/org/elasticsearch/plugins/PluginManager.java b/core/src/main/java/org/elasticsearch/plugins/PluginManager.java index a107c957bd4..044b4c9dbbd 100644 --- a/core/src/main/java/org/elasticsearch/plugins/PluginManager.java +++ b/core/src/main/java/org/elasticsearch/plugins/PluginManager.java @@ -100,7 +100,6 @@ public class PluginManager { "discovery-azure", "discovery-ec2", "discovery-gce", - "discovery-multicast", "ingest-geoip", "lang-javascript", "lang-painless", diff --git a/core/src/main/resources/org/elasticsearch/plugins/plugin-install.help b/core/src/main/resources/org/elasticsearch/plugins/plugin-install.help index 9d5b8b3d68d..15efac60308 100644 --- a/core/src/main/resources/org/elasticsearch/plugins/plugin-install.help +++ b/core/src/main/resources/org/elasticsearch/plugins/plugin-install.help @@ -42,7 +42,6 @@ OFFICIAL PLUGINS - discovery-azure - discovery-ec2 - discovery-gce - - discovery-multicast - ingest-geoip - lang-javascript - lang-painless diff --git a/dev-tools/smoke_test_rc.py b/dev-tools/smoke_test_rc.py index 8c8a6fb9fae..f32a6b80f52 100644 --- a/dev-tools/smoke_test_rc.py +++ b/dev-tools/smoke_test_rc.py @@ -66,7 +66,6 @@ DEFAULT_PLUGINS = ["analysis-icu", "discovery-azure", "discovery-ec2", "discovery-gce", - "discovery-multicast", "lang-javascript", "lang-painless", "lang-python", diff --git a/docs/plugins/discovery-multicast.asciidoc b/docs/plugins/discovery-multicast.asciidoc deleted file mode 100644 index 75acbd89577..00000000000 --- a/docs/plugins/discovery-multicast.asciidoc +++ /dev/null @@ -1,55 +0,0 @@ -[[discovery-multicast]] -=== Multicast Discovery Plugin - -The Multicast Discovery plugin provides the ability to form a cluster using -TCP/IP multicast messages. - -[[discovery-multicast-install]] -[float] -==== Installation - -This plugin can be installed using the plugin manager: - -[source,sh] ----------------------------------------------------------------- -sudo bin/plugin install discovery-multicast ----------------------------------------------------------------- - -The plugin must be installed on every node in the cluster, and each node must -be restarted after installation. - -[[discovery-multicast-remove]] -[float] -==== Removal - -The plugin can be removed with the following command: - -[source,sh] ----------------------------------------------------------------- -sudo bin/plugin remove discovery-multicast ----------------------------------------------------------------- - -The node must be stopped before removing the plugin. - -[[discovery-multicast-usage]] -==== Configuring multicast discovery - -Multicast ping discovery of other nodes is done by sending one or more -multicast requests which existing nodes will receive and -respond to. It provides the following settings with the -`discovery.zen.ping.multicast` prefix: - -[cols="<,<",options="header",] -|======================================================================= -|Setting |Description -|`group` |The group address to use. Defaults to `224.2.2.4`. - -|`port` |The port to use. Defaults to `54328`. - -|`ttl` |The ttl of the multicast message. Defaults to `3`. - -|`address` |The address to bind to, defaults to `null` which means it -will bind `network.bind_host` - -|`enabled` |Whether multicast ping discovery is enabled. Defaults to `false`. -|======================================================================= diff --git a/docs/plugins/discovery.asciidoc b/docs/plugins/discovery.asciidoc index cfc98e45dee..f6f181bc1f1 100644 --- a/docs/plugins/discovery.asciidoc +++ b/docs/plugins/discovery.asciidoc @@ -21,10 +21,6 @@ The Azure discovery plugin uses the Azure API for unicast discovery. The Google Compute Engine discovery plugin uses the GCE API for unicast discovery. -<>:: - -The multicast plugin sends multicast messages to discover other nodes in the cluster. - [float] ==== Community contributed discovery plugins diff --git a/docs/reference/migration/migrate_3_0.asciidoc b/docs/reference/migration/migrate_3_0.asciidoc index 78f8ff40307..1574dbd128e 100644 --- a/docs/reference/migration/migrate_3_0.asciidoc +++ b/docs/reference/migration/migrate_3_0.asciidoc @@ -319,6 +319,10 @@ disable doc values is by using the `doc_values` property of mappings. Site plugins have been removed. It is recommended to migrate site plugins to Kibana plugins. +==== Multicast plugin removed + +Multicast has been removed. Use unicast discovery, or one of the cloud discovery plugins. + ==== Plugins with custom query implementations Plugins implementing custom queries need to implement the `fromXContent(QueryParseContext)` method in their diff --git a/plugins/discovery-multicast/build.gradle b/plugins/discovery-multicast/build.gradle deleted file mode 100644 index 295f28c094b..00000000000 --- a/plugins/discovery-multicast/build.gradle +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -esplugin { - description 'The Multicast Discovery plugin allows discovery other nodes using multicast requests' - classname 'org.elasticsearch.plugin.discovery.multicast.MulticastDiscoveryPlugin' -} diff --git a/plugins/discovery-multicast/src/main/java/org/elasticsearch/plugin/discovery/multicast/MulticastChannel.java b/plugins/discovery-multicast/src/main/java/org/elasticsearch/plugin/discovery/multicast/MulticastChannel.java deleted file mode 100644 index dee74b9ddce..00000000000 --- a/plugins/discovery-multicast/src/main/java/org/elasticsearch/plugin/discovery/multicast/MulticastChannel.java +++ /dev/null @@ -1,390 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.plugin.discovery.multicast; - -import org.apache.lucene.util.IOUtils; -import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; -import org.elasticsearch.common.settings.Settings; - -import java.io.Closeable; -import java.net.DatagramPacket; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; -import java.net.SocketAddress; -import java.net.SocketTimeoutException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; - -/** - * A multicast channel that supports registering for receive events, and sending datagram packets. Allows - * to easily share the same multicast socket if it holds the same config. - */ -abstract class MulticastChannel implements Closeable { - - /** - * Builds a channel based on the provided config, allowing to control if sharing a channel that uses - * the same config is allowed or not. - */ - public static MulticastChannel getChannel(String name, boolean shared, Config config, Listener listener) throws Exception { - if (!shared) { - return new Plain(listener, name, config); - } - return Shared.getSharedChannel(listener, config); - } - - /** - * Config of multicast channel. - */ - public static final class Config { - public final int port; - public final String group; - public final int bufferSize; - public final int ttl; - public final InetAddress multicastInterface; - public final boolean deferToInterface; - - public Config(int port, String group, int bufferSize, int ttl, - InetAddress multicastInterface, boolean deferToInterface) { - this.port = port; - this.group = group; - this.bufferSize = bufferSize; - this.ttl = ttl; - this.multicastInterface = multicastInterface; - this.deferToInterface = deferToInterface; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Config config = (Config) o; - - if (bufferSize != config.bufferSize) return false; - if (port != config.port) return false; - if (ttl != config.ttl) return false; - if (group != null ? !group.equals(config.group) : config.group != null) return false; - if (multicastInterface != null ? !multicastInterface.equals(config.multicastInterface) : config.multicastInterface != null) - return false; - - return true; - } - - @Override - public int hashCode() { - int result = port; - result = 31 * result + (group != null ? group.hashCode() : 0); - result = 31 * result + bufferSize; - result = 31 * result + ttl; - result = 31 * result + (multicastInterface != null ? multicastInterface.hashCode() : 0); - return result; - } - } - - /** - * Listener that gets called when data is received on the multicast channel. - */ - public static interface Listener { - void onMessage(BytesReference data, SocketAddress address); - } - - /** - * Simple listener that wraps multiple listeners into one. - */ - public static class MultiListener implements Listener { - - private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); - - public void add(Listener listener) { - this.listeners.add(listener); - } - - public boolean remove(Listener listener) { - return this.listeners.remove(listener); - } - - @Override - public void onMessage(BytesReference data, SocketAddress address) { - for (Listener listener : listeners) { - listener.onMessage(data, address); - } - } - } - - protected final Listener listener; - private AtomicBoolean closed = new AtomicBoolean(); - - protected MulticastChannel(Listener listener) { - this.listener = listener; - } - - /** - * Send the data over the multicast channel. - */ - public abstract void send(BytesReference data) throws Exception; - - /** - * Close the channel. - */ - @Override - public void close() { - if (closed.compareAndSet(false, true)) { - close(listener); - } - } - - protected abstract void close(Listener listener); - - public static final String SHARED_CHANNEL_NAME = "#shared#"; - /** - * A shared channel that keeps a static map of Config -> Shared channels, and closes shared - * channel once their reference count has reached 0. It also handles de-registering relevant - * listener from the shared list of listeners. - */ - private final static class Shared extends MulticastChannel { - - private static final Map sharedChannels = new HashMap<>(); - private static final Object mutex = new Object(); // global mutex so we don't sync on static methods (.class) - - static MulticastChannel getSharedChannel(Listener listener, Config config) throws Exception { - - synchronized (mutex) { - Shared shared = sharedChannels.get(config); - if (shared != null) { - shared.incRef(); - ((MultiListener) shared.listener).add(listener); - } else { - MultiListener multiListener = new MultiListener(); - multiListener.add(listener); - shared = new Shared(multiListener, new Plain(multiListener, SHARED_CHANNEL_NAME, config)); - sharedChannels.put(config, shared); - } - return new Delegate(listener, shared); - } - } - - static void close(Shared shared, Listener listener) { - synchronized (mutex) { - // remove this - boolean removed = ((MultiListener) shared.listener).remove(listener); - assert removed : "a listener should be removed"; - if (shared.decRef() == 0) { - assert ((MultiListener) shared.listener).listeners.isEmpty(); - sharedChannels.remove(shared.channel.getConfig()); - shared.channel.close(); - } - } - } - - final Plain channel; - private int refCount = 1; - - Shared(MultiListener listener, Plain channel) { - super(listener); - this.channel = channel; - } - - private void incRef() { - refCount++; - } - - private int decRef() { - --refCount; - assert refCount >= 0 : "illegal ref counting, close called multiple times"; - return refCount; - } - - @Override - public void send(BytesReference data) throws Exception { - channel.send(data); - } - - @Override - public void close() { - assert false : "Shared references should never be closed directly, only via Delegate"; - } - - @Override - protected void close(Listener listener) { - close(this, listener); - } - } - - /** - * A light weight delegate that wraps another channel, mainly to support delegating - * the close method with the provided listener and not holding existing listener. - */ - private final static class Delegate extends MulticastChannel { - - private final MulticastChannel channel; - - Delegate(Listener listener, MulticastChannel channel) { - super(listener); - this.channel = channel; - } - - @Override - public void send(BytesReference data) throws Exception { - channel.send(data); - } - - @Override - protected void close(Listener listener) { - channel.close(listener); // we delegate here to the close with our listener, not with the delegate listener - } - } - - /** - * Simple implementation of a channel. - */ - @SuppressForbidden(reason = "I bind to wildcard addresses. I am a total nightmare") - private static class Plain extends MulticastChannel { - private final ESLogger logger; - private final Config config; - - private volatile MulticastSocket multicastSocket; - private final DatagramPacket datagramPacketSend; - private final DatagramPacket datagramPacketReceive; - - private final Object sendMutex = new Object(); - private final Object receiveMutex = new Object(); - - private final Receiver receiver; - private final Thread receiverThread; - - Plain(Listener listener, String name, Config config) throws Exception { - super(listener); - this.logger = ESLoggerFactory.getLogger(name); - this.config = config; - this.datagramPacketReceive = new DatagramPacket(new byte[config.bufferSize], config.bufferSize); - this.datagramPacketSend = new DatagramPacket(new byte[config.bufferSize], config.bufferSize, InetAddress.getByName(config.group), config.port); - this.multicastSocket = buildMulticastSocket(config); - this.receiver = new Receiver(); - this.receiverThread = daemonThreadFactory(Settings.builder().put("name", name).build(), "discovery#multicast#receiver").newThread(receiver); - this.receiverThread.start(); - } - - private MulticastSocket buildMulticastSocket(Config config) throws Exception { - SocketAddress addr = new InetSocketAddress(InetAddress.getByName(config.group), config.port); - MulticastSocket multicastSocket = new MulticastSocket(config.port); - try { - multicastSocket.setTimeToLive(config.ttl); - // OSX is not smart enough to tell that a socket bound to the - // 'lo0' interface needs to make sure to send the UDP packet - // out of the lo0 interface, so we need to do some special - // workarounds to fix it. - if (config.deferToInterface) { - // 'null' here tells the socket to deter to the interface set - // with .setInterface - multicastSocket.joinGroup(addr, null); - multicastSocket.setInterface(config.multicastInterface); - } else { - multicastSocket.setInterface(config.multicastInterface); - multicastSocket.joinGroup(InetAddress.getByName(config.group)); - } - multicastSocket.setReceiveBufferSize(config.bufferSize); - multicastSocket.setSendBufferSize(config.bufferSize); - multicastSocket.setSoTimeout(60000); - } catch (Throwable e) { - IOUtils.closeWhileHandlingException(multicastSocket); - throw e; - } - return multicastSocket; - } - - public Config getConfig() { - return this.config; - } - - @Override - public void send(BytesReference data) throws Exception { - synchronized (sendMutex) { - datagramPacketSend.setData(data.toBytes()); - multicastSocket.send(datagramPacketSend); - } - } - - @Override - protected void close(Listener listener) { - receiver.stop(); - receiverThread.interrupt(); - if (multicastSocket != null) { - IOUtils.closeWhileHandlingException(multicastSocket); - multicastSocket = null; - } - try { - receiverThread.join(10000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - private class Receiver implements Runnable { - - private volatile boolean running = true; - - public void stop() { - running = false; - } - - @Override - public void run() { - while (running) { - try { - synchronized (receiveMutex) { - try { - multicastSocket.receive(datagramPacketReceive); - } catch (SocketTimeoutException ignore) { - continue; - } catch (Exception e) { - if (running) { - if (multicastSocket.isClosed()) { - logger.warn("multicast socket closed while running, restarting..."); - multicastSocket = buildMulticastSocket(config); - } else { - logger.warn("failed to receive packet, throttling...", e); - Thread.sleep(500); - } - } - continue; - } - } - if (datagramPacketReceive.getData().length > 0) { - listener.onMessage(new BytesArray(datagramPacketReceive.getData()), datagramPacketReceive.getSocketAddress()); - } - } catch (Throwable e) { - if (running) { - logger.warn("unexpected exception in multicast receiver", e); - } - } - } - } - } - } -} diff --git a/plugins/discovery-multicast/src/main/java/org/elasticsearch/plugin/discovery/multicast/MulticastDiscoveryPlugin.java b/plugins/discovery-multicast/src/main/java/org/elasticsearch/plugin/discovery/multicast/MulticastDiscoveryPlugin.java deleted file mode 100644 index da9c5ba3c89..00000000000 --- a/plugins/discovery-multicast/src/main/java/org/elasticsearch/plugin/discovery/multicast/MulticastDiscoveryPlugin.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.plugin.discovery.multicast; - -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.settings.SettingsModule; -import org.elasticsearch.discovery.DiscoveryModule; -import org.elasticsearch.plugins.Plugin; - -public class MulticastDiscoveryPlugin extends Plugin { - - private final Settings settings; - - public MulticastDiscoveryPlugin(Settings settings) { - this.settings = settings; - } - - @Override - public String name() { - return "discovery-multicast"; - } - - @Override - public String description() { - return "Multicast Discovery Plugin"; - } - - public void onModule(DiscoveryModule module) { - if (settings.getAsBoolean("discovery.zen.ping.multicast.enabled", false)) { - module.addZenPing(MulticastZenPing.class); - } - } - - public void onModule(SettingsModule module) { - module.registerSetting(MulticastZenPing.ADDRESS_SETTING); - module.registerSetting(MulticastZenPing.GROUP_SETTING); - module.registerSetting(MulticastZenPing.PORT_SETTING); - module.registerSetting(MulticastZenPing.SHARED_SETTING); - module.registerSetting(MulticastZenPing.TTL_SETTING); - module.registerSetting(MulticastZenPing.BUFFER_SIZE_SETTING); - module.registerSetting(MulticastZenPing.PING_ENABLED_SETTING); - module.registerSetting(MulticastZenPing.DEFERE_TO_INTERFACE_SETTING); - } -} diff --git a/plugins/discovery-multicast/src/main/java/org/elasticsearch/plugin/discovery/multicast/MulticastZenPing.java b/plugins/discovery-multicast/src/main/java/org/elasticsearch/plugin/discovery/multicast/MulticastZenPing.java deleted file mode 100644 index 46f50235b58..00000000000 --- a/plugins/discovery-multicast/src/main/java/org/elasticsearch/plugin/discovery/multicast/MulticastZenPing.java +++ /dev/null @@ -1,604 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.plugin.discovery.multicast; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.SocketAddress; -import java.security.AccessController; -import java.security.PrivilegedExceptionAction; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; - -import org.apache.lucene.util.Constants; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.SpecialPermission; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.network.NetworkService; -import org.elasticsearch.common.network.NetworkUtils; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.discovery.zen.ping.PingContextProvider; -import org.elasticsearch.discovery.zen.ping.ZenPing; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportService; - -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - -import static org.elasticsearch.cluster.node.DiscoveryNode.readNode; -import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS; -import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; - -/** - * - */ -public class MulticastZenPing extends AbstractLifecycleComponent implements ZenPing { - - public static final String ACTION_NAME = "internal:discovery/zen/multicast"; - - private static final byte[] INTERNAL_HEADER = new byte[]{1, 9, 8, 4}; - - private static final int PING_SIZE_ESTIMATE = 150; - - private final String address; - private final int port; - private final String group; - private final int bufferSize; - private final int ttl; - - private final ThreadPool threadPool; - private final TransportService transportService; - private final ClusterName clusterName; - private final NetworkService networkService; - private final Version version; - private volatile PingContextProvider contextProvider; - - private final boolean pingEnabled; - - private volatile MulticastChannel multicastChannel; - - private final AtomicInteger pingIdGenerator = new AtomicInteger(); - private final Map receivedResponses = newConcurrentMap(); - public static final Setting ADDRESS_SETTING = Setting.simpleString("discovery.zen.ping.multicast.address", false, Setting.Scope.CLUSTER); - public static final Setting PORT_SETTING = Setting.intSetting("discovery.zen.ping.multicast.port", 54328, 0, (1<<16)-1, false, Setting.Scope.CLUSTER); - public static final Setting GROUP_SETTING = new Setting<>("discovery.zen.ping.multicast.group", "224.2.2.4", Function.identity(), false, Setting.Scope.CLUSTER); - public static final Setting BUFFER_SIZE_SETTING = Setting.byteSizeSetting("discovery.zen.ping.multicast.buffer_size", new ByteSizeValue(2048, ByteSizeUnit.BYTES), false, Setting.Scope.CLUSTER); - public static final Setting TTL_SETTING = Setting.intSetting("discovery.zen.ping.multicast.ttl", 3, 0, 255, false, Setting.Scope.CLUSTER); - public static final Setting PING_ENABLED_SETTING = Setting.boolSetting("discovery.zen.ping.multicast.ping.enabled", true, false, Setting.Scope.CLUSTER); - public static final Setting SHARED_SETTING = Setting.boolSetting("discovery.zen.ping.multicast.shared", Constants.MAC_OS_X, false, Setting.Scope.CLUSTER); - public static final Setting DEFERE_TO_INTERFACE_SETTING = Setting.boolSetting("discovery.zen.ping.multicast.defer_group_to_set_interface", Constants.MAC_OS_X, false, Setting.Scope.CLUSTER); - - public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version) { - this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS), version); - } - - @Inject - public MulticastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService, Version version) { - super(settings); - this.threadPool = threadPool; - this.transportService = transportService; - this.clusterName = clusterName; - this.networkService = networkService; - this.version = version; - - this.address = ADDRESS_SETTING.exists(settings) ? ADDRESS_SETTING.get(settings) : null; - this.port = PORT_SETTING.get(settings); - this.group = GROUP_SETTING.get(settings); - this.bufferSize = BUFFER_SIZE_SETTING.get(settings).bytesAsInt(); - this.ttl = TTL_SETTING.get(settings); - this.pingEnabled = PING_ENABLED_SETTING.get(settings); - - logger.debug("using group [{}], with port [{}], ttl [{}], and address [{}]", group, port, ttl, address); - - this.transportService.registerRequestHandler(ACTION_NAME, MulticastPingResponse::new, ThreadPool.Names.SAME, new MulticastPingResponseRequestHandler()); - } - - @Override - public void setPingContextProvider(PingContextProvider nodesProvider) { - if (lifecycle.started()) { - throw new IllegalStateException("Can't set nodes provider when started"); - } - this.contextProvider = nodesProvider; - } - - @Override - protected void doStart() { - try { - // we know OSX has bugs in the JVM when creating multiple instances of multicast sockets - // causing for "socket close" exceptions when receive and/or crashes - boolean shared = SHARED_SETTING.get(settings); - // OSX does not correctly send multicasts FROM the right interface - boolean deferToInterface = DEFERE_TO_INTERFACE_SETTING.get(settings); - - final MulticastChannel.Config config = new MulticastChannel.Config(port, group, bufferSize, ttl, - getMulticastInterface(), deferToInterface); - SecurityManager sm = System.getSecurityManager(); - if (sm != null) { - sm.checkPermission(new SpecialPermission()); - } - multicastChannel = AccessController.doPrivileged(new PrivilegedExceptionAction() { - @Override - public MulticastChannel run() throws Exception { - return MulticastChannel.getChannel(nodeName(), shared, config, new Receiver()); - } - }); - } catch (Throwable t) { - String msg = "multicast failed to start [{}], disabling. Consider using IPv4 only (by defining env. variable `ES_USE_IPV4`)"; - logger.info(msg, t, ExceptionsHelper.detailedMessage(t)); - } - } - - - @SuppressWarnings("deprecation") // Used to support funky configuration options - private InetAddress getMulticastInterface() throws IOException { - // don't use publish address, the use case for that is e.g. a firewall or proxy and - // may not even be bound to an interface on this machine! use the first bound address. - List addresses = Arrays.asList(networkService.resolveBindHostAddresses(address == null ? null : new String[] { address })); - NetworkUtils.sortAddresses(addresses); - return addresses.get(0); - } - - @Override - protected void doStop() { - if (multicastChannel != null) { - multicastChannel.close(); - multicastChannel = null; - } - } - - @Override - protected void doClose() { - } - - public PingResponse[] pingAndWait(TimeValue timeout) { - final AtomicReference response = new AtomicReference<>(); - final CountDownLatch latch = new CountDownLatch(1); - try { - ping(new PingListener() { - @Override - public void onPing(PingResponse[] pings) { - response.set(pings); - latch.countDown(); - } - }, timeout); - } catch (EsRejectedExecutionException ex) { - logger.debug("Ping execution rejected", ex); - return PingResponse.EMPTY; - } - try { - latch.await(); - return response.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return PingResponse.EMPTY; - } - } - - @Override - public void ping(final PingListener listener, final TimeValue timeout) { - if (!pingEnabled || multicastChannel == null) { - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - listener.onPing(PingResponse.EMPTY); - } - }); - return; - } - final int id = pingIdGenerator.incrementAndGet(); - try { - receivedResponses.put(id, new PingCollection()); - sendPingRequest(id); - // try and send another ping request halfway through (just in case someone woke up during it...) - // this can be a good trade-off to nailing the initial lookup or un-delivered messages - threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { - @Override - public void onFailure(Throwable t) { - logger.warn("[{}] failed to send second ping request", t, id); - finalizePingCycle(id, listener); - } - - @Override - public void doRun() { - sendPingRequest(id); - threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { - @Override - public void onFailure(Throwable t) { - logger.warn("[{}] failed to send third ping request", t, id); - finalizePingCycle(id, listener); - } - - @Override - public void doRun() { - // make one last ping, but finalize as soon as all nodes have responded or a timeout has past - PingCollection collection = receivedResponses.get(id); - FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener); - receivedResponses.put(id, finalizingPingCollection); - logger.trace("[{}] sending last pings", id); - sendPingRequest(id); - threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() { - @Override - public void onFailure(Throwable t) { - logger.warn("[{}] failed to finalize ping", t, id); - } - - @Override - protected void doRun() throws Exception { - finalizePingCycle(id, listener); - } - }); - } - }); - } - }); - } catch (Exception e) { - logger.warn("failed to ping", e); - finalizePingCycle(id, listener); - } - } - - /** - * takes all pings collected for a given id and pass them to the given listener. - * this method is safe to call multiple times as is guaranteed to only finalize once. - */ - private void finalizePingCycle(int id, final PingListener listener) { - PingCollection responses = receivedResponses.remove(id); - if (responses != null) { - listener.onPing(responses.toArray()); - } - } - - private void sendPingRequest(int id) { - try { - BytesStreamOutput out = new BytesStreamOutput(PING_SIZE_ESTIMATE); - out.writeBytes(INTERNAL_HEADER); - // TODO: change to min_required version! - Version.writeVersion(version, out); - out.writeInt(id); - clusterName.writeTo(out); - contextProvider.nodes().localNode().writeTo(out); - out.close(); - multicastChannel.send(out.bytes()); - if (logger.isTraceEnabled()) { - logger.trace("[{}] sending ping request", id); - } - } catch (Exception e) { - if (lifecycle.stoppedOrClosed()) { - return; - } - if (logger.isDebugEnabled()) { - logger.debug("failed to send multicast ping request", e); - } else { - logger.warn("failed to send multicast ping request: {}", ExceptionsHelper.detailedMessage(e)); - } - } - } - - class FinalizingPingCollection extends PingCollection { - final private PingCollection internalCollection; - final private int expectedResponses; - final private AtomicInteger responseCount; - final private PingListener listener; - final private int id; - - public FinalizingPingCollection(int id, PingCollection internalCollection, int expectedResponses, PingListener listener) { - this.id = id; - this.internalCollection = internalCollection; - this.expectedResponses = expectedResponses; - this.responseCount = new AtomicInteger(); - this.listener = listener; - } - - @Override - public synchronized boolean addPing(PingResponse ping) { - if (internalCollection.addPing(ping)) { - if (responseCount.incrementAndGet() >= expectedResponses) { - logger.trace("[{}] all nodes responded", id); - finish(); - } - return true; - } - return false; - } - - @Override - public synchronized void addPings(PingResponse[] pings) { - internalCollection.addPings(pings); - } - - @Override - public synchronized PingResponse[] toArray() { - return internalCollection.toArray(); - } - - void finish() { - // spawn another thread as we may be running on a network thread - threadPool.generic().execute(new AbstractRunnable() { - @Override - public void onFailure(Throwable t) { - logger.error("failed to call ping listener", t); - } - - @Override - protected void doRun() throws Exception { - finalizePingCycle(id, listener); - } - }); - } - } - - class MulticastPingResponseRequestHandler implements TransportRequestHandler { - @Override - public void messageReceived(MulticastPingResponse request, TransportChannel channel) throws Exception { - if (logger.isTraceEnabled()) { - logger.trace("[{}] received {}", request.id, request.pingResponse); - } - PingCollection responses = receivedResponses.get(request.id); - if (responses == null) { - logger.warn("received ping response {} with no matching id [{}]", request.pingResponse, request.id); - } else { - responses.addPing(request.pingResponse); - } - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - } - - public static class MulticastPingResponse extends TransportRequest { - - int id; - - PingResponse pingResponse; - - public MulticastPingResponse() { - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - id = in.readInt(); - pingResponse = PingResponse.readPingResponse(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeInt(id); - pingResponse.writeTo(out); - } - } - - - private class Receiver implements MulticastChannel.Listener { - - @Override - public void onMessage(BytesReference data, SocketAddress address) { - int id = -1; - DiscoveryNode requestingNodeX = null; - ClusterName clusterName = null; - - Map externalPingData = null; - XContentType xContentType = null; - - try { - boolean internal = false; - if (data.length() > 4) { - int counter = 0; - for (; counter < INTERNAL_HEADER.length; counter++) { - if (data.get(counter) != INTERNAL_HEADER[counter]) { - break; - } - } - if (counter == INTERNAL_HEADER.length) { - internal = true; - } - } - if (internal) { - StreamInput input = StreamInput.wrap(new BytesArray(data.toBytes(), INTERNAL_HEADER.length, data.length() - INTERNAL_HEADER.length)); - Version version = Version.readVersion(input); - input.setVersion(version); - id = input.readInt(); - clusterName = ClusterName.readClusterName(input); - requestingNodeX = readNode(input); - } else { - xContentType = XContentFactory.xContentType(data); - if (xContentType != null) { - // an external ping - try (XContentParser parser = XContentFactory.xContent(xContentType).createParser(data)) { - externalPingData = parser.map(); - } - } else { - throw new IllegalStateException("failed multicast message, probably message from previous version"); - } - } - if (externalPingData != null) { - handleExternalPingRequest(externalPingData, xContentType, address); - } else { - handleNodePingRequest(id, requestingNodeX, clusterName); - } - } catch (Exception e) { - if (!lifecycle.started() || (e instanceof EsRejectedExecutionException)) { - logger.debug("failed to read requesting data from {}", e, address); - } else { - logger.warn("failed to read requesting data from {}", e, address); - } - } - } - - @SuppressWarnings("unchecked") - private void handleExternalPingRequest(Map externalPingData, XContentType contentType, SocketAddress remoteAddress) { - if (externalPingData.containsKey("response")) { - // ignoring responses sent over the multicast channel - logger.trace("got an external ping response (ignoring) from {}, content {}", remoteAddress, externalPingData); - return; - } - - if (multicastChannel == null) { - logger.debug("can't send ping response, no socket, from {}, content {}", remoteAddress, externalPingData); - return; - } - - Map request = (Map) externalPingData.get("request"); - if (request == null) { - logger.warn("malformed external ping request, no 'request' element from {}, content {}", remoteAddress, externalPingData); - return; - } - - final String requestClusterName = request.containsKey("cluster_name") ? request.get("cluster_name").toString() : request.containsKey("clusterName") ? request.get("clusterName").toString() : null; - if (requestClusterName == null) { - logger.warn("malformed external ping request, missing 'cluster_name' element within request, from {}, content {}", remoteAddress, externalPingData); - return; - } - - if (!requestClusterName.equals(clusterName.value())) { - logger.trace("got request for cluster_name {}, but our cluster_name is {}, from {}, content {}", - requestClusterName, clusterName.value(), remoteAddress, externalPingData); - return; - } - if (logger.isTraceEnabled()) { - logger.trace("got external ping request from {}, content {}", remoteAddress, externalPingData); - } - - try { - DiscoveryNode localNode = contextProvider.nodes().localNode(); - - XContentBuilder builder = XContentFactory.contentBuilder(contentType); - builder.startObject().startObject("response"); - builder.field("cluster_name", clusterName.value()); - builder.startObject("version").field("number", version.number()).field("snapshot_build", version.snapshot).endObject(); - builder.field("transport_address", localNode.address().toString()); - - if (contextProvider.nodeService() != null) { - for (Map.Entry attr : contextProvider.nodeService().attributes().entrySet()) { - builder.field(attr.getKey(), attr.getValue()); - } - } - - builder.startObject("attributes"); - for (ObjectObjectCursor attr : localNode.attributes()) { - builder.field(attr.key, attr.value); - } - builder.endObject(); - - builder.endObject().endObject(); - multicastChannel.send(builder.bytes()); - if (logger.isTraceEnabled()) { - logger.trace("sending external ping response {}", builder.string()); - } - } catch (Exception e) { - logger.warn("failed to send external multicast response", e); - } - } - - private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) { - if (!pingEnabled || multicastChannel == null) { - return; - } - final DiscoveryNodes discoveryNodes = contextProvider.nodes(); - final DiscoveryNode requestingNode = requestingNodeX; - if (requestingNode.id().equals(discoveryNodes.localNodeId())) { - // that's me, ignore - return; - } - if (!requestClusterName.equals(clusterName)) { - if (logger.isTraceEnabled()) { - logger.trace("[{}] received ping_request from [{}], but wrong cluster_name [{}], expected [{}], ignoring", - id, requestingNode, requestClusterName.value(), clusterName.value()); - } - return; - } - // don't connect between two client nodes, no need for that... - if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) { - if (logger.isTraceEnabled()) { - logger.trace("[{}] received ping_request from [{}], both are client nodes, ignoring", id, requestingNode, requestClusterName); - } - return; - } - final MulticastPingResponse multicastPingResponse = new MulticastPingResponse(); - multicastPingResponse.id = id; - multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce()); - - if (logger.isTraceEnabled()) { - logger.trace("[{}] received ping_request from [{}], sending {}", id, requestingNode, multicastPingResponse.pingResponse); - } - - if (!transportService.nodeConnected(requestingNode)) { - // do the connect and send on a thread pool - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - // connect to the node if possible - try { - transportService.connectToNode(requestingNode); - transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleException(TransportException exp) { - logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); - } - }); - } catch (Exception e) { - if (lifecycle.started()) { - logger.warn("failed to connect to requesting node {}", e, requestingNode); - } - } - } - }); - } else { - transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleException(TransportException exp) { - if (lifecycle.started()) { - logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); - } - } - }); - } - } - } -} diff --git a/plugins/discovery-multicast/src/main/plugin-metadata/plugin-security.policy b/plugins/discovery-multicast/src/main/plugin-metadata/plugin-security.policy deleted file mode 100644 index 5752c86bb4f..00000000000 --- a/plugins/discovery-multicast/src/main/plugin-metadata/plugin-security.policy +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -grant { - // needed to bind multicast to arbitrary port - permission java.net.SocketPermission "localhost:1024-", "listen,resolve"; -}; diff --git a/plugins/discovery-multicast/src/test/java/org/elasticsearch/plugin/discovery/multicast/MulticastDiscoveryRestIT.java b/plugins/discovery-multicast/src/test/java/org/elasticsearch/plugin/discovery/multicast/MulticastDiscoveryRestIT.java deleted file mode 100644 index c6af20c011e..00000000000 --- a/plugins/discovery-multicast/src/test/java/org/elasticsearch/plugin/discovery/multicast/MulticastDiscoveryRestIT.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.plugin.discovery.multicast; - -import com.carrotsearch.randomizedtesting.annotations.Name; -import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; -import org.elasticsearch.test.rest.ESRestTestCase; -import org.elasticsearch.test.rest.RestTestCandidate; -import org.elasticsearch.test.rest.parser.RestTestParseException; - -import java.io.IOException; - -public class MulticastDiscoveryRestIT extends ESRestTestCase { - - public MulticastDiscoveryRestIT(@Name("yaml") RestTestCandidate testCandidate) { - super(testCandidate); - } - - @ParametersFactory - public static Iterable parameters() throws IOException, RestTestParseException { - return ESRestTestCase.createParameters(0, 1); - } -} - diff --git a/plugins/discovery-multicast/src/test/java/org/elasticsearch/plugin/discovery/multicast/MulticastZenPingTests.java b/plugins/discovery-multicast/src/test/java/org/elasticsearch/plugin/discovery/multicast/MulticastZenPingTests.java deleted file mode 100644 index 8c2d95ec799..00000000000 --- a/plugins/discovery-multicast/src/test/java/org/elasticsearch/plugin/discovery/multicast/MulticastZenPingTests.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.plugin.discovery.multicast; - -import org.apache.lucene.util.Constants; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.discovery.zen.ping.PingContextProvider; -import org.elasticsearch.discovery.zen.ping.ZenPing; -import org.elasticsearch.node.service.NodeService; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.transport.local.LocalTransport; -import org.hamcrest.Matchers; -import org.junit.Assert; - -import java.net.DatagramPacket; -import java.net.InetAddress; -import java.net.MulticastSocket; - -public class MulticastZenPingTests extends ESTestCase { - - private Settings buildRandomMulticast(Settings settings) { - Settings.Builder builder = Settings.builder().put(settings); - builder.put("discovery.zen.ping.multicast.group", "224.2.3." + randomIntBetween(0, 255)); - builder.put("discovery.zen.ping.multicast.port", randomIntBetween(55000, 56000)); - builder.put("discovery.zen.ping.multicast.enabled", true); - if (randomBoolean()) { - builder.put("discovery.zen.ping.multicast.shared", randomBoolean()); - } - return builder.build(); - } - - public void testSimplePings() throws InterruptedException { - assumeTrue("https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=193246", Constants.FREE_BSD == false); - Settings settings = Settings.EMPTY; - settings = buildRandomMulticast(settings); - Thread.sleep(30000); - - ThreadPool threadPool = new ThreadPool("testSimplePings"); - final ClusterName clusterName = new ClusterName("test"); - final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start(); - final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); - - final TransportService transportServiceB = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start(); - final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceB.boundAddress().publishAddress(), Version.CURRENT); - - MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT); - zenPingA.setPingContextProvider(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().put(nodeA).localNodeId("A").build(); - } - - @Override - public NodeService nodeService() { - return null; - } - - @Override - public boolean nodeHasJoinedClusterOnce() { - return false; - } - }); - zenPingA.start(); - - MulticastZenPing zenPingB = new MulticastZenPing(threadPool, transportServiceB, clusterName, Version.CURRENT); - zenPingB.setPingContextProvider(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().put(nodeB).localNodeId("B").build(); - } - - @Override - public NodeService nodeService() { - return null; - } - - @Override - public boolean nodeHasJoinedClusterOnce() { - return true; - } - }); - zenPingB.start(); - - try { - logger.info("ping from A"); - ZenPing.PingResponse[] pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1)); - Assert.assertThat(pingResponses.length, Matchers.equalTo(1)); - Assert.assertThat(pingResponses[0].node().id(), Matchers.equalTo("B")); - Assert.assertTrue(pingResponses[0].hasJoinedOnce()); - - logger.info("ping from B"); - pingResponses = zenPingB.pingAndWait(TimeValue.timeValueSeconds(1)); - Assert.assertThat(pingResponses.length, Matchers.equalTo(1)); - Assert.assertThat(pingResponses[0].node().id(), Matchers.equalTo("A")); - Assert.assertFalse(pingResponses[0].hasJoinedOnce()); - - } finally { - zenPingA.close(); - zenPingB.close(); - transportServiceA.close(); - transportServiceB.close(); - terminate(threadPool); - } - } - - // This test is here because when running on FreeBSD, if no tests are - // executed for the 'multicast' project it will assume everything - // failed, so we need to have at least one test that runs. - public void testAlwaysRun() throws Exception { - assertTrue(true); - } - - @SuppressForbidden(reason = "I bind to wildcard addresses. I am a total nightmare") - public void testExternalPing() throws Exception { - assumeTrue("https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=193246", Constants.FREE_BSD == false); - Settings settings = Settings.EMPTY; - settings = buildRandomMulticast(settings); - - final ThreadPool threadPool = new ThreadPool("testExternalPing"); - final ClusterName clusterName = new ClusterName("test"); - final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start(); - final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); - - MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT); - zenPingA.setPingContextProvider(new PingContextProvider() { - @Override - public DiscoveryNodes nodes() { - return DiscoveryNodes.builder().put(nodeA).localNodeId("A").build(); - } - - @Override - public NodeService nodeService() { - return null; - } - - @Override - public boolean nodeHasJoinedClusterOnce() { - return false; - } - }); - zenPingA.start(); - - MulticastSocket multicastSocket = null; - try { - Loggers.getLogger(MulticastZenPing.class).setLevel("TRACE"); - multicastSocket = new MulticastSocket(); - multicastSocket.setReceiveBufferSize(2048); - multicastSocket.setSendBufferSize(2048); - multicastSocket.setSoTimeout(60000); - - DatagramPacket datagramPacket = new DatagramPacket(new byte[2048], 2048, InetAddress.getByName("224.2.2.4"), 54328); - XContentBuilder builder = XContentFactory.jsonBuilder().startObject().startObject("request").field("cluster_name", "test").endObject().endObject(); - datagramPacket.setData(builder.bytes().toBytes()); - multicastSocket.send(datagramPacket); - Thread.sleep(100); - } finally { - Loggers.getLogger(MulticastZenPing.class).setLevel("INFO"); - if (multicastSocket != null) multicastSocket.close(); - zenPingA.close(); - terminate(threadPool); - } - } -} diff --git a/plugins/discovery-multicast/src/test/resources/rest-api-spec/test/discovery_multicast/10_basic.yaml b/plugins/discovery-multicast/src/test/resources/rest-api-spec/test/discovery_multicast/10_basic.yaml deleted file mode 100644 index 36172fa2c33..00000000000 --- a/plugins/discovery-multicast/src/test/resources/rest-api-spec/test/discovery_multicast/10_basic.yaml +++ /dev/null @@ -1,13 +0,0 @@ -# Integration tests for multicast discovery -# -"Multicast discovery loaded": - - do: - cluster.state: {} - - # Get master node id - - set: { master_node: master } - - - do: - nodes.info: {} - - - match: { nodes.$master.plugins.0.name: discovery-multicast } diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/plugins/PluginManagerTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/plugins/PluginManagerTests.java index d997a167541..b8a8c4b39dd 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/plugins/PluginManagerTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/plugins/PluginManagerTests.java @@ -598,7 +598,6 @@ public class PluginManagerTests extends ESIntegTestCase { PluginManager.checkForOfficialPlugins("mapper-attachments"); PluginManager.checkForOfficialPlugins("mapper-murmur3"); PluginManager.checkForOfficialPlugins("mapper-size"); - PluginManager.checkForOfficialPlugins("discovery-multicast"); PluginManager.checkForOfficialPlugins("discovery-azure"); PluginManager.checkForOfficialPlugins("discovery-ec2"); PluginManager.checkForOfficialPlugins("discovery-gce"); diff --git a/qa/vagrant/src/test/resources/packaging/scripts/plugin_test_cases.bash b/qa/vagrant/src/test/resources/packaging/scripts/plugin_test_cases.bash index 9889048e973..18a93711ff9 100644 --- a/qa/vagrant/src/test/resources/packaging/scripts/plugin_test_cases.bash +++ b/qa/vagrant/src/test/resources/packaging/scripts/plugin_test_cases.bash @@ -219,10 +219,6 @@ fi install_and_check_plugin discovery ec2 aws-java-sdk-core-*.jar } -@test "[$GROUP] install multicast discovery plugin" { - install_and_check_plugin discovery multicast -} - @test "[$GROUP] install lang-expression plugin" { install_and_check_plugin lang expression } @@ -325,10 +321,6 @@ fi remove_plugin discovery-ec2 } -@test "[$GROUP] remove multicast discovery plugin" { - remove_plugin discovery-multicast -} - @test "[$GROUP] remove lang-expression plugin" { remove_plugin lang-expression } diff --git a/settings.gradle b/settings.gradle index c8616789569..f7a7fc71772 100644 --- a/settings.gradle +++ b/settings.gradle @@ -24,7 +24,6 @@ List projects = [ 'plugins:discovery-azure', 'plugins:discovery-ec2', 'plugins:discovery-gce', - 'plugins:discovery-multicast', 'plugins:ingest-geoip', 'plugins:lang-javascript', 'plugins:lang-painless',