[reindex] Move refresh tests to unit test

The refresh tests were failing rarely due to refreshes happening
automatically on indexes with -1 refresh intervals. This commit moves
the refresh test into a unit test where we can check if it was attempted
so we never get false failures from background refreshes.

It also stopped refresh from being run if the reindex request was canceled.
This commit is contained in:
Nik Everett 2016-03-10 15:22:35 -05:00
parent 9d340e6b08
commit ebc12690bc
4 changed files with 48 additions and 85 deletions

View File

@ -47,6 +47,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -267,9 +268,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
throw new IllegalArgumentException("Unknown op type: " + item.getOpType()); throw new IllegalArgumentException("Unknown op type: " + item.getOpType());
} }
// Track the indexes we've seen so we can refresh them if requested // Track the indexes we've seen so we can refresh them if requested
destinationIndices.add(item.getIndex()); destinationIndicesThisBatch.add(item.getIndex());
} }
destinationIndices.addAll(destinationIndicesThisBatch); addDestinationIndices(destinationIndicesThisBatch);
if (false == failures.isEmpty()) { if (false == failures.isEmpty()) {
startNormalTermination(unmodifiableList(failures), emptyList(), false); startNormalTermination(unmodifiableList(failures), emptyList(), false);
@ -318,7 +319,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
} }
void startNormalTermination(List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures, boolean timedOut) { void startNormalTermination(List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures, boolean timedOut) {
if (false == mainRequest.isRefresh()) { if (task.isCancelled() || false == mainRequest.isRefresh()) {
finishHim(null, indexingFailures, searchFailures, timedOut); finishHim(null, indexingFailures, searchFailures, timedOut);
return; return;
} }
@ -390,6 +391,14 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
return exponentialBackoff(mainRequest.getRetryBackoffInitialTime(), mainRequest.getMaxRetries()); return exponentialBackoff(mainRequest.getRetryBackoffInitialTime(), mainRequest.getMaxRetries());
} }
/**
* Add to the list of indices that were modified by this request. This is the list of indices refreshed at the end of the request if the
* request asks for a refresh.
*/
void addDestinationIndices(Collection<String> indices) {
destinationIndices.addAll(indices);
}
/** /**
* Wraps a backoffPolicy in another policy that counts the number of backoffs acquired. * Wraps a backoffPolicy in another policy that counts the number of backoffs acquired.
*/ */

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
@ -74,10 +75,12 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static org.apache.lucene.util.TestUtil.randomSimpleString; import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff; import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@ -388,6 +391,32 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
assertEquals(defaultBackoffBeforeFailing, millis); assertEquals(defaultBackoffBeforeFailing, millis);
} }
public void testRefreshIsFalseByDefault() throws Exception {
refreshTestCase(null, false);
}
public void testRefreshFalseDoesntMakeVisible() throws Exception {
refreshTestCase(false, false);
}
public void testRefreshTrueMakesVisible() throws Exception {
refreshTestCase(true, true);
}
private void refreshTestCase(Boolean refresh, boolean shouldRefresh) {
if (refresh != null) {
mainRequest.setRefresh(refresh);
}
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
action.addDestinationIndices(singleton("foo"));
action.startNormalTermination(emptyList(), emptyList(), false);
if (shouldRefresh) {
assertArrayEquals(new String[] {"foo"}, client.lastRefreshRequest.get().indices());
} else {
assertNull("No refresh was attempted", client.lastRefreshRequest.get());
}
}
public void testCancelBeforeInitialSearch() throws Exception { public void testCancelBeforeInitialSearch() throws Exception {
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.initialSearch()); cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.initialSearch());
} }
@ -415,7 +444,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
// Refresh or not doesn't matter - we don't try to refresh. // Refresh or not doesn't matter - we don't try to refresh.
mainRequest.setRefresh(usually()); mainRequest.setRefresh(usually());
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNormalTermination(emptyList(), emptyList(), false)); cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNormalTermination(emptyList(), emptyList(), false));
// This wouldn't return if we called refresh - the action would hang waiting for the refresh that we haven't mocked. assertNull("No refresh was attempted", client.lastRefreshRequest.get());
} }
private void cancelTaskCase(Consumer<DummyAbstractAsyncBulkByScrollAction> testMe) throws Exception { private void cancelTaskCase(Consumer<DummyAbstractAsyncBulkByScrollAction> testMe) throws Exception {
@ -463,6 +492,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
private static class MyMockClient extends FilterClient { private static class MyMockClient extends FilterClient {
private final List<String> scrollsCleared = new ArrayList<>(); private final List<String> scrollsCleared = new ArrayList<>();
private final AtomicInteger bulksAttempts = new AtomicInteger(); private final AtomicInteger bulksAttempts = new AtomicInteger();
private final AtomicReference<RefreshRequest> lastRefreshRequest = new AtomicReference<>();
private int bulksToReject = 0; private int bulksToReject = 0;
@ -475,6 +505,11 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
protected <Request extends ActionRequest<Request>, Response extends ActionResponse, protected <Request extends ActionRequest<Request>, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute( RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) { Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
if (request instanceof RefreshRequest) {
lastRefreshRequest.set((RefreshRequest) request);
listener.onResponse(null);
return;
}
if (request instanceof ClearScrollRequest) { if (request instanceof ClearScrollRequest) {
ClearScrollRequest clearScroll = (ClearScrollRequest) request; ClearScrollRequest clearScroll = (ClearScrollRequest) request;
scrollsCleared.addAll(clearScroll.getScrollIds()); scrollsCleared.addAll(clearScroll.getScrollIds());

View File

@ -19,14 +19,12 @@
package org.elasticsearch.index.reindex; package org.elasticsearch.index.reindex;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
public class ReindexBasicTests extends ReindexTestCase { public class ReindexBasicTests extends ReindexTestCase {
@ -84,40 +82,4 @@ public class ReindexBasicTests extends ReindexTestCase {
assertThat(copy.get(), responseMatcher().created(half).batches(half, 5)); assertThat(copy.get(), responseMatcher().created(half).batches(half, 5));
assertHitCount(client().prepareSearch("dest").setTypes("half").setSize(0).get(), half); assertHitCount(client().prepareSearch("dest").setTypes("half").setSize(0).get(), half);
} }
public void testRefreshIsFalseByDefault() throws Exception {
refreshTestCase(null, false);
}
public void testRefreshFalseDoesntMakeVisible() throws Exception {
refreshTestCase(false, false);
}
public void testRefreshTrueMakesVisible() throws Exception {
refreshTestCase(true, true);
}
/**
* Executes a reindex into an index with -1 refresh_interval and checks that
* the documents are visible properly.
*/
private void refreshTestCase(Boolean refresh, boolean visible) throws Exception {
CreateIndexRequestBuilder create = client().admin().indices().prepareCreate("dest").setSettings("refresh_interval", -1);
assertAcked(create);
ensureYellow();
indexRandom(true, client().prepareIndex("source", "test", "1").setSource("foo", "a"),
client().prepareIndex("source", "test", "2").setSource("foo", "a"),
client().prepareIndex("source", "test", "3").setSource("foo", "b"),
client().prepareIndex("source", "test", "4").setSource("foo", "c"));
assertHitCount(client().prepareSearch("source").setSize(0).get(), 4);
// Copy all the docs
ReindexRequestBuilder copy = reindex().source("source").destination("dest", "all");
if (refresh != null) {
copy.refresh(refresh);
}
assertThat(copy.get(), responseMatcher().created(4));
assertHitCount(client().prepareSearch("dest").setTypes("all").setSize(0).get(), visible ? 4 : 0);
}
} }

View File

@ -19,12 +19,9 @@
package org.elasticsearch.index.reindex; package org.elasticsearch.index.reindex;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
public class UpdateByQueryBasicTests extends UpdateByQueryTestCase { public class UpdateByQueryBasicTests extends UpdateByQueryTestCase {
@ -64,44 +61,4 @@ public class UpdateByQueryBasicTests extends UpdateByQueryTestCase {
assertEquals(3, client().prepareGet("test", "test", "3").get().getVersion()); assertEquals(3, client().prepareGet("test", "test", "3").get().getVersion());
assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion()); assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion());
} }
public void testRefreshIsFalseByDefault() throws Exception {
refreshTestCase(null, false);
}
public void testRefreshFalseDoesntMakeVisible() throws Exception {
refreshTestCase(false, false);
}
public void testRefreshTrueMakesVisible() throws Exception {
refreshTestCase(true, true);
}
/**
* Executes an update_by_query on an index with -1 refresh_interval and
* checks that the documents are visible properly.
*/
private void refreshTestCase(Boolean refresh, boolean visible) throws Exception {
CreateIndexRequestBuilder create = client().admin().indices().prepareCreate("test").setSettings("refresh_interval", -1);
create.addMapping("test", "{\"dynamic\": \"false\"}");
assertAcked(create);
ensureYellow();
indexRandom(true, client().prepareIndex("test", "test", "1").setSource("foo", "a"),
client().prepareIndex("test", "test", "2").setSource("foo", "a"),
client().prepareIndex("test", "test", "3").setSource("foo", "b"),
client().prepareIndex("test", "test", "4").setSource("foo", "c"));
assertHitCount(client().prepareSearch("test").setQuery(matchQuery("foo", "a")).setSize(0).get(), 0);
// Now make foo searchable
assertAcked(client().admin().indices().preparePutMapping("test").setType("test")
.setSource("{\"test\": {\"properties\":{\"foo\": {\"type\": \"text\"}}}}"));
UpdateByQueryRequestBuilder update = request().source("test");
if (refresh != null) {
update.refresh(refresh);
}
assertThat(update.get(), responseMatcher().updated(4));
assertHitCount(client().prepareSearch("test").setQuery(matchQuery("foo", "a")).setSize(0).get(), visible ? 2 : 0);
}
} }