mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-24 22:09:24 +00:00
Log warning when application of an updated cluster state fails with an exception
This commit is contained in:
parent
e7baf30bd2
commit
ed8c289dec
@ -212,87 +212,95 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
||||
return;
|
||||
}
|
||||
if (previousClusterState != clusterState) {
|
||||
if (clusterState.nodes().localNodeMaster()) {
|
||||
// only the master controls the version numbers
|
||||
Builder builder = ClusterState.builder().state(clusterState).version(clusterState.version() + 1);
|
||||
if (previousClusterState.routingTable() != clusterState.routingTable()) {
|
||||
builder.routingTable(RoutingTable.builder().routingTable(clusterState.routingTable()).version(clusterState.routingTable().version() + 1));
|
||||
try {
|
||||
if (clusterState.nodes().localNodeMaster()) {
|
||||
// only the master controls the version numbers
|
||||
Builder builder = ClusterState.builder().state(clusterState).version(clusterState.version() + 1);
|
||||
if (previousClusterState.routingTable() != clusterState.routingTable()) {
|
||||
builder.routingTable(RoutingTable.builder().routingTable(clusterState.routingTable()).version(clusterState.routingTable().version() + 1));
|
||||
}
|
||||
if (previousClusterState.metaData() != clusterState.metaData()) {
|
||||
builder.metaData(MetaData.builder().metaData(clusterState.metaData()).version(clusterState.metaData().version() + 1));
|
||||
}
|
||||
clusterState = builder.build();
|
||||
} else {
|
||||
// we got this cluster state from the master, filter out based on versions (don't call listeners)
|
||||
if (clusterState.version() < previousClusterState.version()) {
|
||||
logger.debug("got old cluster state [" + clusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring");
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (previousClusterState.metaData() != clusterState.metaData()) {
|
||||
builder.metaData(MetaData.builder().metaData(clusterState.metaData()).version(clusterState.metaData().version() + 1));
|
||||
}
|
||||
clusterState = builder.build();
|
||||
} else {
|
||||
// we got this cluster state from the master, filter out based on versions (don't call listeners)
|
||||
if (clusterState.version() < previousClusterState.version()) {
|
||||
logger.debug("got old cluster state [" + clusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
StringBuilder sb = new StringBuilder("cluster state updated:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
|
||||
if (logger.isTraceEnabled()) {
|
||||
StringBuilder sb = new StringBuilder("cluster state updated:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
|
||||
sb.append(clusterState.nodes().prettyPrint());
|
||||
sb.append(clusterState.routingTable().prettyPrint());
|
||||
sb.append(clusterState.readOnlyRoutingNodes().prettyPrint());
|
||||
logger.trace(sb.toString());
|
||||
} else if (logger.isDebugEnabled()) {
|
||||
logger.debug("cluster state updated, version [{}], source [{}]", clusterState.version(), source);
|
||||
}
|
||||
|
||||
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, clusterState, previousClusterState);
|
||||
// new cluster state, notify all listeners
|
||||
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
|
||||
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
|
||||
String summary = nodesDelta.shortSummary();
|
||||
if (summary.length() > 0) {
|
||||
logger.info("{}, reason: {}", summary, source);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO, do this in parallel (and wait)
|
||||
for (DiscoveryNode node : nodesDelta.addedNodes()) {
|
||||
if (!nodeRequiresConnection(node)) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
transportService.connectToNode(node);
|
||||
} catch (Exception e) {
|
||||
// the fault detection will detect it as failed as well
|
||||
logger.warn("failed to connect to node [" + node + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
for (ClusterStateListener listener : priorityClusterStateListeners) {
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
for (ClusterStateListener listener : clusterStateListeners) {
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
for (ClusterStateListener listener : lastClusterStateListeners) {
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
|
||||
if (!nodesDelta.removedNodes().isEmpty()) {
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
for (DiscoveryNode node : nodesDelta.removedNodes()) {
|
||||
transportService.disconnectFromNode(node);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// if we are the master, publish the new state to all nodes
|
||||
if (clusterState.nodes().localNodeMaster()) {
|
||||
discoveryService.publish(clusterState);
|
||||
}
|
||||
|
||||
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
|
||||
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(clusterState);
|
||||
}
|
||||
|
||||
logger.debug("processing [{}]: done applying updated cluster_state", source);
|
||||
} catch (Exception e) {
|
||||
StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
|
||||
sb.append(clusterState.nodes().prettyPrint());
|
||||
sb.append(clusterState.routingTable().prettyPrint());
|
||||
sb.append(clusterState.readOnlyRoutingNodes().prettyPrint());
|
||||
logger.trace(sb.toString());
|
||||
} else if (logger.isDebugEnabled()) {
|
||||
logger.debug("cluster state updated, version [{}], source [{}]", clusterState.version(), source);
|
||||
logger.warn(sb.toString(), e);
|
||||
}
|
||||
|
||||
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, clusterState, previousClusterState);
|
||||
// new cluster state, notify all listeners
|
||||
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
|
||||
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
|
||||
String summary = nodesDelta.shortSummary();
|
||||
if (summary.length() > 0) {
|
||||
logger.info("{}, reason: {}", summary, source);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO, do this in parallel (and wait)
|
||||
for (DiscoveryNode node : nodesDelta.addedNodes()) {
|
||||
if (!nodeRequiresConnection(node)) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
transportService.connectToNode(node);
|
||||
} catch (Exception e) {
|
||||
// the fault detection will detect it as failed as well
|
||||
logger.warn("failed to connect to node [" + node + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
for (ClusterStateListener listener : priorityClusterStateListeners) {
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
for (ClusterStateListener listener : clusterStateListeners) {
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
for (ClusterStateListener listener : lastClusterStateListeners) {
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
|
||||
if (!nodesDelta.removedNodes().isEmpty()) {
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
for (DiscoveryNode node : nodesDelta.removedNodes()) {
|
||||
transportService.disconnectFromNode(node);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// if we are the master, publish the new state to all nodes
|
||||
if (clusterState.nodes().localNodeMaster()) {
|
||||
discoveryService.publish(clusterState);
|
||||
}
|
||||
|
||||
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
|
||||
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(clusterState);
|
||||
}
|
||||
|
||||
logger.debug("processing [{}]: done applying updated cluster_state", source);
|
||||
} else {
|
||||
logger.debug("processing [{}]: no change in cluster_state", source);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user