diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 16bcb84d4ed..8938d42757c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -212,87 +212,95 @@ public class InternalClusterService extends AbstractLifecycleComponent 0) { + logger.info("{}, reason: {}", summary, source); + } + } + + // TODO, do this in parallel (and wait) + for (DiscoveryNode node : nodesDelta.addedNodes()) { + if (!nodeRequiresConnection(node)) { + continue; + } + try { + transportService.connectToNode(node); + } catch (Exception e) { + // the fault detection will detect it as failed as well + logger.warn("failed to connect to node [" + node + "]", e); + } + } + + for (ClusterStateListener listener : priorityClusterStateListeners) { + listener.clusterChanged(clusterChangedEvent); + } + for (ClusterStateListener listener : clusterStateListeners) { + listener.clusterChanged(clusterChangedEvent); + } + for (ClusterStateListener listener : lastClusterStateListeners) { + listener.clusterChanged(clusterChangedEvent); + } + + if (!nodesDelta.removedNodes().isEmpty()) { + threadPool.cached().execute(new Runnable() { + @Override public void run() { + for (DiscoveryNode node : nodesDelta.removedNodes()) { + transportService.disconnectFromNode(node); + } + } + }); + } + + // if we are the master, publish the new state to all nodes + if (clusterState.nodes().localNodeMaster()) { + discoveryService.publish(clusterState); + } + + if (updateTask instanceof ProcessedClusterStateUpdateTask) { + ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(clusterState); + } + + logger.debug("processing [{}]: done applying updated cluster_state", source); + } catch (Exception e) { + StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n"); sb.append(clusterState.nodes().prettyPrint()); sb.append(clusterState.routingTable().prettyPrint()); sb.append(clusterState.readOnlyRoutingNodes().prettyPrint()); - logger.trace(sb.toString()); - } else if (logger.isDebugEnabled()) { - logger.debug("cluster state updated, version [{}], source [{}]", clusterState.version(), source); + logger.warn(sb.toString(), e); } - - ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, clusterState, previousClusterState); - // new cluster state, notify all listeners - final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); - if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { - String summary = nodesDelta.shortSummary(); - if (summary.length() > 0) { - logger.info("{}, reason: {}", summary, source); - } - } - - // TODO, do this in parallel (and wait) - for (DiscoveryNode node : nodesDelta.addedNodes()) { - if (!nodeRequiresConnection(node)) { - continue; - } - try { - transportService.connectToNode(node); - } catch (Exception e) { - // the fault detection will detect it as failed as well - logger.warn("failed to connect to node [" + node + "]", e); - } - } - - for (ClusterStateListener listener : priorityClusterStateListeners) { - listener.clusterChanged(clusterChangedEvent); - } - for (ClusterStateListener listener : clusterStateListeners) { - listener.clusterChanged(clusterChangedEvent); - } - for (ClusterStateListener listener : lastClusterStateListeners) { - listener.clusterChanged(clusterChangedEvent); - } - - if (!nodesDelta.removedNodes().isEmpty()) { - threadPool.cached().execute(new Runnable() { - @Override public void run() { - for (DiscoveryNode node : nodesDelta.removedNodes()) { - transportService.disconnectFromNode(node); - } - } - }); - } - - // if we are the master, publish the new state to all nodes - if (clusterState.nodes().localNodeMaster()) { - discoveryService.publish(clusterState); - } - - if (updateTask instanceof ProcessedClusterStateUpdateTask) { - ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(clusterState); - } - - logger.debug("processing [{}]: done applying updated cluster_state", source); } else { logger.debug("processing [{}]: no change in cluster_state", source); }