Logging: Add logging of slow cluster state tasks

Closes #10874
This commit is contained in:
Igor Motov 2015-04-30 13:47:19 -04:00
parent aa968f6b65
commit c165afb4d5
4 changed files with 392 additions and 7 deletions

View File

@ -20,7 +20,6 @@
package org.elasticsearch.cluster.service;
import com.google.common.collect.Iterables;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterState.Builder;
@ -59,6 +58,9 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
*/
public class InternalClusterService extends AbstractLifecycleComponent<ClusterService> implements ClusterService {
public static final String SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD = "cluster.service.slow_task_logging_threshold";
public static final String SETTING_CLUSTER_SERVICE_RECONNECT_INTERVAL = "cluster.service.reconnect_interval";
public static final String UPDATE_THREAD_NAME = "clusterService#updateTask";
private final ThreadPool threadPool;
@ -74,6 +76,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private final TimeValue reconnectInterval;
private TimeValue slowTaskLoggingThreshold;
private volatile PrioritizedEsThreadPoolExecutor updateTasksExecutor;
/**
@ -115,8 +119,11 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
this.clusterState = ClusterState.builder(clusterName).build();
this.nodeSettingsService.setClusterService(this);
this.nodeSettingsService.addListener(new ApplySettings());
this.reconnectInterval = this.settings.getAsTime("cluster.service.reconnect_interval", TimeValue.timeValueSeconds(10));
this.reconnectInterval = this.settings.getAsTime(SETTING_CLUSTER_SERVICE_RECONNECT_INTERVAL, TimeValue.timeValueSeconds(10));
this.slowTaskLoggingThreshold = this.settings.getAsTime(SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD, TimeValue.timeValueSeconds(30));
localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
@ -371,22 +378,24 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return;
}
ClusterState newClusterState;
long startTime = System.currentTimeMillis();
try {
newClusterState = updateTask.execute(previousClusterState);
} catch (Throwable e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, System.currentTimeMillis() - startTime));
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("failed to execute cluster state update, state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
sb.append(previousClusterState.nodes().prettyPrint());
sb.append(previousClusterState.routingTable().prettyPrint());
sb.append(previousClusterState.readOnlyRoutingNodes().prettyPrint());
logger.trace(sb.toString(), e);
}
warnAboutSlowTaskIfNeeded(executionTime, source);
updateTask.onFailure(source, e);
return;
}
if (previousClusterState == newClusterState) {
logger.debug("processing [{}]: no change in cluster_state", source);
if (updateTask instanceof AckedClusterStateUpdateTask) {
//no need to wait for ack if nothing changed, the update can be counted as acknowledged
((AckedClusterStateUpdateTask) updateTask).onAllNodesAcked(null);
@ -394,6 +403,9 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, System.currentTimeMillis() - startTime));
logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, source);
return;
}
@ -511,9 +523,12 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}
logger.debug("processing [{}]: done applying updated cluster_state (version: {}, uuid: {})", source, newClusterState.version(), newClusterState.uuid());
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, System.currentTimeMillis() - startTime));
logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.uuid());
warnAboutSlowTaskIfNeeded(executionTime, source);
} catch (Throwable t) {
StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(newClusterState.version()).append("], uuid [").append(newClusterState.uuid()).append("], source [").append(source).append("]\n");
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, System.currentTimeMillis() - startTime));
StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append(newClusterState.version()).append("], uuid [").append(newClusterState.uuid()).append("], source [").append(source).append("]\n");
sb.append(newClusterState.nodes().prettyPrint());
sb.append(newClusterState.routingTable().prettyPrint());
sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint());
@ -523,6 +538,12 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) {
if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) {
logger.warn("cluster state update task [{}] took {} above the warn threshold of {}", source, executionTime, slowTaskLoggingThreshold);
}
}
class NotifyTimeout implements Runnable {
final TimeoutClusterStateListener listener;
final TimeValue timeout;
@ -755,4 +776,13 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
}
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
final TimeValue slowTaskLoggingThreshold = settings.getAsTime(SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD, InternalClusterService.this.slowTaskLoggingThreshold);
InternalClusterService.this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
}
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.*;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ZenDiscovery;
@ -101,6 +102,7 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(InternalClusterService.SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD, Validator.TIME_NON_NEGATIVE);
}
public void addDynamicSettings(String... settings) {

View File

@ -21,10 +21,12 @@ package org.elasticsearch.cluster;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.ListenableFuture;
import org.elasticsearch.ElasticsearchException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
@ -38,6 +40,8 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Test;
@ -48,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*;
/**
@ -721,6 +726,215 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
}
}
@Test
@TestLogging("cluster:TRACE") // To ensure that we log cluster state events on TRACE level
public void testClusterStateUpdateLogging() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();
internalCluster().startNode(settings);
ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class);
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", "cluster.service", Level.DEBUG, "*processing [test1]: took * no change in cluster_state"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.TRACE, "*failed to execute cluster state update in *"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.DEBUG, "*processing [test3]: took * done applying updated cluster_state (version: *, uuid: *)"));
Logger rootLogger = Logger.getRootLogger();
rootLogger.addAppender(mockAppender);
try {
final CountDownLatch latch = new CountDownLatch(4);
clusterService1.submitStateUpdateTask("test1", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
clusterService1.submitStateUpdateTask("test2", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail();
}
@Override
public void onFailure(String source, Throwable t) {
latch.countDown();
}
});
clusterService1.submitStateUpdateTask("test3", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).incrementVersion().build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
// Additional update task to make sure all previous logging made it to the logger
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterService1.submitStateUpdateTask("test4", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
} finally {
rootLogger.removeAppender(mockAppender);
}
mockAppender.assertAllExpectationsMatched();
}
@Test
@TestLogging("cluster:WARN") // To ensure that we log cluster state events on WARN level
public void testLongClusterStateUpdateLogging() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.put(InternalClusterService.SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD, "10s")
.build();
internalCluster().startNode(settings);
ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class);
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation("test1 shouldn't see because setting is too low", "cluster.service", Level.WARN, "*cluster state update task [test1] took * above the warn threshold of *"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.WARN, "*cluster state update task [test2] took * above the warn threshold of 10ms"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.WARN, "*cluster state update task [test3] took * above the warn threshold of 10ms"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test4", "cluster.service", Level.WARN, "*cluster state update task [test4] took * above the warn threshold of 10ms"));
Logger rootLogger = Logger.getRootLogger();
rootLogger.addAppender(mockAppender);
try {
final CountDownLatch latch = new CountDownLatch(5);
final CountDownLatch processedFirstTask = new CountDownLatch(1);
clusterService1.submitStateUpdateTask("test1", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
processedFirstTask.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
processedFirstTask.await(1, TimeUnit.SECONDS);
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder()
.put(InternalClusterService.SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD, "10ms")));
clusterService1.submitStateUpdateTask("test2", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail();
}
@Override
public void onFailure(String source, Throwable t) {
latch.countDown();
}
});
clusterService1.submitStateUpdateTask("test3", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
return ClusterState.builder(currentState).incrementVersion().build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
clusterService1.submitStateUpdateTask("test4", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
// Additional update task to make sure all previous logging made it to the logger
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterService1.submitStateUpdateTask("test5", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
assertThat(latch.await(5, TimeUnit.SECONDS), equalTo(true));
} finally {
rootLogger.removeAppender(mockAppender);
}
mockAppender.assertAllExpectationsMatched();
}
private static class BlockingTask extends ClusterStateUpdateTask {
private final CountDownLatch latch = new CountDownLatch(1);

View File

@ -0,0 +1,139 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.elasticsearch.common.regex.Regex;
import java.util.List;
import static com.google.common.collect.Lists.newArrayList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* Test appender that can be used to verify that certain events were logged correctly
*/
public class MockLogAppender extends AppenderSkeleton {
private final static String COMMON_PREFIX = System.getProperty("es.logger.prefix", "org.elasticsearch.");
private List<LoggingExpectation> expectations;
public MockLogAppender() {
expectations = newArrayList();
}
public void addExpectation(LoggingExpectation expectation) {
expectations.add(expectation);
}
@Override
protected void append(LoggingEvent loggingEvent) {
for (LoggingExpectation expectation : expectations) {
expectation.match(loggingEvent);
}
}
@Override
public void close() {
}
@Override
public boolean requiresLayout() {
return false;
}
public void assertAllExpectationsMatched() {
for (LoggingExpectation expectation : expectations) {
expectation.assertMatched();
}
}
public interface LoggingExpectation {
void match(LoggingEvent loggingEvent);
void assertMatched();
}
public static abstract class AbstractEventExpectation implements LoggingExpectation {
protected final String name;
protected final String logger;
protected final Level level;
protected final String message;
protected boolean saw;
public AbstractEventExpectation(String name, String logger, Level level, String message) {
this.name = name;
this.logger = getLoggerName(logger);
this.level = level;
this.message = message;
this.saw = false;
}
@Override
public void match(LoggingEvent event) {
if (event.getLevel() == level && event.getLoggerName().equals(logger)) {
if (Regex.isSimpleMatchPattern(message)) {
if (Regex.simpleMatch(message, event.getMessage().toString())) {
saw = true;
}
} else {
if (event.getMessage().toString().contains(message)) {
saw = true;
}
}
}
}
}
public static class UnseenEventExpectation extends AbstractEventExpectation {
public UnseenEventExpectation(String name, String logger, Level level, String message) {
super(name, logger, level, message);
}
@Override
public void assertMatched() {
assertThat(name, saw, equalTo(false));
}
}
public static class SeenEventExpectation extends AbstractEventExpectation {
public SeenEventExpectation(String name, String logger, Level level, String message) {
super(name, logger, level, message);
}
@Override
public void assertMatched() {
assertThat(name, saw, equalTo(true));
}
}
private static String getLoggerName(String name) {
if (name.startsWith("org.elasticsearch.")) {
name = name.substring("org.elasticsearch.".length());
}
return COMMON_PREFIX + name;
}
}