Merge pull request #14494 from ywelsch/fix/delayed-allocation-reroute
Delayed allocation can miss a reroute
This commit is contained in:
commit
4729386347
|
@ -110,10 +110,6 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
|
|||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.source().startsWith(CLUSTER_UPDATE_TASK_SOURCE)) {
|
||||
// that's us, ignore this event
|
||||
return;
|
||||
}
|
||||
if (event.state().nodes().localNodeMaster()) {
|
||||
// figure out when the next unassigned allocation need to happen from now. If this is larger or equal
|
||||
// then the last time we checked and scheduled, we are guaranteed to have a reroute until then, so no need
|
||||
|
|
|
@ -23,22 +23,35 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.EmptyClusterInfoService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.gateway.GatewayAllocator;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
import org.elasticsearch.test.ESAllocationTestCase;
|
||||
import org.elasticsearch.test.cluster.TestClusterService;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/**
|
||||
|
@ -138,6 +151,105 @@ public class RoutingServiceTests extends ESAllocationTestCase {
|
|||
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE));
|
||||
}
|
||||
|
||||
/**
|
||||
* This tests that a new delayed reroute is scheduled right after a delayed reroute was run
|
||||
*/
|
||||
public void testDelayedUnassignedScheduleRerouteAfterDelayedReroute() throws Exception {
|
||||
final ThreadPool testThreadPool = new ThreadPool(getTestName());
|
||||
|
||||
try {
|
||||
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
|
||||
AllocationService allocation = new AllocationService(Settings.Builder.EMPTY_SETTINGS,
|
||||
randomAllocationDeciders(Settings.Builder.EMPTY_SETTINGS, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
|
||||
new ShardsAllocators(Settings.Builder.EMPTY_SETTINGS, mockGatewayAllocator), EmptyClusterInfoService.INSTANCE);
|
||||
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("short_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
|
||||
.numberOfShards(1).numberOfReplicas(1))
|
||||
.put(IndexMetaData.builder("long_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10s"))
|
||||
.numberOfShards(1).numberOfReplicas(1))
|
||||
.build();
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData)
|
||||
.routingTable(RoutingTable.builder().addAsNew(metaData.index("short_delay")).addAsNew(metaData.index("long_delay")).build())
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
.put(newNode("node0", singletonMap("data", Boolean.FALSE.toString()))).localNodeId("node0").masterNodeId("node0")
|
||||
.put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build();
|
||||
// allocate shards
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
|
||||
// start primaries
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
|
||||
// start replicas
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
|
||||
assertThat("all shards should be started", clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));
|
||||
|
||||
// find replica of short_delay
|
||||
ShardRouting shortDelayReplica = null;
|
||||
for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("short_delay")) {
|
||||
if (shardRouting.primary() == false) {
|
||||
shortDelayReplica = shardRouting;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertNotNull(shortDelayReplica);
|
||||
|
||||
// find replica of long_delay
|
||||
ShardRouting longDelayReplica = null;
|
||||
for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("long_delay")) {
|
||||
if (shardRouting.primary() == false) {
|
||||
longDelayReplica = shardRouting;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertNotNull(longDelayReplica);
|
||||
|
||||
// remove node of shortDelayReplica and node of longDelayReplica and reroute
|
||||
ClusterState prevState = clusterState;
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(shortDelayReplica.currentNodeId()).remove(longDelayReplica.currentNodeId())).build();
|
||||
// make sure both replicas are marked as delayed (i.e. not reallocated)
|
||||
mockGatewayAllocator.setShardsToDelay(Arrays.asList(shortDelayReplica, longDelayReplica));
|
||||
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
|
||||
|
||||
// check that shortDelayReplica and longDelayReplica have been marked unassigned
|
||||
RoutingNodes.UnassignedShards unassigned = clusterState.getRoutingNodes().unassigned();
|
||||
assertEquals(2, unassigned.size());
|
||||
// update shortDelayReplica and longDelayReplica variables with new shard routing
|
||||
ShardRouting shortDelayUnassignedReplica = null;
|
||||
ShardRouting longDelayUnassignedReplica = null;
|
||||
for (ShardRouting shr : unassigned) {
|
||||
if (shr.getIndex().equals("short_delay")) {
|
||||
shortDelayUnassignedReplica = shr;
|
||||
} else {
|
||||
longDelayUnassignedReplica = shr;
|
||||
}
|
||||
}
|
||||
assertTrue(shortDelayReplica.isSameShard(shortDelayUnassignedReplica));
|
||||
assertTrue(longDelayReplica.isSameShard(longDelayUnassignedReplica));
|
||||
|
||||
// manually trigger a clusterChanged event on routingService
|
||||
ClusterState newState = clusterState;
|
||||
// create fake cluster service
|
||||
TestClusterService clusterService = new TestClusterService(newState, testThreadPool);
|
||||
// create routing service, also registers listener on cluster service
|
||||
RoutingService routingService = new RoutingService(Settings.EMPTY, testThreadPool, clusterService, allocation);
|
||||
routingService.start(); // just so performReroute does not prematurely return
|
||||
// ensure routing service has proper timestamp before triggering
|
||||
routingService.setUnassignedShardsAllocatedTimestamp(shortDelayUnassignedReplica.unassignedInfo().getTimestampInMillis() + randomIntBetween(0, 50));
|
||||
// next (delayed) reroute should only delay longDelayReplica/longDelayUnassignedReplica
|
||||
mockGatewayAllocator.setShardsToDelay(Arrays.asList(longDelayUnassignedReplica));
|
||||
// register listener on cluster state so we know when cluster state has been changed
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
clusterService.addLast(event -> latch.countDown());
|
||||
// instead of clusterService calling clusterChanged, we call it directly here
|
||||
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
|
||||
// cluster service should have updated state and called routingService with clusterChanged
|
||||
latch.await();
|
||||
// verify the registration has been set to the delay of longDelayReplica/longDelayUnassignedReplica
|
||||
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(10000L));
|
||||
} finally {
|
||||
terminate(testThreadPool);
|
||||
}
|
||||
}
|
||||
|
||||
public void testDelayedUnassignedDoesNotRerouteForNegativeDelays() throws Exception {
|
||||
AllocationService allocation = createAllocationService();
|
||||
MetaData metaData = MetaData.builder()
|
||||
|
@ -197,4 +309,46 @@ public class RoutingServiceTests extends ESAllocationTestCase {
|
|||
rerouted.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet.
|
||||
* It does not implement the full logic but shards that are to be delayed need to be explicitly set using the method setShardsToDelay(...).
|
||||
*/
|
||||
private static class DelayedShardsMockGatewayAllocator extends GatewayAllocator {
|
||||
volatile List<ShardRouting> delayedShards = Collections.emptyList();
|
||||
|
||||
public DelayedShardsMockGatewayAllocator() {
|
||||
super(Settings.EMPTY, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void applyStartedShards(StartedRerouteAllocation allocation) {}
|
||||
|
||||
@Override
|
||||
public void applyFailedShards(FailedRerouteAllocation allocation) {}
|
||||
|
||||
/**
|
||||
* Explicitly set which shards should be delayed in the next allocateUnassigned calls
|
||||
*/
|
||||
public void setShardsToDelay(List<ShardRouting> delayedShards) {
|
||||
this.delayedShards = delayedShards;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean allocateUnassigned(RoutingAllocation allocation) {
|
||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
|
||||
boolean changed = false;
|
||||
while (unassignedIterator.hasNext()) {
|
||||
ShardRouting shard = unassignedIterator.next();
|
||||
for (ShardRouting shardToDelay : delayedShards) {
|
||||
if (shard.isSameShard(shardToDelay)) {
|
||||
changed = true;
|
||||
unassignedIterator.removeAndIgnore();
|
||||
}
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ import java.util.concurrent.ScheduledFuture;
|
|||
public class TestClusterService implements ClusterService {
|
||||
|
||||
volatile ClusterState state;
|
||||
private final Collection<ClusterStateListener> listeners = new CopyOnWriteArrayList<>();
|
||||
private final List<ClusterStateListener> listeners = new CopyOnWriteArrayList<>();
|
||||
private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();
|
||||
private final ThreadPool threadPool;
|
||||
private final ESLogger logger = Loggers.getLogger(getClass(), Settings.EMPTY);
|
||||
|
@ -135,7 +135,7 @@ public class TestClusterService implements ClusterService {
|
|||
|
||||
@Override
|
||||
public void addFirst(ClusterStateListener listener) {
|
||||
throw new UnsupportedOperationException();
|
||||
listeners.add(0, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue