Rollup: Consolidate rollup cleanup for http tests (#34342)

This moves the rollup cleanup code for http tests from the high level rest
client into the test framework and then entirely removes the rollup cleanup
code for http tests that lived in x-pack. This is nice because it
consolidates the cleanup into one spot, automatically invokes the cleanup
without the test having to know that it is "about rollup", and should allow
us to run the rollup docs tests.

Part of #34530
This commit is contained in:
Nik Everett 2018-10-17 09:32:16 -04:00 committed by GitHub
parent fb579d2d9a
commit 139bbc3f03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 98 additions and 191 deletions

View File

@ -208,8 +208,7 @@ public class RollupIT extends ESRestHighLevelClientTestCase {
}
});
// TODO when we move cleaning rollup into ESTestCase we can randomly choose the _all version of this request
GetRollupJobRequest getRollupJobRequest = new GetRollupJobRequest(id);
GetRollupJobRequest getRollupJobRequest = randomBoolean() ? new GetRollupJobRequest() : new GetRollupJobRequest(id);
GetRollupJobResponse getResponse = execute(getRollupJobRequest, rollupClient::getRollupJob, rollupClient::getRollupJobAsync);
assertThat(getResponse.getJobs(), hasSize(1));
JobWrapper job = getResponse.getJobs().get(0);

View File

