Allow to share multicast socket within jvm
Due to bugs in jvm (specifically OSX), running zen discovery tests causes for "socket close" failure on receive on multicast socket, and under some jvm versions, even crashes. This happens because of the creation of multiple multicast sockets within the same VM. In practice, in our tests, we use the same settings, so we can share the same multicast socket across multiple channels. This change creates an abstraction called MulticastChannel, that can be shared, with ref counting. Today, the shared option is only enabled under OSX. closes #5410
This commit is contained in:
parent
b7de1becf4
commit
3755f8e4df
|
@ -0,0 +1,355 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.common.network;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import org.apache.lucene.util.IOUtils;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
|
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||||
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.net.*;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A multicast channel that supports registering for receive events, and sending datagram packets. Allows
|
||||||
|
* to easily share the same multicast socket if it holds the same config.
|
||||||
|
*/
|
||||||
|
public abstract class MulticastChannel implements Closeable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builds a channel based on the provided config, allowing to control if sharing a channel that uses
|
||||||
|
* the same config is allowed or not.
|
||||||
|
*/
|
||||||
|
public static MulticastChannel getChannel(String name, boolean shared, Config config, Listener listener) throws Exception {
|
||||||
|
if (!shared) {
|
||||||
|
return new Plain(listener, name, config);
|
||||||
|
}
|
||||||
|
return Shared.getSharedChannel(listener, config);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Config of multicast channel.
|
||||||
|
*/
|
||||||
|
public static final class Config {
|
||||||
|
public final int port;
|
||||||
|
public final String group;
|
||||||
|
public final int bufferSize;
|
||||||
|
public final int ttl;
|
||||||
|
public final InetAddress multicastInterface;
|
||||||
|
|
||||||
|
public Config(int port, String group, int bufferSize, int ttl, InetAddress multicastInterface) {
|
||||||
|
this.port = port;
|
||||||
|
this.group = group;
|
||||||
|
this.bufferSize = bufferSize;
|
||||||
|
this.ttl = ttl;
|
||||||
|
this.multicastInterface = multicastInterface;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
|
||||||
|
Config config = (Config) o;
|
||||||
|
|
||||||
|
if (bufferSize != config.bufferSize) return false;
|
||||||
|
if (port != config.port) return false;
|
||||||
|
if (ttl != config.ttl) return false;
|
||||||
|
if (group != null ? !group.equals(config.group) : config.group != null) return false;
|
||||||
|
if (multicastInterface != null ? !multicastInterface.equals(config.multicastInterface) : config.multicastInterface != null)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
int result = port;
|
||||||
|
result = 31 * result + (group != null ? group.hashCode() : 0);
|
||||||
|
result = 31 * result + bufferSize;
|
||||||
|
result = 31 * result + ttl;
|
||||||
|
result = 31 * result + (multicastInterface != null ? multicastInterface.hashCode() : 0);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listener that gets called when data is received on the multicast channel.
|
||||||
|
*/
|
||||||
|
public static interface Listener {
|
||||||
|
void onMessage(BytesReference data, SocketAddress address);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple listener that wraps multiple listeners into one.
|
||||||
|
*/
|
||||||
|
public static class MultiListener implements Listener {
|
||||||
|
|
||||||
|
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();
|
||||||
|
|
||||||
|
public void add(Listener listener) {
|
||||||
|
this.listeners.add(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean remove(Listener listener) {
|
||||||
|
return this.listeners.remove(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(BytesReference data, SocketAddress address) {
|
||||||
|
for (Listener listener : listeners) {
|
||||||
|
listener.onMessage(data, address);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final Listener listener;
|
||||||
|
private AtomicBoolean closed = new AtomicBoolean();
|
||||||
|
|
||||||
|
protected MulticastChannel(Listener listener) {
|
||||||
|
this.listener = listener;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send the data over the multicast channel.
|
||||||
|
*/
|
||||||
|
public abstract void send(BytesReference data) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the channel.
|
||||||
|
*/
|
||||||
|
public void close() {
|
||||||
|
if (closed.compareAndSet(false, true)) {
|
||||||
|
close(listener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void close(Listener listener);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A shared channel that keeps a static map of Config -> Shared channels, and closes shared
|
||||||
|
* channel once their reference count has reached 0. It also handles de-registering relevant
|
||||||
|
* listener from the shared list of listeners.
|
||||||
|
*/
|
||||||
|
private static class Shared extends MulticastChannel {
|
||||||
|
|
||||||
|
private static final Map<Config, Shared> sharedChannels = Maps.newHashMap();
|
||||||
|
private static final Object mutex = new Object(); // global mutex so we don't sync on static methods (.class)
|
||||||
|
|
||||||
|
static MulticastChannel getSharedChannel(Listener listener, Config config) throws Exception {
|
||||||
|
synchronized (mutex) {
|
||||||
|
Shared shared = sharedChannels.get(config);
|
||||||
|
if (shared != null) {
|
||||||
|
shared.incRef();
|
||||||
|
((MultiListener) shared.listener).add(listener);
|
||||||
|
return new Delegate(listener, shared);
|
||||||
|
}
|
||||||
|
MultiListener multiListener = new MultiListener();
|
||||||
|
multiListener.add(listener);
|
||||||
|
shared = new Shared(multiListener, new Plain(multiListener, "#shared#", config));
|
||||||
|
sharedChannels.put(config, shared);
|
||||||
|
return shared;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void close(Shared shared, Listener listener) {
|
||||||
|
synchronized (mutex) {
|
||||||
|
// remove this
|
||||||
|
boolean removed = ((MultiListener) shared.listener).remove(listener);
|
||||||
|
assert removed : "a listener should be removed";
|
||||||
|
if (shared.decRef() == 0) {
|
||||||
|
sharedChannels.remove(shared.channel.getConfig());
|
||||||
|
shared.channel.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final Plain channel;
|
||||||
|
private int refCount = 1;
|
||||||
|
|
||||||
|
Shared(MultiListener listener, Plain channel) {
|
||||||
|
super(listener);
|
||||||
|
this.channel = channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void incRef() {
|
||||||
|
refCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int decRef() {
|
||||||
|
--refCount;
|
||||||
|
assert refCount >= 0 : "illegal ref counting, close called multiple times";
|
||||||
|
return refCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void send(BytesReference data) throws Exception {
|
||||||
|
channel.send(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void close(Listener listener) {
|
||||||
|
close(this, listener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A light weight delegate that wraps another channel, mainly to support delegating
|
||||||
|
* the close method with the provided listener and not holding existing listener.
|
||||||
|
*/
|
||||||
|
private static class Delegate extends MulticastChannel {
|
||||||
|
|
||||||
|
private final MulticastChannel channel;
|
||||||
|
|
||||||
|
Delegate(Listener listener, MulticastChannel channel) {
|
||||||
|
super(listener);
|
||||||
|
this.channel = channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void send(BytesReference data) throws Exception {
|
||||||
|
channel.send(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void close(Listener listener) {
|
||||||
|
channel.close(listener); // we delegate here to the close with our listener, not with the delegate listener
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple implementation of a channel.
|
||||||
|
*/
|
||||||
|
private static class Plain extends MulticastChannel {
|
||||||
|
private final ESLogger logger;
|
||||||
|
private final Config config;
|
||||||
|
|
||||||
|
private volatile MulticastSocket multicastSocket;
|
||||||
|
private final DatagramPacket datagramPacketSend;
|
||||||
|
private final DatagramPacket datagramPacketReceive;
|
||||||
|
|
||||||
|
private final Object sendMutex = new Object();
|
||||||
|
private final Object receiveMutex = new Object();
|
||||||
|
|
||||||
|
private final Receiver receiver;
|
||||||
|
private final Thread receiverThread;
|
||||||
|
|
||||||
|
Plain(Listener listener, String name, Config config) throws Exception {
|
||||||
|
super(listener);
|
||||||
|
this.logger = ESLoggerFactory.getLogger(name);
|
||||||
|
this.config = config;
|
||||||
|
this.datagramPacketReceive = new DatagramPacket(new byte[config.bufferSize], config.bufferSize);
|
||||||
|
this.datagramPacketSend = new DatagramPacket(new byte[config.bufferSize], config.bufferSize, InetAddress.getByName(config.group), config.port);
|
||||||
|
this.multicastSocket = buildMulticastSocket(config);
|
||||||
|
this.receiver = new Receiver();
|
||||||
|
this.receiverThread = daemonThreadFactory(ImmutableSettings.builder().put("name", name).build(), "discovery#multicast#receiver").newThread(receiver);
|
||||||
|
this.receiverThread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private MulticastSocket buildMulticastSocket(Config config) throws Exception {
|
||||||
|
MulticastSocket multicastSocket = new MulticastSocket(config.port);
|
||||||
|
try {
|
||||||
|
multicastSocket.setTimeToLive(config.ttl);
|
||||||
|
// set the send interface
|
||||||
|
multicastSocket.setInterface(config.multicastInterface);
|
||||||
|
multicastSocket.joinGroup(InetAddress.getByName(config.group));
|
||||||
|
multicastSocket.setReceiveBufferSize(config.bufferSize);
|
||||||
|
multicastSocket.setSendBufferSize(config.bufferSize);
|
||||||
|
multicastSocket.setSoTimeout(60000);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
IOUtils.closeWhileHandlingException(multicastSocket);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return multicastSocket;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Config getConfig() {
|
||||||
|
return this.config;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void send(BytesReference data) throws Exception {
|
||||||
|
synchronized (sendMutex) {
|
||||||
|
datagramPacketSend.setData(data.toBytes());
|
||||||
|
multicastSocket.send(datagramPacketSend);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void close(Listener listener) {
|
||||||
|
receiver.stop();
|
||||||
|
receiverThread.interrupt();
|
||||||
|
if (multicastSocket != null) {
|
||||||
|
IOUtils.closeWhileHandlingException(multicastSocket);
|
||||||
|
multicastSocket = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class Receiver implements Runnable {
|
||||||
|
|
||||||
|
private volatile boolean running = true;
|
||||||
|
|
||||||
|
public void stop() {
|
||||||
|
running = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (running) {
|
||||||
|
try {
|
||||||
|
synchronized (receiveMutex) {
|
||||||
|
try {
|
||||||
|
multicastSocket.receive(datagramPacketReceive);
|
||||||
|
} catch (SocketTimeoutException ignore) {
|
||||||
|
continue;
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (running) {
|
||||||
|
if (multicastSocket.isClosed()) {
|
||||||
|
logger.warn("multicast socket closed while running, restarting...");
|
||||||
|
multicastSocket = buildMulticastSocket(config);
|
||||||
|
} else {
|
||||||
|
logger.warn("failed to receive packet, throttling...", e);
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (datagramPacketReceive.getData().length > 0) {
|
||||||
|
listener.onMessage(new BytesArray(datagramPacketReceive.getData()), datagramPacketReceive.getSocketAddress());
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
if (running) {
|
||||||
|
logger.warn("unexpected exception in multicast receiver", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen.ping.multicast;
|
package org.elasticsearch.discovery.zen.ping.multicast;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.Constants;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
|
@ -26,9 +27,11 @@ import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.io.stream.*;
|
import org.elasticsearch.common.io.stream.*;
|
||||||
|
import org.elasticsearch.common.network.MulticastChannel;
|
||||||
import org.elasticsearch.common.network.NetworkService;
|
import org.elasticsearch.common.network.NetworkService;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
@ -43,7 +46,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.*;
|
import org.elasticsearch.transport.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.*;
|
import java.net.SocketAddress;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
@ -53,7 +56,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
import static org.elasticsearch.cluster.node.DiscoveryNode.readNode;
|
import static org.elasticsearch.cluster.node.DiscoveryNode.readNode;
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
|
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
|
||||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
|
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
|
||||||
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -77,18 +79,11 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
|
|
||||||
private final boolean pingEnabled;
|
private final boolean pingEnabled;
|
||||||
|
|
||||||
private volatile Receiver receiver;
|
private volatile MulticastChannel multicastChannel;
|
||||||
private volatile Thread receiverThread;
|
|
||||||
private volatile MulticastSocket multicastSocket;
|
|
||||||
private DatagramPacket datagramPacketSend;
|
|
||||||
private DatagramPacket datagramPacketReceive;
|
|
||||||
|
|
||||||
private final AtomicInteger pingIdGenerator = new AtomicInteger();
|
private final AtomicInteger pingIdGenerator = new AtomicInteger();
|
||||||
private final Map<Integer, ConcurrentMap<DiscoveryNode, PingResponse>> receivedResponses = newConcurrentMap();
|
private final Map<Integer, ConcurrentMap<DiscoveryNode, PingResponse>> receivedResponses = newConcurrentMap();
|
||||||
|
|
||||||
private final Object sendMutex = new Object();
|
|
||||||
private final Object receiveMutex = new Object();
|
|
||||||
|
|
||||||
public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version) {
|
public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version) {
|
||||||
this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS), version);
|
this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS), version);
|
||||||
}
|
}
|
||||||
|
@ -125,71 +120,26 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
@Override
|
@Override
|
||||||
protected void doStart() throws ElasticsearchException {
|
protected void doStart() throws ElasticsearchException {
|
||||||
try {
|
try {
|
||||||
this.datagramPacketReceive = new DatagramPacket(new byte[bufferSize], bufferSize);
|
// we know OSX has bugs in the JVM when creating multiple instances of multicast sockets
|
||||||
this.datagramPacketSend = new DatagramPacket(new byte[bufferSize], bufferSize, InetAddress.getByName(group), port);
|
// causing for "socket close" exceptions when receive and/or crashes
|
||||||
} catch (Exception e) {
|
boolean shared = componentSettings.getAsBoolean("shared", Constants.MAC_OS_X);
|
||||||
logger.warn("disabled, failed to setup multicast (datagram) discovery : {}", e.getMessage());
|
multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
|
||||||
|
new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
|
||||||
|
new Receiver());
|
||||||
|
} catch (Throwable t) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("disabled, failed to setup multicast (datagram) discovery", e);
|
logger.debug("multicast failed to start [{}], disabling", t, ExceptionsHelper.detailedMessage(t));
|
||||||
}
|
} else {
|
||||||
return;
|
logger.info("multicast failed to start [{}], disabling", ExceptionsHelper.detailedMessage(t));
|
||||||
}
|
|
||||||
|
|
||||||
InetAddress multicastInterface = null;
|
|
||||||
try {
|
|
||||||
MulticastSocket multicastSocket;
|
|
||||||
// if (NetworkUtils.canBindToMcastAddress()) {
|
|
||||||
// try {
|
|
||||||
// multicastSocket = new MulticastSocket(new InetSocketAddress(group, port));
|
|
||||||
// } catch (Exception e) {
|
|
||||||
// logger.debug("Failed to create multicast socket by binding to group address, binding to port", e);
|
|
||||||
// multicastSocket = new MulticastSocket(port);
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
multicastSocket = new MulticastSocket(port);
|
|
||||||
// }
|
|
||||||
|
|
||||||
multicastSocket.setTimeToLive(ttl);
|
|
||||||
|
|
||||||
// set the send interface
|
|
||||||
multicastInterface = networkService.resolvePublishHostAddress(address);
|
|
||||||
multicastSocket.setInterface(multicastInterface);
|
|
||||||
multicastSocket.joinGroup(InetAddress.getByName(group));
|
|
||||||
|
|
||||||
multicastSocket.setReceiveBufferSize(bufferSize);
|
|
||||||
multicastSocket.setSendBufferSize(bufferSize);
|
|
||||||
multicastSocket.setSoTimeout(60000);
|
|
||||||
|
|
||||||
this.multicastSocket = multicastSocket;
|
|
||||||
|
|
||||||
this.receiver = new Receiver();
|
|
||||||
this.receiverThread = daemonThreadFactory(settings, "discovery#multicast#receiver").newThread(receiver);
|
|
||||||
this.receiverThread.start();
|
|
||||||
} catch (Exception e) {
|
|
||||||
datagramPacketReceive = null;
|
|
||||||
datagramPacketSend = null;
|
|
||||||
if (multicastSocket != null) {
|
|
||||||
multicastSocket.close();
|
|
||||||
multicastSocket = null;
|
|
||||||
}
|
|
||||||
logger.warn("disabled, failed to setup multicast discovery on port [{}], [{}]: {}", port, multicastInterface, e.getMessage());
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("disabled, failed to setup multicast discovery on {}", e, multicastInterface);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doStop() throws ElasticsearchException {
|
protected void doStop() throws ElasticsearchException {
|
||||||
if (receiver != null) {
|
if (multicastChannel != null) {
|
||||||
receiver.stop();
|
multicastChannel.close();
|
||||||
}
|
multicastChannel = null;
|
||||||
if (receiverThread != null) {
|
|
||||||
receiverThread.interrupt();
|
|
||||||
}
|
|
||||||
if (multicastSocket != null) {
|
|
||||||
multicastSocket.close();
|
|
||||||
multicastSocket = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,33 +207,27 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendPingRequest(int id) {
|
private void sendPingRequest(int id) {
|
||||||
if (multicastSocket == null) {
|
try {
|
||||||
return;
|
BytesStreamOutput bStream = new BytesStreamOutput();
|
||||||
}
|
StreamOutput out = new HandlesStreamOutput(bStream);
|
||||||
synchronized (sendMutex) {
|
out.writeBytes(INTERNAL_HEADER);
|
||||||
try {
|
Version.writeVersion(version, out);
|
||||||
BytesStreamOutput bStream = new BytesStreamOutput();
|
out.writeInt(id);
|
||||||
StreamOutput out = new HandlesStreamOutput(bStream);
|
clusterName.writeTo(out);
|
||||||
out.writeBytes(INTERNAL_HEADER);
|
nodesProvider.nodes().localNode().writeTo(out);
|
||||||
Version.writeVersion(version, out);
|
out.close();
|
||||||
out.writeInt(id);
|
multicastChannel.send(bStream.bytes());
|
||||||
clusterName.writeTo(out);
|
if (logger.isTraceEnabled()) {
|
||||||
nodesProvider.nodes().localNode().writeTo(out);
|
logger.trace("[{}] sending ping request", id);
|
||||||
out.close();
|
}
|
||||||
datagramPacketSend.setData(bStream.bytes().toBytes());
|
} catch (Exception e) {
|
||||||
multicastSocket.send(datagramPacketSend);
|
if (lifecycle.stoppedOrClosed()) {
|
||||||
if (logger.isTraceEnabled()) {
|
return;
|
||||||
logger.trace("[{}] sending ping request", id);
|
}
|
||||||
}
|
if (logger.isDebugEnabled()) {
|
||||||
} catch (Exception e) {
|
logger.debug("failed to send multicast ping request", e);
|
||||||
if (lifecycle.stoppedOrClosed()) {
|
} else {
|
||||||
return;
|
logger.warn("failed to send multicast ping request: {}", ExceptionsHelper.detailedMessage(e));
|
||||||
}
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("failed to send multicast ping request", e);
|
|
||||||
} else {
|
|
||||||
logger.warn("failed to send multicast ping request: {}", ExceptionsHelper.detailedMessage(e));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -342,98 +286,59 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private class Receiver implements Runnable {
|
private class Receiver implements MulticastChannel.Listener {
|
||||||
|
|
||||||
private volatile boolean running = true;
|
|
||||||
|
|
||||||
public void stop() {
|
|
||||||
running = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void onMessage(BytesReference data, SocketAddress address) {
|
||||||
while (running) {
|
int id = -1;
|
||||||
try {
|
DiscoveryNode requestingNodeX = null;
|
||||||
int id = -1;
|
ClusterName clusterName = null;
|
||||||
DiscoveryNode requestingNodeX = null;
|
|
||||||
ClusterName clusterName = null;
|
|
||||||
|
|
||||||
Map<String, Object> externalPingData = null;
|
Map<String, Object> externalPingData = null;
|
||||||
XContentType xContentType = null;
|
XContentType xContentType = null;
|
||||||
|
|
||||||
synchronized (receiveMutex) {
|
try {
|
||||||
try {
|
boolean internal = false;
|
||||||
multicastSocket.receive(datagramPacketReceive);
|
if (data.length() > 4) {
|
||||||
} catch (SocketTimeoutException ignore) {
|
int counter = 0;
|
||||||
continue;
|
for (; counter < INTERNAL_HEADER.length; counter++) {
|
||||||
} catch (Exception e) {
|
if (data.get(counter) != INTERNAL_HEADER[counter]) {
|
||||||
if (running) {
|
break;
|
||||||
if (multicastSocket.isClosed()) {
|
|
||||||
logger.warn("multicast socket closed while running, restarting...");
|
|
||||||
// for some reason, the socket got closed on us while we are still running
|
|
||||||
// make a best effort in trying to start the multicast socket again...
|
|
||||||
threadPool.generic().execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
MulticastZenPing.this.stop();
|
|
||||||
MulticastZenPing.this.start();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
running = false;
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
logger.warn("failed to receive packet, throttling...", e);
|
|
||||||
Thread.sleep(500);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
boolean internal = false;
|
|
||||||
if (datagramPacketReceive.getLength() > 4) {
|
|
||||||
int counter = 0;
|
|
||||||
for (; counter < INTERNAL_HEADER.length; counter++) {
|
|
||||||
if (datagramPacketReceive.getData()[datagramPacketReceive.getOffset() + counter] != INTERNAL_HEADER[counter]) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (counter == INTERNAL_HEADER.length) {
|
|
||||||
internal = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (internal) {
|
|
||||||
StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset() + INTERNAL_HEADER.length, datagramPacketReceive.getLength(), true));
|
|
||||||
Version version = Version.readVersion(input);
|
|
||||||
input.setVersion(version);
|
|
||||||
id = input.readInt();
|
|
||||||
clusterName = ClusterName.readClusterName(input);
|
|
||||||
requestingNodeX = readNode(input);
|
|
||||||
} else {
|
|
||||||
xContentType = XContentFactory.xContentType(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength());
|
|
||||||
if (xContentType != null) {
|
|
||||||
// an external ping
|
|
||||||
externalPingData = XContentFactory.xContent(xContentType)
|
|
||||||
.createParser(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength())
|
|
||||||
.mapAndClose();
|
|
||||||
} else {
|
|
||||||
throw new ElasticsearchIllegalStateException("failed multicast message, probably message from previous version");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.warn("failed to read requesting data from {}", e, datagramPacketReceive.getSocketAddress());
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (externalPingData != null) {
|
if (counter == INTERNAL_HEADER.length) {
|
||||||
handleExternalPingRequest(externalPingData, xContentType, datagramPacketReceive.getSocketAddress());
|
internal = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (internal) {
|
||||||
|
StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(new BytesArray(data.toBytes(), INTERNAL_HEADER.length, data.length() - INTERNAL_HEADER.length)));
|
||||||
|
Version version = Version.readVersion(input);
|
||||||
|
input.setVersion(version);
|
||||||
|
id = input.readInt();
|
||||||
|
clusterName = ClusterName.readClusterName(input);
|
||||||
|
requestingNodeX = readNode(input);
|
||||||
|
} else {
|
||||||
|
xContentType = XContentFactory.xContentType(data);
|
||||||
|
if (xContentType != null) {
|
||||||
|
// an external ping
|
||||||
|
externalPingData = XContentFactory.xContent(xContentType)
|
||||||
|
.createParser(data)
|
||||||
|
.mapAndClose();
|
||||||
} else {
|
} else {
|
||||||
handleNodePingRequest(id, requestingNodeX, clusterName);
|
throw new ElasticsearchIllegalStateException("failed multicast message, probably message from previous version");
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
if (running) {
|
|
||||||
logger.warn("unexpected exception in multicast receiver", e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (externalPingData != null) {
|
||||||
|
handleExternalPingRequest(externalPingData, xContentType, address);
|
||||||
|
} else {
|
||||||
|
handleNodePingRequest(id, requestingNodeX, clusterName);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!lifecycle.started() || (e instanceof EsRejectedExecutionException)) {
|
||||||
|
logger.debug("failed to read requesting data from {}", e, address);
|
||||||
|
} else {
|
||||||
|
logger.warn("failed to read requesting data from {}", e, address);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -445,7 +350,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (multicastSocket == null) {
|
if (multicastChannel == null) {
|
||||||
logger.debug("can't send ping response, no socket, from {}, content {}", remoteAddress, externalPingData);
|
logger.debug("can't send ping response, no socket, from {}, content {}", remoteAddress, externalPingData);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -492,13 +397,9 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
|
|
||||||
builder.endObject().endObject();
|
builder.endObject().endObject();
|
||||||
synchronized (sendMutex) {
|
multicastChannel.send(builder.bytes());
|
||||||
BytesReference bytes = builder.bytes();
|
if (logger.isTraceEnabled()) {
|
||||||
datagramPacketSend.setData(bytes.array(), bytes.arrayOffset(), bytes.length());
|
logger.trace("sending external ping response {}", builder.string());
|
||||||
multicastSocket.send(datagramPacketSend);
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("sending external ping response {}", builder.string());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("failed to send external multicast response", e);
|
logger.warn("failed to send external multicast response", e);
|
||||||
|
@ -551,7 +452,9 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("failed to connect to requesting node {}", e, requestingNode);
|
if (lifecycle.started()) {
|
||||||
|
logger.warn("failed to connect to requesting node {}", e, requestingNode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -559,7 +462,9 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||||
@Override
|
@Override
|
||||||
public void handleException(TransportException exp) {
|
public void handleException(TransportException exp) {
|
||||||
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
|
if (lifecycle.started()) {
|
||||||
|
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,9 @@ public class MulticastZenPingTests extends ElasticsearchTestCase {
|
||||||
ImmutableSettings.Builder builder = ImmutableSettings.builder().put(settings);
|
ImmutableSettings.Builder builder = ImmutableSettings.builder().put(settings);
|
||||||
builder.put("discovery.zen.ping.multicast.group", "224.2.3." + randomIntBetween(0, 255));
|
builder.put("discovery.zen.ping.multicast.group", "224.2.3." + randomIntBetween(0, 255));
|
||||||
builder.put("discovery.zen.ping.multicast.port", randomIntBetween(55000, 56000));
|
builder.put("discovery.zen.ping.multicast.port", randomIntBetween(55000, 56000));
|
||||||
|
if (randomBoolean()) {
|
||||||
|
builder.put("discovery.zen.ping.multicast.shared", randomBoolean());
|
||||||
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue