diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index 526dde505ef..cf1235c8f21 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -140,12 +140,19 @@ public class IndexRoutingTable extends AbstractDiffable imple } if (shardRouting.primary() && shardRouting.initializing() && - shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE && - inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) - throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " + - "a known allocation id but has no corresponding entry in the in-sync " + - "allocation set " + inSyncAllocationIds); - + shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE) { + if (inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)) { + if (inSyncAllocationIds.size() != 1) { + throw new IllegalStateException("a primary shard routing " + shardRouting + + " is a primary that is recovering from a stale primary has unexpected allocation ids in in-sync " + + "allocation set " + inSyncAllocationIds); + } + } else if (inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) { + throw new IllegalStateException("a primary shard routing " + shardRouting + + " is a primary that is recovering from a known allocation id but has no corresponding entry in the in-sync " + + "allocation set " + inSyncAllocationIds); + } + } } } return true; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java index b7cc95298c4..c72c25a0578 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java @@ -132,6 +132,11 @@ public abstract class RecoverySource implements Writeable, ToXContentObject { * Recovery from an existing on-disk store */ public static final class ExistingStoreRecoverySource extends RecoverySource { + /** + * Special allocation id that shard has during initialization on allocate_stale_primary + */ + public static final String FORCED_ALLOCATION_ID = "_forced_allocation_"; + public static final ExistingStoreRecoverySource INSTANCE = new ExistingStoreRecoverySource(false); public static final ExistingStoreRecoverySource FORCE_STALE_PRIMARY_INSTANCE = new ExistingStoreRecoverySource(true); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java index 6d4ca7dc775..54625a15e8d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java @@ -39,6 +39,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -68,7 +69,16 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting @Override public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) { - addAllocationId(startedShard); + assert Objects.equals(initializingShard.allocationId().getId(), startedShard.allocationId().getId()) + : "initializingShard.allocationId [" + initializingShard.allocationId().getId() + + "] and startedShard.allocationId [" + startedShard.allocationId().getId() + "] have to have the same"; + Updates updates = changes(startedShard.shardId()); + updates.addedAllocationIds.add(startedShard.allocationId().getId()); + if (startedShard.primary() + // started shard has to have null recoverySource; have to pick up recoverySource from its initializing state + && (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)) { + updates.removedAllocationIds.add(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID); + } } @Override @@ -144,7 +154,8 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting oldInSyncAllocationIds.contains(updates.initializedPrimary.allocationId().getId()) == false) { // we're not reusing an existing in-sync allocation id to initialize a primary, which means that we're either force-allocating // an empty or a stale primary (see AllocateEmptyPrimaryAllocationCommand or AllocateStalePrimaryAllocationCommand). - RecoverySource.Type recoverySourceType = updates.initializedPrimary.recoverySource().getType(); + RecoverySource recoverySource = updates.initializedPrimary.recoverySource(); + RecoverySource.Type recoverySourceType = recoverySource.getType(); boolean emptyPrimary = recoverySourceType == RecoverySource.Type.EMPTY_STORE; assert updates.addedAllocationIds.isEmpty() : (emptyPrimary ? "empty" : "stale") + " primary is not force-initialized in same allocation round where shards are started"; @@ -156,9 +167,15 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting // forcing an empty primary resets the in-sync allocations to the empty set (ShardRouting.allocatedPostIndexCreate) indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.emptySet()); } else { + final String allocationId; + if (recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE) { + allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID; + } else { + assert recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource; + allocationId = updates.initializedPrimary.allocationId().getId(); + } // forcing a stale primary resets the in-sync allocations to the singleton set with the stale id - indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), - Collections.singleton(updates.initializedPrimary.allocationId().getId())); + indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.singleton(allocationId)); } } else { // standard path for updating in-sync ids @@ -166,6 +183,10 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting inSyncAllocationIds.addAll(updates.addedAllocationIds); inSyncAllocationIds.removeAll(updates.removedAllocationIds); + assert oldInSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false + || inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false : + "fake allocation id has to be removed, inSyncAllocationIds:" + inSyncAllocationIds; + // Prevent set of inSyncAllocationIds to grow unboundedly. This can happen for example if we don't write to a primary // but repeatedly shut down nodes that have active replicas. // We use number_of_replicas + 1 (= possible active shard copies) to bound the inSyncAllocationIds set @@ -287,13 +308,6 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting } } - /** - * Add allocation id of this shard to the set of in-sync shard copies - */ - private void addAllocationId(ShardRouting shardRouting) { - changes(shardRouting.shardId()).addedAllocationIds.add(shardRouting.allocationId().getId()); - } - /** * Increase primary term for this shard id */ diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java new file mode 100644 index 00000000000..f9da7a1aa8e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -0,0 +1,238 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing; + +import org.apache.lucene.store.SimpleFSDirectory; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.allocation.AllocationDecision; +import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; +import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MockEngineFactoryPlugin; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommandIT; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.transport.MockTransportService; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0) +public class AllocationIdIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class); + } + + public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStalePrimary() throws Exception { + /* + * Allocation id is put on start of shard while historyUUID is adjusted after recovery is done. + * + * If during execution of AllocateStalePrimary a proper allocation id is stored in allocation id set and recovery is failed + * shard restart skips the stage where historyUUID is changed. + * + * That leads to situation where allocated stale primary and its replica belongs to the same historyUUID and + * replica will receive operations after local checkpoint while documents before checkpoints could be significant different. + * + * Therefore, on AllocateStalePrimary we put some fake allocation id (no real one could be generated like that) + * and any failure during recovery requires extra AllocateStalePrimary command to be executed. + */ + + // initial set up + final String indexName = "index42"; + final String master = internalCluster().startMasterOnlyNode(); + String node1 = internalCluster().startNode(); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum").build()); + final int numDocs = indexDocs(indexName, "foo", "bar"); + final IndexSettings indexSettings = getIndexSettings(indexName, node1); + final Set allocationIds = getAllocationIds(indexName); + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + final Path indexPath = getIndexPath(node1, shardId); + assertThat(allocationIds, hasSize(1)); + final String historyUUID = historyUUID(node1, indexName); + String node2 = internalCluster().startNode(); + ensureGreen(indexName); + internalCluster().assertSameDocIdsOnShards(); + // initial set up is done + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1)); + + // index more docs to node2 that marks node1 as stale + int numExtraDocs = indexDocs(indexName, "foo", "bar2"); + assertHitCount(client(node2).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node2)); + + // create fake corrupted marker on node1 + putFakeCorruptionMarker(indexSettings, shardId, indexPath); + + // thanks to master node1 is out of sync + node1 = internalCluster().startNode(); + + // there is only _stale_ primary + checkNoValidShardCopy(indexName, shardId); + + // allocate stale primary + client(node1).admin().cluster().prepareReroute() + .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, node1, true)) + .get(); + + // allocation fails due to corruption marker + assertBusy(() -> { + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + final ShardRouting shardRouting = state.routingTable().index(indexName).shard(shardId.id()).primaryShard(); + assertThat(shardRouting.state(), equalTo(ShardRoutingState.UNASSIGNED)); + assertThat(shardRouting.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED)); + }); + + try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) { + store.removeCorruptionMarker(); + } + + // index is red: no any shard is allocated (allocation id is a fake id that does not match to anything) + checkHealthStatus(indexName, ClusterHealthStatus.RED); + checkNoValidShardCopy(indexName, shardId); + + internalCluster().restartNode(node1, InternalTestCluster.EMPTY_CALLBACK); + + // index is still red due to mismatch of allocation id + checkHealthStatus(indexName, ClusterHealthStatus.RED); + checkNoValidShardCopy(indexName, shardId); + + // no any valid shard is there; have to invoke AllocateStalePrimary again + client().admin().cluster().prepareReroute() + .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, node1, true)) + .get(); + + ensureYellow(indexName); + + // bring node2 back + node2 = internalCluster().startNode(); + ensureGreen(indexName); + + assertThat(historyUUID(node1, indexName), not(equalTo(historyUUID))); + assertThat(historyUUID(node1, indexName), equalTo(historyUUID(node2, indexName))); + + internalCluster().assertSameDocIdsOnShards(); + } + + public void checkHealthStatus(String indexName, ClusterHealthStatus healthStatus) { + final ClusterHealthStatus indexHealthStatus = client().admin().cluster() + .health(Requests.clusterHealthRequest(indexName)).actionGet().getStatus(); + assertThat(indexHealthStatus, is(healthStatus)); + } + + private int indexDocs(String indexName, Object ... source) throws InterruptedException, ExecutionException { + // index some docs in several segments + int numDocs = 0; + for (int k = 0, attempts = randomIntBetween(5, 10); k < attempts; k++) { + final int numExtraDocs = between(10, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numExtraDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex(indexName, "type").setSource(source); + } + + indexRandom(true, false, true, Arrays.asList(builders)); + numDocs += numExtraDocs; + } + + return numDocs; + } + + private Path getIndexPath(String nodeName, ShardId shardId) { + final Set indexDirs = RemoveCorruptedShardDataCommandIT.getDirs(nodeName, shardId, ShardPath.INDEX_FOLDER_NAME); + assertThat(indexDirs, hasSize(1)); + return indexDirs.iterator().next(); + } + + private Set getAllocationIds(String indexName) { + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + final Set allocationIds = state.metaData().index(indexName).inSyncAllocationIds(0); + return allocationIds; + } + + private IndexSettings getIndexSettings(String indexName, String nodeName) { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); + final IndexService indexService = indicesService.indexService(resolveIndex(indexName)); + return indexService.getIndexSettings(); + } + + private String historyUUID(String node, String indexName) { + final ShardStats[] shards = client(node).admin().indices().prepareStats(indexName).clear().get().getShards(); + assertThat(shards.length, greaterThan(0)); + final Set historyUUIDs = Arrays.stream(shards) + .map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY)) + .collect(Collectors.toSet()); + assertThat(historyUUIDs, hasSize(1)); + return historyUUIDs.iterator().next(); + } + + private void putFakeCorruptionMarker(IndexSettings indexSettings, ShardId shardId, Path indexPath) throws IOException { + try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) { + store.markStoreCorrupted(new IOException("fake ioexception")); + } + } + + private void checkNoValidShardCopy(String indexName, ShardId shardId) throws Exception { + final ClusterAllocationExplanation explanation = + client().admin().cluster().prepareAllocationExplain() + .setIndex(indexName).setShard(shardId.id()).setPrimary(true) + .get().getExplanation(); + + final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision(); + assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true)); + assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(), + equalTo(AllocationDecision.NO_VALID_SHARD_COPY)); + } + +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index f1876eab2ae..989dc22ee0a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -214,6 +214,13 @@ public class PrimaryAllocationIT extends ESIntegTestCase { } rerouteBuilder.get(); + ClusterState state = client().admin().cluster().prepareState().get().getState(); + + Set expectedAllocationIds = useStaleReplica + ? Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) + : Collections.emptySet(); + assertEquals(expectedAllocationIds, state.metaData().index(idxName).inSyncAllocationIds(0)); + logger.info("--> check that the stale primary shard gets allocated and that documents are available"); ensureYellow(idxName); @@ -228,7 +235,8 @@ public class PrimaryAllocationIT extends ESIntegTestCase { assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L); // allocation id of old primary was cleaned from the in-sync set - ClusterState state = client().admin().cluster().prepareState().get().getState(); + state = client().admin().cluster().prepareState().get().getState(); + assertEquals(Collections.singleton(state.routingTable().index(idxName).shard(0).primary.allocationId().getId()), state.metaData().index(idxName).inSyncAllocationIds(0)); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java index 1ea0a7f8501..c966e3cac27 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java @@ -23,14 +23,18 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; @@ -58,14 +62,18 @@ import org.elasticsearch.index.shard.ShardNotFoundException; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.List; +import java.util.Set; import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; @@ -259,6 +267,56 @@ public class AllocationCommandsTests extends ESAllocationTestCase { } } + public void testAllocateStalePrimaryCommand() { + AllocationService allocation = createAllocationService(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none") + .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none") + .build()); + final String index = "test"; + + logger.info("--> building initial routing table"); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder(index).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1) + .putInSyncAllocationIds(0, Collections.singleton("asdf")).putInSyncAllocationIds(1, Collections.singleton("qwertz"))) + .build(); + // shard routing is added as "from recovery" instead of "new index creation" so that we can test below that allocating an empty + // primary with accept_data_loss flag set to false fails + RoutingTable routingTable = RoutingTable.builder() + .addAsRecovery(metaData.index(index)) + .build(); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData).routingTable(routingTable).build(); + + final String node1 = "node1"; + final String node2 = "node2"; + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .add(newNode(node1)) + .add(newNode(node2)) + ).build(); + clusterState = allocation.reroute(clusterState, "reroute"); + + // mark all shards as stale + final List shardRoutings = clusterState.getRoutingNodes().shardsWithState(UNASSIGNED); + assertThat(shardRoutings, hasSize(2)); + + logger.info("--> allocating empty primary with acceptDataLoss flag set to true"); + clusterState = allocation.reroute(clusterState, + new AllocationCommands(new AllocateStalePrimaryAllocationCommand(index, 0, node1, true)), false, false).getClusterState(); + RoutingNode routingNode1 = clusterState.getRoutingNodes().node(node1); + assertThat(routingNode1.size(), equalTo(1)); + assertThat(routingNode1.shardsWithState(INITIALIZING).size(), equalTo(1)); + Set inSyncAllocationIds = clusterState.metaData().index(index).inSyncAllocationIds(0); + assertThat(inSyncAllocationIds, equalTo(Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID))); + + clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + routingNode1 = clusterState.getRoutingNodes().node(node1); + assertThat(routingNode1.size(), equalTo(1)); + assertThat(routingNode1.shardsWithState(STARTED).size(), equalTo(1)); + inSyncAllocationIds = clusterState.metaData().index(index).inSyncAllocationIds(0); + assertThat(inSyncAllocationIds, hasSize(1)); + assertThat(inSyncAllocationIds, not(Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID))); + } + public void testCancelCommand() { AllocationService allocation = createAllocationService(Settings.builder() .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none") diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index 0ef64e15ce7..2e559c64678 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -42,12 +42,9 @@ import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator; import org.hamcrest.Matchers; -import java.util.List; - import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; @@ -73,7 +70,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceTreshold); - AllocationService strategy = createAllocationService(settings.build(), new NoopGatewayAllocator()); + AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); ClusterState clusterState = initCluster(strategy); assertIndexBalance(clusterState.getRoutingTable(), clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, @@ -101,7 +98,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceTreshold); - AllocationService strategy = createAllocationService(settings.build(), new NoopGatewayAllocator()); + AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); ClusterState clusterState = initCluster(strategy); assertReplicaBalance(logger, clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, @@ -366,7 +363,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { assertThat(shardRouting.state(), Matchers.equalTo(ShardRoutingState.INITIALIZING)); } } - strategy = createAllocationService(settings.build(), new NoopGatewayAllocator()); + strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); logger.info("use the new allocator and check if it moves shards"); routingNodes = clusterState.getRoutingNodes(); @@ -402,19 +399,4 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { } } - private class NoopGatewayAllocator extends GatewayAllocator { - @Override - public void applyStartedShards(RoutingAllocation allocation, List startedShards) { - // noop - } - - @Override - public void applyFailedShards(RoutingAllocation allocation, List failedShards) { - // noop - } - @Override - public void allocateUnassigned(RoutingAllocation allocation) { - // noop - } - } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java index 3c52497539e..26f04319a25 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java @@ -610,7 +610,7 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase { return getDirs(nodeId, shardId, dirSuffix); } - private Set getDirs(String nodeId, ShardId shardId, String dirSuffix) { + public static Set getDirs(String nodeId, ShardId shardId, String dirSuffix) { final NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get(); final Set translogDirs = new TreeSet<>(); final NodeStats nodeStats = nodeStatses.getNodes().get(0);