Merge remote-tracking branch 'dakrone/avoid-extra-reroutes-FP'

This commit is contained in:
Lee Hinman 2015-08-06 10:07:41 -06:00
commit 0a1c9de075
8 changed files with 146 additions and 29 deletions

View File

@ -272,13 +272,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
} catch (IndexNotFoundException e) {
// one of the specified indices is not there - treat it as RED.
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState,
numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState),
numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(), settings, clusterState),
pendingTaskTimeInQueue);
response.status = ClusterHealthStatus.RED;
return response;
}
return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks,
numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState), pendingTaskTimeInQueue);
numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(), settings, clusterState), pendingTaskTimeInQueue);
}
}

View File

@ -57,6 +57,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
private AtomicBoolean rerouting = new AtomicBoolean();
private volatile long registeredNextDelaySetting = Long.MAX_VALUE;
private volatile ScheduledFuture registeredNextDelayFuture;
private volatile long unassignedShardsAllocatedTimestamp = 0;
@Inject
public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService) {
@ -87,6 +88,19 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
return this.allocationService;
}
/**
* Update the last time the allocator tried to assign unassigned shards
*
* This is used so that both the GatewayAllocator and RoutingService use a
* consistent timestamp for comparing which shards have been delayed to
* avoid a race condition where GatewayAllocator thinks the shard should
* be delayed and the RoutingService thinks it has already passed the delay
* and that the GatewayAllocator has/will handle it.
*/
public void setUnassignedShardsAllocatedTimestamp(long timeInMillis) {
this.unassignedShardsAllocatedTimestamp = timeInMillis;
}
/**
* Initiates a reroute.
*/
@ -108,8 +122,15 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
if (nextDelaySetting > 0 && nextDelaySetting < registeredNextDelaySetting) {
FutureUtils.cancel(registeredNextDelayFuture);
registeredNextDelaySetting = nextDelaySetting;
TimeValue nextDelay = TimeValue.timeValueMillis(UnassignedInfo.findNextDelayedAllocationIn(settings, event.state()));
logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]", UnassignedInfo.getNumberOfDelayedUnassigned(settings, event.state()), nextDelay);
// We use System.currentTimeMillis here because we want the
// next delay from the "now" perspective, rather than the
// delay from the last time the GatewayAllocator tried to
// assign/delay the shard
TimeValue nextDelay = TimeValue.timeValueMillis(UnassignedInfo.findNextDelayedAllocationIn(System.currentTimeMillis(), settings, event.state()));
int unassignedDelayedShards = UnassignedInfo.getNumberOfDelayedUnassigned(unassignedShardsAllocatedTimestamp, settings, event.state());
if (unassignedDelayedShards > 0) {
logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]",
unassignedDelayedShards, nextDelay);
registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
@ -120,8 +141,10 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
@Override
public void onFailure(Throwable t) {
logger.warn("failed to schedule/execute reroute post unassigned shard", t);
registeredNextDelaySetting = Long.MAX_VALUE;
}
});
}
} else {
logger.trace("no need to schedule reroute due to delayed unassigned, next_delay_setting [{}], registered [{}]", nextDelaySetting, registeredNextDelaySetting);
}

View File

