Convert most awaitBusy calls to assertBusy (#45794) (#47112)

Backport of #45794 to 7.x. Convert most `awaitBusy` calls to
`assertBusy`, and use asserts where possible. Follows on from #28548 by
@liketic.

There were a small number of places where it didn't make sense to me to
call `assertBusy`, so I kept the existing calls but renamed the method to
`waitUntil`. This was partly to better reflect its usage, and partly so
that anyone trying to add a new call to awaitBusy wouldn't be able to find
it.

I also didn't change the usage in `TransportStopRollupAction` as the
comments state that the local awaitBusy method is a temporary
copy-and-paste.

Other changes:

  * Rework `waitForDocs` to scale its timeout. Instead of calling
    `assertBusy` in a loop, work out a reasonable overall timeout and await
    just once.
  * Some tests failed after switching to `assertBusy` and had to be fixed.
  * Correct the expect templates in AbstractUpgradeTestCase.  The ES
    Security team confirmed that they don't use templates any more, so
    remove this from the expected templates. Also rewrite how the setup
    code checks for templates, in order to give more information.
  * Remove an expected ML template from XPackRestTestConstants The ML team
    advised that the ML tests shouldn't be waiting for any
    `.ml-notifications*` templates, since such checks should happen in the
    production code instead.
  * Also rework the template checking code in `XPackRestTestHelper` to give
    more helpful failure messages.
  * Fix issue in `DataFrameSurvivesUpgradeIT` when upgrading from < 7.4
This commit is contained in:
Rory Hunter 2019-09-29 12:21:46 +01:00 committed by GitHub
parent 98989f7b37
commit 53a4d2176f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
60 changed files with 522 additions and 547 deletions

View File

@ -902,7 +902,7 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
// b) is slightly more efficient since we may not need to wait an entire second for the timestamp to increment
assertBusy(() -> {
long timeNow = System.currentTimeMillis() / 1000;
assertFalse(prevJobTimeStamp >= timeNow);
assertThat(prevJobTimeStamp, lessThan(timeNow));
});
// Update snapshot timestamp to force it out of snapshot retention window
@ -920,7 +920,8 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
waitForForecastToComplete(jobId, forecastJobResponse.getForecastId());
// Wait for the forecast to expire
awaitBusy(() -> false, 1, TimeUnit.SECONDS);
// FIXME: We should wait for something specific to change, rather than waiting for time to pass.
waitUntil(() -> false, 1, TimeUnit.SECONDS);
// Run up to now
startDatafeed(datafeedId, String.valueOf(0), String.valueOf(nowMillis));
@ -964,7 +965,9 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
assertTrue(response.getDeleted());
awaitBusy(() -> false, 1, TimeUnit.SECONDS);
// Wait for the forecast to expire
// FIXME: We should wait for something specific to change, rather than waiting for time to pass.
waitUntil(() -> false, 1, TimeUnit.SECONDS);
GetModelSnapshotsRequest getModelSnapshotsRequest1 = new GetModelSnapshotsRequest(jobId);
GetModelSnapshotsResponse getModelSnapshotsResponse1 = execute(getModelSnapshotsRequest1, machineLearningClient::getModelSnapshots,
@ -2079,8 +2082,6 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
highLevelClient().update(updateSnapshotRequest, RequestOptions.DEFAULT);
}
private String createAndPutDatafeed(String jobId, String indexName) throws IOException {
String datafeedId = jobId + "-feed";
DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, jobId)

View File

@ -200,7 +200,7 @@ public class ReindexDocumentationIT extends ESIntegTestCase {
}
}
public void testTasks() throws InterruptedException {
public void testTasks() throws Exception {
final Client client = client();
final ReindexRequestBuilder builder = reindexAndPartiallyBlock();
@ -284,7 +284,7 @@ public class ReindexDocumentationIT extends ESIntegTestCase {
* Similar to what CancelTests does: blocks some operations to be able to catch some tasks in running state
* @see CancelTests#testCancel(String, AbstractBulkByScrollRequestBuilder, CancelTests.CancelAssertion, Matcher)
*/
private ReindexRequestBuilder reindexAndPartiallyBlock() throws InterruptedException {
private ReindexRequestBuilder reindexAndPartiallyBlock() throws Exception {
final Client client = client();
final int numDocs = randomIntBetween(10, 100);
ALLOWED_OPERATIONS.release(numDocs);
@ -310,9 +310,12 @@ public class ReindexDocumentationIT extends ESIntegTestCase {
builder.execute();
// 10 seconds is usually fine but on heavily loaded machines this can take a while
assertTrue("updates blocked", awaitBusy(
() -> ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0,
1, TimeUnit.MINUTES));
assertBusy(
() -> {
assertTrue("Expected some queued threads", ALLOWED_OPERATIONS.hasQueuedThreads());
assertEquals("Expected that no permits are available", 0, ALLOWED_OPERATIONS.availablePermits());
},
1, TimeUnit.MINUTES);
return builder;
}

View File

@ -118,10 +118,9 @@ public class CancelTests extends ReindexTestCase {
* exhausted their slice while others might have quite a bit left
* to work on. We can't control that. */
logger.debug("waiting for updates to be blocked");
boolean blocked = awaitBusy(
() -> ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0,
assertBusy(
() -> assertTrue("updates blocked", ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0),
1, TimeUnit.MINUTES); // 10 seconds is usually fine but on heavily loaded machines this can take a while
assertTrue("updates blocked", blocked);
// Status should show the task running
TaskInfo mainTask = findTaskToCancel(action, builder.request().getSlices());

View File

@ -240,7 +240,7 @@ public class SearchRestCancellationIT extends HttpSmokeTestCase {
LogManager.getLogger(SearchRestCancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id"));
hits.incrementAndGet();
try {
awaitBusy(() -> shouldBlock.get() == false);
waitUntil(() -> shouldBlock.get() == false);
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -167,7 +167,7 @@ public class CancellableTasksTests extends TaskManagerTestCase {
// Simulate a job that takes forever to finish
// Using periodic checks method to identify that the task was cancelled
try {
awaitBusy(() -> {
waitUntil(() -> {
if (((CancellableTask) task).isCancelled()) {
throw new TaskCancelledException("Cancelled");
}

View File

@ -75,7 +75,7 @@ import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.waitUntil;
/**
* A plugin that adds a cancellable blocking test task of integration testing of the task manager.
@ -305,7 +305,7 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
logger.info("Test task started on the node {}", clusterService.localNode());
if (request.shouldBlock) {
try {
awaitBusy(() -> {
waitUntil(() -> {
if (((CancellableTask) task).isCancelled()) {
throw new RuntimeException("Cancelled!");
}

View File

@ -128,10 +128,11 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
Settings masterDataPathSettings = internalCluster().dataPathSettings(masterNode);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNode));
awaitBusy(() -> {
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
return clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID);
assertTrue(clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
});
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
// verify that both nodes are still in the cluster state but there is no master

View File

@ -219,11 +219,10 @@ public class NoMasterNodeIT extends ESIntegTestCase {
final Client clientToMasterlessNode = client();
assertTrue(awaitBusy(() -> {
ClusterState state = clientToMasterlessNode.admin().cluster().prepareState().setLocal(true).get().getState();
return state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID);
}
));
assertBusy(() -> {
ClusterState state = clientToMasterlessNode.admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
});
GetResponse getResponse = clientToMasterlessNode.prepareGet("test1", "type1", "1").get();
assertExists(getResponse);

View File

@ -38,9 +38,11 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope= ESIntegTestCase.Scope.TEST, numDataNodes =0, minNumDataNodes = 2)
@ -78,40 +80,43 @@ public class AwarenessAllocationIT extends ESIntegTestCase {
final String node3 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.rack_id", "rack_2").build());
// On slow machines the initial relocation might be delayed
assertThat(awaitBusy(
() -> {
logger.info("--> waiting for no relocation");
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
.setIndices("test1", "test2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.setWaitForNoRelocatingShards(true)
.get();
if (clusterHealth.isTimedOut()) {
return false;
}
assertBusy(
() -> {
logger.info("--> waiting for no relocation");
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
.setIndices("test1", "test2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.setWaitForNoRelocatingShards(true)
.get();
logger.info("--> checking current state");
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
// check that closed indices are effectively closed
if (indicesToClose.stream().anyMatch(index -> clusterState.metaData().index(index).getState() != State.CLOSE)) {
return false;
}
// verify that we have all the primaries on node3
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1);
}
assertThat("Cluster health request timed out", clusterHealth.isTimedOut(), equalTo(false));
logger.info("--> checking current state");
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
// check that closed indices are effectively closed
final List<String> notClosedIndices =
indicesToClose.stream()
.filter(index -> clusterState.metaData().index(index).getState() != State.CLOSE)
.collect(Collectors.toList());
assertThat("Some indices not closed", notClosedIndices, empty());
// verify that we have all the primaries on node3
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1);
}
}
return counts.get(node3) == totalPrimaries;
},
10,
TimeUnit.SECONDS
), equalTo(true));
}
assertThat(counts.get(node3), equalTo(totalPrimaries));
},
10,
TimeUnit.SECONDS
);
}
public void testAwarenessZones() {

View File

@ -400,8 +400,8 @@ public class ClusterServiceIT extends ESIntegTestCase {
block1.countDown();
invoked2.await();
// whenever we test for no tasks, we need to awaitBusy since this is a live node
assertTrue(awaitBusy(() -> clusterService.getMasterService().pendingTasks().isEmpty()));
// whenever we test for no tasks, we need to wait since this is a live node
assertBusy(() -> assertTrue("Pending tasks not empty", clusterService.getMasterService().pendingTasks().isEmpty()));
waitNoPendingTasksOnAll();
final CountDownLatch block2 = new CountDownLatch(1);

View File

@ -282,21 +282,17 @@ public class MasterDisruptionIT extends AbstractDisruptionTestCase {
}
private void assertDiscoveryCompleted(List<String> nodes) throws InterruptedException {
private void assertDiscoveryCompleted(List<String> nodes) throws Exception {
for (final String node : nodes) {
assertTrue(
"node [" + node + "] is still joining master",
awaitBusy(
() -> {
final Discovery discovery = internalCluster().getInstance(Discovery.class, node);
if (discovery instanceof ZenDiscovery) {
return !((ZenDiscovery) discovery).joiningCluster();
}
return true;
},
30,
TimeUnit.SECONDS
)
assertBusy(
() -> {
final Discovery discovery = internalCluster().getInstance(Discovery.class, node);
if (discovery instanceof ZenDiscovery) {
assertFalse("node [" + node + "] is still joining master", ((ZenDiscovery) discovery).joiningCluster());
}
},
30,
TimeUnit.SECONDS
);
}
}

View File

@ -328,7 +328,7 @@ public class AsyncShardFetchTests extends ESTestCase {
entry = simulations.get(nodeId);
if (entry == null) {
// we are simulating a master node switch, wait for it to not be null
awaitBusy(() -> simulations.containsKey(nodeId));
assertBusy(() -> assertTrue(simulations.containsKey(nodeId)));
}
assert entry != null;
entry.executeLatch.await();

View File

@ -68,13 +68,15 @@ public class QuorumGatewayIT extends ESIntegTestCase {
@Override
public void doAfterNodes(int numNodes, final Client activeClient) throws Exception {
if (numNodes == 1) {
assertTrue(awaitBusy(() -> {
assertBusy(() -> {
logger.info("--> running cluster_health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = activeClient.admin().cluster().health(clusterHealthRequest()
.waitForYellowStatus().waitForNodes("2").waitForActiveShards(test.numPrimaries * 2)).actionGet();
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());
return (!clusterHealth.isTimedOut()) && clusterHealth.getStatus() == ClusterHealthStatus.YELLOW;
}, 30, TimeUnit.SECONDS));
assertFalse(clusterHealth.isTimedOut());
assertEquals(ClusterHealthStatus.YELLOW, clusterHealth.getStatus());
}, 30, TimeUnit.SECONDS);
logger.info("--> one node is closed -- index 1 document into the remaining nodes");
activeClient.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3")
.endObject()).get();

View File

@ -71,7 +71,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
return new CompressedXContent(Strings.toString(builder));
}
public void testBaseAsyncTask() throws InterruptedException, IOException {
public void testBaseAsyncTask() throws Exception {
IndexService indexService = createIndex("test", Settings.EMPTY);
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
AtomicReference<CountDownLatch> latch2 = new AtomicReference<>(new CountDownLatch(1));
@ -127,7 +127,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
// now close the index
final Index index = indexService.index();
assertAcked(client().admin().indices().prepareClose(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index)));
final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(indexService, closedIndexService);
@ -137,7 +137,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
// now reopen the index
assertAcked(client().admin().indices().prepareOpen(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index)));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(closedIndexService, indexService);
@ -205,7 +205,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
// now close the index
final Index index = indexService.index();
assertAcked(client().admin().indices().prepareClose(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index)));
final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(indexService, closedIndexService);
@ -216,7 +216,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
// now reopen the index
assertAcked(client().admin().indices().prepareOpen(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index)));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(closedIndexService, indexService);
refreshTask = indexService.getRefreshTask();
@ -242,7 +242,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
// now close the index
final Index index = indexService.index();
assertAcked(client().admin().indices().prepareClose(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index)));
final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(indexService, closedIndexService);
@ -253,7 +253,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
// now reopen the index
assertAcked(client().admin().indices().prepareOpen(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index)));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(closedIndexService, indexService);
fsyncTask = indexService.getFsyncTask();

View File

@ -27,19 +27,18 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ClusterScope(supportsDedicatedMasters = false, numDataNodes = 1, scope = Scope.SUITE)
public class InternalEngineMergeIT extends ESIntegTestCase {
public void testMergesHappening() throws InterruptedException, IOException, ExecutionException {
public void testMergesHappening() throws Exception {
final int numOfShards = randomIntBetween(1, 5);
// some settings to keep num segments low
assertAcked(prepareCreate("test").setSettings(Settings.builder()
@ -66,21 +65,24 @@ public class InternalEngineMergeIT extends ESIntegTestCase {
stats.getPrimaries().getMerge().getCurrent());
}
final long upperNumberSegments = 2 * numOfShards * 10;
awaitBusy(() -> {
assertBusy(() -> {
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get();
logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards,
stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(),
stats.getPrimaries().getMerge().getCurrent());
long current = stats.getPrimaries().getMerge().getCurrent();
long count = stats.getPrimaries().getSegments().getCount();
return count < upperNumberSegments && current == 0;
assertThat(count, lessThan(upperNumberSegments));
assertThat(current, equalTo(0L));
});
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get();
logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards,
stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(),
stats.getPrimaries().getMerge().getCurrent());
long count = stats.getPrimaries().getSegments().getCount();
assertThat(count, Matchers.lessThanOrEqualTo(upperNumberSegments));
assertThat(count, lessThanOrEqualTo(upperNumberSegments));
}
}

View File

@ -619,7 +619,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
phaseTwoStartLatch.countDown();
// wait for the translog phase to complete and the recovery to block global checkpoint advancement
awaitBusy(() -> shards.getPrimary().pendingInSync());
assertBusy(() -> assertTrue(shards.getPrimary().pendingInSync()));
{
shards.index(new IndexRequest(index.getName(), "type", "last").source("{}", XContentType.JSON));
final long expectedDocs = docs + 3L;

View File

@ -184,7 +184,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
assertThat(updatedGlobalCheckpoint.get(), equalTo(update));
}
public void testMarkAllocationIdAsInSync() throws BrokenBarrierException, InterruptedException {
public void testMarkAllocationIdAsInSync() throws Exception {
final long initialClusterStateVersion = randomNonNegativeLong();
Map<AllocationId, Long> activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(1, 1);
Set<AllocationId> active = new HashSet<>(activeWithCheckpoints.keySet());
@ -212,7 +212,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
});
thread.start();
barrier.await();
awaitBusy(tracker::pendingInSync);
assertBusy(() -> assertTrue(tracker.pendingInSync()));
final long updatedLocalCheckpoint = randomLongBetween(1 + localCheckpoint, Long.MAX_VALUE);
// there is a shard copy pending in sync, the global checkpoint can not advance
updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO);

View File

@ -495,13 +495,14 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
* permits to the semaphore. We wait here until all generic threads are idle as an indication that all permits have been returned to
* the semaphore.
*/
awaitBusy(() -> {
assertBusy(() -> {
for (final ThreadPoolStats.Stats stats : threadPool.stats()) {
if (ThreadPool.Names.GENERIC.equals(stats.getName())) {
return stats.getActive() == 0;
assertThat("Expected no active threads in GENERIC pool", stats.getActive(), equalTo(0));
return;
}
}
return false;
fail("Failed to find stats for the GENERIC thread pool");
});
}

View File

@ -3363,7 +3363,7 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(primary);
}
public void testScheduledRefresh() throws IOException, InterruptedException {
public void testScheduledRefresh() throws Exception {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
@ -3388,7 +3388,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertFalse(primary.scheduledRefresh());
assertEquals(lastSearchAccess, primary.getLastSearcherAccess());
// wait until the thread-pool has moved the timestamp otherwise we can't assert on this below
awaitBusy(() -> primary.getThreadPool().relativeTimeInMillis() > lastSearchAccess);
assertBusy(() -> assertThat(primary.getThreadPool().relativeTimeInMillis(), greaterThan(lastSearchAccess)));
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
primary.awaitShardSearchActive(refreshed -> {

View File

@ -282,17 +282,20 @@ public class CorruptedFileIT extends ESIntegTestCase {
client().admin().indices().prepareUpdateSettings("test").setSettings(build).get();
client().admin().cluster().prepareReroute().get();
boolean didClusterTurnRed = awaitBusy(() -> {
boolean didClusterTurnRed = waitUntil(() -> {
ClusterHealthStatus test = client().admin().cluster()
.health(Requests.clusterHealthRequest("test")).actionGet().getStatus();
return test == ClusterHealthStatus.RED;
}, 5, TimeUnit.MINUTES);// sometimes on slow nodes the replication / recovery is just dead slow
final ClusterHealthResponse response = client().admin().cluster()
.health(Requests.clusterHealthRequest("test")).get();
if (response.getStatus() != ClusterHealthStatus.RED) {
logger.info("Cluster turned red in busy loop: {}", didClusterTurnRed);
logger.info("cluster state:\n{}\n{}",
client().admin().cluster().prepareState().get().getState(), client().admin().cluster().preparePendingClusterTasks().get());
client().admin().cluster().prepareState().get().getState(),
client().admin().cluster().preparePendingClusterTasks().get());
}
assertThat(response.getStatus(), is(ClusterHealthStatus.RED));
ClusterState state = client().admin().cluster().prepareState().get().getState();

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
@ -51,7 +52,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
@ -224,25 +224,23 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase {
private static void assertShardStatesMatch(final IndexShardStateChangeListener stateChangeListener,
final int numShards,
final IndexShardState... shardStates)
throws InterruptedException {
throws Exception {
CheckedRunnable<Exception> waitPredicate = () -> {
assertEquals(stateChangeListener.shardStates.size(), numShards);
BooleanSupplier waitPredicate = () -> {
if (stateChangeListener.shardStates.size() != numShards) {
return false;
}
for (List<IndexShardState> indexShardStates : stateChangeListener.shardStates.values()) {
if (indexShardStates == null || indexShardStates.size() != shardStates.length) {
return false;
}
assertNotNull(indexShardStates);
assertThat(indexShardStates.size(), equalTo(shardStates.length));
for (int i = 0; i < shardStates.length; i++) {
if (indexShardStates.get(i) != shardStates[i]) {
return false;
}
assertThat(indexShardStates.get(i), equalTo(shardStates[i]));
}
}
return true;
};
if (!awaitBusy(waitPredicate, 1, TimeUnit.MINUTES)) {
try {
assertBusy(waitPredicate, 1, TimeUnit.MINUTES);
} catch (AssertionError ae) {
fail("failed to observe expect shard states\n" +
"expected: [" + numShards + "] shards with states: " + Strings.arrayToCommaDelimitedString(shardStates) + "\n" +
"observed:\n" + stateChangeListener);

View File

@ -135,6 +135,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.empty;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class IndexRecoveryIT extends ESIntegTestCase {
@ -789,9 +790,11 @@ public class IndexRecoveryIT extends ESIntegTestCase {
if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action) && count.incrementAndGet() == 1) {
// ensures that it's considered as valid recovery attempt by source
try {
awaitBusy(() -> client(blueNodeName).admin().cluster().prepareState().setLocal(true).get()
.getState().getRoutingTable().index("test").shard(0).getAllInitializingShards().isEmpty() == false);
} catch (InterruptedException e) {
assertBusy(() -> assertThat(
"Expected there to be some initializing shards",
client(blueNodeName).admin().cluster().prepareState().setLocal(true).get()
.getState().getRoutingTable().index("test").shard(0).getAllInitializingShards(), not(empty())));
} catch (Exception e) {
throw new RuntimeException(e);
}
connection.sendRequest(requestId, action, request, options);

View File

@ -145,8 +145,8 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(waitForShardDeletion(node_1, index, 0), equalTo(false));
assertThat(waitForIndexDeletion(node_1, index), equalTo(false));
assertShardDeleted(node_1, index, 0);
assertIndexDeleted(node_1, index);
assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_2, index)), equalTo(true));
assertThat(Files.exists(shardDirectory(node_3, index, 0)), equalTo(true));
@ -240,12 +240,13 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
// it must still delete the shard, even if it cannot find it anymore in indicesservice
client().admin().indices().prepareDelete("test").get();
assertThat(waitForShardDeletion(node_1, index, 0), equalTo(false));
assertThat(waitForIndexDeletion(node_1, index), equalTo(false));
assertShardDeleted(node_1, index, 0);
assertIndexDeleted(node_1, index);
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(false));
assertThat(Files.exists(indexDirectory(node_1, index)), equalTo(false));
assertThat(waitForShardDeletion(node_2, index, 0), equalTo(false));
assertThat(waitForIndexDeletion(node_2, index), equalTo(false));
assertShardDeleted(node_2, index, 0);
assertIndexDeleted(node_2, index);
assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(false));
assertThat(Files.exists(indexDirectory(node_2, index)), equalTo(false));
}
@ -277,7 +278,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
assertThat(clusterHealth.isTimedOut(), equalTo(false));
logger.info("--> making sure that shard is not allocated on server3");
assertThat(waitForShardDeletion(node_3, index, 0), equalTo(false));
assertShardDeleted(node_3, index, 0);
Path server2Shard = shardDirectory(node_2, index, 0);
logger.info("--> stopping node {}", node_2);
@ -308,7 +309,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
logger.info("--> making sure that shard and its replica are allocated on server1 and server3 but not on server2");
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true));
assertThat(Files.exists(shardDirectory(node_3, index, 0)), equalTo(true));
assertThat(waitForShardDeletion(node_4, index, 0), equalTo(false));
assertShardDeleted(node_4, index, 0);
}
public void testShardActiveElsewhereDoesNotDeleteAnother() throws Exception {
@ -453,7 +454,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
waitNoPendingTasksOnAll();
logger.info("Checking if shards aren't removed");
for (int shard : node2Shards) {
assertTrue(waitForShardDeletion(nonMasterNode, index, shard));
assertShardExists(nonMasterNode, index, shard);
}
}
@ -471,13 +472,18 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
return paths[0];
}
private boolean waitForShardDeletion(final String server, final Index index, final int shard) throws InterruptedException {
awaitBusy(() -> !Files.exists(shardDirectory(server, index, shard)));
return Files.exists(shardDirectory(server, index, shard));
private void assertShardDeleted(final String server, final Index index, final int shard) throws Exception {
final Path path = shardDirectory(server, index, shard);
assertBusy(() -> assertFalse("Expected shard to not exist: " + path, Files.exists(path)));
}
private boolean waitForIndexDeletion(final String server, final Index index) throws InterruptedException {
awaitBusy(() -> !Files.exists(indexDirectory(server, index)));
return Files.exists(indexDirectory(server, index));
private void assertShardExists(final String server, final Index index, final int shard) throws Exception {
final Path path = shardDirectory(server, index, shard);
assertBusy(() -> assertTrue("Expected shard to exist: " + path, Files.exists(path)));
}
private void assertIndexDeleted(final String server, final Index index) throws Exception {
final Path path = indexDirectory(server, index);
assertBusy(() -> assertFalse("Expected index to be deleted: " + path, Files.exists(path)));
}
}

View File

@ -75,7 +75,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -335,10 +335,15 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
AtomicInteger phase = new AtomicInteger();
while (true) {
// wait for something to happen
assertTrue(awaitBusy(() -> testTask.isCancelled() ||
testTask.getOperation() != null ||
clusterService.lifecycleState() != Lifecycle.State.STARTED, // speedup finishing on closed nodes
45, TimeUnit.SECONDS)); // This can take a while during large cluster restart
try {
assertBusy(() -> assertTrue(testTask.isCancelled() ||
testTask.getOperation() != null ||
clusterService.lifecycleState() != Lifecycle.State.STARTED), // speedup finishing on closed nodes
45, TimeUnit.SECONDS); // This can take a while during large cluster restart
} catch (Exception ex) {
throw new RuntimeException(ex);
}
if (clusterService.lifecycleState() != Lifecycle.State.STARTED) {
return;
}

View File

@ -362,23 +362,23 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
//if there was an error we try to wait and see if at some point it'll get fixed
logger.info("--> trying to wait");
assertTrue(awaitBusy(() -> {
boolean errorOccurred = false;
for (int i = 0; i < iterations; i++) {
SearchResponse searchResponse = client().prepareSearch()
.setTrackTotalHits(true)
.setSize(0)
.setQuery(matchAllQuery())
.get();
if (searchResponse.getHits().getTotalHits().value != numberOfDocs) {
errorOccurred = true;
}
}
return !errorOccurred;
},
5,
TimeUnit.MINUTES
)
assertBusy(
() -> {
boolean errorOccurred = false;
for (int i = 0; i < iterations; i++) {
SearchResponse searchResponse = client().prepareSearch()
.setTrackTotalHits(true)
.setSize(0)
.setQuery(matchAllQuery())
.get();
if (searchResponse.getHits().getTotalHits().value != numberOfDocs) {
errorOccurred = true;
}
}
assertFalse("An error occurred while waiting", errorOccurred);
},
5,
TimeUnit.MINUTES
);
assertEquals(numberOfDocs, ids.size());
}

View File

@ -272,7 +272,7 @@ public class SearchCancellationIT extends ESIntegTestCase {
LogManager.getLogger(SearchCancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id"));
hits.incrementAndGet();
try {
awaitBusy(() -> shouldBlock.get() == false);
assertBusy(() -> assertFalse(shouldBlock.get()));
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -41,7 +41,7 @@ public class SearchWithRejectionsIT extends ESIntegTestCase {
.build();
}
public void testOpenContextsAfterRejections() throws InterruptedException {
public void testOpenContextsAfterRejections() throws Exception {
createIndex("test");
ensureGreen("test");
final int docs = scaledRandomIntBetween(20, 50);
@ -68,10 +68,8 @@ public class SearchWithRejectionsIT extends ESIntegTestCase {
} catch (Exception t) {
}
}
awaitBusy(
() -> client().admin().indices().prepareStats().get().getTotal().getSearch().getOpenContexts() == 0,
1, TimeUnit.SECONDS);
indicesStats = client().admin().indices().prepareStats().get();
assertThat(indicesStats.getTotal().getSearch().getOpenContexts(), equalTo(0L));
assertBusy(
() -> assertThat(client().admin().indices().prepareStats().get().getTotal().getSearch().getOpenContexts(), equalTo(0L)),
1, TimeUnit.SECONDS);
}
}

View File

@ -208,17 +208,17 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
}
public void waitForBlockOnAnyDataNode(String repository, TimeValue timeout) throws InterruptedException {
if (false == awaitBusy(() -> {
for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
final boolean blocked = waitUntil(() -> {
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
if (mockRepository.blocked()) {
return true;
}
}
return false;
}, timeout.millis(), TimeUnit.MILLISECONDS)) {
fail("Timeout waiting for repository block on any data node!!!");
}
}, timeout.millis(), TimeUnit.MILLISECONDS);
assertTrue("No repository is blocked waiting on a data node", blocked);
}
public static void unblockNode(final String repository, final String node) {

View File

@ -1214,7 +1214,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> wait for the index to appear");
// that would mean that recovery process started and failing
assertThat(waitForIndex("test-idx", TimeValue.timeValueSeconds(10)), equalTo(true));
waitForIndex("test-idx", TimeValue.timeValueSeconds(10));
logger.info("--> delete index");
cluster().wipeIndices("test-idx");
@ -2095,6 +2095,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertEquals("test-snap", response.getSnapshots().get(0).getSnapshot().getSnapshotId().getName());
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/46276")
public void testSnapshotRelocatingPrimary() throws Exception {
Client client = client();
logger.info("--> creating repository");
@ -2619,9 +2620,14 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
restoreFut.get();
}
private boolean waitForIndex(final String index, TimeValue timeout) throws InterruptedException {
return awaitBusy(() -> client().admin().indices().prepareExists(index).execute().actionGet().isExists(),
timeout.millis(), TimeUnit.MILLISECONDS);
private void waitForIndex(final String index, TimeValue timeout) throws Exception {
assertBusy(
() -> {
boolean exists = client().admin().indices().prepareExists(index).execute().actionGet().isExists();
assertTrue("Expected index [" + index + "] to exist", exists);
},
timeout.millis(),
TimeUnit.MILLISECONDS);
}
public void testSnapshotName() throws Exception {

View File

@ -178,7 +178,7 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
// rarely wait and make sure the runnable didn't run at the next interval
if (rarely()) {
assertFalse(awaitBusy(runAfterDone::get, 1L, TimeUnit.SECONDS));
assertBusy(() -> assertFalse("Runnable was run after being cancelled", runAfterDone.get()), 1L, TimeUnit.SECONDS);
}
}
@ -283,10 +283,10 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
assertThat(counterValue, equalTo(iterations));
if (rarely()) {
awaitBusy(() -> {
final int value = counter.get();
return value == iterations;
}, 5 * interval.millis(), TimeUnit.MILLISECONDS);
assertBusy(
() -> assertThat(counter.get(), equalTo(iterations)),
5 * interval.millis(),
TimeUnit.MILLISECONDS);
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.common.util;
import com.carrotsearch.randomizedtesting.RandomizedContext;
import com.carrotsearch.randomizedtesting.SeedUtils;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.BytesRef;
@ -29,7 +28,6 @@ import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import java.util.Collection;
import java.util.Collections;
@ -41,6 +39,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.junit.Assert.assertTrue;
public class MockBigArrays extends BigArrays {
/**
@ -57,8 +58,9 @@ public class MockBigArrays extends BigArrays {
// not empty, we might be executing on a shared cluster that keeps on obtaining
// and releasing arrays, lets make sure that after a reasonable timeout, all master
// copy (snapshot) have been released
boolean success = ESTestCase.awaitBusy(() -> Sets.haveEmptyIntersection(masterCopy.keySet(), ACQUIRED_ARRAYS.keySet()));
if (!success) {
try {
assertBusy(() -> assertTrue(Sets.haveEmptyIntersection(masterCopy.keySet(), ACQUIRED_ARRAYS.keySet())));
} catch (AssertionError ex) {
masterCopy.keySet().retainAll(ACQUIRED_ARRAYS.keySet());
ACQUIRED_ARRAYS.keySet().removeAll(masterCopy.keySet()); // remove all existing master copy we will report on
if (!masterCopy.isEmpty()) {

View File

@ -23,7 +23,6 @@ import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.recycler.Recycler.V;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.test.ESTestCase;
import java.lang.reflect.Array;
import java.util.Arrays;
@ -34,6 +33,8 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.elasticsearch.test.ESTestCase.waitUntil;
public class MockPageCacheRecycler extends PageCacheRecycler {
private static final ConcurrentMap<Object, Throwable> ACQUIRED_PAGES = new ConcurrentHashMap<>();
@ -44,8 +45,8 @@ public class MockPageCacheRecycler extends PageCacheRecycler {
// not empty, we might be executing on a shared cluster that keeps on obtaining
// and releasing pages, lets make sure that after a reasonable timeout, all master
// copy (snapshot) have been released
boolean success =
ESTestCase.awaitBusy(() -> Sets.haveEmptyIntersection(masterCopy.keySet(), ACQUIRED_PAGES.keySet()));
final boolean success =
waitUntil(() -> Sets.haveEmptyIntersection(masterCopy.keySet(), ACQUIRED_PAGES.keySet()));
if (!success) {
masterCopy.keySet().retainAll(ACQUIRED_PAGES.keySet());
ACQUIRED_PAGES.keySet().removeAll(masterCopy.keySet()); // remove all existing master copy we will report on

View File

@ -1,4 +1,4 @@
package org.elasticsearch.test;/*
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
@ -17,6 +17,8 @@ package org.elasticsearch.test;/*
* under the License.
*/
package org.elasticsearch.test;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;

View File

@ -176,8 +176,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -200,6 +198,7 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.startsWith;
@ -967,67 +966,50 @@ public abstract class ESIntegTestCase extends ESTestCase {
* Waits until at least a give number of document is visible for searchers
*
* @param numDocs number of documents to wait for
* @param indexer a {@link org.elasticsearch.test.BackgroundIndexer}. If supplied it will be first checked for documents indexed.
* @param indexer a {@link org.elasticsearch.test.BackgroundIndexer}. It will be first checked for documents indexed.
* This saves on unneeded searches.
* @return the actual number of docs seen.
*/
public long waitForDocs(final long numDocs, @Nullable final BackgroundIndexer indexer) throws InterruptedException {
public void waitForDocs(final long numDocs, final BackgroundIndexer indexer) throws Exception {
// indexing threads can wait for up to ~1m before retrying when they first try to index into a shard which is not STARTED.
return waitForDocs(numDocs, 90, TimeUnit.SECONDS, indexer);
}
final long maxWaitTimeMs = Math.max(90 * 1000, 200 * numDocs);
/**
* Waits until at least a give number of document is visible for searchers
*
* @param numDocs number of documents to wait for
* @param maxWaitTime if not progress have been made during this time, fail the test
* @param maxWaitTimeUnit the unit in which maxWaitTime is specified
* @param indexer If supplied it will be first checked for documents indexed.
* This saves on unneeded searches.
* @return the actual number of docs seen.
*/
public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, @Nullable final BackgroundIndexer indexer)
throws InterruptedException {
final AtomicLong lastKnownCount = new AtomicLong(-1);
long lastStartCount = -1;
BooleanSupplier testDocs = () -> {
if (indexer != null) {
lastKnownCount.set(indexer.totalIndexedDocs());
}
if (lastKnownCount.get() >= numDocs) {
try {
assertBusy(
() -> {
long lastKnownCount = indexer.totalIndexedDocs();
long count = client().prepareSearch()
.setTrackTotalHits(true)
.setSize(0)
.setQuery(matchAllQuery())
.get()
.getHits().getTotalHits().value;
if (lastKnownCount >= numDocs) {
try {
long count = client().prepareSearch()
.setTrackTotalHits(true)
.setSize(0)
.setQuery(matchAllQuery())
.get()
.getHits().getTotalHits().value;
if (count == lastKnownCount.get()) {
// no progress - try to refresh for the next time
client().admin().indices().prepareRefresh().get();
if (count == lastKnownCount) {
// no progress - try to refresh for the next time
client().admin().indices().prepareRefresh().get();
}
lastKnownCount = count;
} catch (Exception e) { // count now acts like search and barfs if all shards failed...
logger.debug("failed to executed count", e);
throw e;
}
lastKnownCount.set(count);
} catch (Exception e) { // count now acts like search and barfs if all shards failed...
logger.debug("failed to executed count", e);
return false;
}
logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount.get(), numDocs);
} else {
logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount.get(), numDocs);
}
return lastKnownCount.get() >= numDocs;
};
while (!awaitBusy(testDocs, maxWaitTime, maxWaitTimeUnit)) {
if (lastStartCount == lastKnownCount.get()) {
// we didn't make any progress
fail("failed to reach " + numDocs + "docs");
}
lastStartCount = lastKnownCount.get();
}
return lastKnownCount.get();
if (logger.isDebugEnabled()) {
if (lastKnownCount < numDocs) {
logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount, numDocs);
} else {
logger.debug("[{}] docs visible for search (needed [{}])", lastKnownCount, numDocs);
}
}
assertThat(lastKnownCount, greaterThanOrEqualTo(numDocs));
},
maxWaitTimeMs,
TimeUnit.MILLISECONDS
);
}
/**

View File

@ -871,6 +871,7 @@ public abstract class ESTestCase extends LuceneTestCase {
*/
public static void assertBusy(CheckedRunnable<Exception> codeBlock, long maxWaitTime, TimeUnit unit) throws Exception {
long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit);
// In case you've forgotten your high-school studies, log10(x) / log10(y) == log y(x)
long iterations = Math.max(Math.round(Math.log10(maxTimeInMillis) / Math.log10(2)), 1);
long timeInMillis = 1;
long sum = 0;
@ -898,14 +899,34 @@ public abstract class ESTestCase extends LuceneTestCase {
}
}
public static boolean awaitBusy(BooleanSupplier breakSupplier) throws InterruptedException {
return awaitBusy(breakSupplier, 10, TimeUnit.SECONDS);
/**
* Periodically execute the supplied function until it returns true, or a timeout
* is reached. This version uses a timeout of 10 seconds. If at all possible,
* use {@link ESTestCase#assertBusy(CheckedRunnable)} instead.
*
* @param breakSupplier determines whether to return immediately or continue waiting.
* @return the last value returned by <code>breakSupplier</code>
* @throws InterruptedException if any sleep calls were interrupted.
*/
public static boolean waitUntil(BooleanSupplier breakSupplier) throws InterruptedException {
return waitUntil(breakSupplier, 10, TimeUnit.SECONDS);
}
// After 1s, we stop growing the sleep interval exponentially and just sleep 1s until maxWaitTime
private static final long AWAIT_BUSY_THRESHOLD = 1000L;
public static boolean awaitBusy(BooleanSupplier breakSupplier, long maxWaitTime, TimeUnit unit) throws InterruptedException {
/**
* Periodically execute the supplied function until it returns true, or until the
* specified maximum wait time has elapsed. If at all possible, use
* {@link ESTestCase#assertBusy(CheckedRunnable)} instead.
*
* @param breakSupplier determines whether to return immediately or continue waiting.
* @param maxWaitTime the maximum amount of time to wait
* @param unit the unit of tie for <code>maxWaitTime</code>
* @return the last value returned by <code>breakSupplier</code>
* @throws InterruptedException if any sleep calls were interrupted.
*/
public static boolean waitUntil(BooleanSupplier breakSupplier, long maxWaitTime, TimeUnit unit) throws InterruptedException {
long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit);
long timeInMillis = 1;
long sum = 0;

View File

@ -155,13 +155,13 @@ import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOU
import static org.elasticsearch.discovery.FileBasedSeedHostsProvider.UNICAST_HOSTS_FILE;
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.getTestTransportType;
import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertFalse;
@ -1252,22 +1252,17 @@ public final class InternalTestCluster extends TestCluster {
logger.trace("validating cluster formed via [{}], expecting {}", viaNode, expectedNodes);
final Client client = client(viaNode);
try {
if (awaitBusy(() -> {
assertBusy(() -> {
DiscoveryNodes discoveryNodes = client.admin().cluster().prepareState().get().getState().nodes();
if (discoveryNodes.getSize() != expectedNodes.size()) {
return false;
}
assertEquals(expectedNodes.size(), discoveryNodes.getSize());
for (DiscoveryNode expectedNode : expectedNodes) {
if (discoveryNodes.nodeExists(expectedNode) == false) {
return false;
}
assertTrue("Expected node to exist: " + expectedNode, discoveryNodes.nodeExists(expectedNode));
}
return true;
}, 30, TimeUnit.SECONDS) == false) {
throw new IllegalStateException("cluster failed to form with expected nodes " + expectedNodes + " and actual nodes " +
client.admin().cluster().prepareState().get().getState().nodes());
}
} catch (InterruptedException e) {
}, 30, TimeUnit.SECONDS);
} catch (AssertionError ae) {
throw new IllegalStateException("cluster failed to form with expected nodes " + expectedNodes + " and actual nodes " +
client.admin().cluster().prepareState().get().getState().nodes());
} catch (Exception e) {
throw new IllegalStateException(e);
}
}

View File

@ -105,8 +105,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -618,62 +616,48 @@ public abstract class CcrIntegTestCase extends ESTestCase {
* @param numDocs number of documents to wait for
* @param indexer a {@link org.elasticsearch.test.BackgroundIndexer}. Will be first checked for documents indexed.
* This saves on unneeded searches.
* @return the actual number of docs seen.
*/
public long waitForDocs(final long numDocs, final BackgroundIndexer indexer) throws InterruptedException {
public void waitForDocs(final long numDocs, final BackgroundIndexer indexer) throws Exception {
// indexing threads can wait for up to ~1m before retrying when they first try to index into a shard which is not STARTED.
return waitForDocs(numDocs, 90, TimeUnit.SECONDS, indexer);
}
final long maxWaitTimeMs = Math.max(90 * 1000, 200 * numDocs);
/**
* Waits until at least a give number of document is visible for searchers
*
* @param numDocs number of documents to wait for
* @param maxWaitTime if not progress have been made during this time, fail the test
* @param maxWaitTimeUnit the unit in which maxWaitTime is specified
* @param indexer Will be first checked for documents indexed.
* This saves on unneeded searches.
* @return the actual number of docs seen.
*/
public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, final BackgroundIndexer indexer)
throws InterruptedException {
final AtomicLong lastKnownCount = new AtomicLong(-1);
long lastStartCount = -1;
BooleanSupplier testDocs = () -> {
lastKnownCount.set(indexer.totalIndexedDocs());
if (lastKnownCount.get() >= numDocs) {
try {
long count = indexer.getClient().prepareSearch()
.setTrackTotalHits(true)
.setSize(0)
.setQuery(QueryBuilders.matchAllQuery())
.get()
.getHits().getTotalHits().value;
assertBusy(
() -> {
long lastKnownCount = indexer.totalIndexedDocs();
if (count == lastKnownCount.get()) {
// no progress - try to refresh for the next time
indexer.getClient().admin().indices().prepareRefresh().get();
if (lastKnownCount >= numDocs) {
try {
long count = indexer.getClient().prepareSearch()
.setTrackTotalHits(true)
.setSize(0)
.setQuery(QueryBuilders.matchAllQuery())
.get()
.getHits().getTotalHits().value;
if (count == lastKnownCount) {
// no progress - try to refresh for the next time
indexer.getClient().admin().indices().prepareRefresh().get();
}
lastKnownCount = count;
} catch (Exception e) { // count now acts like search and barfs if all shards failed...
logger.debug("failed to executed count", e);
throw e;
}
lastKnownCount.set(count);
} catch (Exception e) { // count now acts like search and barfs if all shards failed...
logger.debug("failed to executed count", e);
return false;
}
logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount.get(), numDocs);
} else {
logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount.get(), numDocs);
}
return lastKnownCount.get() >= numDocs;
};
while (!awaitBusy(testDocs, maxWaitTime, maxWaitTimeUnit)) {
if (lastStartCount == lastKnownCount.get()) {
// we didn't make any progress
fail("failed to reach " + numDocs + "docs");
}
lastStartCount = lastKnownCount.get();
}
return lastKnownCount.get();
if (logger.isDebugEnabled()) {
if (lastKnownCount < numDocs) {
logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount, numDocs);
} else {
logger.debug("[{}] docs visible for search (needed [{}])", lastKnownCount, numDocs);
}
}
assertThat(lastKnownCount, greaterThanOrEqualTo(numDocs));
},
maxWaitTimeMs,
TimeUnit.MILLISECONDS
);
}
protected ActionListener<RestoreService.RestoreCompletionResponse> waitForRestore(

View File

@ -97,16 +97,15 @@ public abstract class AbstractLicensesIntegrationTestCase extends ESIntegTestCas
latch.await();
}
protected void assertLicenseActive(boolean active) throws InterruptedException {
boolean success = awaitBusy(() -> {
protected void assertLicenseActive(boolean active) throws Exception {
assertBusy(() -> {
for (XPackLicenseState licenseState : internalCluster().getDataNodeInstances(XPackLicenseState.class)) {
if (licenseState.isActive() == active) {
return true;
return;
}
}
return false;
fail("No data nodes have a license active state of [" + active + "]");
});
assertTrue(success);
}
}

View File

@ -174,16 +174,15 @@ public class LicenseServiceClusterTests extends AbstractLicensesIntegrationTestC
assertLicenseActive(true);
}
private void assertOperationMode(License.OperationMode operationMode) throws InterruptedException {
boolean success = awaitBusy(() -> {
private void assertOperationMode(License.OperationMode operationMode) throws Exception {
assertBusy(() -> {
for (XPackLicenseState licenseState : internalCluster().getDataNodeInstances(XPackLicenseState.class)) {
if (licenseState.getOperationMode() == operationMode) {
return true;
return;
}
}
return false;
fail("No data nodes found with operation mode [" + operationMode + "]");
});
assertTrue(success);
}
private void writeCloudInternalMode(String mode) throws Exception {

View File

@ -331,9 +331,10 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
assertTrue(awaitBusy(() -> indexer.getPosition() == 2));
assertBusy(() -> assertThat(indexer.getPosition(), equalTo(2)));
countDownLatch.countDown();
assertTrue(awaitBusy(() -> isFinished.get()));
assertBusy(() -> assertTrue(isFinished.get()));
assertThat(indexer.getPosition(), equalTo(3));
assertFalse(isStopped.get());
@ -347,24 +348,24 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
}
}
public void testStateMachineBrokenSearch() throws InterruptedException {
public void testStateMachineBrokenSearch() throws Exception {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
try {
MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertTrue(awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS));
assertBusy(() -> assertTrue(isFinished.get()), 10000, TimeUnit.SECONDS);
assertThat(indexer.getStep(), equalTo(3));
} finally {
executor.shutdownNow();
}
}
public void testStop_WhileIndexing() throws InterruptedException {
public void testStop_WhileIndexing() throws Exception {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
@ -378,14 +379,14 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
countDownLatch.countDown();
assertThat(indexer.getPosition(), equalTo(2));
assertTrue(awaitBusy(() -> isStopped.get()));
assertBusy(() -> assertTrue(isStopped.get()));
assertFalse(isFinished.get());
} finally {
executor.shutdownNow();
}
}
public void testFiveRuns() throws InterruptedException {
public void testFiveRuns() throws Exception {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
@ -393,7 +394,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertTrue(awaitBusy(() -> isFinished.get()));
assertBusy(() -> assertTrue(isFinished.get()));
indexer.assertCounters();
} finally {
executor.shutdownNow();

View File

@ -245,7 +245,7 @@ public class PermissionsIT extends ESRestTestCase {
*/
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41440")
public void testWhenUserLimitedByOnlyAliasOfIndexCanWriteToIndexWhichWasRolledoverByILMPolicy()
throws IOException, InterruptedException {
throws Exception {
/*
* Setup:
* - ILM policy to rollover index when max docs condition is met
@ -264,33 +264,24 @@ public class PermissionsIT extends ESRestTestCase {
refresh("foo_alias");
// wait so the ILM policy triggers rollover action, verify that the new index exists
assertThat(awaitBusy(() -> {
assertBusy(() -> {
Request request = new Request("HEAD", "/" + "foo-logs-000002");
int status;
try {
status = adminClient().performRequest(request).getStatusLine().getStatusCode();
} catch (IOException e) {
throw new RuntimeException(e);
}
return status == 200;
}), is(true));
int status = adminClient().performRequest(request).getStatusLine().getStatusCode();
assertThat(status, equalTo(200));
});
// test_user: index docs using alias, now should be able write to new index
indexDocs("test_user", "x-pack-test-password", "foo_alias", 1);
refresh("foo_alias");
// verify that the doc has been indexed into new write index
awaitBusy(() -> {
assertBusy(() -> {
Request request = new Request("GET", "/foo-logs-000002/_search");
Response response;
try {
response = adminClient().performRequest(request);
try (InputStream content = response.getEntity().getContent()) {
Map<String, Object> map = XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
return ((Integer) XContentMapValues.extractValue("hits.total.value", map)) == 1;
}
} catch (IOException e) {
throw new RuntimeException(e);
Response response = adminClient().performRequest(request);
try (InputStream content = response.getEntity().getContent()) {
Map<String, Object> map = XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
Integer totalHits = (Integer) XContentMapValues.extractValue("hits.total.value", map);
assertThat(totalHits, equalTo(1));
}
});
}

View File

@ -168,7 +168,8 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
client().admin().indices().prepareRefresh("*").get();
// We need to wait a second to ensure the second time around model snapshots will have a different ID (it depends on epoch seconds)
awaitBusy(() -> false, 1, TimeUnit.SECONDS);
// FIXME it would be better to wait for something concrete instead of wait for time to elapse
assertBusy(() -> {}, 1, TimeUnit.SECONDS);
for (Job.Builder job : getJobs()) {
// Run up to now

View File

@ -69,7 +69,7 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
assertThat(revertPointBucket.isInterim(), is(true));
// We need to wait a second to ensure the second time around model snapshot will have a different ID (it depends on epoch seconds)
awaitBusy(() -> false, 1, TimeUnit.SECONDS);
waitUntil(() -> false, 1, TimeUnit.SECONDS);
openJob(job.getId());
postData(job.getId(), generateData(startTime + 10 * bucketSpan.getMillis(), bucketSpan, 10, Arrays.asList("foo", "bar"),

View File

@ -315,12 +315,12 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
client().performRequest(createFeedRequest);
client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "datafeeds/hrd-split-datafeed/_start"));
boolean passed = awaitBusy(() -> {
try {
try {
assertBusy(() -> {
client().performRequest(new Request("POST", "/_refresh"));
Response response = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/hrd-split-job/results/records"));
MachineLearning.BASE_PATH + "anomaly_detectors/hrd-split-job/results/records"));
String responseBody = EntityUtils.toString(response.getEntity());
if (responseBody.contains("\"count\":2")) {
@ -339,27 +339,19 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
// domainSplit() tests had subdomain, testHighestRegisteredDomainCases() do not
if (test.subDomainExpected != null) {
assertThat("Expected subdomain [" + test.subDomainExpected + "] but found [" + actualSubDomain
+ "]. Actual " + actualTotal + " vs Expected " + expectedTotal, actualSubDomain,
equalTo(test.subDomainExpected));
+ "]. Actual " + actualTotal + " vs Expected " + expectedTotal, actualSubDomain,
equalTo(test.subDomainExpected));
}
assertThat("Expected domain [" + test.domainExpected + "] but found [" + actualDomain + "]. Actual "
+ actualTotal + " vs Expected " + expectedTotal, actualDomain, equalTo(test.domainExpected));
return true;
+ actualTotal + " vs Expected " + expectedTotal, actualDomain, equalTo(test.domainExpected));
} else {
logger.error(responseBody);
return false;
fail("Response body didn't contain [\"count\":2]");
}
} catch (Exception e) {
logger.error(e.getMessage());
return false;
}
}, 5, TimeUnit.SECONDS);
if (!passed) {
}, 5, TimeUnit.SECONDS);
} catch (Exception e) {
logger.error(e.getMessage());
fail("Anomaly records were not found within 5 seconds");
}
}

View File

@ -404,7 +404,7 @@ public class RollupJobTaskTests extends ESTestCase {
}
@SuppressWarnings("unchecked")
public void testTriggerWithoutHeaders() throws InterruptedException {
public void testTriggerWithoutHeaders() throws Exception {
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
Client client = mock(Client.class);
@ -475,7 +475,7 @@ public class RollupJobTaskTests extends ESTestCase {
fail("Should not have entered onFailure");
}
});
ESTestCase.awaitBusy(started::get);
assertBusy(() -> assertTrue(started.get()));
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
@ -484,11 +484,11 @@ public class RollupJobTaskTests extends ESTestCase {
latch.countDown();
// Wait for the final persistent status to finish
ESTestCase.awaitBusy(finished::get);
assertBusy(() -> assertTrue(finished.get()));
}
@SuppressWarnings("unchecked")
public void testTriggerWithHeaders() throws InterruptedException {
public void testTriggerWithHeaders() throws Exception {
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Map<String, String> headers = new HashMap<>(1);
headers.put("es-security-runas-user", "foo");
@ -565,7 +565,7 @@ public class RollupJobTaskTests extends ESTestCase {
fail("Should not have entered onFailure");
}
});
ESTestCase.awaitBusy(started::get);
assertBusy(() -> assertTrue(started.get()));
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
@ -574,11 +574,11 @@ public class RollupJobTaskTests extends ESTestCase {
latch.countDown();
// Wait for the final persistent status to finish
ESTestCase.awaitBusy(finished::get);
assertBusy(() -> assertTrue(finished.get()));
}
@SuppressWarnings("unchecked")
public void testSaveStateChangesIDScheme() throws InterruptedException {
public void testSaveStateChangesIDScheme() throws Exception {
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Map<String, String> headers = new HashMap<>(1);
headers.put("es-security-runas-user", "foo");
@ -656,7 +656,7 @@ public class RollupJobTaskTests extends ESTestCase {
fail("Should not have entered onFailure");
}
});
ESTestCase.awaitBusy(started::get);
assertBusy(() -> assertTrue(started.get()));
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
@ -665,7 +665,7 @@ public class RollupJobTaskTests extends ESTestCase {
latch.countDown();
// Wait for the final persistent status to finish
ESTestCase.awaitBusy(finished::get);
assertBusy(() -> assertTrue(finished.get()));
}
public void testStopWhenStopped() throws InterruptedException {

View File

@ -129,7 +129,7 @@ public class LicensingTests extends SecurityIntegTestCase {
}
@Before
public void resetLicensing() throws InterruptedException {
public void resetLicensing() throws Exception {
enableLicensing(OperationMode.MISSING);
}
@ -297,64 +297,50 @@ public class LicensingTests extends SecurityIntegTestCase {
assertThat(ee.status(), is(RestStatus.FORBIDDEN));
}
private void disableLicensing() throws InterruptedException {
private void disableLicensing() throws Exception {
// This method first makes sure licensing is enabled everywhere so that we can execute
// monitoring actions to ensure we have a stable cluster and only then do we disable.
// This is done in an await busy since there is a chance that the enabling of the license
// This is done in an assertBusy since there is a chance that the enabling of the license
// is overwritten by some other cluster activity and the node throws an exception while we
// wait for things to stabilize!
final boolean success = awaitBusy(() -> {
try {
for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) {
if (licenseState.isAuthAllowed() == false) {
enableLicensing(OperationMode.BASIC);
break;
}
assertBusy(() -> {
for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) {
if (licenseState.isAuthAllowed() == false) {
enableLicensing(OperationMode.BASIC);
break;
}
ensureGreen();
ensureClusterSizeConsistency();
ensureClusterStateConsistency();
// apply the disabling of the license once the cluster is stable
for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) {
licenseState.update(OperationMode.BASIC, false, null);
}
} catch (Exception e) {
logger.error("Caught exception while disabling license", e);
return false;
}
return true;
ensureGreen();
ensureClusterSizeConsistency();
ensureClusterStateConsistency();
// apply the disabling of the license once the cluster is stable
for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) {
licenseState.update(OperationMode.BASIC, false, null);
}
}, 30L, TimeUnit.SECONDS);
assertTrue(success);
}
private void enableLicensing(License.OperationMode operationMode) throws InterruptedException {
private void enableLicensing(License.OperationMode operationMode) throws Exception {
// do this in an await busy since there is a chance that the enabling of the license is
// overwritten by some other cluster activity and the node throws an exception while we
// wait for things to stabilize!
final boolean success = awaitBusy(() -> {
try {
// first update the license so we can execute monitoring actions
for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) {
licenseState.update(operationMode, true, null);
}
ensureGreen();
ensureClusterSizeConsistency();
ensureClusterStateConsistency();
// re-apply the update in case any node received an updated cluster state that triggered the license state
// to change
for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) {
licenseState.update(operationMode, true, null);
}
} catch (Exception e) {
logger.error("Caught exception while enabling license", e);
return false;
assertBusy(() -> {
// first update the license so we can execute monitoring actions
for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) {
licenseState.update(operationMode, true, null);
}
ensureGreen();
ensureClusterSizeConsistency();
ensureClusterStateConsistency();
// re-apply the update in case any node received an updated cluster state that triggered the license state
// to change
for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) {
licenseState.update(operationMode, true, null);
}
return true;
}, 30L, TimeUnit.SECONDS);
assertTrue(success);
}
}

View File

@ -80,7 +80,7 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
}
@After
public void wipeSecurityIndex() throws InterruptedException {
public void wipeSecurityIndex() throws Exception {
// get the api key service and wait until api key expiration is not in progress!
awaitApiKeysRemoverCompletion();
deleteSecurityIndex();
@ -111,10 +111,9 @@ public class ApiKeyIntegTests extends SecurityIntegTestCase {
"manage_own_api_key_role:user_with_manage_own_api_key_role\n";
}
private void awaitApiKeysRemoverCompletion() throws InterruptedException {
private void awaitApiKeysRemoverCompletion() throws Exception {
for (ApiKeyService apiKeyService : internalCluster().getInstances(ApiKeyService.class)) {
final boolean done = awaitBusy(() -> apiKeyService.isExpirationInProgress() == false);
assertTrue(done);
assertBusy(() -> assertFalse(apiKeyService.isExpirationInProgress()));
}
}

View File

@ -161,9 +161,7 @@ public class RunAsIntegTests extends SecurityIntegTestCase {
.put(SecurityField.USER_SETTING.getKey(), TRANSPORT_CLIENT_USER + ":"
+ SecuritySettingsSourceField.TEST_PASSWORD).build())) {
//ensure the client can connect
awaitBusy(() -> {
return client.connectedNodes().size() > 0;
});
assertBusy(() -> assertFalse(client.connectedNodes().isEmpty()));
try {
Map<String, String> headers = new HashMap<>();
@ -193,9 +191,7 @@ public class RunAsIntegTests extends SecurityIntegTestCase {
.put(SecurityField.USER_SETTING.getKey(), TRANSPORT_CLIENT_USER + ":" +
SecuritySettingsSourceField.TEST_PASSWORD).build())) {
//ensure the client can connect
awaitBusy(() -> {
return client.connectedNodes().size() > 0;
});
assertBusy(() -> assertFalse(client.connectedNodes().isEmpty()));
try {
Map<String, String> headers = new HashMap<>();

View File

@ -558,11 +558,10 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
}
@After
public void wipeSecurityIndex() throws InterruptedException {
public void wipeSecurityIndex() throws Exception {
// get the token service and wait until token expiration is not in progress!
for (TokenService tokenService : internalCluster().getInstances(TokenService.class)) {
final boolean done = awaitBusy(() -> tokenService.isExpirationInProgress() == false);
assertTrue(done);
assertBusy(() -> assertFalse(tokenService.isExpirationInProgress()));
}
super.deleteSecurityIndex();
}

View File

@ -320,7 +320,7 @@ public class SessionFactoryLoadBalancingTests extends LdapTestCase {
final List<Socket> openedSockets = new ArrayList<>();
final List<InetAddress> blacklistedAddress = new ArrayList<>();
try {
final boolean allSocketsOpened = awaitBusy(() -> {
final boolean allSocketsOpened = waitUntil(() -> {
try {
InetAddress[] allAddresses = InetAddressHelper.getAllAddresses();
if (serverAddress instanceof Inet4Address) {
@ -337,10 +337,7 @@ public class SessionFactoryLoadBalancingTests extends LdapTestCase {
final Socket socket = openMockSocket(serverAddress, serverPort, localAddress, portToBind);
openedSockets.add(socket);
logger.debug("opened socket [{}]", socket);
} catch (NoRouteToHostException e) {
logger.debug(new ParameterizedMessage("blacklisting address [{}] due to:", localAddress), e);
blacklistedAddress.add(localAddress);
} catch (ConnectException e) {
} catch (NoRouteToHostException | ConnectException e) {
logger.debug(new ParameterizedMessage("blacklisting address [{}] due to:", localAddress), e);
blacklistedAddress.add(localAddress);
}

View File

@ -283,7 +283,7 @@ public class NativePrivilegeStoreTests extends ESTestCase {
));
}
awaitBusy(() -> requests.size() > 0, 1, TimeUnit.SECONDS);
assertBusy(() -> assertFalse(requests.isEmpty()), 1, TimeUnit.SECONDS);
assertThat(requests, iterableWithSize(1));
assertThat(requests.get(0), instanceOf(ClearRolesCacheRequest.class));
@ -324,7 +324,7 @@ public class NativePrivilegeStoreTests extends ESTestCase {
));
}
awaitBusy(() -> requests.size() > 0, 1, TimeUnit.SECONDS);
assertBusy(() -> assertFalse(requests.isEmpty()), 1, TimeUnit.SECONDS);
assertThat(requests, iterableWithSize(1));
assertThat(requests.get(0), instanceOf(ClearRolesCacheRequest.class));

View File

@ -39,7 +39,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
@ -49,6 +48,7 @@ import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@ -213,7 +213,7 @@ public class XPackRestIT extends ESClientYamlSuiteTestCase {
},
() -> "Exception when disabling monitoring");
awaitBusy(() -> {
assertBusy(() -> {
try {
ClientYamlTestResponse response =
callApi("xpack.usage", singletonMap("filter_path", "monitoring.enabled_exporters"), emptyList(),
@ -222,7 +222,7 @@ public class XPackRestIT extends ESClientYamlSuiteTestCase {
@SuppressWarnings("unchecked")
final Map<String, ?> exporters = (Map<String, ?>) response.evaluate("monitoring.enabled_exporters");
if (exporters.isEmpty() == false) {
return false;
fail("Exporters were not found");
}
final Map<String, String> params = new HashMap<>();
@ -237,7 +237,8 @@ public class XPackRestIT extends ESClientYamlSuiteTestCase {
final Map<String, Object> node = (Map<String, Object>) nodes.values().iterator().next();
final Number activeWrites = (Number) extractValue("thread_pool.write.active", node);
return activeWrites != null && activeWrites.longValue() == 0L;
assertNotNull(activeWrites);
assertThat(activeWrites, equalTo(0));
} catch (Exception e) {
throw new ElasticsearchException("Failed to wait for monitoring exporters to stop:", e);
}
@ -281,26 +282,15 @@ public class XPackRestIT extends ESClientYamlSuiteTestCase {
Map<String, String> params,
List<Map<String, Object>> bodies,
CheckedFunction<ClientYamlTestResponse, Boolean, IOException> success,
Supplier<String> error) throws Exception {
AtomicReference<IOException> exceptionHolder = new AtomicReference<>();
awaitBusy(() -> {
try {
ClientYamlTestResponse response = callApi(apiName, params, bodies, getApiCallHeaders());
if (response.getStatusCode() == HttpStatus.SC_OK) {
exceptionHolder.set(null);
return success.apply(response);
}
return false;
} catch (IOException e) {
exceptionHolder.set(e);
}
return false;
});
IOException exception = exceptionHolder.get();
if (exception != null) {
throw new IllegalStateException(error.get(), exception);
Supplier<String> error) {
try {
// The actual method call that sends the API requests returns a Future, but we immediately
// call .get() on it so there's no need for this method to do any other awaiting.
ClientYamlTestResponse response = callApi(apiName, params, bodies, getApiCallHeaders());
assertEquals(response.getStatusCode(), HttpStatus.SC_OK);
success.apply(response);
} catch (Exception e) {
throw new IllegalStateException(error.get(), e);
}
}

View File

@ -195,7 +195,7 @@ public class TransformIndexerTests extends ESTestCase {
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
}
public void testPageSizeAdapt() throws InterruptedException {
public void testPageSizeAdapt() throws Exception {
Integer pageSize = randomBoolean() ? null : randomIntBetween(500, 10_000);
TransformConfig config = new TransformConfig(randomAlphaOfLength(10),
randomSourceConfig(),
@ -232,8 +232,9 @@ public class TransformIndexerTests extends ESTestCase {
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
latch.countDown();
awaitBusy(() -> indexer.getState() == IndexerState.STOPPED);
assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)));
long pageSizeAfterFirstReduction = indexer.getPageSize();
assertThat(initialPageSize, greaterThan(pageSizeAfterFirstReduction));
assertThat(pageSizeAfterFirstReduction, greaterThan((long)TransformIndexer.MINIMUM_PAGE_SIZE));
@ -245,8 +246,9 @@ public class TransformIndexerTests extends ESTestCase {
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
secondRunLatch.countDown();
awaitBusy(() -> indexer.getState() == IndexerState.STOPPED);
assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)));
// assert that page size has been reduced again
assertThat(pageSizeAfterFirstReduction, greaterThan((long)indexer.getPageSize()));

View File

@ -90,9 +90,10 @@ public class ActivateWatchTests extends AbstractWatcherIntegrationTestCase {
Thread.sleep(5000);
refresh();
long count2 = docCount(".watcher-history*", matchAllQuery());
assertThat(count2, is(count1));
// Ensure no new watch history. The assertion ought to always return false, but if it returns true
// then we know that more history has been written.
boolean hasNewHistory = waitUntil(() -> count1 != docCount(".watcher-history*", matchAllQuery()), 5, TimeUnit.SECONDS);
assertFalse("Watcher should have stopped executing but new history found", hasNewHistory);
// lets activate it again
logger.info("Activating watch again");

View File

@ -35,6 +35,9 @@ import java.util.concurrent.TimeUnit;
import javax.net.ServerSocketFactory;
import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.junit.Assert.assertTrue;
/**
* Utility wrapper around Apache {@link SimpleKdcServer} backed by Unboundid
* {@link InMemoryDirectoryServer}.<br>
@ -90,9 +93,7 @@ public class SimpleKdcLdapServer {
AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
if (ESTestCase.awaitBusy(() -> init()) == false) {
throw new IllegalStateException("could not initialize SimpleKdcLdapServer");
}
assertBusy(() -> assertTrue("Failed to initialize SimpleKdcLdapServer", init()));
return null;
}
});
@ -218,7 +219,7 @@ public class SimpleKdcLdapServer {
/**
* Stop Simple Kdc Server
*
*
* @throws PrivilegedActionException when privileged action threw exception
*/
public synchronized void stop() throws PrivilegedActionException {

View File

@ -6,15 +6,18 @@
package org.elasticsearch.upgrades;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.xpack.test.SecuritySettingsSourceField;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.test.SecuritySettingsSourceField;
import org.junit.Before;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.test.SecuritySettingsSourceField.basicAuthHeaderValue;
@ -77,27 +80,31 @@ public abstract class AbstractUpgradeTestCase extends ESRestTestCase {
}
protected Collection<String> templatesToWaitFor() {
return Collections.singletonList("security-index-template");
return Collections.emptyList();
}
@Before
public void setupForTests() throws Exception {
awaitBusy(() -> {
boolean success = true;
for (String template : templatesToWaitFor()) {
try {
final Request headRequest = new Request("HEAD", "_template/" + template);
headRequest.setOptions(allowTypesRemovalWarnings());
final boolean exists = adminClient()
.performRequest(headRequest)
.getStatusLine().getStatusCode() == 200;
success &= exists;
logger.debug("template [{}] exists [{}]", template, exists);
} catch (IOException e) {
logger.warn("error calling template api", e);
}
final Collection<String> expectedTemplates = templatesToWaitFor();
if (expectedTemplates.isEmpty()) {
return;
}
assertBusy(() -> {
final Request catRequest = new Request("GET", "_cat/templates?h=n&s=n");
final Response catResponse = adminClient().performRequest(catRequest);
final List<String> templates = Streams.readAllLines(catResponse.getEntity().getContent());
final List<String> missingTemplates = expectedTemplates.stream()
.filter(each -> templates.contains(each) == false)
.collect(Collectors.toList());
// While it's possible to use a Hamcrest matcher for this, the failure is much less legible.
if (missingTemplates.isEmpty() == false) {
fail("Some expected templates are missing: " + missingTemplates + ". The templates that exist are: " + templates + "");
}
return success;
});
}
}

View File

@ -37,6 +37,7 @@ import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -68,8 +69,12 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
@Override
protected Collection<String> templatesToWaitFor() {
return Stream.concat(XPackRestTestConstants.DATA_FRAME_TEMPLATES.stream(),
super.templatesToWaitFor().stream()).collect(Collectors.toSet());
if (UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_4_0)) {
return Stream.concat(XPackRestTestConstants.DATA_FRAME_TEMPLATES.stream(),
super.templatesToWaitFor().stream()).collect(Collectors.toSet());
} else {
return Collections.emptySet();
}
}
protected static void waitForPendingDataFrameTasks() throws Exception {

View File

@ -51,9 +51,7 @@ public class SecurityTransportClientIT extends ESIntegTestCase {
public void testThatTransportClientWithoutAuthenticationDoesNotWork() throws Exception {
try (TransportClient client = transportClient(Settings.EMPTY)) {
boolean connected = awaitBusy(() -> {
return client.connectedNodes().size() > 0;
}, 5L, TimeUnit.SECONDS);
boolean connected = waitUntil(() -> client.connectedNodes().size() > 0, 5L, TimeUnit.SECONDS);
assertThat(connected, is(false));
}
@ -64,11 +62,7 @@ public class SecurityTransportClientIT extends ESIntegTestCase {
.put(SecurityField.USER_SETTING.getKey(), TRANSPORT_USER_PW)
.build();
try (TransportClient client = transportClient(settings)) {
boolean connected = awaitBusy(() -> {
return client.connectedNodes().size() > 0;
}, 5L, TimeUnit.SECONDS);
assertThat(connected, is(true));
assertBusy(() -> assertFalse(client.connectedNodes().isEmpty()), 5L, TimeUnit.SECONDS);
// this checks that the transport client is really running in a limited state
try {
@ -86,11 +80,7 @@ public class SecurityTransportClientIT extends ESIntegTestCase {
.put(SecurityField.USER_SETTING.getKey(), useTransportUser ? TRANSPORT_USER_PW : ADMIN_USER_PW)
.build();
try (TransportClient client = transportClient(settings)) {
boolean connected = awaitBusy(() -> {
return client.connectedNodes().size() > 0;
}, 5L, TimeUnit.SECONDS);
assertThat(connected, is(true));
assertBusy(() -> assertFalse(client.connectedNodes().isEmpty()), 5L, TimeUnit.SECONDS);
// this checks that the transport client is really running in a limited state
ClusterHealthResponse response;

View File

@ -23,25 +23,23 @@ public final class XPackRestTestConstants {
// ML constants:
public static final String ML_META_INDEX_NAME = ".ml-meta";
public static final String AUDITOR_NOTIFICATIONS_INDEX = ".ml-notifications-000001";
public static final String CONFIG_INDEX = ".ml-config";
public static final String RESULTS_INDEX_PREFIX = ".ml-anomalies-";
public static final String STATE_INDEX_PREFIX = ".ml-state";
public static final String RESULTS_INDEX_DEFAULT = "shared";
public static final List<String> ML_PRE_V660_TEMPLATES = Collections.unmodifiableList(Arrays.asList(
AUDITOR_NOTIFICATIONS_INDEX, ML_META_INDEX_NAME, STATE_INDEX_PREFIX, RESULTS_INDEX_PREFIX));
ML_META_INDEX_NAME, STATE_INDEX_PREFIX, RESULTS_INDEX_PREFIX));
public static final List<String> ML_POST_V660_TEMPLATES =
Collections.unmodifiableList(Arrays.asList(
AUDITOR_NOTIFICATIONS_INDEX,
ML_META_INDEX_NAME,
STATE_INDEX_PREFIX,
RESULTS_INDEX_PREFIX,
CONFIG_INDEX));
// Data Frame constants:
public static final String DATA_FRAME_INTERNAL_INDEX = ".data-frame-internal-1";
public static final String DATA_FRAME_INTERNAL_INDEX = ".data-frame-internal-2";
public static final String DATA_FRAME_NOTIFICATIONS_INDEX = ".data-frame-notifications-1";
public static final List<String> DATA_FRAME_TEMPLATES =

View File

@ -5,22 +5,25 @@
*/
package org.elasticsearch.xpack.test.rest;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.elasticsearch.test.rest.ESRestTestCase.allowTypesRemovalWarnings;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.fail;
public final class XPackRestTestHelper {
@ -31,50 +34,53 @@ public final class XPackRestTestHelper {
* For each template name wait for the template to be created and
* for the template version to be equal to the master node version.
*
* @param client The rest client
* @param templateNames Names of the templates to wait for
* @param client The rest client
* @param expectedTemplates Names of the templates to wait for
* @throws InterruptedException If the wait is interrupted
*/
public static void waitForTemplates(RestClient client, List<String> templateNames) throws InterruptedException {
public static void waitForTemplates(RestClient client, List<String> expectedTemplates) throws Exception {
AtomicReference<Version> masterNodeVersion = new AtomicReference<>();
ESTestCase.awaitBusy(() -> {
String response;
try {
Request request = new Request("GET", "/_cat/nodes");
request.addParameter("h", "master,version");
response = EntityUtils.toString(client.performRequest(request).getEntity());
} catch (IOException e) {
throw new RuntimeException(e);
}
assertBusy(() -> {
Request request = new Request("GET", "/_cat/nodes");
request.addParameter("h", "master,version");
String response = EntityUtils.toString(client.performRequest(request).getEntity());
for (String line : response.split("\n")) {
if (line.startsWith("*")) {
masterNodeVersion.set(Version.fromString(line.substring(2).trim()));
return true;
return;
}
}
return false;
fail("No master elected");
});
for (String template : templateNames) {
ESTestCase.awaitBusy(() -> {
Map<?, ?> response;
try {
final Request getRequest = new Request("GET", "_template/" + template);
getRequest.setOptions(allowTypesRemovalWarnings());
String string = EntityUtils.toString(client.performRequest(getRequest).getEntity());
response = XContentHelper.convertToMap(JsonXContent.jsonXContent, string, false);
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == 404) {
return false;
}
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
assertBusy(() -> {
final Request request = new Request("GET", "_template");
request.setOptions(allowTypesRemovalWarnings());
String string = EntityUtils.toString(client.performRequest(request).getEntity());
Map<String, Object> response = XContentHelper.convertToMap(JsonXContent.jsonXContent, string, false);
final Set<String> templates = new TreeSet<>(response.keySet());
final List<String> missingTemplates = expectedTemplates.stream()
.filter(each -> templates.contains(each) == false)
.collect(Collectors.toList());
// While it's possible to use a Hamcrest matcher for this, the failure is much less legible.
if (missingTemplates.isEmpty() == false) {
fail("Some expected templates are missing: " + missingTemplates + ". The templates that exist are: " + templates + "");
}
expectedTemplates.forEach(template -> {
Map<?, ?> templateDefinition = (Map<?, ?>) response.get(template);
return Version.fromId((Integer) templateDefinition.get("version")).equals(masterNodeVersion.get());
assertThat(
"Template [" + template + "] has unexpected version",
Version.fromId((Integer) templateDefinition.get("version")),
equalTo(masterNodeVersion.get()));
});
}
});
}
public static String resultsWriteAlias(String jobId) {