Merge pull request #16326 from rjernst/remove_multicast
Remove multicast plugin
This commit is contained in:
commit
6f12048cda
|
@ -16,7 +16,6 @@
|
|||
-/plugins/discovery-azure/target
|
||||
-/plugins/discovery-ec2/target
|
||||
-/plugins/discovery-gce/target
|
||||
-/plugins/discovery-multicast/target
|
||||
-/plugins/jvm-example/target
|
||||
-/plugins/lang-expression/target
|
||||
-/plugins/lang-groovy/target
|
||||
|
|
|
@ -107,7 +107,6 @@ class InstallPluginCommand extends CliTool.Command {
|
|||
"discovery-azure",
|
||||
"discovery-ec2",
|
||||
"discovery-gce",
|
||||
"discovery-multicast",
|
||||
"lang-javascript",
|
||||
"lang-painless",
|
||||
"lang-python",
|
||||
|
|
|
@ -37,7 +37,6 @@ OFFICIAL PLUGINS
|
|||
- discovery-azure
|
||||
- discovery-ec2
|
||||
- discovery-gce
|
||||
- discovery-multicast
|
||||
- ingest-geoip
|
||||
- lang-javascript
|
||||
- lang-painless
|
||||
|
|
|
@ -66,7 +66,6 @@ DEFAULT_PLUGINS = ["analysis-icu",
|
|||
"discovery-azure",
|
||||
"discovery-ec2",
|
||||
"discovery-gce",
|
||||
"discovery-multicast",
|
||||
"lang-javascript",
|
||||
"lang-painless",
|
||||
"lang-python",
|
||||
|
|
|
@ -1,55 +0,0 @@
|
|||
[[discovery-multicast]]
|
||||
=== Multicast Discovery Plugin
|
||||
|
||||
The Multicast Discovery plugin provides the ability to form a cluster using
|
||||
TCP/IP multicast messages.
|
||||
|
||||
[[discovery-multicast-install]]
|
||||
[float]
|
||||
==== Installation
|
||||
|
||||
This plugin can be installed using the plugin manager:
|
||||
|
||||
[source,sh]
|
||||
----------------------------------------------------------------
|
||||
sudo bin/plugin install discovery-multicast
|
||||
----------------------------------------------------------------
|
||||
|
||||
The plugin must be installed on every node in the cluster, and each node must
|
||||
be restarted after installation.
|
||||
|
||||
[[discovery-multicast-remove]]
|
||||
[float]
|
||||
==== Removal
|
||||
|
||||
The plugin can be removed with the following command:
|
||||
|
||||
[source,sh]
|
||||
----------------------------------------------------------------
|
||||
sudo bin/plugin remove discovery-multicast
|
||||
----------------------------------------------------------------
|
||||
|
||||
The node must be stopped before removing the plugin.
|
||||
|
||||
[[discovery-multicast-usage]]
|
||||
==== Configuring multicast discovery
|
||||
|
||||
Multicast ping discovery of other nodes is done by sending one or more
|
||||
multicast requests which existing nodes will receive and
|
||||
respond to. It provides the following settings with the
|
||||
`discovery.zen.ping.multicast` prefix:
|
||||
|
||||
[cols="<,<",options="header",]
|
||||
|=======================================================================
|
||||
|Setting |Description
|
||||
|`group` |The group address to use. Defaults to `224.2.2.4`.
|
||||
|
||||
|`port` |The port to use. Defaults to `54328`.
|
||||
|
||||
|`ttl` |The ttl of the multicast message. Defaults to `3`.
|
||||
|
||||
|`address` |The address to bind to, defaults to `null` which means it
|
||||
will bind `network.bind_host`
|
||||
|
||||
|`enabled` |Whether multicast ping discovery is enabled. Defaults to `false`.
|
||||
|=======================================================================
|
|
@ -21,10 +21,6 @@ The Azure discovery plugin uses the Azure API for unicast discovery.
|
|||
|
||||
The Google Compute Engine discovery plugin uses the GCE API for unicast discovery.
|
||||
|
||||
<<discovery-multicast,Multicast>>::
|
||||
|
||||
The multicast plugin sends multicast messages to discover other nodes in the cluster.
|
||||
|
||||
[float]
|
||||
==== Community contributed discovery plugins
|
||||
|
||||
|
@ -41,5 +37,3 @@ include::discovery-azure.asciidoc[]
|
|||
|
||||
include::discovery-gce.asciidoc[]
|
||||
|
||||
include::discovery-multicast.asciidoc[]
|
||||
|
||||
|
|
|
@ -19,15 +19,11 @@ bin/elasticsearch --network.host _non_loopback_
|
|||
|
||||
The full list of options that network.host accepts can be found in the <<modules-network>>.
|
||||
|
||||
==== Multicast removed
|
||||
==== Unicast discovery
|
||||
|
||||
Multicast has been removed (although it is still
|
||||
{plugins}/discovery-multicast.html[provided as a plugin] for now). Instead,
|
||||
and only when bound to localhost, Elasticsearch will use unicast to contact
|
||||
When bound to localhost, Elasticsearch will use unicast to contact
|
||||
the first 5 ports in the `transport.tcp.port` range, which defaults to
|
||||
`9300-9400`.
|
||||
|
||||
This preserves the zero-config auto-clustering experience for the developer,
|
||||
`9300-9400`. This preserves the zero-config auto-clustering experience for the developer,
|
||||
but it means that you will have to provide a list of <<unicast,unicast hosts>>
|
||||
when moving to production, for instance:
|
||||
|
||||
|
|
|
@ -319,6 +319,10 @@ disable doc values is by using the `doc_values` property of mappings.
|
|||
|
||||
Site plugins have been removed. It is recommended to migrate site plugins to Kibana plugins.
|
||||
|
||||
==== Multicast plugin removed
|
||||
|
||||
Multicast has been removed. Use unicast discovery, or one of the cloud discovery plugins.
|
||||
|
||||
==== Plugins with custom query implementations
|
||||
|
||||
Plugins implementing custom queries need to implement the `fromXContent(QueryParseContext)` method in their
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
esplugin {
|
||||
description 'The Multicast Discovery plugin allows discovery other nodes using multicast requests'
|
||||
classname 'org.elasticsearch.plugin.discovery.multicast.MulticastDiscoveryPlugin'
|
||||
}
|
|
@ -1,390 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.discovery.multicast;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.MulticastSocket;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
|
||||
|
||||
/**
|
||||
* A multicast channel that supports registering for receive events, and sending datagram packets. Allows
|
||||
* to easily share the same multicast socket if it holds the same config.
|
||||
*/
|
||||
abstract class MulticastChannel implements Closeable {
|
||||
|
||||
/**
|
||||
* Builds a channel based on the provided config, allowing to control if sharing a channel that uses
|
||||
* the same config is allowed or not.
|
||||
*/
|
||||
public static MulticastChannel getChannel(String name, boolean shared, Config config, Listener listener) throws Exception {
|
||||
if (!shared) {
|
||||
return new Plain(listener, name, config);
|
||||
}
|
||||
return Shared.getSharedChannel(listener, config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Config of multicast channel.
|
||||
*/
|
||||
public static final class Config {
|
||||
public final int port;
|
||||
public final String group;
|
||||
public final int bufferSize;
|
||||
public final int ttl;
|
||||
public final InetAddress multicastInterface;
|
||||
public final boolean deferToInterface;
|
||||
|
||||
public Config(int port, String group, int bufferSize, int ttl,
|
||||
InetAddress multicastInterface, boolean deferToInterface) {
|
||||
this.port = port;
|
||||
this.group = group;
|
||||
this.bufferSize = bufferSize;
|
||||
this.ttl = ttl;
|
||||
this.multicastInterface = multicastInterface;
|
||||
this.deferToInterface = deferToInterface;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
Config config = (Config) o;
|
||||
|
||||
if (bufferSize != config.bufferSize) return false;
|
||||
if (port != config.port) return false;
|
||||
if (ttl != config.ttl) return false;
|
||||
if (group != null ? !group.equals(config.group) : config.group != null) return false;
|
||||
if (multicastInterface != null ? !multicastInterface.equals(config.multicastInterface) : config.multicastInterface != null)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = port;
|
||||
result = 31 * result + (group != null ? group.hashCode() : 0);
|
||||
result = 31 * result + bufferSize;
|
||||
result = 31 * result + ttl;
|
||||
result = 31 * result + (multicastInterface != null ? multicastInterface.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Listener that gets called when data is received on the multicast channel.
|
||||
*/
|
||||
public static interface Listener {
|
||||
void onMessage(BytesReference data, SocketAddress address);
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple listener that wraps multiple listeners into one.
|
||||
*/
|
||||
public static class MultiListener implements Listener {
|
||||
|
||||
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
public void add(Listener listener) {
|
||||
this.listeners.add(listener);
|
||||
}
|
||||
|
||||
public boolean remove(Listener listener) {
|
||||
return this.listeners.remove(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(BytesReference data, SocketAddress address) {
|
||||
for (Listener listener : listeners) {
|
||||
listener.onMessage(data, address);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected final Listener listener;
|
||||
private AtomicBoolean closed = new AtomicBoolean();
|
||||
|
||||
protected MulticastChannel(Listener listener) {
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the data over the multicast channel.
|
||||
*/
|
||||
public abstract void send(BytesReference data) throws Exception;
|
||||
|
||||
/**
|
||||
* Close the channel.
|
||||
*/
|
||||
@Override
|
||||
public void close() {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
close(listener);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void close(Listener listener);
|
||||
|
||||
public static final String SHARED_CHANNEL_NAME = "#shared#";
|
||||
/**
|
||||
* A shared channel that keeps a static map of Config -> Shared channels, and closes shared
|
||||
* channel once their reference count has reached 0. It also handles de-registering relevant
|
||||
* listener from the shared list of listeners.
|
||||
*/
|
||||
private final static class Shared extends MulticastChannel {
|
||||
|
||||
private static final Map<Config, Shared> sharedChannels = new HashMap<>();
|
||||
private static final Object mutex = new Object(); // global mutex so we don't sync on static methods (.class)
|
||||
|
||||
static MulticastChannel getSharedChannel(Listener listener, Config config) throws Exception {
|
||||
|
||||
synchronized (mutex) {
|
||||
Shared shared = sharedChannels.get(config);
|
||||
if (shared != null) {
|
||||
shared.incRef();
|
||||
((MultiListener) shared.listener).add(listener);
|
||||
} else {
|
||||
MultiListener multiListener = new MultiListener();
|
||||
multiListener.add(listener);
|
||||
shared = new Shared(multiListener, new Plain(multiListener, SHARED_CHANNEL_NAME, config));
|
||||
sharedChannels.put(config, shared);
|
||||
}
|
||||
return new Delegate(listener, shared);
|
||||
}
|
||||
}
|
||||
|
||||
static void close(Shared shared, Listener listener) {
|
||||
synchronized (mutex) {
|
||||
// remove this
|
||||
boolean removed = ((MultiListener) shared.listener).remove(listener);
|
||||
assert removed : "a listener should be removed";
|
||||
if (shared.decRef() == 0) {
|
||||
assert ((MultiListener) shared.listener).listeners.isEmpty();
|
||||
sharedChannels.remove(shared.channel.getConfig());
|
||||
shared.channel.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final Plain channel;
|
||||
private int refCount = 1;
|
||||
|
||||
Shared(MultiListener listener, Plain channel) {
|
||||
super(listener);
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
private void incRef() {
|
||||
refCount++;
|
||||
}
|
||||
|
||||
private int decRef() {
|
||||
--refCount;
|
||||
assert refCount >= 0 : "illegal ref counting, close called multiple times";
|
||||
return refCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(BytesReference data) throws Exception {
|
||||
channel.send(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
assert false : "Shared references should never be closed directly, only via Delegate";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void close(Listener listener) {
|
||||
close(this, listener);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A light weight delegate that wraps another channel, mainly to support delegating
|
||||
* the close method with the provided listener and not holding existing listener.
|
||||
*/
|
||||
private final static class Delegate extends MulticastChannel {
|
||||
|
||||
private final MulticastChannel channel;
|
||||
|
||||
Delegate(Listener listener, MulticastChannel channel) {
|
||||
super(listener);
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(BytesReference data) throws Exception {
|
||||
channel.send(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void close(Listener listener) {
|
||||
channel.close(listener); // we delegate here to the close with our listener, not with the delegate listener
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple implementation of a channel.
|
||||
*/
|
||||
@SuppressForbidden(reason = "I bind to wildcard addresses. I am a total nightmare")
|
||||
private static class Plain extends MulticastChannel {
|
||||
private final ESLogger logger;
|
||||
private final Config config;
|
||||
|
||||
private volatile MulticastSocket multicastSocket;
|
||||
private final DatagramPacket datagramPacketSend;
|
||||
private final DatagramPacket datagramPacketReceive;
|
||||
|
||||
private final Object sendMutex = new Object();
|
||||
private final Object receiveMutex = new Object();
|
||||
|
||||
private final Receiver receiver;
|
||||
private final Thread receiverThread;
|
||||
|
||||
Plain(Listener listener, String name, Config config) throws Exception {
|
||||
super(listener);
|
||||
this.logger = ESLoggerFactory.getLogger(name);
|
||||
this.config = config;
|
||||
this.datagramPacketReceive = new DatagramPacket(new byte[config.bufferSize], config.bufferSize);
|
||||
this.datagramPacketSend = new DatagramPacket(new byte[config.bufferSize], config.bufferSize, InetAddress.getByName(config.group), config.port);
|
||||
this.multicastSocket = buildMulticastSocket(config);
|
||||
this.receiver = new Receiver();
|
||||
this.receiverThread = daemonThreadFactory(Settings.builder().put("name", name).build(), "discovery#multicast#receiver").newThread(receiver);
|
||||
this.receiverThread.start();
|
||||
}
|
||||
|
||||
private MulticastSocket buildMulticastSocket(Config config) throws Exception {
|
||||
SocketAddress addr = new InetSocketAddress(InetAddress.getByName(config.group), config.port);
|
||||
MulticastSocket multicastSocket = new MulticastSocket(config.port);
|
||||
try {
|
||||
multicastSocket.setTimeToLive(config.ttl);
|
||||
// OSX is not smart enough to tell that a socket bound to the
|
||||
// 'lo0' interface needs to make sure to send the UDP packet
|
||||
// out of the lo0 interface, so we need to do some special
|
||||
// workarounds to fix it.
|
||||
if (config.deferToInterface) {
|
||||
// 'null' here tells the socket to deter to the interface set
|
||||
// with .setInterface
|
||||
multicastSocket.joinGroup(addr, null);
|
||||
multicastSocket.setInterface(config.multicastInterface);
|
||||
} else {
|
||||
multicastSocket.setInterface(config.multicastInterface);
|
||||
multicastSocket.joinGroup(InetAddress.getByName(config.group));
|
||||
}
|
||||
multicastSocket.setReceiveBufferSize(config.bufferSize);
|
||||
multicastSocket.setSendBufferSize(config.bufferSize);
|
||||
multicastSocket.setSoTimeout(60000);
|
||||
} catch (Throwable e) {
|
||||
IOUtils.closeWhileHandlingException(multicastSocket);
|
||||
throw e;
|
||||
}
|
||||
return multicastSocket;
|
||||
}
|
||||
|
||||
public Config getConfig() {
|
||||
return this.config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(BytesReference data) throws Exception {
|
||||
synchronized (sendMutex) {
|
||||
datagramPacketSend.setData(data.toBytes());
|
||||
multicastSocket.send(datagramPacketSend);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void close(Listener listener) {
|
||||
receiver.stop();
|
||||
receiverThread.interrupt();
|
||||
if (multicastSocket != null) {
|
||||
IOUtils.closeWhileHandlingException(multicastSocket);
|
||||
multicastSocket = null;
|
||||
}
|
||||
try {
|
||||
receiverThread.join(10000);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
private class Receiver implements Runnable {
|
||||
|
||||
private volatile boolean running = true;
|
||||
|
||||
public void stop() {
|
||||
running = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (running) {
|
||||
try {
|
||||
synchronized (receiveMutex) {
|
||||
try {
|
||||
multicastSocket.receive(datagramPacketReceive);
|
||||
} catch (SocketTimeoutException ignore) {
|
||||
continue;
|
||||
} catch (Exception e) {
|
||||
if (running) {
|
||||
if (multicastSocket.isClosed()) {
|
||||
logger.warn("multicast socket closed while running, restarting...");
|
||||
multicastSocket = buildMulticastSocket(config);
|
||||
} else {
|
||||
logger.warn("failed to receive packet, throttling...", e);
|
||||
Thread.sleep(500);
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (datagramPacketReceive.getData().length > 0) {
|
||||
listener.onMessage(new BytesArray(datagramPacketReceive.getData()), datagramPacketReceive.getSocketAddress());
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
if (running) {
|
||||
logger.warn("unexpected exception in multicast receiver", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,61 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.discovery.multicast;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
||||
public class MulticastDiscoveryPlugin extends Plugin {
|
||||
|
||||
private final Settings settings;
|
||||
|
||||
public MulticastDiscoveryPlugin(Settings settings) {
|
||||
this.settings = settings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return "discovery-multicast";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "Multicast Discovery Plugin";
|
||||
}
|
||||
|
||||
public void onModule(DiscoveryModule module) {
|
||||
if (settings.getAsBoolean("discovery.zen.ping.multicast.enabled", false)) {
|
||||
module.addZenPing(MulticastZenPing.class);
|
||||
}
|
||||
}
|
||||
|
||||
public void onModule(SettingsModule module) {
|
||||
module.registerSetting(MulticastZenPing.ADDRESS_SETTING);
|
||||
module.registerSetting(MulticastZenPing.GROUP_SETTING);
|
||||
module.registerSetting(MulticastZenPing.PORT_SETTING);
|
||||
module.registerSetting(MulticastZenPing.SHARED_SETTING);
|
||||
module.registerSetting(MulticastZenPing.TTL_SETTING);
|
||||
module.registerSetting(MulticastZenPing.BUFFER_SIZE_SETTING);
|
||||
module.registerSetting(MulticastZenPing.PING_ENABLED_SETTING);
|
||||
module.registerSetting(MulticastZenPing.DEFERE_TO_INTERFACE_SETTING);
|
||||
}
|
||||
}
|
|
@ -1,604 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.discovery.multicast;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.network.NetworkUtils;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
|
||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
|
||||
import static org.elasticsearch.cluster.node.DiscoveryNode.readNode;
|
||||
import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implements ZenPing {
|
||||
|
||||
public static final String ACTION_NAME = "internal:discovery/zen/multicast";
|
||||
|
||||
private static final byte[] INTERNAL_HEADER = new byte[]{1, 9, 8, 4};
|
||||
|
||||
private static final int PING_SIZE_ESTIMATE = 150;
|
||||
|
||||
private final String address;
|
||||
private final int port;
|
||||
private final String group;
|
||||
private final int bufferSize;
|
||||
private final int ttl;
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final TransportService transportService;
|
||||
private final ClusterName clusterName;
|
||||
private final NetworkService networkService;
|
||||
private final Version version;
|
||||
private volatile PingContextProvider contextProvider;
|
||||
|
||||
private final boolean pingEnabled;
|
||||
|
||||
private volatile MulticastChannel multicastChannel;
|
||||
|
||||
private final AtomicInteger pingIdGenerator = new AtomicInteger();
|
||||
private final Map<Integer, PingCollection> receivedResponses = newConcurrentMap();
|
||||
public static final Setting<String> ADDRESS_SETTING = Setting.simpleString("discovery.zen.ping.multicast.address", false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Integer> PORT_SETTING = Setting.intSetting("discovery.zen.ping.multicast.port", 54328, 0, (1<<16)-1, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<String> GROUP_SETTING = new Setting<>("discovery.zen.ping.multicast.group", "224.2.2.4", Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<ByteSizeValue> BUFFER_SIZE_SETTING = Setting.byteSizeSetting("discovery.zen.ping.multicast.buffer_size", new ByteSizeValue(2048, ByteSizeUnit.BYTES), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Integer> TTL_SETTING = Setting.intSetting("discovery.zen.ping.multicast.ttl", 3, 0, 255, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> PING_ENABLED_SETTING = Setting.boolSetting("discovery.zen.ping.multicast.ping.enabled", true, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> SHARED_SETTING = Setting.boolSetting("discovery.zen.ping.multicast.shared", Constants.MAC_OS_X, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> DEFERE_TO_INTERFACE_SETTING = Setting.boolSetting("discovery.zen.ping.multicast.defer_group_to_set_interface", Constants.MAC_OS_X, false, Setting.Scope.CLUSTER);
|
||||
|
||||
public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version) {
|
||||
this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS), version);
|
||||
}
|
||||
|
||||
@Inject
|
||||
public MulticastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService, Version version) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.transportService = transportService;
|
||||
this.clusterName = clusterName;
|
||||
this.networkService = networkService;
|
||||
this.version = version;
|
||||
|
||||
this.address = ADDRESS_SETTING.exists(settings) ? ADDRESS_SETTING.get(settings) : null;
|
||||
this.port = PORT_SETTING.get(settings);
|
||||
this.group = GROUP_SETTING.get(settings);
|
||||
this.bufferSize = BUFFER_SIZE_SETTING.get(settings).bytesAsInt();
|
||||
this.ttl = TTL_SETTING.get(settings);
|
||||
this.pingEnabled = PING_ENABLED_SETTING.get(settings);
|
||||
|
||||
logger.debug("using group [{}], with port [{}], ttl [{}], and address [{}]", group, port, ttl, address);
|
||||
|
||||
this.transportService.registerRequestHandler(ACTION_NAME, MulticastPingResponse::new, ThreadPool.Names.SAME, new MulticastPingResponseRequestHandler());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPingContextProvider(PingContextProvider nodesProvider) {
|
||||
if (lifecycle.started()) {
|
||||
throw new IllegalStateException("Can't set nodes provider when started");
|
||||
}
|
||||
this.contextProvider = nodesProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
try {
|
||||
// we know OSX has bugs in the JVM when creating multiple instances of multicast sockets
|
||||
// causing for "socket close" exceptions when receive and/or crashes
|
||||
boolean shared = SHARED_SETTING.get(settings);
|
||||
// OSX does not correctly send multicasts FROM the right interface
|
||||
boolean deferToInterface = DEFERE_TO_INTERFACE_SETTING.get(settings);
|
||||
|
||||
final MulticastChannel.Config config = new MulticastChannel.Config(port, group, bufferSize, ttl,
|
||||
getMulticastInterface(), deferToInterface);
|
||||
SecurityManager sm = System.getSecurityManager();
|
||||
if (sm != null) {
|
||||
sm.checkPermission(new SpecialPermission());
|
||||
}
|
||||
multicastChannel = AccessController.doPrivileged(new PrivilegedExceptionAction<MulticastChannel>() {
|
||||
@Override
|
||||
public MulticastChannel run() throws Exception {
|
||||
return MulticastChannel.getChannel(nodeName(), shared, config, new Receiver());
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
String msg = "multicast failed to start [{}], disabling. Consider using IPv4 only (by defining env. variable `ES_USE_IPV4`)";
|
||||
logger.info(msg, t, ExceptionsHelper.detailedMessage(t));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("deprecation") // Used to support funky configuration options
|
||||
private InetAddress getMulticastInterface() throws IOException {
|
||||
// don't use publish address, the use case for that is e.g. a firewall or proxy and
|
||||
// may not even be bound to an interface on this machine! use the first bound address.
|
||||
List<InetAddress> addresses = Arrays.asList(networkService.resolveBindHostAddresses(address == null ? null : new String[] { address }));
|
||||
NetworkUtils.sortAddresses(addresses);
|
||||
return addresses.get(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
if (multicastChannel != null) {
|
||||
multicastChannel.close();
|
||||
multicastChannel = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
}
|
||||
|
||||
public PingResponse[] pingAndWait(TimeValue timeout) {
|
||||
final AtomicReference<PingResponse[]> response = new AtomicReference<>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
try {
|
||||
ping(new PingListener() {
|
||||
@Override
|
||||
public void onPing(PingResponse[] pings) {
|
||||
response.set(pings);
|
||||
latch.countDown();
|
||||
}
|
||||
}, timeout);
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
logger.debug("Ping execution rejected", ex);
|
||||
return PingResponse.EMPTY;
|
||||
}
|
||||
try {
|
||||
latch.await();
|
||||
return response.get();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return PingResponse.EMPTY;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void ping(final PingListener listener, final TimeValue timeout) {
|
||||
if (!pingEnabled || multicastChannel == null) {
|
||||
threadPool.generic().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
listener.onPing(PingResponse.EMPTY);
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
final int id = pingIdGenerator.incrementAndGet();
|
||||
try {
|
||||
receivedResponses.put(id, new PingCollection());
|
||||
sendPingRequest(id);
|
||||
// try and send another ping request halfway through (just in case someone woke up during it...)
|
||||
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
|
||||
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
logger.warn("[{}] failed to send second ping request", t, id);
|
||||
finalizePingCycle(id, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doRun() {
|
||||
sendPingRequest(id);
|
||||
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
logger.warn("[{}] failed to send third ping request", t, id);
|
||||
finalizePingCycle(id, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doRun() {
|
||||
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
|
||||
PingCollection collection = receivedResponses.get(id);
|
||||
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
|
||||
receivedResponses.put(id, finalizingPingCollection);
|
||||
logger.trace("[{}] sending last pings", id);
|
||||
sendPingRequest(id);
|
||||
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
logger.warn("[{}] failed to finalize ping", t, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
finalizePingCycle(id, listener);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to ping", e);
|
||||
finalizePingCycle(id, listener);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* takes all pings collected for a given id and pass them to the given listener.
|
||||
* this method is safe to call multiple times as is guaranteed to only finalize once.
|
||||
*/
|
||||
private void finalizePingCycle(int id, final PingListener listener) {
|
||||
PingCollection responses = receivedResponses.remove(id);
|
||||
if (responses != null) {
|
||||
listener.onPing(responses.toArray());
|
||||
}
|
||||
}
|
||||
|
||||
private void sendPingRequest(int id) {
|
||||
try {
|
||||
BytesStreamOutput out = new BytesStreamOutput(PING_SIZE_ESTIMATE);
|
||||
out.writeBytes(INTERNAL_HEADER);
|
||||
// TODO: change to min_required version!
|
||||
Version.writeVersion(version, out);
|
||||
out.writeInt(id);
|
||||
clusterName.writeTo(out);
|
||||
contextProvider.nodes().localNode().writeTo(out);
|
||||
out.close();
|
||||
multicastChannel.send(out.bytes());
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}] sending ping request", id);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (lifecycle.stoppedOrClosed()) {
|
||||
return;
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to send multicast ping request", e);
|
||||
} else {
|
||||
logger.warn("failed to send multicast ping request: {}", ExceptionsHelper.detailedMessage(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class FinalizingPingCollection extends PingCollection {
|
||||
final private PingCollection internalCollection;
|
||||
final private int expectedResponses;
|
||||
final private AtomicInteger responseCount;
|
||||
final private PingListener listener;
|
||||
final private int id;
|
||||
|
||||
public FinalizingPingCollection(int id, PingCollection internalCollection, int expectedResponses, PingListener listener) {
|
||||
this.id = id;
|
||||
this.internalCollection = internalCollection;
|
||||
this.expectedResponses = expectedResponses;
|
||||
this.responseCount = new AtomicInteger();
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean addPing(PingResponse ping) {
|
||||
if (internalCollection.addPing(ping)) {
|
||||
if (responseCount.incrementAndGet() >= expectedResponses) {
|
||||
logger.trace("[{}] all nodes responded", id);
|
||||
finish();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void addPings(PingResponse[] pings) {
|
||||
internalCollection.addPings(pings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized PingResponse[] toArray() {
|
||||
return internalCollection.toArray();
|
||||
}
|
||||
|
||||
void finish() {
|
||||
// spawn another thread as we may be running on a network thread
|
||||
threadPool.generic().execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
logger.error("failed to call ping listener", t);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
finalizePingCycle(id, listener);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class MulticastPingResponseRequestHandler implements TransportRequestHandler<MulticastPingResponse> {
|
||||
@Override
|
||||
public void messageReceived(MulticastPingResponse request, TransportChannel channel) throws Exception {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}] received {}", request.id, request.pingResponse);
|
||||
}
|
||||
PingCollection responses = receivedResponses.get(request.id);
|
||||
if (responses == null) {
|
||||
logger.warn("received ping response {} with no matching id [{}]", request.pingResponse, request.id);
|
||||
} else {
|
||||
responses.addPing(request.pingResponse);
|
||||
}
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
}
|
||||
|
||||
public static class MulticastPingResponse extends TransportRequest {
|
||||
|
||||
int id;
|
||||
|
||||
PingResponse pingResponse;
|
||||
|
||||
public MulticastPingResponse() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
id = in.readInt();
|
||||
pingResponse = PingResponse.readPingResponse(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeInt(id);
|
||||
pingResponse.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private class Receiver implements MulticastChannel.Listener {
|
||||
|
||||
@Override
|
||||
public void onMessage(BytesReference data, SocketAddress address) {
|
||||
int id = -1;
|
||||
DiscoveryNode requestingNodeX = null;
|
||||
ClusterName clusterName = null;
|
||||
|
||||
Map<String, Object> externalPingData = null;
|
||||
XContentType xContentType = null;
|
||||
|
||||
try {
|
||||
boolean internal = false;
|
||||
if (data.length() > 4) {
|
||||
int counter = 0;
|
||||
for (; counter < INTERNAL_HEADER.length; counter++) {
|
||||
if (data.get(counter) != INTERNAL_HEADER[counter]) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (counter == INTERNAL_HEADER.length) {
|
||||
internal = true;
|
||||
}
|
||||
}
|
||||
if (internal) {
|
||||
StreamInput input = StreamInput.wrap(new BytesArray(data.toBytes(), INTERNAL_HEADER.length, data.length() - INTERNAL_HEADER.length));
|
||||
Version version = Version.readVersion(input);
|
||||
input.setVersion(version);
|
||||
id = input.readInt();
|
||||
clusterName = ClusterName.readClusterName(input);
|
||||
requestingNodeX = readNode(input);
|
||||
} else {
|
||||
xContentType = XContentFactory.xContentType(data);
|
||||
if (xContentType != null) {
|
||||
// an external ping
|
||||
try (XContentParser parser = XContentFactory.xContent(xContentType).createParser(data)) {
|
||||
externalPingData = parser.map();
|
||||
}
|
||||
} else {
|
||||
throw new IllegalStateException("failed multicast message, probably message from previous version");
|
||||
}
|
||||
}
|
||||
if (externalPingData != null) {
|
||||
handleExternalPingRequest(externalPingData, xContentType, address);
|
||||
} else {
|
||||
handleNodePingRequest(id, requestingNodeX, clusterName);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (!lifecycle.started() || (e instanceof EsRejectedExecutionException)) {
|
||||
logger.debug("failed to read requesting data from {}", e, address);
|
||||
} else {
|
||||
logger.warn("failed to read requesting data from {}", e, address);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void handleExternalPingRequest(Map<String, Object> externalPingData, XContentType contentType, SocketAddress remoteAddress) {
|
||||
if (externalPingData.containsKey("response")) {
|
||||
// ignoring responses sent over the multicast channel
|
||||
logger.trace("got an external ping response (ignoring) from {}, content {}", remoteAddress, externalPingData);
|
||||
return;
|
||||
}
|
||||
|
||||
if (multicastChannel == null) {
|
||||
logger.debug("can't send ping response, no socket, from {}, content {}", remoteAddress, externalPingData);
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, Object> request = (Map<String, Object>) externalPingData.get("request");
|
||||
if (request == null) {
|
||||
logger.warn("malformed external ping request, no 'request' element from {}, content {}", remoteAddress, externalPingData);
|
||||
return;
|
||||
}
|
||||
|
||||
final String requestClusterName = request.containsKey("cluster_name") ? request.get("cluster_name").toString() : request.containsKey("clusterName") ? request.get("clusterName").toString() : null;
|
||||
if (requestClusterName == null) {
|
||||
logger.warn("malformed external ping request, missing 'cluster_name' element within request, from {}, content {}", remoteAddress, externalPingData);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!requestClusterName.equals(clusterName.value())) {
|
||||
logger.trace("got request for cluster_name {}, but our cluster_name is {}, from {}, content {}",
|
||||
requestClusterName, clusterName.value(), remoteAddress, externalPingData);
|
||||
return;
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("got external ping request from {}, content {}", remoteAddress, externalPingData);
|
||||
}
|
||||
|
||||
try {
|
||||
DiscoveryNode localNode = contextProvider.nodes().localNode();
|
||||
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(contentType);
|
||||
builder.startObject().startObject("response");
|
||||
builder.field("cluster_name", clusterName.value());
|
||||
builder.startObject("version").field("number", version.number()).field("snapshot_build", version.snapshot).endObject();
|
||||
builder.field("transport_address", localNode.address().toString());
|
||||
|
||||
if (contextProvider.nodeService() != null) {
|
||||
for (Map.Entry<String, String> attr : contextProvider.nodeService().attributes().entrySet()) {
|
||||
builder.field(attr.getKey(), attr.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
builder.startObject("attributes");
|
||||
for (ObjectObjectCursor<String, String> attr : localNode.attributes()) {
|
||||
builder.field(attr.key, attr.value);
|
||||
}
|
||||
builder.endObject();
|
||||
|
||||
builder.endObject().endObject();
|
||||
multicastChannel.send(builder.bytes());
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("sending external ping response {}", builder.string());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to send external multicast response", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
|
||||
if (!pingEnabled || multicastChannel == null) {
|
||||
return;
|
||||
}
|
||||
final DiscoveryNodes discoveryNodes = contextProvider.nodes();
|
||||
final DiscoveryNode requestingNode = requestingNodeX;
|
||||
if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
|
||||
// that's me, ignore
|
||||
return;
|
||||
}
|
||||
if (!requestClusterName.equals(clusterName)) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}] received ping_request from [{}], but wrong cluster_name [{}], expected [{}], ignoring",
|
||||
id, requestingNode, requestClusterName.value(), clusterName.value());
|
||||
}
|
||||
return;
|
||||
}
|
||||
// don't connect between two client nodes, no need for that...
|
||||
if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}] received ping_request from [{}], both are client nodes, ignoring", id, requestingNode, requestClusterName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
|
||||
multicastPingResponse.id = id;
|
||||
multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}] received ping_request from [{}], sending {}", id, requestingNode, multicastPingResponse.pingResponse);
|
||||
}
|
||||
|
||||
if (!transportService.nodeConnected(requestingNode)) {
|
||||
// do the connect and send on a thread pool
|
||||
threadPool.generic().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// connect to the node if possible
|
||||
try {
|
||||
transportService.connectToNode(requestingNode);
|
||||
transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
if (lifecycle.started()) {
|
||||
logger.warn("failed to connect to requesting node {}", e, requestingNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
if (lifecycle.started()) {
|
||||
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,23 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
grant {
|
||||
// needed to bind multicast to arbitrary port
|
||||
permission java.net.SocketPermission "localhost:1024-", "listen,resolve";
|
||||
};
|
|
@ -1,41 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.discovery.multicast;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Name;
|
||||
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.elasticsearch.test.rest.RestTestCandidate;
|
||||
import org.elasticsearch.test.rest.parser.RestTestParseException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class MulticastDiscoveryRestIT extends ESRestTestCase {
|
||||
|
||||
public MulticastDiscoveryRestIT(@Name("yaml") RestTestCandidate testCandidate) {
|
||||
super(testCandidate);
|
||||
}
|
||||
|
||||
@ParametersFactory
|
||||
public static Iterable<Object[]> parameters() throws IOException, RestTestParseException {
|
||||
return ESRestTestCase.createParameters(0, 1);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,192 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.discovery.multicast;
|
||||
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
|
||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
||||
import org.elasticsearch.node.service.NodeService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.InetAddress;
|
||||
import java.net.MulticastSocket;
|
||||
|
||||
public class MulticastZenPingTests extends ESTestCase {
|
||||
|
||||
private Settings buildRandomMulticast(Settings settings) {
|
||||
Settings.Builder builder = Settings.builder().put(settings);
|
||||
builder.put("discovery.zen.ping.multicast.group", "224.2.3." + randomIntBetween(0, 255));
|
||||
builder.put("discovery.zen.ping.multicast.port", randomIntBetween(55000, 56000));
|
||||
builder.put("discovery.zen.ping.multicast.enabled", true);
|
||||
if (randomBoolean()) {
|
||||
builder.put("discovery.zen.ping.multicast.shared", randomBoolean());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public void testSimplePings() throws InterruptedException {
|
||||
assumeTrue("https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=193246", Constants.FREE_BSD == false);
|
||||
Settings settings = Settings.EMPTY;
|
||||
settings = buildRandomMulticast(settings);
|
||||
Thread.sleep(30000);
|
||||
|
||||
ThreadPool threadPool = new ThreadPool("testSimplePings");
|
||||
final ClusterName clusterName = new ClusterName("test");
|
||||
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start();
|
||||
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
|
||||
|
||||
final TransportService transportServiceB = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start();
|
||||
final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceB.boundAddress().publishAddress(), Version.CURRENT);
|
||||
|
||||
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT);
|
||||
zenPingA.setPingContextProvider(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
return DiscoveryNodes.builder().put(nodeA).localNodeId("A").build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeService nodeService() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nodeHasJoinedClusterOnce() {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
zenPingA.start();
|
||||
|
||||
MulticastZenPing zenPingB = new MulticastZenPing(threadPool, transportServiceB, clusterName, Version.CURRENT);
|
||||
zenPingB.setPingContextProvider(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
return DiscoveryNodes.builder().put(nodeB).localNodeId("B").build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeService nodeService() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nodeHasJoinedClusterOnce() {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
zenPingB.start();
|
||||
|
||||
try {
|
||||
logger.info("ping from A");
|
||||
ZenPing.PingResponse[] pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1));
|
||||
Assert.assertThat(pingResponses.length, Matchers.equalTo(1));
|
||||
Assert.assertThat(pingResponses[0].node().id(), Matchers.equalTo("B"));
|
||||
Assert.assertTrue(pingResponses[0].hasJoinedOnce());
|
||||
|
||||
logger.info("ping from B");
|
||||
pingResponses = zenPingB.pingAndWait(TimeValue.timeValueSeconds(1));
|
||||
Assert.assertThat(pingResponses.length, Matchers.equalTo(1));
|
||||
Assert.assertThat(pingResponses[0].node().id(), Matchers.equalTo("A"));
|
||||
Assert.assertFalse(pingResponses[0].hasJoinedOnce());
|
||||
|
||||
} finally {
|
||||
zenPingA.close();
|
||||
zenPingB.close();
|
||||
transportServiceA.close();
|
||||
transportServiceB.close();
|
||||
terminate(threadPool);
|
||||
}
|
||||
}
|
||||
|
||||
// This test is here because when running on FreeBSD, if no tests are
|
||||
// executed for the 'multicast' project it will assume everything
|
||||
// failed, so we need to have at least one test that runs.
|
||||
public void testAlwaysRun() throws Exception {
|
||||
assertTrue(true);
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "I bind to wildcard addresses. I am a total nightmare")
|
||||
public void testExternalPing() throws Exception {
|
||||
assumeTrue("https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=193246", Constants.FREE_BSD == false);
|
||||
Settings settings = Settings.EMPTY;
|
||||
settings = buildRandomMulticast(settings);
|
||||
|
||||
final ThreadPool threadPool = new ThreadPool("testExternalPing");
|
||||
final ClusterName clusterName = new ClusterName("test");
|
||||
final TransportService transportServiceA = new TransportService(new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), threadPool).start();
|
||||
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
|
||||
|
||||
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName, Version.CURRENT);
|
||||
zenPingA.setPingContextProvider(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
return DiscoveryNodes.builder().put(nodeA).localNodeId("A").build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeService nodeService() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean nodeHasJoinedClusterOnce() {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
zenPingA.start();
|
||||
|
||||
MulticastSocket multicastSocket = null;
|
||||
try {
|
||||
Loggers.getLogger(MulticastZenPing.class).setLevel("TRACE");
|
||||
multicastSocket = new MulticastSocket();
|
||||
multicastSocket.setReceiveBufferSize(2048);
|
||||
multicastSocket.setSendBufferSize(2048);
|
||||
multicastSocket.setSoTimeout(60000);
|
||||
|
||||
DatagramPacket datagramPacket = new DatagramPacket(new byte[2048], 2048, InetAddress.getByName("224.2.2.4"), 54328);
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder().startObject().startObject("request").field("cluster_name", "test").endObject().endObject();
|
||||
datagramPacket.setData(builder.bytes().toBytes());
|
||||
multicastSocket.send(datagramPacket);
|
||||
Thread.sleep(100);
|
||||
} finally {
|
||||
Loggers.getLogger(MulticastZenPing.class).setLevel("INFO");
|
||||
if (multicastSocket != null) multicastSocket.close();
|
||||
zenPingA.close();
|
||||
terminate(threadPool);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,13 +0,0 @@
|
|||
# Integration tests for multicast discovery
|
||||
#
|
||||
"Multicast discovery loaded":
|
||||
- do:
|
||||
cluster.state: {}
|
||||
|
||||
# Get master node id
|
||||
- set: { master_node: master }
|
||||
|
||||
- do:
|
||||
nodes.info: {}
|
||||
|
||||
- match: { nodes.$master.plugins.0.name: discovery-multicast }
|
|
@ -219,10 +219,6 @@ fi
|
|||
install_and_check_plugin discovery ec2 aws-java-sdk-core-*.jar
|
||||
}
|
||||
|
||||
@test "[$GROUP] install multicast discovery plugin" {
|
||||
install_and_check_plugin discovery multicast
|
||||
}
|
||||
|
||||
@test "[$GROUP] install lang-expression plugin" {
|
||||
install_and_check_plugin lang expression
|
||||
}
|
||||
|
@ -325,10 +321,6 @@ fi
|
|||
remove_plugin discovery-ec2
|
||||
}
|
||||
|
||||
@test "[$GROUP] remove multicast discovery plugin" {
|
||||
remove_plugin discovery-multicast
|
||||
}
|
||||
|
||||
@test "[$GROUP] remove lang-expression plugin" {
|
||||
remove_plugin lang-expression
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ List projects = [
|
|||
'plugins:discovery-azure',
|
||||
'plugins:discovery-ec2',
|
||||
'plugins:discovery-gce',
|
||||
'plugins:discovery-multicast',
|
||||
'plugins:ingest-geoip',
|
||||
'plugins:lang-javascript',
|
||||
'plugins:lang-painless',
|
||||
|
|
Loading…
Reference in New Issue