@ -199,12 +199,12 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
/**
* The time in millisecond until this unassigned shard can be reassigned.
*/
public long getDelayAllocationExpirationIn(Settings settings, Settings indexSettings) {
public long getDelayAllocationExpirationIn(long unassignedShardsAllocatedTimestamp, Settings settings, Settings indexSettings) {
long delayTimeout = getAllocationDelayTimeoutSetting(settings, indexSettings);
if (delayTimeout == 0) {
return 0;
}
long delta = System.currentTimeMillis() - timestamp;
long delta = unassignedShardsAllocatedTimestamp - timestamp;
// account for time drift, treat it as no timeout
if (delta < 0) {
return 0;
@ -216,12 +216,12 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
/**
* Returns the number of shards that are unassigned and currently being delayed.
*/
public static int getNumberOfDelayedUnassigned(Settings settings, ClusterState state) {
public static int getNumberOfDelayedUnassigned(long unassignedShardsAllocatedTimestamp, Settings settings, ClusterState state) {
int count = 0;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings());
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(unassignedShardsAllocatedTimestamp, settings, indexMetaData.getSettings());
if (delay > 0) {
count++;
}
@ -251,12 +251,12 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
/**
* Finds the next (closest) delay expiration of an unassigned shard. Returns 0 if there are none.
*/
public static long findNextDelayedAllocationIn(Settings settings, ClusterState state) {
public static long findNextDelayedAllocationIn(long unassignedShardsAllocatedTimestamp, Settings settings, ClusterState state) {
long nextDelay = Long.MAX_VALUE;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
long nextShardDelay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings());
long nextShardDelay = shard.unassignedInfo().getDelayAllocationExpirationIn(unassignedShardsAllocatedTimestamp, settings, indexMetaData.getSettings());
if (nextShardDelay > 0 && nextShardDelay < nextDelay) {
nextDelay = nextShardDelay;
}

View File

@ -113,6 +113,10 @@ public class GatewayAllocator extends AbstractComponent {
}
public boolean allocateUnassigned(final RoutingAllocation allocation) {
// Take a snapshot of the current time and tell the RoutingService
// about it, so it will use a consistent timestamp for delays
long lastAllocateUnassignedRun = System.currentTimeMillis();
this.routingService.setUnassignedShardsAllocatedTimestamp(lastAllocateUnassignedRun);
boolean changed = false;
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
@ -127,7 +131,7 @@ public class GatewayAllocator extends AbstractComponent {
changed |= primaryShardAllocator.allocateUnassigned(allocation);
changed |= replicaShardAllocator.processExistingRecoveries(allocation);
changed |= replicaShardAllocator.allocateUnassigned(allocation);
changed |= replicaShardAllocator.allocateUnassigned(allocation, lastAllocateUnassignedRun);
return changed;
}

View File

@ -111,6 +111,10 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
}
public boolean allocateUnassigned(RoutingAllocation allocation) {
return allocateUnassigned(allocation, System.currentTimeMillis());
}
public boolean allocateUnassigned(RoutingAllocation allocation, long allocateUnassignedTimestapm) {
boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes();
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
@ -174,7 +178,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
// will anyhow wait to find an existing copy of the shard to be allocated
// note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService
IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex());
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings());
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(allocateUnassignedTimestapm, settings, indexMetaData.getSettings());
if (delay > 0) {
logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay));
/**

View File

@ -34,15 +34,18 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.Allocation;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.junit.Test;
import java.nio.file.Path;
@ -160,6 +163,40 @@ public class ClusterRerouteIT extends ESIntegTestCase {
rerouteWithAllocateLocalGateway(commonSettings);
}
@Test
public void testDelayWithALargeAmountOfShards() throws Exception {
Settings commonSettings = settingsBuilder()
.put("gateway.type", "local")
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 1)
.build();
logger.info("--> starting 4 nodes");
String node_1 = internalCluster().startNode(commonSettings);
internalCluster().startNode(commonSettings);
internalCluster().startNode(commonSettings);
internalCluster().startNode(commonSettings);
assertThat(cluster().size(), equalTo(4));
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
logger.info("--> create indices");
for (int i = 0; i < 25; i++) {
client().admin().indices().prepareCreate("test" + i)
.setSettings(settingsBuilder()
.put("index.number_of_shards", 5).put("index.number_of_replicas", 1)
.put("index.unassigned.node_left.delayed_timeout", randomIntBetween(250, 1000) + "ms"))
.execute().actionGet();
}
ensureGreen(TimeValue.timeValueMinutes(1));
logger.info("--> stopping node1");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_1));
// This might run slowly on older hardware
ensureGreen(TimeValue.timeValueMinutes(2));
}
private void rerouteWithAllocateLocalGateway(Settings commonSettings) throws Exception {
logger.info("--> starting 2 nodes");
String node_1 = internalCluster().startNode(commonSettings);

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
@ -112,6 +113,10 @@ public class RoutingServiceTests extends ESAllocationTestCase {
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// We need to update the routing service's last attempted run to
// signal that the GatewayAllocator tried to allocated it but
// it was delayed
routingService.setUnassignedShardsAllocatedTimestamp(System.currentTimeMillis());
ClusterState newState = clusterState;
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
@ -125,6 +130,44 @@ public class RoutingServiceTests extends ESAllocationTestCase {
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE));
}
@Test
public void testDelayedUnassignedDoesNotRerouteForNegativeDelays() throws Exception {
AllocationService allocation = createAllocationService();
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.routingNodes().hasUnassigned(), equalTo(false));
// remove node2 and reroute
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// Set it in the future so the delay will be negative
routingService.setUnassignedShardsAllocatedTimestamp(System.currentTimeMillis() + TimeValue.timeValueMinutes(1).millis());
ClusterState newState = clusterState;
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(routingService.hasReroutedAndClear(), equalTo(false));
// verify the registration has been updated
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(100L));
}
});
}
private class TestRoutingService extends RoutingService {
private AtomicBoolean rerouted = new AtomicBoolean();

View File

@ -273,7 +273,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
assertBusy(new Runnable() {
@Override
public void run() {
long delay = unassignedInfo.getDelayAllocationExpirationIn(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
long delay = unassignedInfo.getDelayAllocationExpirationIn(System.currentTimeMillis(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
assertThat(delay, greaterThan(0l));
assertThat(delay, lessThan(TimeValue.timeValueHours(10).millis()));
}
@ -290,7 +291,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
UnassignedInfo unassignedInfo = new UnassignedInfo(RandomPicks.randomFrom(getRandom(), reasons), null);
long delay = unassignedInfo.getAllocationDelayTimeoutSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
assertThat(delay, equalTo(0l));
delay = unassignedInfo.getDelayAllocationExpirationIn(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
delay = unassignedInfo.getDelayAllocationExpirationIn(System.currentTimeMillis(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
assertThat(delay, equalTo(0l));
}
@ -306,7 +308,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2"))).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(0));
assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(0));
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
@ -315,7 +318,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(2));
assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(2));
}
@Test
@ -330,7 +334,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2"))).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(0));
assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(0));
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
@ -343,7 +348,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState);
assertThat(nextDelaySetting, equalTo(TimeValue.timeValueHours(10).millis()));
long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState);
long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(System.currentTimeMillis(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState);
assertThat(nextDelay, greaterThan(TimeValue.timeValueHours(9).millis()));
assertThat(nextDelay, lessThanOrEqualTo(TimeValue.timeValueHours(10).millis()));
}