Watcher: Reload properly on remote shard change (#33167)

When a node dies that carries a watcher shard or a shard is relocated to
another node, then watcher needs not only trigger a reload on the node
where the shard relocation happened, but also on other nodes where
copies of this shard, as different watches may need to be loaded.

This commit takes the change of remote nodes into account by not only
storing the local shard allocation ids in the WatcherLifeCycleService,
but storing a list of ShardRoutings based on the local active shards.

This also fixes some tests, which had a wrong assumption. Using
`TestShardRouting.newShardRouting` in our tests for cluster state
creation led to the issue of always creating new allocation ids which
implicitely lead to a reload.
This commit is contained in:
Alexander Reelsen 2018-08-29 22:42:08 +02:00 committed by GitHub
parent 0f22dbb1cc
commit 13880bd8c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 98 additions and 21 deletions

View File

@ -11,7 +11,6 @@ import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
@ -22,13 +21,16 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
import org.elasticsearch.xpack.core.watcher.WatcherState; import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -45,7 +47,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
Setting.boolSetting("xpack.watcher.require_manual_start", false, Property.NodeScope); Setting.boolSetting("xpack.watcher.require_manual_start", false, Property.NodeScope);
private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STARTED); private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STARTED);
private final AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList()); private final AtomicReference<List<ShardRouting>> previousShardRoutings = new AtomicReference<>(Collections.emptyList());
private final boolean requireManualStart; private final boolean requireManualStart;
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this. private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
private volatile WatcherService watcherService; private volatile WatcherService watcherService;
@ -144,15 +146,20 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
return; return;
} }
List<String> currentAllocationIds = localShards.stream() // also check if non local shards have changed, as loosing a shard on a
.map(ShardRouting::allocationId) // remote node or adding a replica on a remote node needs to trigger a reload too
.map(AllocationId::getId) Set<ShardId> localShardIds = localShards.stream().map(ShardRouting::shardId).collect(Collectors.toSet());
.sorted() List<ShardRouting> allShards = event.state().routingTable().index(watchIndex).shardsWithState(STARTED);
allShards.addAll(event.state().routingTable().index(watchIndex).shardsWithState(RELOCATING));
List<ShardRouting> localAffectedShardRoutings = allShards.stream()
.filter(shardRouting -> localShardIds.contains(shardRouting.shardId()))
// shardrouting is not comparable, so we need some order mechanism
.sorted(Comparator.comparing(ShardRouting::hashCode))
.collect(Collectors.toList()); .collect(Collectors.toList());
if (previousAllocationIds.get().equals(currentAllocationIds) == false) { if (previousShardRoutings.get().equals(localAffectedShardRoutings) == false) {
if (watcherService.validate(event.state())) { if (watcherService.validate(event.state())) {
previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds)); previousShardRoutings.set(localAffectedShardRoutings);
if (state.get() == WatcherState.STARTED) { if (state.get() == WatcherState.STARTED) {
watcherService.reload(event.state(), "new local watcher shard allocation ids"); watcherService.reload(event.state(), "new local watcher shard allocation ids");
} else if (state.get() == WatcherState.STOPPED) { } else if (state.get() == WatcherState.STOPPED) {
@ -187,13 +194,13 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
* @return true, if existing allocation ids were cleaned out, false otherwise * @return true, if existing allocation ids were cleaned out, false otherwise
*/ */
private boolean clearAllocationIds() { private boolean clearAllocationIds() {
List<String> previousIds = previousAllocationIds.getAndSet(Collections.emptyList()); List<ShardRouting> previousIds = previousShardRoutings.getAndSet(Collections.emptyList());
return previousIds.equals(Collections.emptyList()) == false; return previousIds.isEmpty() == false;
} }
// for testing purposes only // for testing purposes only
List<String> allocationIds() { List<ShardRouting> shardRoutings() {
return previousAllocationIds.get(); return previousShardRoutings.get();
} }
public WatcherState getState() { public WatcherState getState() {

View File

@ -254,9 +254,12 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
.add(newNode("node_2")) .add(newNode("node_2"))
.build(); .build();
ShardRouting firstShardOnSecondNode = TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED);
ShardRouting secondShardOnFirstNode = TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED);
IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable.builder(watchIndex) IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
.addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED)) .addShard(secondShardOnFirstNode)
.addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED)) .addShard(firstShardOnSecondNode)
.build(); .build();
IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX) IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX)
@ -273,10 +276,19 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
.metaData(MetaData.builder().put(indexMetaData, false)) .metaData(MetaData.builder().put(indexMetaData, false))
.build(); .build();
// add a replica in the local node
boolean addShardOnLocalNode = randomBoolean();
final ShardRouting addedShardRouting;
if (addShardOnLocalNode) {
addedShardRouting = TestShardRouting.newShardRouting(shardId, "node_1", false, STARTED);
} else {
addedShardRouting = TestShardRouting.newShardRouting(secondShardId, "node_2", false, STARTED);
}
IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable.builder(watchIndex) IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
.addShard(TestShardRouting.newShardRouting(shardId, "node_1", false, STARTED)) .addShard(secondShardOnFirstNode)
.addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED)) .addShard(firstShardOnSecondNode)
.addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED)) .addShard(addedShardRouting)
.build(); .build();
ClusterState stateWithReplicaAdded = ClusterState.builder(new ClusterName("my-cluster")) ClusterState stateWithReplicaAdded = ClusterState.builder(new ClusterName("my-cluster"))
@ -477,7 +489,67 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED)); assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
} }
private ClusterState startWatcher() { // this emulates a node outage somewhere in the cluster that carried a watcher shard
// the number of shards remains the same, but we need to ensure that watcher properly reloads
// previously we only checked the local shard allocations, but we also need to check if shards in the cluster have changed
public void testWatcherReloadsOnNodeOutageWithWatcherShard() {
Index watchIndex = new Index(Watch.INDEX, "foo");
ShardId shardId = new ShardId(watchIndex, 0);
String localNodeId = randomFrom("node_1", "node_2");
String outageNodeId = localNodeId.equals("node_1") ? "node_2" : "node_1";
DiscoveryNodes previousDiscoveryNodes = new DiscoveryNodes.Builder().masterNodeId(localNodeId).localNodeId(localNodeId)
.add(newNode(localNodeId))
.add(newNode(outageNodeId))
.build();
ShardRouting replicaShardRouting = TestShardRouting.newShardRouting(shardId, localNodeId, false, STARTED);
ShardRouting primartShardRouting = TestShardRouting.newShardRouting(shardId, outageNodeId, true, STARTED);
IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
.addShard(replicaShardRouting)
.addShard(primartShardRouting)
.build();
IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX)
.settings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)
).build();
ClusterState previousState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(previousDiscoveryNodes)
.routingTable(RoutingTable.builder().add(previousWatchRoutingTable).build())
.metaData(MetaData.builder().put(indexMetaData, false))
.build();
ShardRouting nowPrimaryShardRouting = replicaShardRouting.moveActiveReplicaToPrimary();
IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
.addShard(nowPrimaryShardRouting)
.build();
DiscoveryNodes currentDiscoveryNodes = new DiscoveryNodes.Builder().masterNodeId(localNodeId).localNodeId(localNodeId)
.add(newNode(localNodeId))
.build();
ClusterState currentState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(currentDiscoveryNodes)
.routingTable(RoutingTable.builder().add(currentWatchRoutingTable).build())
.metaData(MetaData.builder().put(indexMetaData, false))
.build();
// initialize the previous state, so all the allocation ids are loaded
when(watcherService.validate(anyObject())).thenReturn(true);
lifeCycleService.clusterChanged(new ClusterChangedEvent("whatever", previousState, currentState));
reset(watcherService);
when(watcherService.validate(anyObject())).thenReturn(true);
ClusterChangedEvent event = new ClusterChangedEvent("whatever", currentState, previousState);
lifeCycleService.clusterChanged(event);
verify(watcherService).reload(eq(event.state()), anyString());
}
private void startWatcher() {
Index index = new Index(Watch.INDEX, "uuid"); Index index = new Index(Watch.INDEX, "uuid");
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addShard( indexRoutingTableBuilder.addShard(
@ -506,12 +578,10 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState)); lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState));
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED)); assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
verify(watcherService, times(1)).reload(eq(state), anyString()); verify(watcherService, times(1)).reload(eq(state), anyString());
assertThat(lifeCycleService.allocationIds(), hasSize(1)); assertThat(lifeCycleService.shardRoutings(), hasSize(1));
// reset the mock, the user has to mock everything themselves again // reset the mock, the user has to mock everything themselves again
reset(watcherService); reset(watcherService);
return state;
} }
private List<String> randomIndexPatterns() { private List<String> randomIndexPatterns() {