Execute watcher lifecycle changes in order (elastic/x-pack-elasticsearch#4437)

This change updates the WatcherLifecycleService to have its own single
thread executor that is used for lifecycle changes in order to have a
guarantee for the order that the changes are executed in.

Previously, a runnable would be submitted to the generic threadpool for
each lifecycle change that is needed. There was no guarantee of
ordering for these changes and no checks to see if a state change was
already in flight.

Relates elastic/x-pack-elasticsearch#4429

Original commit: elastic/x-pack-elasticsearch@14b73381db
This commit is contained in:
Jay Modi 2018-04-20 12:59:18 -06:00 committed by GitHub
parent 91ab88e86e
commit b57318c1a3
3 changed files with 36 additions and 7 deletions

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.upgrade.UpgradeField;
@ -35,12 +36,14 @@ import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener {
@ -51,6 +54,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
public static final Setting<Boolean> SETTING_REQUIRE_MANUAL_START =
Setting.boolSetting("xpack.watcher.require_manual_start", false, Property.NodeScope);
private static final String LIFECYCLE_THREADPOOL_NAME = "watcher-lifecycle";
private final WatcherService watcherService;
private final ExecutorService executor;
private AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
@ -59,8 +64,20 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
WatcherService watcherService) {
// use a single thread executor so that lifecycle changes are handled in the order they
// are submitted in
this(settings, clusterService, watcherService, EsExecutors.newFixed(
LIFECYCLE_THREADPOOL_NAME,
1,
1000,
daemonThreadFactory(settings, LIFECYCLE_THREADPOOL_NAME),
threadPool.getThreadContext()));
}
WatcherLifeCycleService(Settings settings, ClusterService clusterService,
WatcherService watcherService, ExecutorService executorService) {
super(settings);
this.executor = threadPool.executor(ThreadPool.Names.GENERIC);
this.executor = executorService;
this.watcherService = watcherService;
this.requireManualStart = SETTING_REQUIRE_MANUAL_START.get(settings);
clusterService.addListener(this);
@ -81,6 +98,11 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
synchronized void shutDown() {
shutDown = true;
stop("shutdown initiated");
stopExecutor();
}
void stopExecutor() {
ThreadPool.terminate(executor, 10L, TimeUnit.SECONDS);
}
private synchronized void start(ClusterState state) {

View File

@ -139,6 +139,8 @@ public class WatcherService extends AbstractComponent {
state.set(WatcherState.STOPPED);
throw e;
}
} else {
logger.debug("could not transition state from stopped to starting, current state [{}]", state.get());
}
}
@ -157,6 +159,8 @@ public class WatcherService extends AbstractComponent {
executionService.stop();
state.set(WatcherState.STOPPED);
logger.debug("watch service has stopped");
} else {
logger.debug("could not transition state from started to stopping, current state [{}]", state.get());
}
} catch (Exception e) {
state.set(WatcherState.STOPPED);

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@ -27,12 +26,12 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
@ -43,7 +42,6 @@ import org.mockito.stubbing.Answer;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -73,8 +71,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
@Before
public void prepareServices() {
ThreadPool threadPool = mock(ThreadPool.class);
final ExecutorService executorService = EsExecutors.newDirectExecutorService();
when(threadPool.executor(anyString())).thenReturn(executorService);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
ClusterService clusterService = mock(ClusterService.class);
Answer<Object> answer = invocationOnMock -> {
AckedClusterStateUpdateTask updateTask = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1];
@ -83,7 +80,13 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
};
doAnswer(answer).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
watcherService = mock(WatcherService.class);
lifeCycleService = new WatcherLifeCycleService(Settings.EMPTY, threadPool, clusterService, watcherService);
lifeCycleService = new WatcherLifeCycleService(Settings.EMPTY, clusterService, watcherService,
EsExecutors.newDirectExecutorService()) {
@Override
void stopExecutor() {
// direct executor cannot be terminated
}
};
}
public void testStartAndStopCausedByClusterState() throws Exception {