CRUD: Fix wait for refresh tests (#33973)
When we implemented `refresh=wait_for` I added a test with the wrong name. This caused us to not run it. The test asserted that running several operations with `refresh=wait_for` did not fail if the index was `_close`d while the operations were waiting. But to be honest, failure here isn't that bad. The index being waited on is closed. You can't do anything with it any way. The most important thing is actually that these operations don't hang forever. Because hanging forever means that the resources used by the operations aren't freed. Anyway, when I noticed the error I reenabled the test. But they don't pass consistently because *sometimes* the operations being tested fail. They don't seem to hang and they always fail with "this index is closed so you can't do anything with it" sorts of messages. When the test started failing we disabled it again. This reenables the test but causes it to ignore these "index is closed" failures. We'd prefer they not happen at all but in the grand scheme of things they are fine and making sure these operations don't hang is much more important. This also updates the test to bring it more in line with my current understanding of the "right" way to use the low level rest client.
This commit is contained in:
parent
16956a1a05
commit
c01b6ffb80
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.test.rest;
|
||||
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Response;
|
||||
|
@ -31,57 +30,53 @@ import org.junit.After;
|
|||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
/**
|
||||
* Tests that wait for refresh is fired if the index is closed.
|
||||
*/
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33533")
|
||||
public class WaitForRefreshAndCloseIT extends ESRestTestCase {
|
||||
@Before
|
||||
public void setupIndex() throws IOException {
|
||||
try {
|
||||
client().performRequest(new Request("DELETE", indexName()));
|
||||
} catch (ResponseException e) {
|
||||
// If we get an error, it should be because the index doesn't exist
|
||||
assertEquals(404, e.getResponse().getStatusLine().getStatusCode());
|
||||
}
|
||||
Request request = new Request("PUT", indexName());
|
||||
Request request = new Request("PUT", "/test");
|
||||
request.setJsonEntity("{\"settings\":{\"refresh_interval\":-1}}");
|
||||
client().performRequest(request);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanupIndex() throws IOException {
|
||||
client().performRequest(new Request("DELETE", indexName()));
|
||||
}
|
||||
|
||||
private String indexName() {
|
||||
return getTestName().toLowerCase(Locale.ROOT);
|
||||
client().performRequest(new Request("DELETE", "/test"));
|
||||
}
|
||||
|
||||
private String docPath() {
|
||||
return indexName() + "/test/1";
|
||||
return "test/_doc/1";
|
||||
}
|
||||
|
||||
public void testIndexAndThenClose() throws Exception {
|
||||
closeWhileListenerEngaged(start("PUT", "", "{\"test\":\"test\"}"));
|
||||
Request request = new Request("PUT", docPath());
|
||||
request.setJsonEntity("{\"test\":\"test\"}");
|
||||
closeWhileListenerEngaged(start(request));
|
||||
}
|
||||
|
||||
public void testUpdateAndThenClose() throws Exception {
|
||||
Request request = new Request("PUT", docPath());
|
||||
request.setJsonEntity("{\"test\":\"test\"}");
|
||||
client().performRequest(request);
|
||||
closeWhileListenerEngaged(start("POST", "/_update", "{\"doc\":{\"name\":\"test\"}}"));
|
||||
Request createDoc = new Request("PUT", docPath());
|
||||
createDoc.setJsonEntity("{\"test\":\"test\"}");
|
||||
client().performRequest(createDoc);
|
||||
Request updateDoc = new Request("POST", docPath() + "/_update");
|
||||
updateDoc.setJsonEntity("{\"doc\":{\"name\":\"test\"}}");
|
||||
closeWhileListenerEngaged(start(updateDoc));
|
||||
}
|
||||
|
||||
public void testDeleteAndThenClose() throws Exception {
|
||||
Request request = new Request("PUT", docPath());
|
||||
request.setJsonEntity("{\"test\":\"test\"}");
|
||||
client().performRequest(request);
|
||||
closeWhileListenerEngaged(start("DELETE", "", null));
|
||||
closeWhileListenerEngaged(start(new Request("DELETE", docPath())));
|
||||
}
|
||||
|
||||
private void closeWhileListenerEngaged(ActionFuture<String> future) throws Exception {
|
||||
|
@ -89,40 +84,52 @@ public class WaitForRefreshAndCloseIT extends ESRestTestCase {
|
|||
assertBusy(() -> {
|
||||
Map<String, Object> stats;
|
||||
try {
|
||||
stats = entityAsMap(client().performRequest(new Request("GET", indexName() + "/_stats/refresh")));
|
||||
stats = entityAsMap(client().performRequest(new Request("GET", "/test/_stats/refresh")));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> indices = (Map<String, Object>) stats.get("indices");
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> theIndex = (Map<String, Object>) indices.get(indexName());
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> total = (Map<String, Object>) theIndex.get("total");
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> refresh = (Map<String, Object>) total.get("refresh");
|
||||
int listeners = (int) refresh.get("listeners");
|
||||
Map<?, ?> indices = (Map<?, ?>) stats.get("indices");
|
||||
Map<?, ?> theIndex = (Map<?, ?>) indices.get("test");
|
||||
Map<?, ?> total = (Map<?, ?>) theIndex.get("total");
|
||||
Map<?, ?> refresh = (Map<?, ?>) total.get("refresh");
|
||||
int listeners = (Integer) refresh.get("listeners");
|
||||
assertEquals(1, listeners);
|
||||
});
|
||||
|
||||
// Close the index. That should flush the listener.
|
||||
client().performRequest(new Request("POST", indexName() + "/_close"));
|
||||
client().performRequest(new Request("POST", "/test/_close"));
|
||||
|
||||
// The request shouldn't fail. It certainly shouldn't hang.
|
||||
future.get();
|
||||
/*
|
||||
* The request may fail, but we really, really, really want to make
|
||||
* sure that it doesn't time out.
|
||||
*/
|
||||
try {
|
||||
future.get(1, TimeUnit.MINUTES);
|
||||
} catch (ExecutionException ee) {
|
||||
/*
|
||||
* If it *does* fail it should fail with a FORBIDDEN error because
|
||||
* it attempts to take an action on a closed index. Again, it'd be
|
||||
* nice if all requests waiting for refresh came back even though
|
||||
* the index is closed and most do, but sometimes they bump into
|
||||
* the index being closed. At least they don't hang forever. That'd
|
||||
* be a nightmare.
|
||||
*/
|
||||
assertThat(ee.getCause(), instanceOf(ResponseException.class));
|
||||
ResponseException re = (ResponseException) ee.getCause();
|
||||
assertEquals(403, re.getResponse().getStatusLine().getStatusCode());
|
||||
assertThat(EntityUtils.toString(re.getResponse().getEntity()), containsString("FORBIDDEN/4/index closed"));
|
||||
}
|
||||
}
|
||||
|
||||
private ActionFuture<String> start(String method, String path, String body) {
|
||||
private ActionFuture<String> start(Request request) {
|
||||
PlainActionFuture<String> future = new PlainActionFuture<>();
|
||||
Request request = new Request(method, docPath() + path);
|
||||
request.addParameter("refresh", "wait_for");
|
||||
request.addParameter("error_trace", "");
|
||||
request.setJsonEntity(body);
|
||||
client().performRequestAsync(request, new ResponseListener() {
|
||||
@Override
|
||||
public void onSuccess(Response response) {
|
||||
try {
|
||||
future.onResponse(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8));
|
||||
future.onResponse(EntityUtils.toString(response.getEntity()));
|
||||
} catch (IOException e) {
|
||||
future.onFailure(e);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue