Remove scheduled routing

Today, we have scheduled reroute that kicks every 10 seconds and checks if a
reroute is needed. We use it when adding nodes, since we don't reroute right
away once its added, and give it a time window to add additional nodes.

We do have recover after nodes setting and such in order to wait for enough
nodes to be added, and also, it really depends at what part of the 10s window
you end up, sometimes, it might not be effective at all. In general, its historic
from the times before we had recover after nodes and such.

This change removes the 10s scheduling, simplifies RoutingService, and adds
explicit reroute when a node is added to the system. It also adds unit tests
to RoutingService.

closes #11776
This commit is contained in:
Shay Banon 2015-06-18 19:22:52 +02:00
parent 49bbc42ac8
commit 435ce7f251
12 changed files with 191 additions and 117 deletions

View File

@ -183,7 +183,7 @@ public class ShardStateAction extends AbstractComponent {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState != newState && newState.getRoutingNodes().hasUnassigned()) {
logger.trace("unassigned shards after shard failures. scheduling a reroute.");
routingService.scheduleReroute();
routingService.reroute("unassigned shards after shard failures, scheduling a reroute");
}
}
});

View File

@ -20,7 +20,6 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
@ -32,12 +31,9 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
/**
* A {@link RoutingService} listens to clusters state. When this service
* receives a {@link ClusterChangedEvent} the cluster state will be verified and
@ -52,21 +48,13 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
*/
public class RoutingService extends AbstractLifecycleComponent<RoutingService> implements ClusterStateListener {
private static final String CLUSTER_UPDATE_TASK_SOURCE = "routing-table-updater";
private final ThreadPool threadPool;
private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";
final ThreadPool threadPool;
private final ClusterService clusterService;
private final AllocationService allocationService;
private final TimeValue schedule;
private volatile boolean routingTableDirty = false;
private volatile Future scheduledRoutingTableFuture;
private AtomicBoolean rerouting = new AtomicBoolean();
private volatile long registeredNextDelaySetting = Long.MAX_VALUE;
private volatile ScheduledFuture registeredNextDelayFuture;
@ -76,8 +64,9 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
this.threadPool = threadPool;
this.clusterService = clusterService;
this.allocationService = allocationService;
this.schedule = settings.getAsTime("cluster.routing.schedule", timeValueSeconds(10));
clusterService.addFirst(this);
if (clusterService != null) {
clusterService.addFirst(this);
}
}
@Override
@ -90,52 +79,27 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
@Override
protected void doClose() {
FutureUtils.cancel(scheduledRoutingTableFuture);
scheduledRoutingTableFuture = null;
FutureUtils.cancel(registeredNextDelayFuture);
clusterService.remove(this);
}
/** make sure that a reroute will be done by the next scheduled check */
public void scheduleReroute() {
routingTableDirty = true;
/**
* Initiates a reroute.
*/
public final void reroute(String reason) {
performReroute(reason);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.source().equals(CLUSTER_UPDATE_TASK_SOURCE)) {
if (event.source().startsWith(CLUSTER_UPDATE_TASK_SOURCE)) {
// that's us, ignore this event
return;
}
if (event.state().nodes().localNodeMaster()) {
// we are master, schedule the routing table updater
if (scheduledRoutingTableFuture == null) {
// a new master (us), make sure we reroute shards
routingTableDirty = true;
scheduledRoutingTableFuture = threadPool.scheduleWithFixedDelay(new RoutingTableUpdater(), schedule);
}
if (event.nodesRemoved()) {
// if nodes were removed, we don't want to wait for the scheduled task
// since we want to get primary election as fast as possible
routingTableDirty = true;
reroute();
// Commented out since we make sure to reroute whenever shards changes state or metadata changes state
// } else if (event.routingTableChanged()) {
// routingTableDirty = true;
// reroute();
} else {
if (event.nodesAdded()) {
for (DiscoveryNode node : event.nodesDelta().addedNodes()) {
if (node.dataNode()) {
routingTableDirty = true;
break;
}
}
}
}
// figure out when the next unassigned allocation need to happen from now. If this is larger or equal
// then the last time we checked and scheduled, we are guaranteed to have a reroute until then, so no need
// to schedule again
// figure out when the next unassigned allocation need to happen from now. If this is larger or equal
// then the last time we checked and scheduled, we are guaranteed to have a reroute until then, so no need
// to schedule again
long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(settings, event.state());
if (nextDelaySetting > 0 && nextDelaySetting < registeredNextDelaySetting) {
FutureUtils.cancel(registeredNextDelayFuture);
@ -145,9 +109,8 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
routingTableDirty = true;
registeredNextDelaySetting = Long.MAX_VALUE;
reroute();
reroute("assign delayed unassigned shards");
}
@Override
@ -158,25 +121,26 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
} else {
logger.trace("no need to schedule reroute due to delayed unassigned, next_delay_setting [{}], registered [{}]", nextDelaySetting, registeredNextDelaySetting);
}
} else {
FutureUtils.cancel(scheduledRoutingTableFuture);
scheduledRoutingTableFuture = null;
}
}
private void reroute() {
// visible for testing
long getRegisteredNextDelaySetting() {
return this.registeredNextDelaySetting;
}
// visible for testing
void performReroute(String reason) {
try {
if (!routingTableDirty) {
return;
}
if (lifecycle.stopped()) {
return;
}
if (rerouting.compareAndSet(false, true) == false) {
logger.trace("already has pending reroute, ignoring");
logger.trace("already has pending reroute, ignoring {}", reason);
return;
}
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, Priority.HIGH, new ClusterStateUpdateTask() {
logger.trace("rerouting {}", reason);
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
rerouting.set(false);
@ -205,19 +169,10 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
}
}
});
routingTableDirty = false;
} catch (Throwable e) {
rerouting.set(false);
ClusterState state = clusterService.state();
logger.warn("Failed to reroute routing table, current state:\n{}", e, state.prettyPrint());
}
}
private class RoutingTableUpdater implements Runnable {
@Override
public void run() {
reroute();
logger.warn("failed to reroute routing table, current state:\n{}", e, state.prettyPrint());
}
}
}

View File

@ -176,7 +176,9 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
nodesBuilder.put(discovery.localNode);
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
return ClusterState.builder(currentState).nodes(nodesBuilder).build();
ClusterState updatedState = ClusterState.builder(currentState).nodes(nodesBuilder).build();
RoutingAllocation.Result routingResult = master.allocationService.reroute(ClusterState.builder(updatedState).build());
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}
@Override

View File

@ -932,7 +932,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
if (modified) {
stateBuilder.nodes(nodesBuilder);
}
return stateBuilder.build();
currentState = stateBuilder.build();
// eagerly run reroute to apply the node addition
RoutingAllocation.Result result = allocationService.reroute(currentState);
return ClusterState.builder(currentState).routingResult(result).build();
}
@Override

View File

@ -32,10 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -70,8 +67,7 @@ public class GatewayAllocator extends AbstractComponent {
private final TransportNodesListGatewayStartedShards startedAction;
private final TransportNodesListShardStoreMetaData storeAction;
private ClusterService clusterService;
private AllocationService allocationService;
private RoutingService routingService;
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>> asyncFetchStarted = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>> asyncFetchStore = ConcurrentCollections.newConcurrentMap();
@ -87,9 +83,8 @@ public class GatewayAllocator extends AbstractComponent {
logger.debug("using initial_shards [{}]", initialShards);
}
public void setReallocation(final ClusterService clusterService, final AllocationService allocationService) {
this.clusterService = clusterService;
this.allocationService = allocationService;
public void setReallocation(final ClusterService clusterService, final RoutingService routingService) {
this.routingService = routingService;
clusterService.add(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
@ -535,8 +530,6 @@ public class GatewayAllocator extends AbstractComponent {
return changed;
}
private final AtomicBoolean rerouting = new AtomicBoolean();
class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T> {
public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List<? extends BaseNodesResponse<T>, T> action) {
@ -545,30 +538,8 @@ public class GatewayAllocator extends AbstractComponent {
@Override
protected void reroute(ShardId shardId, String reason) {
if (rerouting.compareAndSet(false, true) == false) {
logger.trace("{} already has pending reroute, ignoring {}", shardId, reason);
return;
}
clusterService.submitStateUpdateTask("async_shard_fetch", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
rerouting.set(false);
if (currentState.nodes().masterNode() == null) {
return currentState;
}
RoutingAllocation.Result routingResult = allocationService.reroute(currentState);
if (!routingResult.changed()) {
return currentState;
}
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
@Override
public void onFailure(String source, Throwable t) {
rerouting.set(false);
logger.warn("failed to perform reroute post async fetch for {}", t, source);
}
});
logger.trace("{} scheduling reroute for {}", shardId, reason);
routingService.reroute("async_shard_fetch");
}
}
}

View File

@ -253,7 +253,7 @@ public class Node implements Releasable {
injector.getInstance(RestController.class).start();
// TODO hack around circular dependecncies problems
injector.getInstance(GatewayAllocator.class).setReallocation(injector.getInstance(ClusterService.class), injector.getInstance(AllocationService.class));
injector.getInstance(GatewayAllocator.class).setReallocation(injector.getInstance(ClusterService.class), injector.getInstance(RoutingService.class));
DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start();
discoService.waitForInitialState();

View File

@ -47,7 +47,6 @@ public class PercolatorStressBenchmark {
public static void main(String[] args) throws Exception {
Settings settings = settingsBuilder()
.put("cluster.routing.schedule", 200, TimeUnit.MILLISECONDS)
.put(SETTING_NUMBER_OF_SHARDS, 4)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.build();

View File

@ -57,7 +57,6 @@ public class AwarenessAllocationTests extends ElasticsearchIntegrationTest {
@Test
public void testSimpleAwareness() throws Exception {
Settings commonSettings = Settings.settingsBuilder()
.put("cluster.routing.schedule", "10ms")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build();

View File

@ -0,0 +1,149 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class RoutingServiceTests extends ElasticsearchAllocationTestCase {
private TestRoutingService routingService;
@Before
public void createRoutingService() {
routingService = new TestRoutingService();
}
@After
public void shutdownRoutingService() throws Exception {
routingService.shutdown();
}
@Test
public void testReroute() {
assertThat(routingService.hasReroutedAndClear(), equalTo(false));
routingService.reroute("test");
assertThat(routingService.hasReroutedAndClear(), equalTo(true));
}
@Test
public void testNoDelayedUnassigned() throws Exception {
AllocationService allocation = createAllocationService();
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "0"))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.routingNodes().hasUnassigned(), equalTo(false));
// remove node2 and reroute
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
ClusterState newState = clusterState;
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE));
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE));
assertThat(routingService.hasReroutedAndClear(), equalTo(false));
}
@Test
public void testDelayedUnassignedScheduleReroute() throws Exception {
AllocationService allocation = createAllocationService();
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.routingNodes().hasUnassigned(), equalTo(false));
// remove node2 and reroute
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
ClusterState newState = clusterState;
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(routingService.hasReroutedAndClear(), equalTo(true));
}
});
// verify the registration has been reset
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE));
}
private class TestRoutingService extends RoutingService {
private AtomicBoolean rerouted = new AtomicBoolean();
public TestRoutingService() {
super(Settings.EMPTY, new ThreadPool(getTestName()), null, null);
}
void shutdown() throws Exception {
terminate(threadPool);
}
public boolean hasReroutedAndClear() {
return rerouted.getAndSet(false);
}
@Override
void performReroute(String reason) {
rerouted.set(true);
}
}
}

View File

@ -61,7 +61,6 @@ public class IndexLifecycleActionTests extends ElasticsearchIntegrationTest {
Settings settings = settingsBuilder()
.put(SETTING_NUMBER_OF_SHARDS, 11)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put("cluster.routing.schedule", "20ms") // reroute every 20ms so we identify new nodes fast
.build();
// start one server

View File

@ -519,7 +519,6 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
final Settings nodeSettings = Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK, "100ms")
.put(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, "1s")
.put("cluster.routing.schedule", "100ms") // aggressive reroute post shard failures
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
.put(MockFSDirectoryService.RANDOM_PREVENT_DOUBLE_WRITE, false) // restarted recoveries will delete temp files and write them again
.build();

View File

@ -369,8 +369,6 @@ public final class InternalTestCluster extends TestCluster {
private static Settings getRandomNodeSettings(long seed) {
Random random = new Random(seed);
Builder builder = Settings.settingsBuilder()
// decrease the routing schedule so new nodes will be added quickly - some random value between 30 and 80 ms
.put("cluster.routing.schedule", (30 + random.nextInt(50)) + "ms")
.put(SETTING_CLUSTER_NODE_SEED, seed);
if (ENABLE_MOCK_MODULES && usually(random)) {
builder.put(IndexStoreModule.STORE_TYPE, MockFSIndexStoreModule.class.getName());