From 3755f8e4dfec8debcae2ab876eda77a811ef4dbb Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Wed, 12 Mar 2014 22:45:49 +0100 Subject: [PATCH] Allow to share multicast socket within jvm Due to bugs in jvm (specifically OSX), running zen discovery tests causes for "socket close" failure on receive on multicast socket, and under some jvm versions, even crashes. This happens because of the creation of multiple multicast sockets within the same VM. In practice, in our tests, we use the same settings, so we can share the same multicast socket across multiple channels. This change creates an abstraction called MulticastChannel, that can be shared, with ref counting. Today, the shared option is only enabled under OSX. closes #5410 --- .../common/network/MulticastChannel.java | 355 ++++++++++++++++++ .../zen/ping/multicast/MulticastZenPing.java | 281 +++++--------- .../ping/multicast/MulticastZenPingTests.java | 3 + 3 files changed, 451 insertions(+), 188 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/network/MulticastChannel.java diff --git a/src/main/java/org/elasticsearch/common/network/MulticastChannel.java b/src/main/java/org/elasticsearch/common/network/MulticastChannel.java new file mode 100644 index 00000000000..2d08cb9a781 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/network/MulticastChannel.java @@ -0,0 +1,355 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.network; + +import com.google.common.collect.Maps; +import org.apache.lucene.util.IOUtils; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.settings.ImmutableSettings; + +import java.io.Closeable; +import java.net.*; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; + +/** + * A multicast channel that supports registering for receive events, and sending datagram packets. Allows + * to easily share the same multicast socket if it holds the same config. + */ +public abstract class MulticastChannel implements Closeable { + + /** + * Builds a channel based on the provided config, allowing to control if sharing a channel that uses + * the same config is allowed or not. + */ + public static MulticastChannel getChannel(String name, boolean shared, Config config, Listener listener) throws Exception { + if (!shared) { + return new Plain(listener, name, config); + } + return Shared.getSharedChannel(listener, config); + } + + /** + * Config of multicast channel. + */ + public static final class Config { + public final int port; + public final String group; + public final int bufferSize; + public final int ttl; + public final InetAddress multicastInterface; + + public Config(int port, String group, int bufferSize, int ttl, InetAddress multicastInterface) { + this.port = port; + this.group = group; + this.bufferSize = bufferSize; + this.ttl = ttl; + this.multicastInterface = multicastInterface; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Config config = (Config) o; + + if (bufferSize != config.bufferSize) return false; + if (port != config.port) return false; + if (ttl != config.ttl) return false; + if (group != null ? !group.equals(config.group) : config.group != null) return false; + if (multicastInterface != null ? !multicastInterface.equals(config.multicastInterface) : config.multicastInterface != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = port; + result = 31 * result + (group != null ? group.hashCode() : 0); + result = 31 * result + bufferSize; + result = 31 * result + ttl; + result = 31 * result + (multicastInterface != null ? multicastInterface.hashCode() : 0); + return result; + } + } + + /** + * Listener that gets called when data is received on the multicast channel. + */ + public static interface Listener { + void onMessage(BytesReference data, SocketAddress address); + } + + /** + * Simple listener that wraps multiple listeners into one. + */ + public static class MultiListener implements Listener { + + private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList(); + + public void add(Listener listener) { + this.listeners.add(listener); + } + + public boolean remove(Listener listener) { + return this.listeners.remove(listener); + } + + @Override + public void onMessage(BytesReference data, SocketAddress address) { + for (Listener listener : listeners) { + listener.onMessage(data, address); + } + } + } + + protected final Listener listener; + private AtomicBoolean closed = new AtomicBoolean(); + + protected MulticastChannel(Listener listener) { + this.listener = listener; + } + + /** + * Send the data over the multicast channel. + */ + public abstract void send(BytesReference data) throws Exception; + + /** + * Close the channel. + */ + public void close() { + if (closed.compareAndSet(false, true)) { + close(listener); + } + } + + protected abstract void close(Listener listener); + + /** + * A shared channel that keeps a static map of Config -> Shared channels, and closes shared + * channel once their reference count has reached 0. It also handles de-registering relevant + * listener from the shared list of listeners. + */ + private static class Shared extends MulticastChannel { + + private static final Map sharedChannels = Maps.newHashMap(); + private static final Object mutex = new Object(); // global mutex so we don't sync on static methods (.class) + + static MulticastChannel getSharedChannel(Listener listener, Config config) throws Exception { + synchronized (mutex) { + Shared shared = sharedChannels.get(config); + if (shared != null) { + shared.incRef(); + ((MultiListener) shared.listener).add(listener); + return new Delegate(listener, shared); + } + MultiListener multiListener = new MultiListener(); + multiListener.add(listener); + shared = new Shared(multiListener, new Plain(multiListener, "#shared#", config)); + sharedChannels.put(config, shared); + return shared; + } + } + + static void close(Shared shared, Listener listener) { + synchronized (mutex) { + // remove this + boolean removed = ((MultiListener) shared.listener).remove(listener); + assert removed : "a listener should be removed"; + if (shared.decRef() == 0) { + sharedChannels.remove(shared.channel.getConfig()); + shared.channel.close(); + } + } + } + + final Plain channel; + private int refCount = 1; + + Shared(MultiListener listener, Plain channel) { + super(listener); + this.channel = channel; + } + + private void incRef() { + refCount++; + } + + private int decRef() { + --refCount; + assert refCount >= 0 : "illegal ref counting, close called multiple times"; + return refCount; + } + + @Override + public void send(BytesReference data) throws Exception { + channel.send(data); + } + + @Override + protected void close(Listener listener) { + close(this, listener); + } + } + + /** + * A light weight delegate that wraps another channel, mainly to support delegating + * the close method with the provided listener and not holding existing listener. + */ + private static class Delegate extends MulticastChannel { + + private final MulticastChannel channel; + + Delegate(Listener listener, MulticastChannel channel) { + super(listener); + this.channel = channel; + } + + @Override + public void send(BytesReference data) throws Exception { + channel.send(data); + } + + @Override + protected void close(Listener listener) { + channel.close(listener); // we delegate here to the close with our listener, not with the delegate listener + } + } + + /** + * Simple implementation of a channel. + */ + private static class Plain extends MulticastChannel { + private final ESLogger logger; + private final Config config; + + private volatile MulticastSocket multicastSocket; + private final DatagramPacket datagramPacketSend; + private final DatagramPacket datagramPacketReceive; + + private final Object sendMutex = new Object(); + private final Object receiveMutex = new Object(); + + private final Receiver receiver; + private final Thread receiverThread; + + Plain(Listener listener, String name, Config config) throws Exception { + super(listener); + this.logger = ESLoggerFactory.getLogger(name); + this.config = config; + this.datagramPacketReceive = new DatagramPacket(new byte[config.bufferSize], config.bufferSize); + this.datagramPacketSend = new DatagramPacket(new byte[config.bufferSize], config.bufferSize, InetAddress.getByName(config.group), config.port); + this.multicastSocket = buildMulticastSocket(config); + this.receiver = new Receiver(); + this.receiverThread = daemonThreadFactory(ImmutableSettings.builder().put("name", name).build(), "discovery#multicast#receiver").newThread(receiver); + this.receiverThread.start(); + } + + private MulticastSocket buildMulticastSocket(Config config) throws Exception { + MulticastSocket multicastSocket = new MulticastSocket(config.port); + try { + multicastSocket.setTimeToLive(config.ttl); + // set the send interface + multicastSocket.setInterface(config.multicastInterface); + multicastSocket.joinGroup(InetAddress.getByName(config.group)); + multicastSocket.setReceiveBufferSize(config.bufferSize); + multicastSocket.setSendBufferSize(config.bufferSize); + multicastSocket.setSoTimeout(60000); + } catch (Throwable e) { + IOUtils.closeWhileHandlingException(multicastSocket); + throw e; + } + return multicastSocket; + } + + public Config getConfig() { + return this.config; + } + + public void send(BytesReference data) throws Exception { + synchronized (sendMutex) { + datagramPacketSend.setData(data.toBytes()); + multicastSocket.send(datagramPacketSend); + } + } + + @Override + protected void close(Listener listener) { + receiver.stop(); + receiverThread.interrupt(); + if (multicastSocket != null) { + IOUtils.closeWhileHandlingException(multicastSocket); + multicastSocket = null; + } + } + + private class Receiver implements Runnable { + + private volatile boolean running = true; + + public void stop() { + running = false; + } + + @Override + public void run() { + while (running) { + try { + synchronized (receiveMutex) { + try { + multicastSocket.receive(datagramPacketReceive); + } catch (SocketTimeoutException ignore) { + continue; + } catch (Exception e) { + if (running) { + if (multicastSocket.isClosed()) { + logger.warn("multicast socket closed while running, restarting..."); + multicastSocket = buildMulticastSocket(config); + } else { + logger.warn("failed to receive packet, throttling...", e); + Thread.sleep(500); + } + } + continue; + } + } + if (datagramPacketReceive.getData().length > 0) { + listener.onMessage(new BytesArray(datagramPacketReceive.getData()), datagramPacketReceive.getSocketAddress()); + } + } catch (Throwable e) { + if (running) { + logger.warn("unexpected exception in multicast receiver", e); + } + } + } + } + } + } +} diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index e9fe57a8da8..954db7a263c 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery.zen.ping.multicast; +import org.apache.lucene.util.Constants; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; @@ -26,9 +27,11 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.*; +import org.elasticsearch.common.network.MulticastChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -43,7 +46,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; -import java.net.*; +import java.net.SocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -53,7 +56,6 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.cluster.node.DiscoveryNode.readNode; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; /** * @@ -77,18 +79,11 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem private final boolean pingEnabled; - private volatile Receiver receiver; - private volatile Thread receiverThread; - private volatile MulticastSocket multicastSocket; - private DatagramPacket datagramPacketSend; - private DatagramPacket datagramPacketReceive; + private volatile MulticastChannel multicastChannel; private final AtomicInteger pingIdGenerator = new AtomicInteger(); private final Map> receivedResponses = newConcurrentMap(); - private final Object sendMutex = new Object(); - private final Object receiveMutex = new Object(); - public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version) { this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS), version); } @@ -125,71 +120,26 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem @Override protected void doStart() throws ElasticsearchException { try { - this.datagramPacketReceive = new DatagramPacket(new byte[bufferSize], bufferSize); - this.datagramPacketSend = new DatagramPacket(new byte[bufferSize], bufferSize, InetAddress.getByName(group), port); - } catch (Exception e) { - logger.warn("disabled, failed to setup multicast (datagram) discovery : {}", e.getMessage()); + // we know OSX has bugs in the JVM when creating multiple instances of multicast sockets + // causing for "socket close" exceptions when receive and/or crashes + boolean shared = componentSettings.getAsBoolean("shared", Constants.MAC_OS_X); + multicastChannel = MulticastChannel.getChannel(nodeName(), shared, + new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)), + new Receiver()); + } catch (Throwable t) { if (logger.isDebugEnabled()) { - logger.debug("disabled, failed to setup multicast (datagram) discovery", e); - } - return; - } - - InetAddress multicastInterface = null; - try { - MulticastSocket multicastSocket; -// if (NetworkUtils.canBindToMcastAddress()) { -// try { -// multicastSocket = new MulticastSocket(new InetSocketAddress(group, port)); -// } catch (Exception e) { -// logger.debug("Failed to create multicast socket by binding to group address, binding to port", e); -// multicastSocket = new MulticastSocket(port); -// } -// } else { - multicastSocket = new MulticastSocket(port); -// } - - multicastSocket.setTimeToLive(ttl); - - // set the send interface - multicastInterface = networkService.resolvePublishHostAddress(address); - multicastSocket.setInterface(multicastInterface); - multicastSocket.joinGroup(InetAddress.getByName(group)); - - multicastSocket.setReceiveBufferSize(bufferSize); - multicastSocket.setSendBufferSize(bufferSize); - multicastSocket.setSoTimeout(60000); - - this.multicastSocket = multicastSocket; - - this.receiver = new Receiver(); - this.receiverThread = daemonThreadFactory(settings, "discovery#multicast#receiver").newThread(receiver); - this.receiverThread.start(); - } catch (Exception e) { - datagramPacketReceive = null; - datagramPacketSend = null; - if (multicastSocket != null) { - multicastSocket.close(); - multicastSocket = null; - } - logger.warn("disabled, failed to setup multicast discovery on port [{}], [{}]: {}", port, multicastInterface, e.getMessage()); - if (logger.isDebugEnabled()) { - logger.debug("disabled, failed to setup multicast discovery on {}", e, multicastInterface); + logger.debug("multicast failed to start [{}], disabling", t, ExceptionsHelper.detailedMessage(t)); + } else { + logger.info("multicast failed to start [{}], disabling", ExceptionsHelper.detailedMessage(t)); } } } @Override protected void doStop() throws ElasticsearchException { - if (receiver != null) { - receiver.stop(); - } - if (receiverThread != null) { - receiverThread.interrupt(); - } - if (multicastSocket != null) { - multicastSocket.close(); - multicastSocket = null; + if (multicastChannel != null) { + multicastChannel.close(); + multicastChannel = null; } } @@ -257,33 +207,27 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem } private void sendPingRequest(int id) { - if (multicastSocket == null) { - return; - } - synchronized (sendMutex) { - try { - BytesStreamOutput bStream = new BytesStreamOutput(); - StreamOutput out = new HandlesStreamOutput(bStream); - out.writeBytes(INTERNAL_HEADER); - Version.writeVersion(version, out); - out.writeInt(id); - clusterName.writeTo(out); - nodesProvider.nodes().localNode().writeTo(out); - out.close(); - datagramPacketSend.setData(bStream.bytes().toBytes()); - multicastSocket.send(datagramPacketSend); - if (logger.isTraceEnabled()) { - logger.trace("[{}] sending ping request", id); - } - } catch (Exception e) { - if (lifecycle.stoppedOrClosed()) { - return; - } - if (logger.isDebugEnabled()) { - logger.debug("failed to send multicast ping request", e); - } else { - logger.warn("failed to send multicast ping request: {}", ExceptionsHelper.detailedMessage(e)); - } + try { + BytesStreamOutput bStream = new BytesStreamOutput(); + StreamOutput out = new HandlesStreamOutput(bStream); + out.writeBytes(INTERNAL_HEADER); + Version.writeVersion(version, out); + out.writeInt(id); + clusterName.writeTo(out); + nodesProvider.nodes().localNode().writeTo(out); + out.close(); + multicastChannel.send(bStream.bytes()); + if (logger.isTraceEnabled()) { + logger.trace("[{}] sending ping request", id); + } + } catch (Exception e) { + if (lifecycle.stoppedOrClosed()) { + return; + } + if (logger.isDebugEnabled()) { + logger.debug("failed to send multicast ping request", e); + } else { + logger.warn("failed to send multicast ping request: {}", ExceptionsHelper.detailedMessage(e)); } } } @@ -342,98 +286,59 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem } - private class Receiver implements Runnable { - - private volatile boolean running = true; - - public void stop() { - running = false; - } + private class Receiver implements MulticastChannel.Listener { @Override - public void run() { - while (running) { - try { - int id = -1; - DiscoveryNode requestingNodeX = null; - ClusterName clusterName = null; + public void onMessage(BytesReference data, SocketAddress address) { + int id = -1; + DiscoveryNode requestingNodeX = null; + ClusterName clusterName = null; - Map externalPingData = null; - XContentType xContentType = null; + Map externalPingData = null; + XContentType xContentType = null; - synchronized (receiveMutex) { - try { - multicastSocket.receive(datagramPacketReceive); - } catch (SocketTimeoutException ignore) { - continue; - } catch (Exception e) { - if (running) { - if (multicastSocket.isClosed()) { - logger.warn("multicast socket closed while running, restarting..."); - // for some reason, the socket got closed on us while we are still running - // make a best effort in trying to start the multicast socket again... - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - MulticastZenPing.this.stop(); - MulticastZenPing.this.start(); - } - }); - running = false; - return; - } else { - logger.warn("failed to receive packet, throttling...", e); - Thread.sleep(500); - } - } - continue; - } - try { - boolean internal = false; - if (datagramPacketReceive.getLength() > 4) { - int counter = 0; - for (; counter < INTERNAL_HEADER.length; counter++) { - if (datagramPacketReceive.getData()[datagramPacketReceive.getOffset() + counter] != INTERNAL_HEADER[counter]) { - break; - } - } - if (counter == INTERNAL_HEADER.length) { - internal = true; - } - } - if (internal) { - StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset() + INTERNAL_HEADER.length, datagramPacketReceive.getLength(), true)); - Version version = Version.readVersion(input); - input.setVersion(version); - id = input.readInt(); - clusterName = ClusterName.readClusterName(input); - requestingNodeX = readNode(input); - } else { - xContentType = XContentFactory.xContentType(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()); - if (xContentType != null) { - // an external ping - externalPingData = XContentFactory.xContent(xContentType) - .createParser(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()) - .mapAndClose(); - } else { - throw new ElasticsearchIllegalStateException("failed multicast message, probably message from previous version"); - } - } - } catch (Exception e) { - logger.warn("failed to read requesting data from {}", e, datagramPacketReceive.getSocketAddress()); - continue; + try { + boolean internal = false; + if (data.length() > 4) { + int counter = 0; + for (; counter < INTERNAL_HEADER.length; counter++) { + if (data.get(counter) != INTERNAL_HEADER[counter]) { + break; } } - if (externalPingData != null) { - handleExternalPingRequest(externalPingData, xContentType, datagramPacketReceive.getSocketAddress()); + if (counter == INTERNAL_HEADER.length) { + internal = true; + } + } + if (internal) { + StreamInput input = CachedStreamInput.cachedHandles(new BytesStreamInput(new BytesArray(data.toBytes(), INTERNAL_HEADER.length, data.length() - INTERNAL_HEADER.length))); + Version version = Version.readVersion(input); + input.setVersion(version); + id = input.readInt(); + clusterName = ClusterName.readClusterName(input); + requestingNodeX = readNode(input); + } else { + xContentType = XContentFactory.xContentType(data); + if (xContentType != null) { + // an external ping + externalPingData = XContentFactory.xContent(xContentType) + .createParser(data) + .mapAndClose(); } else { - handleNodePingRequest(id, requestingNodeX, clusterName); - } - } catch (Exception e) { - if (running) { - logger.warn("unexpected exception in multicast receiver", e); + throw new ElasticsearchIllegalStateException("failed multicast message, probably message from previous version"); } } + if (externalPingData != null) { + handleExternalPingRequest(externalPingData, xContentType, address); + } else { + handleNodePingRequest(id, requestingNodeX, clusterName); + } + } catch (Exception e) { + if (!lifecycle.started() || (e instanceof EsRejectedExecutionException)) { + logger.debug("failed to read requesting data from {}", e, address); + } else { + logger.warn("failed to read requesting data from {}", e, address); + } } } @@ -445,7 +350,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem return; } - if (multicastSocket == null) { + if (multicastChannel == null) { logger.debug("can't send ping response, no socket, from {}, content {}", remoteAddress, externalPingData); return; } @@ -492,13 +397,9 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem builder.endObject(); builder.endObject().endObject(); - synchronized (sendMutex) { - BytesReference bytes = builder.bytes(); - datagramPacketSend.setData(bytes.array(), bytes.arrayOffset(), bytes.length()); - multicastSocket.send(datagramPacketSend); - if (logger.isTraceEnabled()) { - logger.trace("sending external ping response {}", builder.string()); - } + multicastChannel.send(builder.bytes()); + if (logger.isTraceEnabled()) { + logger.trace("sending external ping response {}", builder.string()); } } catch (Exception e) { logger.warn("failed to send external multicast response", e); @@ -551,7 +452,9 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem } }); } catch (Exception e) { - logger.warn("failed to connect to requesting node {}", e, requestingNode); + if (lifecycle.started()) { + logger.warn("failed to connect to requesting node {}", e, requestingNode); + } } } }); @@ -559,7 +462,9 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { - logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); + if (lifecycle.started()) { + logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode); + } } }); } diff --git a/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingTests.java b/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingTests.java index ad987547700..f87455958db 100644 --- a/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingTests.java +++ b/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingTests.java @@ -53,6 +53,9 @@ public class MulticastZenPingTests extends ElasticsearchTestCase { ImmutableSettings.Builder builder = ImmutableSettings.builder().put(settings); builder.put("discovery.zen.ping.multicast.group", "224.2.3." + randomIntBetween(0, 255)); builder.put("discovery.zen.ping.multicast.port", randomIntBetween(55000, 56000)); + if (randomBoolean()) { + builder.put("discovery.zen.ping.multicast.shared", randomBoolean()); + } return builder.build(); }