Add test to verify force primary allocation on closed indices (#42458)

This change adds a test verifying that we can force primary allocation on closed indices.
This commit is contained in:
Nhat Nguyen 2019-05-24 11:19:41 -04:00 committed by Yannick Welsch
parent 075fd2a0ac
commit 329d1307a5

View File

@ -23,9 +23,11 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -65,6 +67,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
@ -231,7 +234,9 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
Set<String> historyUUIDs = Arrays.stream(client().admin().indices().prepareStats("test").clear().get().getShards()) Set<String> historyUUIDs = Arrays.stream(client().admin().indices().prepareStats("test").clear().get().getShards())
.map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY)).collect(Collectors.toSet()); .map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY)).collect(Collectors.toSet());
createStaleReplicaScenario(master); createStaleReplicaScenario(master);
if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose("test").setWaitForActiveShards(0));
}
boolean useStaleReplica = randomBoolean(); // if true, use stale replica, otherwise a completely empty copy boolean useStaleReplica = randomBoolean(); // if true, use stale replica, otherwise a completely empty copy
logger.info("--> explicitly promote old primary shard"); logger.info("--> explicitly promote old primary shard");
final String idxName = "test"; final String idxName = "test";
@ -281,15 +286,18 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get() assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get()
.getState().routingTable().index(idxName).allPrimaryShardsActive())); .getState().routingTable().index(idxName).allPrimaryShardsActive()));
} }
assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L); ShardStats[] shardStats = client().admin().indices().prepareStats("test")
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED).get().getShards();
for (ShardStats shardStat : shardStats) {
assertThat(shardStat.getCommitStats().getNumDocs(), equalTo(useStaleReplica ? 1 : 0));
}
// allocation id of old primary was cleaned from the in-sync set // allocation id of old primary was cleaned from the in-sync set
final ClusterState state = client().admin().cluster().prepareState().get().getState(); final ClusterState state = client().admin().cluster().prepareState().get().getState();
assertEquals(Collections.singleton(state.routingTable().index(idxName).shard(0).primary.allocationId().getId()), assertEquals(Collections.singleton(state.routingTable().index(idxName).shard(0).primary.allocationId().getId()),
state.metaData().index(idxName).inSyncAllocationIds(0)); state.metaData().index(idxName).inSyncAllocationIds(0));
Set<String> newHistoryUUIds = Arrays.stream(client().admin().indices().prepareStats("test").clear().get().getShards()) Set<String> newHistoryUUIds = Stream.of(shardStats)
.map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY)).collect(Collectors.toSet()); .map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY)).collect(Collectors.toSet());
assertThat(newHistoryUUIds, everyItem(not(isIn(historyUUIDs)))); assertThat(newHistoryUUIds, everyItem(not(isIn(historyUUIDs))));
assertThat(newHistoryUUIds, hasSize(1)); assertThat(newHistoryUUIds, hasSize(1));