@ -27,9 +27,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.rollup.GetRollupJobRequest;
import org.elasticsearch.client.rollup.GetRollupJobResponse;
@ -47,21 +45,15 @@ import org.elasticsearch.client.rollup.job.config.MetricConfig;
import org.elasticsearch.client.rollup.job.config.RollupJobConfig;
import org.elasticsearch.client.rollup.job.config.TermsGroupConfig;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.junit.After;
import org.junit.Before;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -227,62 +219,6 @@ public class RollupDocumentationIT extends ESRestHighLevelClientTestCase {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
@After
public void wipeRollup() throws Exception {
// TODO move this to ESRestTestCase
deleteRollupJobs();
waitForPendingRollupTasks();
}
private void deleteRollupJobs() throws Exception {
Response response = adminClient().performRequest(new Request("GET", "/_xpack/rollup/job/_all"));
Map<String, Object> jobs = entityAsMap(response);
@SuppressWarnings("unchecked")
List<Map<String, Object>> jobConfigs =
(List<Map<String, Object>>) XContentMapValues.extractValue("jobs", jobs);
if (jobConfigs == null) {
return;
}
for (Map<String, Object> jobConfig : jobConfigs) {
@SuppressWarnings("unchecked")
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
Request request = new Request("DELETE", "/_xpack/rollup/job/" + jobId);
request.addParameter("ignore", "404"); // Ignore 404s because they imply someone was racing us to delete this
adminClient().performRequest(request);
}
}
private void waitForPendingRollupTasks() throws Exception {
assertBusy(() -> {
try {
Request request = new Request("GET", "/_cat/tasks");
request.addParameter("detailed", "true");
Response response = adminClient().performRequest(request);
try (BufferedReader responseReader = new BufferedReader(
new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
int activeTasks = 0;
String line;
StringBuilder tasksListString = new StringBuilder();
while ((line = responseReader.readLine()) != null) {
// We only care about Rollup jobs, otherwise this fails too easily due to unrelated tasks
if (line.startsWith("xpack/rollup/job") == true) {
activeTasks++;
tasksListString.append(line).append('\n');
}
}
assertEquals(activeTasks + " active tasks found:\n" + tasksListString, 0, activeTasks);
}
} catch (IOException e) {
// Throw an assertion error so we retry
throw new AssertionError("Error getting active tasks list", e);
}
});
}
public void testDeleteRollupJob() throws Exception {
RestHighLevelClient client = highLevelClient();
@ -303,8 +239,6 @@ public class RollupDocumentationIT extends ESRestHighLevelClientTestCase {
// Swallow any exception, this test does not test actually cancelling.
}
// tag::rollup-delete-job-execute-listener
ActionListener<DeleteRollupJobResponse> listener = new ActionListener<DeleteRollupJobResponse>() {
@Override
@ -328,7 +262,5 @@ public class RollupDocumentationIT extends ESRestHighLevelClientTestCase {
// end::rollup-delete-job-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

View File

@ -52,8 +52,11 @@ import org.junit.AfterClass;
import org.junit.Before;
import javax.net.ssl.SSLContext;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyManagementException;
@ -206,8 +209,8 @@ public abstract class ESRestTestCase extends ESTestCase {
/**
* Returns whether to preserve the state of the cluster upon completion of this test. Defaults to false. If true, overrides the value of
* {@link #preserveIndicesUponCompletion()}, {@link #preserveTemplatesUponCompletion()}, {@link #preserveReposUponCompletion()}, and
* {@link #preserveSnapshotsUponCompletion()}.
* {@link #preserveIndicesUponCompletion()}, {@link #preserveTemplatesUponCompletion()}, {@link #preserveReposUponCompletion()},
* {@link #preserveSnapshotsUponCompletion()}, and {@link #preserveRollupJobsUponCompletion()}.
*
* @return true if the state of the cluster should be preserved
*/
@ -263,7 +266,18 @@ public abstract class ESRestTestCase extends ESTestCase {
return false;
}
private void wipeCluster() throws IOException {
/**
* Returns whether to preserve the rollup jobs of this test. Defaults to
* not preserving them. Only runs at all if xpack is installed on the
* cluster being tested.
*/
protected boolean preserveRollupJobsUponCompletion() {
return false;
}
private void wipeCluster() throws Exception {
boolean hasXPack = hasXPack();
if (preserveIndicesUponCompletion() == false) {
// wipe indices
try {
@ -278,7 +292,7 @@ public abstract class ESRestTestCase extends ESTestCase {
// wipe index templates
if (preserveTemplatesUponCompletion() == false) {
if (hasXPack()) {
if (hasXPack) {
/*
* Delete only templates that xpack doesn't automatically
* recreate. Deleting them doesn't hurt anything, but it
@ -310,6 +324,11 @@ public abstract class ESRestTestCase extends ESTestCase {
if (preserveClusterSettings() == false) {
wipeClusterSettings();
}
if (hasXPack && false == preserveRollupJobsUponCompletion()) {
wipeRollupJobs();
waitForPendingRollupTasks();
}
}
/**
@ -372,6 +391,56 @@ public abstract class ESRestTestCase extends ESTestCase {
}
}
private void wipeRollupJobs() throws IOException {
Response response = adminClient().performRequest(new Request("GET", "/_xpack/rollup/job/_all"));
Map<String, Object> jobs = entityAsMap(response);
@SuppressWarnings("unchecked")
List<Map<String, Object>> jobConfigs =
(List<Map<String, Object>>) XContentMapValues.extractValue("jobs", jobs);
if (jobConfigs == null) {
return;
}
for (Map<String, Object> jobConfig : jobConfigs) {
@SuppressWarnings("unchecked")
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
Request request = new Request("DELETE", "/_xpack/rollup/job/" + jobId);
request.addParameter("ignore", "404"); // Ignore 404s because they imply someone was racing us to delete this
logger.debug("deleting rollup job [{}]", jobId);
adminClient().performRequest(request);
}
}
private void waitForPendingRollupTasks() throws Exception {
assertBusy(() -> {
try {
Request request = new Request("GET", "/_cat/tasks");
request.addParameter("detailed", "true");
Response response = adminClient().performRequest(request);
try (BufferedReader responseReader = new BufferedReader(
new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
int activeTasks = 0;
String line;
StringBuilder tasksListString = new StringBuilder();
while ((line = responseReader.readLine()) != null) {
// We only care about Rollup jobs, otherwise this fails too easily due to unrelated tasks
if (line.startsWith("xpack/rollup/job") == true) {
activeTasks++;
tasksListString.append(line).append('\n');
}
}
assertEquals(activeTasks + " active tasks found:\n" + tasksListString, 0, activeTasks);
}
} catch (IOException e) {
// Throw an assertion error so we retry
throw new AssertionError("Error getting active tasks list", e);
}
});
}
/**
* Logs a message if there are still running tasks. The reasoning is that any tasks still running are state the is trying to bleed into
* other tests.

View File

@ -62,4 +62,8 @@ public abstract class AbstractFullClusterRestartTestCase extends ESRestTestCase
return true;
}
@Override
protected boolean preserveRollupJobsUponCompletion() {
return true;
}
}

View File

@ -120,12 +120,6 @@ public class XDocsClientYamlTestSuiteIT extends XPackRestIT {
return testName != null && (testName.contains("ml/") || testName.contains("ml\\"));
}
@Override
protected boolean isRollupTest() {
String testName = getTestName();
return testName != null && (testName.contains("rollup/") || testName.contains("rollup\\"));
}
/**
* Deletes users after every test just in case any test adds any.
*/

View File

@ -1,92 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.rollup;
import org.apache.http.HttpStatus;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
public class RollupRestTestStateCleaner {
public static void clearRollupMetadata(RestClient adminClient) throws Exception {
deleteAllJobs(adminClient);
waitForPendingTasks(adminClient);
// indices will be deleted by the ESRestTestCase class
}
private static void waitForPendingTasks(RestClient adminClient) throws Exception {
ESTestCase.assertBusy(() -> {
try {
Request request = new Request("GET", "/_cat/tasks");
request.addParameter("detailed", "true");
Response response = adminClient.performRequest(request);
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
try (BufferedReader responseReader = new BufferedReader(
new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
int activeTasks = 0;
String line;
StringBuilder tasksListString = new StringBuilder();
while ((line = responseReader.readLine()) != null) {
// We only care about Rollup jobs, otherwise this fails too easily due to unrelated tasks
if (line.startsWith(RollupJob.NAME) == true) {
activeTasks++;
tasksListString.append(line);
tasksListString.append('\n');
}
}
assertEquals(activeTasks + " active tasks found:\n" + tasksListString, 0, activeTasks);
}
}
} catch (IOException e) {
throw new AssertionError("Error getting active tasks list", e);
}
});
}
@SuppressWarnings("unchecked")
private static void deleteAllJobs(RestClient adminClient) throws Exception {
Response response = adminClient.performRequest(new Request("GET", "/_xpack/rollup/job/_all"));
Map<String, Object> jobs = ESRestTestCase.entityAsMap(response);
List<Map<String, Object>> jobConfigs =
(List<Map<String, Object>>) XContentMapValues.extractValue("jobs", jobs);
if (jobConfigs == null) {
return;
}
for (Map<String, Object> jobConfig : jobConfigs) {
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
try {
response = adminClient.performRequest(new Request("DELETE", "/_xpack/rollup/job/" + jobId));
} catch (Exception e) {
// ok
}
}
}
private static String responseEntityToString(Response response) throws Exception {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(),
StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));
}
}
}

View File

@ -24,7 +24,7 @@ import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.elasticsearch.xpack.core.rollup.RollupRestTestStateCleaner;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField;
import org.junit.After;
import org.junit.Before;
@ -243,11 +243,13 @@ public class XPackRestIT extends ESClientYamlSuiteTestCase {
public void cleanup() throws Exception {
disableMonitoring();
clearMlState();
clearRollupState();
if (isWaitForPendingTasks()) {
// This waits for pending tasks to complete, so must go last (otherwise
// it could be waiting for pending tasks while monitoring is still running).
XPackRestTestHelper.waitForPendingTasks(adminClient());
XPackRestTestHelper.waitForPendingTasks(adminClient(), task -> {
// Don't check rollup jobs because we clear them in the superclass.
return task.contains(RollupJob.NAME);
});
}
}
@ -260,17 +262,6 @@ public class XPackRestIT extends ESClientYamlSuiteTestCase {
}
}
/**
* Delete any left over rollup jobs
*
* Also reuses the pending-task logic from Ml... should refactor to shared location
*/
private void clearRollupState() throws Exception {
if (isRollupTest()) {
RollupRestTestStateCleaner.clearRollupMetadata(adminClient());
}
}
/**
* Executes an API call using the admin context, waiting for it to succeed.
*/
@ -331,11 +322,6 @@ public class XPackRestIT extends ESClientYamlSuiteTestCase {
return testName != null && (testName.contains("=ml/") || testName.contains("=ml\\"));
}
protected boolean isRollupTest() {
String testName = getTestName();
return testName != null && (testName.contains("=rollup/") || testName.contains("=rollup\\"));
}
/**
* Should each test wait for pending tasks to finish after execution?
* @return Wait for pending tasks

View File

@ -24,6 +24,11 @@ public abstract class AbstractUpgradeTestCase extends ESRestTestCase {
return true;
}
@Override
protected boolean preserveRollupJobsUponCompletion() {
return true;
}
enum CLUSTER_TYPE {
OLD,
MIXED,

View File

@ -38,6 +38,11 @@ public abstract class AbstractUpgradeTestCase extends ESRestTestCase {
return true;
}
@Override
protected boolean preserveRollupJobsUponCompletion() {
return true;
}
enum ClusterType {
OLD,
MIXED,

View File

@ -42,6 +42,11 @@ public class UpgradeClusterClientYamlTestSuiteIT extends ESClientYamlSuiteTestCa
return true;
}
@Override
protected boolean preserveRollupJobsUponCompletion() {
return true;
}
public UpgradeClusterClientYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}