Clean SLM and ongoing snapshots in test framework (#45564)

Adjusts the cluster cleanup routine in ESRestTestCase to clean up SLM
test cases, and optionally wait for all snapshots to be deleted.

Waiting for all snapshots to be deleted, rather than failing if any are
in progress, is necessary for tests which use SLM policies because SLM
policies may be in the process of executing when the test ends.
This commit is contained in:
Gordon Brown 2019-08-16 14:17:34 -06:00 committed by GitHub
parent c321272ae7
commit ecb3ebd796
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 69 additions and 31 deletions

View File

@ -95,6 +95,11 @@ public class SnapshotClientDocumentationIT extends ESRestHighLevelClientTestCase
private static final String snapshotName = "test_snapshot"; private static final String snapshotName = "test_snapshot";
private static final String indexName = "test_index"; private static final String indexName = "test_index";
@Override
protected boolean waitForAllSnapshotsWiped() {
return true;
}
public void testSnapshotCreateRepository() throws IOException { public void testSnapshotCreateRepository() throws IOException {
RestHighLevelClient client = highLevelClient(); RestHighLevelClient client = highLevelClient();

View File

@ -26,6 +26,7 @@ import org.apache.http.message.BasicHeader;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContexts; import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.client.Request; import org.elasticsearch.client.Request;
@ -80,11 +81,13 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Predicate; import java.util.function.Predicate;
import static java.util.Collections.sort; import static java.util.Collections.sort;
import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableList;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -468,6 +471,12 @@ public abstract class ESRestTestCase extends ESTestCase {
return false; return false;
} }
/**
* Returns whether to wait to make absolutely certain that all snapshots
* have been deleted.
*/
protected boolean waitForAllSnapshotsWiped() { return false; }
private void wipeCluster() throws Exception { private void wipeCluster() throws Exception {
// Cleanup rollup before deleting indices. A rollup job might have bulks in-flight, // Cleanup rollup before deleting indices. A rollup job might have bulks in-flight,
@ -478,7 +487,29 @@ public abstract class ESRestTestCase extends ESTestCase {
waitForPendingRollupTasks(); waitForPendingRollupTasks();
} }
final Map<String, List<Map<?,?>>> inProgressSnapshots = wipeSnapshots(); // Clean up SLM policies before trying to wipe snapshots so that no new ones get started by SLM after wiping
if (nodeVersions.first().onOrAfter(Version.V_7_4_0)) { // SLM was introduced in version 7.4
deleteAllSLMPolicies();
}
SetOnce<Map<String, List<Map<?,?>>>> inProgressSnapshots = new SetOnce<>();
if (waitForAllSnapshotsWiped()) {
AtomicReference<Map<String, List<Map<?,?>>>> snapshots = new AtomicReference<>();
try {
// Repeatedly delete the snapshots until there aren't any
assertBusy(() -> {
snapshots.set(wipeSnapshots());
assertThat(snapshots.get(), anEmptyMap());
}, 2, TimeUnit.MINUTES);
// At this point there should be no snaphots
inProgressSnapshots.set(snapshots.get());
} catch (AssertionError e) {
// This will cause an error at the end of this method, but do the rest of the cleanup first
inProgressSnapshots.set(snapshots.get());
}
} else {
inProgressSnapshots.set(wipeSnapshots());
}
if (preserveIndicesUponCompletion() == false) { if (preserveIndicesUponCompletion() == false) {
// wipe indices // wipe indices
@ -526,10 +557,10 @@ public abstract class ESRestTestCase extends ESTestCase {
} }
if (hasXPack && false == preserveILMPoliciesUponCompletion()) { if (hasXPack && false == preserveILMPoliciesUponCompletion()) {
deleteAllPolicies(); deleteAllILMPolicies();
} }
assertTrue("Found in progress snapshots [" + inProgressSnapshots + "].", inProgressSnapshots.isEmpty()); assertThat("Found in progress snapshots [" + inProgressSnapshots.get() + "].", inProgressSnapshots.get(), anEmptyMap());
} }
/** /**
@ -634,7 +665,7 @@ public abstract class ESRestTestCase extends ESTestCase {
waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("xpack/rollup/job") == false); waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("xpack/rollup/job") == false);
} }
private static void deleteAllPolicies() throws IOException { private static void deleteAllILMPolicies() throws IOException {
Map<String, Object> policies; Map<String, Object> policies;
try { try {
@ -657,6 +688,29 @@ public abstract class ESRestTestCase extends ESTestCase {
} }
} }
private static void deleteAllSLMPolicies() throws IOException {
Map<String, Object> policies;
try {
Response response = adminClient().performRequest(new Request("GET", "/_slm/policy"));
policies = entityAsMap(response);
} catch (ResponseException e) {
if (RestStatus.METHOD_NOT_ALLOWED.getStatus() == e.getResponse().getStatusLine().getStatusCode()) {
// If bad request returned, SLM is not enabled.
return;
}
throw e;
}
if (policies == null || policies.isEmpty()) {
return;
}
for (String policyName : policies.keySet()) {
adminClient().performRequest(new Request("DELETE", "/_slm/policy/" + policyName));
}
}
/** /**
* Logs a message if there are still running tasks. The reasoning is that any tasks still running are state the is trying to bleed into * Logs a message if there are still running tasks. The reasoning is that any tasks still running are state the is trying to bleed into
* other tests. * other tests.
@ -954,6 +1008,7 @@ public abstract class ESRestTestCase extends ESTestCase {
case ".watches": case ".watches":
case "logstash-index-template": case "logstash-index-template":
case "security_audit_log": case "security_audit_log":
case ".slm-history":
return true; return true;
default: default:
return false; return false;

View File

@ -42,6 +42,11 @@ import static org.hamcrest.Matchers.startsWith;
public class SnapshotLifecycleIT extends ESRestTestCase { public class SnapshotLifecycleIT extends ESRestTestCase {
@Override
protected boolean waitForAllSnapshotsWiped() {
return true;
}
public void testMissingRepo() throws Exception { public void testMissingRepo() throws Exception {
SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy("test-policy", "snap", SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy("test-policy", "snap",
"*/1 * * * * ?", "missing-repo", Collections.emptyMap()); "*/1 * * * * ?", "missing-repo", Collections.emptyMap());
@ -115,12 +120,6 @@ public class SnapshotLifecycleIT extends ESRestTestCase {
Request delReq = new Request("DELETE", "/_slm/policy/" + policyName); Request delReq = new Request("DELETE", "/_slm/policy/" + policyName);
assertOK(client().performRequest(delReq)); assertOK(client().performRequest(delReq));
// It's possible there could have been a snapshot in progress when the
// policy is deleted, so wait for it to be finished
assertBusy(() -> {
assertThat(wipeSnapshots().size(), equalTo(0));
});
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -159,9 +158,6 @@ public class SnapshotLifecycleIT extends ESRestTestCase {
} }
assertHistoryIsPresent(policyName, false, repoName); assertHistoryIsPresent(policyName, false, repoName);
}); });
Request delReq = new Request("DELETE", "/_slm/policy/" + policyName);
assertOK(client().performRequest(delReq));
} }
public void testPolicyManualExecution() throws Exception { public void testPolicyManualExecution() throws Exception {
@ -207,15 +203,6 @@ public class SnapshotLifecycleIT extends ESRestTestCase {
} }
}); });
} }
Request delReq = new Request("DELETE", "/_slm/policy/" + policyName);
assertOK(client().performRequest(delReq));
// It's possible there could have been a snapshot in progress when the
// policy is deleted, so wait for it to be finished
assertBusy(() -> {
assertThat(wipeSnapshots().size(), equalTo(0));
});
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -269,15 +256,6 @@ public class SnapshotLifecycleIT extends ESRestTestCase {
// Cancel the snapshot since it is not going to complete quickly // Cancel the snapshot since it is not going to complete quickly
assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + repoId + "/" + snapshotName))); assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + repoId + "/" + snapshotName)));
} }
Request delReq = new Request("DELETE", "/_slm/policy/" + policyName);
assertOK(client().performRequest(delReq));
// It's possible there could have been a snapshot in progress when the
// policy is deleted, so wait for it to be finished
assertBusy(() -> {
assertThat(wipeSnapshots().size(), equalTo(0));
});
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")