diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java index fc4567a5b53..a87df801a63 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.cluster.health; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -58,6 +59,7 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable validationFailures; @@ -68,14 +70,15 @@ public class ClusterHealthResponse extends ActionResponse implements Iterabletrue if the waitForXXX has timeout out and did not match. */ @@ -229,6 +241,9 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable i private volatile boolean routingTableDirty = false; private volatile Future scheduledRoutingTableFuture; + private AtomicBoolean rerouting = new AtomicBoolean(); + + private volatile long registeredNextDelaySetting = Long.MAX_VALUE; + private volatile ScheduledFuture registeredNextDelayFuture; @Inject public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService) { @@ -125,6 +132,31 @@ public class RoutingService extends AbstractLifecycleComponent i } } } + + // figure out when the next unassigned allocation need to happen from now. If this is larger or equal + // then the last time we checked and scheduled, we are guaranteed to have a reroute until then, so no need + // to schedule again + long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(settings, event.state()); + if (nextDelaySetting > 0 && nextDelaySetting < registeredNextDelaySetting) { + FutureUtils.cancel(registeredNextDelayFuture); + registeredNextDelaySetting = nextDelaySetting; + TimeValue nextDelay = TimeValue.timeValueMillis(UnassignedInfo.findNextDelayedAllocationIn(settings, event.state())); + logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]", UnassignedInfo.getNumberOfDelayedUnassigned(settings, event.state()), nextDelay); + registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + routingTableDirty = true; + reroute(); + } + + @Override + public void onFailure(Throwable t) { + logger.warn("failed to schedule/execute reroute post unassigned shard", t); + } + }); + } else { + logger.trace("no need to schedule reroute due to delayed unassigned, next_delay_setting [{}], registered [{}]", nextDelaySetting, registeredNextDelaySetting); + } } else { FutureUtils.cancel(scheduledRoutingTableFuture); scheduledRoutingTableFuture = null; @@ -139,9 +171,14 @@ public class RoutingService extends AbstractLifecycleComponent i if (lifecycle.stopped()) { return; } + if (rerouting.compareAndSet(false, true) == false) { + logger.trace("already has pending reroute, ignoring"); + return; + } clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, Priority.HIGH, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { + rerouting.set(false); RoutingAllocation.Result routingResult = allocationService.reroute(currentState); if (!routingResult.changed()) { // no state changed @@ -152,11 +189,13 @@ public class RoutingService extends AbstractLifecycleComponent i @Override public void onNoLongerMaster(String source) { + rerouting.set(false); // no biggie } @Override public void onFailure(String source, Throwable t) { + rerouting.set(false); ClusterState state = clusterService.state(); if (logger.isTraceEnabled()) { logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint()); @@ -166,7 +205,8 @@ public class RoutingService extends AbstractLifecycleComponent i } }); routingTableDirty = false; - } catch (Exception e) { + } catch (Throwable e) { + rerouting.set(false); ClusterState state = clusterService.state(); logger.warn("Failed to reroute routing table, current state:\n{}", e, state.prettyPrint()); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index b20f1363e30..c2a8eb4ed25 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -19,12 +19,16 @@ package org.elasticsearch.cluster.routing; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.joda.FormatDateTimeFormatter; import org.elasticsearch.common.joda.Joda; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -37,6 +41,9 @@ public class UnassignedInfo implements ToXContent, Writeable { public static final FormatDateTimeFormatter DATE_TIME_FORMATTER = Joda.forPattern("dateOptionalTime"); + public static final String DELAYED_NODE_LEFT_TIMEOUT = "index.unassigned.node_left.delayed_timeout"; + public static final TimeValue DEFAULT_DELAYED_NODE_LEFT_TIMEOUT = TimeValue.timeValueMillis(0); + /** * Reason why the shard is in unassigned state. *

