diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index a3aa9b2ed07..ed7c5c46aa3 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -110,10 +110,6 @@ public class RoutingService extends AbstractLifecycleComponent i @Override public void clusterChanged(ClusterChangedEvent event) { - if (event.source().startsWith(CLUSTER_UPDATE_TASK_SOURCE)) { - // that's us, ignore this event - return; - } if (event.state().nodes().localNodeMaster()) { // figure out when the next unassigned allocation need to happen from now. If this is larger or equal // then the last time we checked and scheduled, we are guaranteed to have a reroute until then, so no need diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java index f0df406fa78..8856e457a4e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java @@ -23,22 +23,35 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.gateway.GatewayAllocator; +import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.test.ESAllocationTestCase; +import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import static java.util.Collections.singletonMap; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.hamcrest.Matchers.equalTo; /** @@ -138,6 +151,105 @@ public class RoutingServiceTests extends ESAllocationTestCase { assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE)); } + /** + * This tests that a new delayed reroute is scheduled right after a delayed reroute was run + */ + public void testDelayedUnassignedScheduleRerouteAfterDelayedReroute() throws Exception { + final ThreadPool testThreadPool = new ThreadPool(getTestName()); + + try { + DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator(); + AllocationService allocation = new AllocationService(Settings.Builder.EMPTY_SETTINGS, + randomAllocationDeciders(Settings.Builder.EMPTY_SETTINGS, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()), + new ShardsAllocators(Settings.Builder.EMPTY_SETTINGS, mockGatewayAllocator), EmptyClusterInfoService.INSTANCE); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("short_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms")) + .numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetaData.builder("long_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10s")) + .numberOfShards(1).numberOfReplicas(1)) + .build(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData) + .routingTable(RoutingTable.builder().addAsNew(metaData.index("short_delay")).addAsNew(metaData.index("long_delay")).build()) + .nodes(DiscoveryNodes.builder() + .put(newNode("node0", singletonMap("data", Boolean.FALSE.toString()))).localNodeId("node0").masterNodeId("node0") + .put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build(); + // allocate shards + clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build(); + // start primaries + clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build(); + // start replicas + clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build(); + assertThat("all shards should be started", clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); + + // find replica of short_delay + ShardRouting shortDelayReplica = null; + for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("short_delay")) { + if (shardRouting.primary() == false) { + shortDelayReplica = shardRouting; + break; + } + } + assertNotNull(shortDelayReplica); + + // find replica of long_delay + ShardRouting longDelayReplica = null; + for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("long_delay")) { + if (shardRouting.primary() == false) { + longDelayReplica = shardRouting; + break; + } + } + assertNotNull(longDelayReplica); + + // remove node of shortDelayReplica and node of longDelayReplica and reroute + ClusterState prevState = clusterState; + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(shortDelayReplica.currentNodeId()).remove(longDelayReplica.currentNodeId())).build(); + // make sure both replicas are marked as delayed (i.e. not reallocated) + mockGatewayAllocator.setShardsToDelay(Arrays.asList(shortDelayReplica, longDelayReplica)); + clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build(); + + // check that shortDelayReplica and longDelayReplica have been marked unassigned + RoutingNodes.UnassignedShards unassigned = clusterState.getRoutingNodes().unassigned(); + assertEquals(2, unassigned.size()); + // update shortDelayReplica and longDelayReplica variables with new shard routing + ShardRouting shortDelayUnassignedReplica = null; + ShardRouting longDelayUnassignedReplica = null; + for (ShardRouting shr : unassigned) { + if (shr.getIndex().equals("short_delay")) { + shortDelayUnassignedReplica = shr; + } else { + longDelayUnassignedReplica = shr; + } + } + assertTrue(shortDelayReplica.isSameShard(shortDelayUnassignedReplica)); + assertTrue(longDelayReplica.isSameShard(longDelayUnassignedReplica)); + + // manually trigger a clusterChanged event on routingService + ClusterState newState = clusterState; + // create fake cluster service + TestClusterService clusterService = new TestClusterService(newState, testThreadPool); + // create routing service, also registers listener on cluster service + RoutingService routingService = new RoutingService(Settings.EMPTY, testThreadPool, clusterService, allocation); + routingService.start(); // just so performReroute does not prematurely return + // ensure routing service has proper timestamp before triggering + routingService.setUnassignedShardsAllocatedTimestamp(shortDelayUnassignedReplica.unassignedInfo().getTimestampInMillis() + randomIntBetween(0, 50)); + // next (delayed) reroute should only delay longDelayReplica/longDelayUnassignedReplica + mockGatewayAllocator.setShardsToDelay(Arrays.asList(longDelayUnassignedReplica)); + // register listener on cluster state so we know when cluster state has been changed + CountDownLatch latch = new CountDownLatch(1); + clusterService.addLast(event -> latch.countDown()); + // instead of clusterService calling clusterChanged, we call it directly here + routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState)); + // cluster service should have updated state and called routingService with clusterChanged + latch.await(); + // verify the registration has been set to the delay of longDelayReplica/longDelayUnassignedReplica + assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(10000L)); + } finally { + terminate(testThreadPool); + } + } + public void testDelayedUnassignedDoesNotRerouteForNegativeDelays() throws Exception { AllocationService allocation = createAllocationService(); MetaData metaData = MetaData.builder() @@ -197,4 +309,46 @@ public class RoutingServiceTests extends ESAllocationTestCase { rerouted.set(true); } } + + /** + * Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet. + * It does not implement the full logic but shards that are to be delayed need to be explicitly set using the method setShardsToDelay(...). + */ + private static class DelayedShardsMockGatewayAllocator extends GatewayAllocator { + volatile List delayedShards = Collections.emptyList(); + + public DelayedShardsMockGatewayAllocator() { + super(Settings.EMPTY, null, null); + } + + @Override + public void applyStartedShards(StartedRerouteAllocation allocation) {} + + @Override + public void applyFailedShards(FailedRerouteAllocation allocation) {} + + /** + * Explicitly set which shards should be delayed in the next allocateUnassigned calls + */ + public void setShardsToDelay(List delayedShards) { + this.delayedShards = delayedShards; + } + + @Override + public boolean allocateUnassigned(RoutingAllocation allocation) { + final RoutingNodes routingNodes = allocation.routingNodes(); + final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator(); + boolean changed = false; + while (unassignedIterator.hasNext()) { + ShardRouting shard = unassignedIterator.next(); + for (ShardRouting shardToDelay : delayedShards) { + if (shard.isSameShard(shardToDelay)) { + changed = true; + unassignedIterator.removeAndIgnore(); + } + } + } + return changed; + } + } } diff --git a/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java b/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java index fa62dd4b6ee..b13963961a0 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java +++ b/test-framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java @@ -51,7 +51,7 @@ import java.util.concurrent.ScheduledFuture; public class TestClusterService implements ClusterService { volatile ClusterState state; - private final Collection listeners = new CopyOnWriteArrayList<>(); + private final List listeners = new CopyOnWriteArrayList<>(); private final Queue onGoingTimeouts = ConcurrentCollections.newQueue(); private final ThreadPool threadPool; private final ESLogger logger = Loggers.getLogger(getClass(), Settings.EMPTY); @@ -135,7 +135,7 @@ public class TestClusterService implements ClusterService { @Override public void addFirst(ClusterStateListener listener) { - throw new UnsupportedOperationException(); + listeners.add(0, listener); } @Override