diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 989dc22ee0a..b00cacd013f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -25,9 +25,11 @@ import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; @@ -54,6 +56,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -212,14 +215,26 @@ public class PrimaryAllocationIT extends ESIntegTestCase { rerouteBuilder.add(new AllocateEmptyPrimaryAllocationCommand(idxName, shardId, storeStatus.getNode().getId(), true)); } } - rerouteBuilder.get(); - ClusterState state = client().admin().cluster().prepareState().get().getState(); - - Set expectedAllocationIds = useStaleReplica + final Set expectedAllocationIds = useStaleReplica ? Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) : Collections.emptySet(); - assertEquals(expectedAllocationIds, state.metaData().index(idxName).inSyncAllocationIds(0)); + + final CountDownLatch clusterStateChangeLatch = new CountDownLatch(1); + final ClusterStateListener clusterStateListener = event -> { + final Set allocationIds = event.state().metaData().index(idxName).inSyncAllocationIds(0); + if (expectedAllocationIds.equals(allocationIds)) { + clusterStateChangeLatch.countDown(); + } + logger.info("expected allocation ids: {} actual allocation ids: {}", expectedAllocationIds, allocationIds); + }; + final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, master); + clusterService.addListener(clusterStateListener); + + rerouteBuilder.get(); + + assertTrue(clusterStateChangeLatch.await(30, TimeUnit.SECONDS)); + clusterService.removeListener(clusterStateListener); logger.info("--> check that the stale primary shard gets allocated and that documents are available"); ensureYellow(idxName); @@ -235,7 +250,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase { assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L); // allocation id of old primary was cleaned from the in-sync set - state = client().admin().cluster().prepareState().get().getState(); + final ClusterState state = client().admin().cluster().prepareState().get().getState(); assertEquals(Collections.singleton(state.routingTable().index(idxName).shard(0).primary.allocationId().getId()), state.metaData().index(idxName).inSyncAllocationIds(0));