@@ -141,6 +148,86 @@ public class UnassignedInfo implements ToXContent, Writeable { return this.details; } + /** + * The allocation delay value associated with the index (defaulting to node settings if not set). + */ + public long getAllocationDelayTimeoutSetting(Settings settings, Settings indexSettings) { + if (reason != Reason.NODE_LEFT) { + return 0; + } + TimeValue delayTimeout = indexSettings.getAsTime(DELAYED_NODE_LEFT_TIMEOUT, settings.getAsTime(DELAYED_NODE_LEFT_TIMEOUT, DEFAULT_DELAYED_NODE_LEFT_TIMEOUT)); + return Math.max(0l, delayTimeout.millis()); + } + + /** + * The time in millisecond until this unassigned shard can be reassigned. + */ + public long getDelayAllocationExpirationIn(Settings settings, Settings indexSettings) { + long delayTimeout = getAllocationDelayTimeoutSetting(settings, indexSettings); + if (delayTimeout == 0) { + return 0; + } + long delta = System.currentTimeMillis() - timestamp; + // account for time drift, treat it as no timeout + if (delta < 0) { + return 0; + } + return delayTimeout - delta; + } + + + /** + * Returns the number of shards that are unassigned and currently being delayed. + */ + public static int getNumberOfDelayedUnassigned(Settings settings, ClusterState state) { + int count = 0; + for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) { + if (shard.primary() == false) { + IndexMetaData indexMetaData = state.metaData().index(shard.getIndex()); + long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings()); + if (delay > 0) { + count++; + } + } + } + return count; + } + + /** + * Finds the smallest delay expiration setting of an unassigned shard. Returns 0 if there are none. + */ + public static long findSmallestDelayedAllocationSetting(Settings settings, ClusterState state) { + long nextDelaySetting = Long.MAX_VALUE; + for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) { + if (shard.primary() == false) { + IndexMetaData indexMetaData = state.metaData().index(shard.getIndex()); + long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSetting(settings, indexMetaData.getSettings()); + if (delayTimeoutSetting > 0 && delayTimeoutSetting < nextDelaySetting) { + nextDelaySetting = delayTimeoutSetting; + } + } + } + return nextDelaySetting == Long.MAX_VALUE ? 0l : nextDelaySetting; + } + + + /** + * Finds the next (closest) delay expiration of an unassigned shard. Returns 0 if there are none. + */ + public static long findNextDelayedAllocationIn(Settings settings, ClusterState state) { + long nextDelay = Long.MAX_VALUE; + for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) { + if (shard.primary() == false) { + IndexMetaData indexMetaData = state.metaData().index(shard.getIndex()); + long nextShardDelay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings()); + if (nextShardDelay > 0 && nextShardDelay < nextDelay) { + nextDelay = nextShardDelay; + } + } + } + return nextDelay == Long.MAX_VALUE ? 0l : nextDelay; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 6e5f6ab23f0..a108489d9f5 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -48,6 +48,7 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; @@ -419,6 +420,8 @@ public class GatewayAllocator extends AbstractComponent { long lastSizeMatched = 0; DiscoveryNode lastDiscoNodeMatched = null; RoutingNode lastNodeMatched = null; + boolean hasReplicaData = false; + IndexMetaData indexMetaData = metaData.index(shard.getIndex()); for (Map.Entry nodeStoreEntry : shardStores.getData().entrySet()) { DiscoveryNode discoNode = nodeStoreEntry.getKey(); @@ -449,6 +452,7 @@ public class GatewayAllocator extends AbstractComponent { } if (!shard.primary()) { + hasReplicaData |= storeFilesMetaData.iterator().hasNext(); MutableShardRouting primaryShard = routingNodes.activePrimary(shard); if (primaryShard != null) { assert primaryShard.active(); @@ -509,6 +513,23 @@ public class GatewayAllocator extends AbstractComponent { allocation.routingNodes().assign(shard, lastNodeMatched.nodeId()); unassignedIterator.remove(); } + } else if (hasReplicaData == false) { + // if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation + // of the replica shard needs to be delayed, and if so, add it to the ignore unassigned list + // note: we only care about replica in delayed allocation, since if we have an unassigned primary it + // will anyhow wait to find an existing copy of the shard to be allocated + // note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService + long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings()); + if (delay > 0) { + logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay)); + /** + * mark it as changed, since we want to kick a publishing to schedule future allocation, + * see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}). + */ + changed = true; + unassignedIterator.remove(); + routingNodes.ignoredUnassigned().add(shard); + } } } return changed; diff --git a/core/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/core/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index ffbc50ef314..6fb91609e74 100644 --- a/core/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/core/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.settings; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider; @@ -110,6 +111,7 @@ public class IndexDynamicSettingsModule extends AbstractModule { indexDynamicSettings.addDynamicSetting(TranslogConfig.INDEX_TRANSLOG_DURABILITY); indexDynamicSettings.addDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED); indexDynamicSettings.addDynamicSetting(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, Validator.BOOLEAN); + indexDynamicSettings.addDynamicSetting(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, Validator.TIME); } public void addDynamicSettings(String... settings) { diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterHealthResponsesTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterHealthResponsesTests.java index 78477acf3b4..345f3960342 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterHealthResponsesTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterHealthResponsesTests.java @@ -192,12 +192,14 @@ public class ClusterHealthResponsesTests extends ElasticsearchTestCase { ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); int pendingTasks = randomIntBetween(0, 200); int inFlight = randomIntBetween(0, 200); - ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, pendingTasks, inFlight); + int delayedUnassigned = randomIntBetween(0, 200); + ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, pendingTasks, inFlight, delayedUnassigned); logger.info("cluster status: {}, expected {}", clusterHealth.getStatus(), counter.status()); clusterHealth = maybeSerialize(clusterHealth); assertClusterHealth(clusterHealth, counter); assertThat(clusterHealth.getNumberOfPendingTasks(), Matchers.equalTo(pendingTasks)); assertThat(clusterHealth.getNumberOfInFlightFetch(), Matchers.equalTo(inFlight)); + assertThat(clusterHealth.getDelayedUnassignedShards(), Matchers.equalTo(delayedUnassigned)); } ClusterHealthResponse maybeSerialize(ClusterHealthResponse clusterHealth) throws IOException { @@ -225,7 +227,7 @@ public class ClusterHealthResponsesTests extends ElasticsearchTestCase { metaData.put(indexMetaData, true); routingTable.add(indexRoutingTable); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); - ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, 0, 0); + ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), (String[]) null), clusterState, 0, 0, 0); clusterHealth = maybeSerialize(clusterHealth); // currently we have no cluster level validation failures as index validation issues are reported per index. assertThat(clusterHealth.getValidationFailures(), Matchers.hasSize(0)); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationTests.java new file mode 100644 index 00000000000..b59a81dd5e3 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationTests.java @@ -0,0 +1,162 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.InternalTestCluster; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +/** + */ +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0) +public class DelayedAllocationTests extends ElasticsearchIntegrationTest { + + /** + * Verifies that when there is no delay timeout, a 1/1 index shard will immediately + * get allocated to a free node when the node hosting it leaves the cluster. + */ + @Test + public void testNoDelayedTimeout() throws Exception { + internalCluster().startNodesAsync(3).get(); + prepareCreate("test").setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.DEFAULT_DELAYED_NODE_LEFT_TIMEOUT, 0)).get(); + ensureGreen("test"); + indexRandomData(); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard())); + assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(0)); + ensureGreen("test"); + } + + /** + * When we do have delayed allocation set, verifies that even though there is a node + * free to allocate the unassigned shard when the node hosting it leaves, it doesn't + * get allocated. Once we bring the node back, it gets allocated since it existed + * on it before. + */ + @Test + public void testDelayedAllocationNodeLeavesAndComesBack() throws Exception { + internalCluster().startNodesAsync(3).get(); + prepareCreate("test").setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, TimeValue.timeValueHours(1))).get(); + ensureGreen("test"); + indexRandomData(); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard())); + assertThat(client().admin().cluster().prepareState().all().get().getState().routingNodes().hasUnassigned(), equalTo(true)); + assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1)); + internalCluster().startNode(); // this will use the same data location as the stopped node + ensureGreen("test"); + } + + /** + * With a very small delay timeout, verify that it expires and we get to green even + * though the node hosting the shard is not coming back. + */ + @Test + public void testDelayedAllocationTimesOut() throws Exception { + internalCluster().startNodesAsync(3).get(); + prepareCreate("test").setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, TimeValue.timeValueMillis(100))).get(); + ensureGreen("test"); + indexRandomData(); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard())); + ensureGreen("test"); + } + + /** + * Verify that when explicitly changing the value of the index setting for the delayed + * allocation to a very small value, it kicks the allocation of the unassigned shard + * even though the node it was hosted on will not come back. + */ + @Test + public void testDelayedAllocationChangeWithSettingTo100ms() throws Exception { + internalCluster().startNodesAsync(3).get(); + prepareCreate("test").setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, TimeValue.timeValueHours(1))).get(); + ensureGreen("test"); + indexRandomData(); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard())); + assertThat(client().admin().cluster().prepareState().all().get().getState().routingNodes().hasUnassigned(), equalTo(true)); + assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1)); + assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, TimeValue.timeValueMillis(100))).get()); + ensureGreen("test"); + assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(0)); + } + + /** + * Verify that when explicitly changing the value of the index setting for the delayed + * allocation to 0, it kicks the allocation of the unassigned shard + * even though the node it was hosted on will not come back. + */ + @Test + public void testDelayedAllocationChangeWithSettingTo0() throws Exception { + internalCluster().startNodesAsync(3).get(); + prepareCreate("test").setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, TimeValue.timeValueHours(1))).get(); + ensureGreen("test"); + indexRandomData(); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard())); + assertThat(client().admin().cluster().prepareState().all().get().getState().routingNodes().hasUnassigned(), equalTo(true)); + assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1)); + assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, TimeValue.timeValueMillis(0))).get()); + ensureGreen("test"); + assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(0)); + } + + + private void indexRandomData() throws Exception { + int numDocs = scaledRandomIntBetween(100, 1000); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test", "type").setSource("field", "value"); + } + // we want to test both full divergent copies of the shard in terms of segments, and + // a case where they are the same (using sync flush), index Random does all this goodness + // already + indexRandom(true, builders); + } + + private String findNodeWithShard() { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + List startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED); + Collections.shuffle(startedShards, getRandom()); + return state.nodes().get(startedShards.get(0).currentNodeId()).getName(); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index ede68df61ed..82d8d043302 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -33,9 +33,13 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ElasticsearchAllocationTestCase; import org.junit.Test; +import java.util.EnumSet; + import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.hamcrest.Matchers.*; @@ -253,4 +257,90 @@ public class UnassignedInfoTests extends ElasticsearchAllocationTestCase { assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getDetails(), equalTo("test fail")); assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getTimestampInMillis(), greaterThan(0l)); } + + /** + * Verifies that delayed allocation calculation are correct. + */ + @Test + public void testUnassignedDelayedOnlyOnNodeLeft() throws Exception { + final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, null); + long delay = unassignedInfo.getAllocationDelayTimeoutSetting(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), Settings.EMPTY); + assertThat(delay, equalTo(TimeValue.timeValueHours(10).millis())); + assertBusy(new Runnable() { + @Override + public void run() { + long delay = unassignedInfo.getDelayAllocationExpirationIn(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), Settings.EMPTY); + assertThat(delay, greaterThan(0l)); + assertThat(delay, lessThan(TimeValue.timeValueHours(10).millis())); + } + }); + } + + /** + * Verifies that delayed allocation is only computed when the reason is NODE_LEFT. + */ + @Test + public void testUnassignedDelayOnlyNodeLeftNonNodeLeftReason() throws Exception { + EnumSet reasons = EnumSet.allOf(UnassignedInfo.Reason.class); + reasons.remove(UnassignedInfo.Reason.NODE_LEFT); + UnassignedInfo unassignedInfo = new UnassignedInfo(RandomPicks.randomFrom(getRandom(), reasons), null); + long delay = unassignedInfo.getAllocationDelayTimeoutSetting(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), Settings.EMPTY); + assertThat(delay, equalTo(0l)); + delay = unassignedInfo.getDelayAllocationExpirationIn(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), Settings.EMPTY); + assertThat(delay, equalTo(0l)); + } + + @Test + public void testNumberOfDelayedUnassigned() throws Exception { + AllocationService allocation = createAllocationService(); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2"))).build(); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); + clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build(); + assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), clusterState), equalTo(0)); + // starting primaries + clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build(); + // starting replicas + clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build(); + assertThat(clusterState.routingNodes().hasUnassigned(), equalTo(false)); + // remove node2 and reroute + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build(); + clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build(); + assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), clusterState), equalTo(2)); + } + + @Test + public void testFindNextDelayedAllocation() { + AllocationService allocation = createAllocationService(); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2"))).build(); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); + clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build(); + assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), clusterState), equalTo(0)); + // starting primaries + clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build(); + // starting replicas + clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build(); + assertThat(clusterState.routingNodes().hasUnassigned(), equalTo(false)); + // remove node2 and reroute + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build(); + clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build(); + + long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), clusterState); + assertThat(nextDelaySetting, equalTo(TimeValue.timeValueHours(10).millis())); + + long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(Settings.builder().put(UnassignedInfo.DELAYED_NODE_LEFT_TIMEOUT, "10h").build(), clusterState); + assertThat(nextDelay, greaterThan(TimeValue.timeValueHours(9).millis())); + assertThat(nextDelay, lessThanOrEqualTo(TimeValue.timeValueHours(10).millis())); + } }