Improve logging for batched cluster state updates (#19255)

We've been slowly improving batch support in `ClusterService` so service won't need to implement this tricky logic themselves. These good changes are blessed but our logging infra didn't catch up and we now log things like:

```
[2016-07-04 21:51:22,318][DEBUG][cluster.service          ] [node_sm0] processing [put-mapping [type1],put-mapping [type1]]:
```

Depending on the `source` string this can get quite ugly (mostly in the ZenDiscovery area). 

This PR adds some infra to improve logging, keeping the non-batched task the same. As result the above line looks like:

```
[2016-07-04 21:44:45,047][DEBUG][cluster.service          ] [node_s0] processing [put-mapping[type0, type0, type0]]: execute
```

ZenDiscovery waiting on join moved from:

```
[2016-07-04 17:09:45,111][DEBUG][cluster.service          ] [node_t0] processing [elected_as_master, [1] nodes joined),elected_as_master, [1] nodes joined)]: execute
```

To

```
[2016-07-04 22:03:30,142][DEBUG][cluster.service          ] [node_t3] processing [elected_as_master ([3] nodes joined)[{node_t2}{R3hu3uoSQee0B6bkuw8pjw}{p9n28HDJQdiDMdh3tjxA5g}{127.0.0.1}{127.0.0.1:30107}, {node_t1}{ynYQfk7uR8qR5wKIysFlQg}{wa_OKuJHSl-Oyl9Gis-GXg}{127.0.0.1}{127.0.0.1:30106}, {node_t0}{pweq-2T4TlKPrEVAVW6bJw}{NPBSLXSTTguT1So0JsZY8g}{127.0.0.1}{127.0.0.1:30105}]]: execute
```

As a bonus, I removed all `zen-disco` prefixes to sources from that area.
This commit is contained in:
Boaz Leskes 2016-07-04 22:54:43 +02:00 committed by GitHub
parent 6861d3571e
commit 37c8c0fa03
8 changed files with 101 additions and 39 deletions

View File

@ -46,6 +46,25 @@ public interface ClusterStateTaskExecutor<T> {
default void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
}
/**
* Builds a concise description of a list of tasks (to be used in logging etc.).
*
* Note that the tasks given are not necessarily the same as those that will be passed to {@link #execute(ClusterState, List)}.
* but are guaranteed to be a subset of them. This method can be called multiple times with different lists before execution.
* This allows groupd task description but the submitting source.
*/
default String describeTasks(List<T> tasks) {
return tasks.stream().map(T::toString).reduce((s1,s2) -> {
if (s1.isEmpty()) {
return s2;
} else if (s2.isEmpty()) {
return s1;
} else {
return s1 + ", " + s2;
}
}).orElse("");
}
/**
* Represents the result of a batched execution of cluster state update tasks
* @param <T> the type of the cluster state update task

View File

@ -46,6 +46,11 @@ public abstract class ClusterStateUpdateTask implements ClusterStateTaskConfig,
return BatchResult.<ClusterStateUpdateTask>builder().successes(tasks).build(result);
}
@Override
public String describeTasks(List<ClusterStateUpdateTask> tasks) {
return ""; // one of task, source is enough
}
/**
* Update the cluster state based on the current state. Return the *same instance* if no state
* should be changed.

View File

@ -183,7 +183,7 @@ public class ShardStateAction extends AbstractComponent {
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
logger.warn("{} received shard failed for {}", request.failure, request.shardRouting.shardId(), request);
clusterService.submitStateUpdateTask(
"shard-failed (" + request.shardRouting + "), message [" + request.message + "]",
"shard-failed",
request,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateTaskExecutor,
@ -233,6 +233,11 @@ public class ShardStateAction extends AbstractComponent {
this.logger = logger;
}
@Override
public String describeTasks(List<ShardRoutingEntry> tasks) {
return tasks.stream().map(entry -> entry.getShardRouting().toString()).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
}
@Override
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
BatchResult.Builder<ShardRoutingEntry> batchResultBuilder = BatchResult.builder();
@ -346,7 +351,7 @@ public class ShardStateAction extends AbstractComponent {
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
logger.debug("{} received shard started for [{}]", request.shardRouting.shardId(), request);
clusterService.submitStateUpdateTask(
"shard-started (" + request.shardRouting + "), reason [" + request.message + "]",
"shard-started",
request,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateTaskExecutor,
@ -364,6 +369,11 @@ public class ShardStateAction extends AbstractComponent {
this.logger = logger;
}
@Override
public String describeTasks(List<ShardRoutingEntry> tasks) {
return tasks.stream().map(entry -> entry.getShardRouting().toString()).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
}
@Override
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
BatchResult.Builder<ShardRoutingEntry> builder = BatchResult.builder();

View File

@ -82,6 +82,11 @@ public class MetaDataMappingService extends AbstractComponent {
this.index = index;
this.indexUUID = indexUUID;
}
@Override
public String toString() {
return "[" + index + "][" + indexUUID + "]";
}
}
class RefreshTaskExecutor implements ClusterStateTaskExecutor<RefreshTask> {
@ -198,7 +203,7 @@ public class MetaDataMappingService extends AbstractComponent {
*/
public void refreshMapping(final String index, final String indexUUID) {
final RefreshTask refreshTask = new RefreshTask(index, indexUUID);
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "]",
clusterService.submitStateUpdateTask("refresh-mapping",
refreshTask,
ClusterStateTaskConfig.build(Priority.HIGH),
refreshExecutor,
@ -347,10 +352,15 @@ public class MetaDataMappingService extends AbstractComponent {
return ClusterState.builder(currentState).metaData(builder).build();
}
@Override
public String describeTasks(List<PutMappingClusterStateUpdateRequest> tasks) {
return tasks.stream().map(PutMappingClusterStateUpdateRequest::type).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
}
}
public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]",
clusterService.submitStateUpdateTask("put-mapping",
request,
ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),
putMappingExecutor,

View File

@ -42,7 +42,6 @@ import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
@ -512,17 +511,17 @@ public class ClusterService extends AbstractLifecycleComponent {
<T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
final ArrayList<UpdateTask<T>> toExecute = new ArrayList<>();
final ArrayList<String> sources = new ArrayList<>();
final Map<String, ArrayList<T>> processTasksBySource = new HashMap<>();
synchronized (updateTasksPerExecutor) {
List<UpdateTask> pending = updateTasksPerExecutor.remove(executor);
if (pending != null) {
for (UpdateTask<T> task : pending) {
if (task.processed.getAndSet(true) == false) {
logger.trace("will process [{}]", task.source);
logger.trace("will process [{}[{}]]", task.source, task.task);
toExecute.add(task);
sources.add(task.source);
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task.task);
} else {
logger.trace("skipping [{}], already processed", task.source);
logger.trace("skipping [{}[{}]], already processed", task.source, task.task);
}
}
}
@ -530,15 +529,19 @@ public class ClusterService extends AbstractLifecycleComponent {
if (toExecute.isEmpty()) {
return;
}
final String source = Strings.collectionToCommaDelimitedString(sources);
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
String tasks = executor.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster_service not started", source);
logger.debug("processing [{}]: ignoring, cluster_service not started", tasksSummary);
return;
}
logger.debug("processing [{}]: execute", source);
logger.debug("processing [{}]: execute", tasksSummary);
ClusterState previousClusterState = clusterState;
if (!previousClusterState.nodes().isLocalNodeElectedMaster() && executor.runOnlyOnMaster()) {
logger.debug("failing [{}]: local node is no longer master", source);
logger.debug("failing [{}]: local node is no longer master", tasksSummary);
toExecute.stream().forEach(task -> task.listener.onNoLongerMaster(task.source));
return;
}
@ -551,10 +554,10 @@ public class ClusterService extends AbstractLifecycleComponent {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
if (logger.isTraceEnabled()) {
logger.trace("failed to execute cluster state update in [{}], state:\nversion [{}], source [{}]\n{}{}{}", e, executionTime,
previousClusterState.version(), source, previousClusterState.nodes().prettyPrint(),
previousClusterState.version(), tasksSummary, previousClusterState.nodes().prettyPrint(),
previousClusterState.routingTable().prettyPrint(), previousClusterState.getRoutingNodes().prettyPrint());
}
warnAboutSlowTaskIfNeeded(executionTime, source);
warnAboutSlowTaskIfNeeded(executionTime, tasksSummary);
batchResult = ClusterStateTaskExecutor.BatchResult.<T>builder()
.failures(toExecute.stream().map(updateTask -> updateTask.task)::iterator, e)
.build(previousClusterState);
@ -597,8 +600,8 @@ public class ClusterService extends AbstractLifecycleComponent {
task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] no change in cluster_state", source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, source);
logger.debug("processing [{}]: took [{}] no change in cluster_state", tasksSummary, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, tasksSummary);
return;
}
@ -640,18 +643,18 @@ public class ClusterService extends AbstractLifecycleComponent {
newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", source, newClusterState.prettyPrint());
logger.trace("cluster state updated, source [{}]\n{}", tasksSummary, newClusterState.prettyPrint());
} else if (logger.isDebugEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source);
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), tasksSummary);
}
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(tasksSummary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String summary = nodesDelta.shortSummary();
if (summary.length() > 0) {
logger.info("{}, reason: {}", summary, source);
logger.info("{}, reason: {}", summary, tasksSummary);
}
}
@ -665,7 +668,7 @@ public class ClusterService extends AbstractLifecycleComponent {
try {
clusterStatePublisher.accept(clusterChangedEvent, ackListener);
} catch (Discovery.FailedToCommitClusterStateException t) {
logger.warn("failing [{}]: failed to commit cluster state version [{}]", t, source, newClusterState.version());
logger.warn("failing [{}]: failed to commit cluster state version [{}]", t, tasksSummary, newClusterState.version());
proccessedListeners.forEach(task -> task.listener.onFailure(task.source, t));
return;
}
@ -719,17 +722,17 @@ public class ClusterService extends AbstractLifecycleComponent {
try {
executor.clusterStatePublished(clusterChangedEvent);
} catch (Exception e) {
logger.error("exception thrown while notifying executor of new cluster state publication [{}]", e, source);
logger.error("exception thrown while notifying executor of new cluster state publication [{}]", e, tasksSummary);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] done applying updated cluster_state (version: {}, uuid: {})", source, executionTime,
newClusterState.version(), newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, source);
logger.debug("processing [{}]: took [{}] done applying updated cluster_state (version: {}, uuid: {})", tasksSummary,
executionTime, newClusterState.version(), newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, tasksSummary);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.warn("failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", e, executionTime,
newClusterState.version(), newClusterState.stateUUID(), source, newClusterState.prettyPrint());
newClusterState.version(), newClusterState.stateUUID(), tasksSummary, newClusterState.prettyPrint());
// TODO: do we want to call updateTask.onFailure here?
}

