Watcher: Ensure reloading happens based on watch index instead of alias (elastic/x-pack-elasticsearch#1544)

The cluster state listener to decide if watcher should be reloaded was
assuming that no aliases could be used and thus wrongly could trigger
a reload, which could have lead to wrong test results.

During debugging I also added a reason for reloading and fixed another
wrong test assumption.

Also the listener does not rely on previous cluster state, but stores this
in instance variable, as we need to compare with local state and not the
previous cluster state.

Original commit: elastic/x-pack-elasticsearch@582783a66d
This commit is contained in:
Alexander Reelsen 2017-06-06 09:39:11 +02:00 committed by GitHub
parent fe37109c3f
commit 376c9be6fa
5 changed files with 37 additions and 34 deletions

View File

@ -25,6 +25,7 @@ import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
@ -35,6 +36,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
private final WatcherService watcherService; private final WatcherService watcherService;
private final ClusterService clusterService; private final ClusterService clusterService;
private final ExecutorService executor; private final ExecutorService executor;
private AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
private volatile WatcherMetaData watcherMetaData; private volatile WatcherMetaData watcherMetaData;
public WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService, public WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
@ -119,11 +121,10 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
if (watcherService.state() == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) { if (watcherService.state() == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) {
DiscoveryNode localNode = event.state().nodes().getLocalNode(); DiscoveryNode localNode = event.state().nodes().getLocalNode();
RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId()); RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId());
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
event.state().metaData());
// no watcher index, time to pause // no watcher index, time to pause
if (watcherIndexMetaData == null) { if (watcherIndexMetaData == null) {
executor.execute(watcherService::pauseExecution); executor.execute(() -> watcherService.pauseExecution("no watcher index found"));
return; return;
} }
@ -132,30 +133,20 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
// no local shards, empty out watcher and not waste resources! // no local shards, empty out watcher and not waste resources!
if (localShards.isEmpty()) { if (localShards.isEmpty()) {
executor.execute(watcherService::pauseExecution); executor.execute(() -> watcherService.pauseExecution("no local watcher shards"));
previousAllocationIds.set(Collections.emptyList());
return; return;
} }
List<String> previousAllocationIds = event.previousState().getRoutingNodes().node(localNode.getId())
.shardsWithState(watchIndex, RELOCATING, STARTED).stream()
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toList());
List<String> currentAllocationIds = localShards.stream() List<String> currentAllocationIds = localShards.stream()
.map(ShardRouting::allocationId) .map(ShardRouting::allocationId)
.map(AllocationId::getId) .map(AllocationId::getId)
.collect(Collectors.toList()); .collect(Collectors.toList());
if (previousAllocationIds.size() == currentAllocationIds.size()) {
// make sure we can compare the created list of allocation ids
Collections.sort(previousAllocationIds);
Collections.sort(currentAllocationIds); Collections.sort(currentAllocationIds);
if (previousAllocationIds.equals(currentAllocationIds) == false) {
executor.execute(() -> watcherService.reload(event.state())); if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
} previousAllocationIds.set(currentAllocationIds);
} else { executor.execute(() -> watcherService.reload(event.state(), "different shard allocation ids"));
executor.execute(() -> watcherService.reload(event.state()));
} }
} else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) { } else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) {
executor.execute(() -> start(event.state(), false)); executor.execute(() -> start(event.state(), false));

View File

@ -165,8 +165,8 @@ public class WatcherService extends AbstractComponent {
* Reload the watcher service, does not switch the state from stopped to started, just keep going * Reload the watcher service, does not switch the state from stopped to started, just keep going
* @param clusterState cluster state, which is needed to find out about local shards * @param clusterState cluster state, which is needed to find out about local shards
*/ */
public void reload(ClusterState clusterState) { public void reload(ClusterState clusterState, String reason) {
pauseExecution(); pauseExecution(reason);
// load watches // load watches
Collection<Watch> watches = loadWatches(clusterState); Collection<Watch> watches = loadWatches(clusterState);
@ -183,9 +183,10 @@ public class WatcherService extends AbstractComponent {
* Stop execution of watches on this node, do not try to reload anything, but still allow * Stop execution of watches on this node, do not try to reload anything, but still allow
* manual watch execution, i.e. via the execute watch API * manual watch execution, i.e. via the execute watch API
*/ */
public void pauseExecution() { public void pauseExecution(String reason) {
executionService.pauseExecution(); int cancelledTaskCount = executionService.pauseExecution();
triggerService.pauseExecution(); triggerService.pauseExecution();
logger.debug("paused execution service, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
} }
/** /**

View File

@ -134,10 +134,14 @@ public class ExecutionService extends AbstractComponent {
} }
} }
public synchronized void pauseExecution() { /**
* Pause the execution of the watcher executor
* @return the number of tasks that have been removed
*/
public synchronized int pauseExecution() {
int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>()); int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>());
this.clearExecutions(); this.clearExecutions();
logger.debug("paused execution service, cancelled [{}] queued tasks", cancelledTaskCount); return cancelledTaskCount;
} }
public TimeValue defaultThrottlePeriod() { public TimeValue defaultThrottlePeriod() {

View File

@ -40,6 +40,7 @@ import static java.util.Arrays.asList;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
@ -187,15 +188,24 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
DiscoveryNodes nodes = new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1") DiscoveryNodes nodes = new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1")
.add(newNode("node_1")).add(newNode("node_2")) .add(newNode("node_1")).add(newNode("node_2"))
.build(); .build();
IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX)
.settings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
).build();
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes) .nodes(nodes)
.routingTable(RoutingTable.builder().add(watchRoutingTable).build()) .routingTable(RoutingTable.builder().add(watchRoutingTable).build())
.metaData(MetaData.builder().put(indexMetaData, false))
.build(); .build();
when(watcherService.state()).thenReturn(WatcherState.STARTED); when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watcherService).pauseExecution(); verify(watcherService).pauseExecution(eq("no local watcher shards"));
} }
public void testReplicaWasAddedOrRemoved() throws Exception { public void testReplicaWasAddedOrRemoved() throws Exception {
@ -251,7 +261,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
when(watcherService.state()).thenReturn(WatcherState.STARTED); when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.clusterChanged(event); lifeCycleService.clusterChanged(event);
verify(watcherService).reload(eq(usedClusterState)); verify(watcherService).reload(eq(usedClusterState), anyString());
} }
// make sure that cluster state changes can be processed on nodes that do not hold data // make sure that cluster state changes can be processed on nodes that do not hold data
@ -300,8 +310,8 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
when(watcherService.state()).thenReturn(WatcherState.STARTED); when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", currentState, previousState)); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", currentState, previousState));
verify(watcherService, times(0)).pauseExecution(); verify(watcherService, times(0)).pauseExecution(anyObject());
verify(watcherService, times(0)).reload(any()); verify(watcherService, times(0)).reload(any(), any());
} }
private static DiscoveryNode newNode(String nodeName) { private static DiscoveryNode newNode(String nodeName) {

View File

@ -231,7 +231,6 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
assertSingleExecutionAndCompleteWatchHistory(numWatches, numRecords); assertSingleExecutionAndCompleteWatchHistory(numWatches, numRecords);
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/x-pack-elasticsearch/issues/1309")
public void testTriggeredWatchLoading() throws Exception { public void testTriggeredWatchLoading() throws Exception {
createIndex("output"); createIndex("output");
client().prepareIndex("my-index", "foo", "bar") client().prepareIndex("my-index", "foo", "bar")
@ -255,8 +254,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
stopWatcher(); stopWatcher();
DateTime now = DateTime.now(UTC); DateTime now = DateTime.now(UTC);
// final int numRecords = scaledRandomIntBetween(2, 12); final int numRecords = scaledRandomIntBetween(2, 12);
final int numRecords = 10;
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < numRecords; i++) { for (int i = 0; i < numRecords; i++) {
now = now.plusMinutes(1); now = now.plusMinutes(1);
@ -291,8 +289,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
long successfulWatchExecutions = searchResponse.getHits().getTotalHits(); long successfulWatchExecutions = searchResponse.getHits().getTotalHits();
// the watch history should contain entries for each triggered watch, which a few have been marked as not executed // the watch history should contain entries for each triggered watch, which a few have been marked as not executed
SearchResponse historySearchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*") SearchResponse historySearchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*").setSize(10000).get();
.setSize(10000).get();
assertHitCount(historySearchResponse, expectedWatchHistoryCount); assertHitCount(historySearchResponse, expectedWatchHistoryCount);
long notExecutedCount = Arrays.stream(historySearchResponse.getHits().getHits()) long notExecutedCount = Arrays.stream(historySearchResponse.getHits().getHits())
.filter(hit -> hit.getSourceAsMap().get("state").equals(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED.id())) .filter(hit -> hit.getSourceAsMap().get("state").equals(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED.id()))