From ff8b7409f7cb7c96b9887a9c69841ce865c20ea7 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 16 Aug 2014 15:36:37 +0200 Subject: [PATCH] [Discovery] add a debug log if a node responds to a publish request after publishing timed out. --- .../publish/PublishClusterStateAction.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 554848422ba..6f7098c6311 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -39,6 +39,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -82,12 +83,15 @@ public class PublishClusterStateAction extends AbstractComponent { publish(clusterState, new AckClusterStatePublishResponseHandler(clusterState.nodes().size() - 1, ackListener)); } - private void publish(ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) { + private void publish(final ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) { DiscoveryNode localNode = nodesProvider.nodes().localNode(); Map serializedStates = Maps.newHashMap(); + final AtomicBoolean timedOutWaitingForNodes = new AtomicBoolean(false); + final TimeValue publishTimeout = discoverySettings.getPublishTimeout(); + for (final DiscoveryNode node : clusterState.nodes()) { if (node.equals(localNode)) { continue; @@ -122,28 +126,30 @@ public class PublishClusterStateAction extends AbstractComponent { @Override public void handleResponse(TransportResponse.Empty response) { + if (timedOutWaitingForNodes.get()) { + logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, clusterState.version(), publishTimeout); + } publishResponseHandler.onResponse(node); } @Override public void handleException(TransportException exp) { - logger.debug("failed to send cluster state to [{}]", exp, node); + logger.debug("failed to send cluster state to {}", exp, node); publishResponseHandler.onFailure(node, exp); } }); } catch (Throwable t) { - logger.debug("error sending cluster state to [{}]", t, node); + logger.debug("error sending cluster state to {}", t, node); publishResponseHandler.onFailure(node, t); } } - TimeValue publishTimeout = discoverySettings.getPublishTimeout(); if (publishTimeout.millis() > 0) { // only wait if the publish timeout is configured... try { - boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout); - if (!awaited) { - logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout); + timedOutWaitingForNodes.set(!publishResponseHandler.awaitAllNodes(publishTimeout)); + if (timedOutWaitingForNodes.get()) { + logger.debug("timed out waiting for all nodes to process published state [{}] (timeout [{}])", clusterState.version(), publishTimeout); } } catch (InterruptedException e) { // ignore & restore interrupt