View File

@ -180,7 +180,7 @@ public class NodeJoinController extends AbstractComponent {
electionContext.addIncomingJoin(node, callback);
checkPendingJoinsAndElectIfNeeded();
} else {
clusterService.submitStateUpdateTask("zen-disco-join(node " + node + "])",
clusterService.submitStateUpdateTask("zen-disco-node-join",
node, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor, new JoinTaskListener(callback, logger));
}
@ -279,7 +279,7 @@ public class NodeJoinController extends AbstractComponent {
innerClose();
Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
final String source = "zen-disco-join(elected_as_master, [" + tasks.size() + "] nodes joined)";
final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";
tasks.put(BECOME_MASTER_TASK, joinProcessedListener);
clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
@ -288,7 +288,7 @@ public class NodeJoinController extends AbstractComponent {
public synchronized void closeAndProcessPending(String reason) {
innerClose();
Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
final String source = "zen-disco-join(election stopped [" + reason + "] nodes joined";
final String source = "zen-disco-process-pending-joins [" + reason + "]";
tasks.put(FINISH_ELECTION_NOT_MASTER_TASK, joinProcessedListener);
clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
@ -381,12 +381,22 @@ public class NodeJoinController extends AbstractComponent {
// a task indicated that the current node should become master, if no current master is known
private static final DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_", LocalTransportAddress.buildUnique(),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) {
@Override
public String toString() {
return ""; // this is not really task , so don't log anything about it...
}
};
// a task that is used to process pending joins without explicitly becoming a master and listening to the results
// this task is used when election is stop without the local node becoming a master per se (though it might
private static final DiscoveryNode FINISH_ELECTION_NOT_MASTER_TASK = new DiscoveryNode("_NOT_MASTER_TASK_",
LocalTransportAddress.buildUnique(), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
LocalTransportAddress.buildUnique(), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) {
@Override
public String toString() {
return ""; // this is not really task , so don't log anything about it...
}
};
class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {

View File

@ -406,7 +406,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
);
} else {
// process any incoming joins (they will fail because we are not the master)
nodeJoinController.stopElectionContext("not master");
nodeJoinController.stopElectionContext(masterNode + " elected");
// send join request
final boolean success = joinElectedMaster(masterNode);
@ -506,7 +506,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
return;
}
if (localNodeMaster()) {
clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
clusterService.submitStateUpdateTask("zen-disco-node-left(" + node + ")", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes()).remove(node.getId());
@ -545,7 +545,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
// nothing to do here...
return;
}
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, new ClusterStateUpdateTask(Priority.IMMEDIATE) {
clusterService.submitStateUpdateTask("zen-disco-node-failed(" + node + "), reason " + reason,
new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().nodeExists(node) == false) {
@ -592,7 +593,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
// We only set the new value. If the master doesn't see enough nodes it will revoke it's mastership.
return;
}
clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
clusterService.submitStateUpdateTask("zen-disco-mini-master-nodes-changed", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
// check if we have enough master nodes, if not, we need to move into joining the cluster again
@ -632,7 +633,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
logger.info("master_left [{}], reason [{}]", cause, masterNode, reason);
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
clusterService.submitStateUpdateTask("master_failed (" + masterNode + ")", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public boolean runOnlyOnMaster() {

View File

@ -313,7 +313,7 @@ public class TribeService extends AbstractLifecycleComponent {
public void clusterChanged(final ClusterChangedEvent event) {
logger.debug("[{}] received cluster event, [{}]", tribeName, event.source());
clusterService.submitStateUpdateTask(
"cluster event from " + tribeName + ", " + event.source(),
"cluster event from " + tribeName,
event,
ClusterStateTaskConfig.build(Priority.NORMAL),
executor,
@ -328,12 +328,16 @@ public class TribeService extends AbstractLifecycleComponent {
this.tribeName = tribeName;
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public String describeTasks(List<ClusterChangedEvent> tasks) {
return tasks.stream().map(ClusterChangedEvent::source).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
}
@Override
public BatchResult<ClusterChangedEvent> execute(ClusterState currentState, List<ClusterChangedEvent> tasks) throws Exception {
ClusterState accumulator = ClusterState.builder(currentState).build();