Hack around cluster service and logging race
When a cluster update task executes, there can be log messages after the update task has finished processing and the new cluster state becomes visible. The visibility of the cluster state allows the test thread in UpdateSettingsIT#testUpdateAutoThrottleSettings and UpdateSettingsiT#testUpdateMergeMaxThreadCount to proceed. The test thread will remove and stop a mock appender setup at the beginning of the test. The log messages in the cluster state update task that occur after processing has finished can race with the removal of the appender. Log4j will grab a reference to the appenders when processing these log messages, and this races with the removal and stopping of the appenders. If Log4j grabs a reference to the appenders before the mock appender has been removed, and the test thread subsequently removes and stops the appender before Log4j has appended the log message, Log4j will get angry that we are appending to a stopped appender, causing the test to fail. This commit addresses this race by waiting for the cluster state update task to have finished processing before freeing the test thread to make its assertions and finally remove and stop the appender. Yes, this is a hack. Relates #21518
This commit is contained in:
parent
d273419d00
commit
19decd7552
|
@ -32,6 +32,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
|
|||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
@ -405,7 +406,7 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
|
||||
}
|
||||
|
||||
public void testUpdateAutoThrottleSettings() throws IllegalAccessException {
|
||||
public void testUpdateAutoThrottleSettings() throws Exception {
|
||||
MockAppender mockAppender = new MockAppender("testUpdateAutoThrottleSettings");
|
||||
mockAppender.start();
|
||||
Logger rootLogger = LogManager.getRootLogger();
|
||||
|
@ -423,8 +424,7 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
|
||||
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1")
|
||||
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "2")
|
||||
.put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "true")
|
||||
));
|
||||
.put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "true")));
|
||||
|
||||
// Disable auto throttle:
|
||||
client()
|
||||
|
@ -434,6 +434,14 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
.setSettings(Settings.builder().put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "false"))
|
||||
.get();
|
||||
|
||||
// if a node has processed the cluster state update but not yet returned from the update task, it might still log messages;
|
||||
// these log messages will race with the stopping of the appender so we wait to ensure these tasks are done processing
|
||||
assertBusy(() -> {
|
||||
for (final ClusterService service : internalCluster().getInstances(ClusterService.class)) {
|
||||
assertThat(service.numberOfPendingTasks(), equalTo(0));
|
||||
}
|
||||
});
|
||||
|
||||
// Make sure we log the change:
|
||||
assertTrue(mockAppender.sawUpdateAutoThrottle);
|
||||
|
||||
|
@ -497,7 +505,7 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
// #6882: make sure we can change index.merge.scheduler.max_thread_count live
|
||||
public void testUpdateMergeMaxThreadCount() throws IllegalAccessException {
|
||||
public void testUpdateMergeMaxThreadCount() throws Exception {
|
||||
MockAppender mockAppender = new MockAppender("testUpdateMergeMaxThreadCount");
|
||||
mockAppender.start();
|
||||
Logger rootLogger = LogManager.getRootLogger();
|
||||
|
@ -514,8 +522,7 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
|
||||
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
|
||||
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "10000")
|
||||
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10000")
|
||||
));
|
||||
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10000")));
|
||||
|
||||
assertFalse(mockAppender.sawUpdateMaxThreadCount);
|
||||
// Now make a live change to reduce allowed merge threads:
|
||||
|
@ -526,6 +533,14 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
.setSettings(Settings.builder().put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1"))
|
||||
.get();
|
||||
|
||||
// if a node has processed the cluster state update but not yet returned from the update task, it might still log messages;
|
||||
// these log messages will race with the stopping of the appender so we wait to ensure these tasks are done processing
|
||||
assertBusy(() -> {
|
||||
for (final ClusterService service : internalCluster().getInstances(ClusterService.class)) {
|
||||
assertThat(service.numberOfPendingTasks(), equalTo(0));
|
||||
}
|
||||
});
|
||||
|
||||
// Make sure we log the change:
|
||||
assertTrue(mockAppender.sawUpdateMaxThreadCount);
|
||||
|
||||
|
|
Loading…
Reference in New Issue