Fix missing reroute in case of multiple delayed shards

After a delayed reroute of a shard, RoutingService misses to schedule a new delayed reroute of other delayed shards.

Closes #14494
Closes #14010
Closes #14445
This commit is contained in:
Yannick Welsch 2015-11-03 19:43:12 +01:00
parent 7e6008f0b9
commit 0220e45e2f
3 changed files with 156 additions and 6 deletions

View File

@ -110,10 +110,6 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.source().startsWith(CLUSTER_UPDATE_TASK_SOURCE)) {
// that's us, ignore this event
return;
}
if (event.state().nodes().localNodeMaster()) {
// figure out when the next unassigned allocation need to happen from now. If this is larger or equal
// then the last time we checked and scheduled, we are guaranteed to have a reroute until then, so no need

View File

@ -23,22 +23,35 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.equalTo;
/**
@ -138,6 +151,105 @@ public class RoutingServiceTests extends ESAllocationTestCase {
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE));
}
/**
* This tests that a new delayed reroute is scheduled right after a delayed reroute was run
*/
public void testDelayedUnassignedScheduleRerouteAfterDelayedReroute() throws Exception {
final ThreadPool testThreadPool = new ThreadPool(getTestName());
try {
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
AllocationService allocation = new AllocationService(Settings.Builder.EMPTY_SETTINGS,
randomAllocationDeciders(Settings.Builder.EMPTY_SETTINGS, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
new ShardsAllocators(Settings.Builder.EMPTY_SETTINGS, mockGatewayAllocator), EmptyClusterInfoService.INSTANCE);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("short_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
.numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("long_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10s"))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("short_delay")).addAsNew(metaData.index("long_delay")).build())
.nodes(DiscoveryNodes.builder()
.put(newNode("node0", singletonMap("data", Boolean.FALSE.toString()))).localNodeId("node0").masterNodeId("node0")
.put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build();
// allocate shards
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// start primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// start replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertThat("all shards should be started", clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));
// find replica of short_delay
ShardRouting shortDelayReplica = null;
for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("short_delay")) {
if (shardRouting.primary() == false) {
shortDelayReplica = shardRouting;
break;
}
}
assertNotNull(shortDelayReplica);
// find replica of long_delay
ShardRouting longDelayReplica = null;
for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("long_delay")) {
if (shardRouting.primary() == false) {
longDelayReplica = shardRouting;
break;
}
}
assertNotNull(longDelayReplica);
// remove node of shortDelayReplica and node of longDelayReplica and reroute
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(shortDelayReplica.currentNodeId()).remove(longDelayReplica.currentNodeId())).build();
// make sure both replicas are marked as delayed (i.e. not reallocated)
mockGatewayAllocator.setShardsToDelay(Arrays.asList(shortDelayReplica, longDelayReplica));
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// check that shortDelayReplica and longDelayReplica have been marked unassigned
RoutingNodes.UnassignedShards unassigned = clusterState.getRoutingNodes().unassigned();
assertEquals(2, unassigned.size());
// update shortDelayReplica and longDelayReplica variables with new shard routing
ShardRouting shortDelayUnassignedReplica = null;
ShardRouting longDelayUnassignedReplica = null;
for (ShardRouting shr : unassigned) {
if (shr.getIndex().equals("short_delay")) {
shortDelayUnassignedReplica = shr;
} else {
longDelayUnassignedReplica = shr;
}
}
assertTrue(shortDelayReplica.isSameShard(shortDelayUnassignedReplica));
assertTrue(longDelayReplica.isSameShard(longDelayUnassignedReplica));
// manually trigger a clusterChanged event on routingService
ClusterState newState = clusterState;
// create fake cluster service
TestClusterService clusterService = new TestClusterService(newState, testThreadPool);
// create routing service, also registers listener on cluster service
RoutingService routingService = new RoutingService(Settings.EMPTY, testThreadPool, clusterService, allocation);
routingService.start(); // just so performReroute does not prematurely return
// ensure routing service has proper timestamp before triggering
routingService.setUnassignedShardsAllocatedTimestamp(shortDelayUnassignedReplica.unassignedInfo().getTimestampInMillis() + randomIntBetween(0, 50));
// next (delayed) reroute should only delay longDelayReplica/longDelayUnassignedReplica
mockGatewayAllocator.setShardsToDelay(Arrays.asList(longDelayUnassignedReplica));
// register listener on cluster state so we know when cluster state has been changed
CountDownLatch latch = new CountDownLatch(1);
clusterService.addLast(event -> latch.countDown());
// instead of clusterService calling clusterChanged, we call it directly here
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
// cluster service should have updated state and called routingService with clusterChanged
latch.await();
// verify the registration has been set to the delay of longDelayReplica/longDelayUnassignedReplica
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(10000L));
} finally {
terminate(testThreadPool);
}
}
public void testDelayedUnassignedDoesNotRerouteForNegativeDelays() throws Exception {
AllocationService allocation = createAllocationService();
MetaData metaData = MetaData.builder()
@ -197,4 +309,46 @@ public class RoutingServiceTests extends ESAllocationTestCase {
rerouted.set(true);
}
}
/**
* Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet.
* It does not implement the full logic but shards that are to be delayed need to be explicitly set using the method setShardsToDelay(...).
*/
private static class DelayedShardsMockGatewayAllocator extends GatewayAllocator {
volatile List<ShardRouting> delayedShards = Collections.emptyList();
public DelayedShardsMockGatewayAllocator() {
super(Settings.EMPTY, null, null);
}
@Override
public void applyStartedShards(StartedRerouteAllocation allocation) {}
@Override
public void applyFailedShards(FailedRerouteAllocation allocation) {}
/**
* Explicitly set which shards should be delayed in the next allocateUnassigned calls
*/
public void setShardsToDelay(List<ShardRouting> delayedShards) {
this.delayedShards = delayedShards;
}
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
final RoutingNodes routingNodes = allocation.routingNodes();
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
boolean changed = false;
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();
for (ShardRouting shardToDelay : delayedShards) {
if (shard.isSameShard(shardToDelay)) {
changed = true;
unassignedIterator.removeAndIgnore();
}
}
}
return changed;
}
}
}

View File

@ -51,7 +51,7 @@ import java.util.concurrent.ScheduledFuture;
public class TestClusterService implements ClusterService {
volatile ClusterState state;
private final Collection<ClusterStateListener> listeners = new CopyOnWriteArrayList<>();
private final List<ClusterStateListener> listeners = new CopyOnWriteArrayList<>();
private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();
private final ThreadPool threadPool;
private final ESLogger logger = Loggers.getLogger(getClass(), Settings.EMPTY);
@ -135,7 +135,7 @@ public class TestClusterService implements ClusterService {
@Override
public void addFirst(ClusterStateListener listener) {
throw new UnsupportedOperationException();
listeners.add(0, listener);
}
@Override