mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Clean up tests in Reindex module
This commit is contained in:
parent
fe05ee8552
commit
6d288dec11
@ -26,10 +26,9 @@ import org.hamcrest.TypeSafeMatcher;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public abstract class AbstractBulkIndexByScrollResponseMatcher<
|
||||
Response extends BulkIndexByScrollResponse,
|
||||
Self extends AbstractBulkIndexByScrollResponseMatcher<Response, Self>>
|
||||
extends TypeSafeMatcher<Response> {
|
||||
public class BulkIndexByScrollResponseMatcher extends TypeSafeMatcher<BulkIndexByScrollResponse> {
|
||||
|
||||
private Matcher<Long> createdMatcher = equalTo(0L);
|
||||
private Matcher<Long> updatedMatcher = equalTo(0L);
|
||||
/**
|
||||
* Matches for number of batches. Optional.
|
||||
@ -39,14 +38,21 @@ public abstract class AbstractBulkIndexByScrollResponseMatcher<
|
||||
private Matcher<Integer> failuresMatcher = equalTo(0);
|
||||
private Matcher<String> reasonCancelledMatcher = nullValue(String.class);
|
||||
|
||||
protected abstract Self self();
|
||||
|
||||
public Self updated(Matcher<Long> updatedMatcher) {
|
||||
this.updatedMatcher = updatedMatcher;
|
||||
return self();
|
||||
public BulkIndexByScrollResponseMatcher created(Matcher<Long> createdMatcher) {
|
||||
this.createdMatcher = createdMatcher;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Self updated(long updated) {
|
||||
public BulkIndexByScrollResponseMatcher created(long created) {
|
||||
return created(equalTo(created));
|
||||
}
|
||||
|
||||
public BulkIndexByScrollResponseMatcher updated(Matcher<Long> updatedMatcher) {
|
||||
this.updatedMatcher = updatedMatcher;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BulkIndexByScrollResponseMatcher updated(long updated) {
|
||||
return updated(equalTo(updated));
|
||||
}
|
||||
|
||||
@ -55,26 +61,26 @@ public abstract class AbstractBulkIndexByScrollResponseMatcher<
|
||||
* integer because we usually don't care about how many batches the job
|
||||
* takes.
|
||||
*/
|
||||
public Self batches(Matcher<Integer> batchesMatcher) {
|
||||
public BulkIndexByScrollResponseMatcher batches(Matcher<Integer> batchesMatcher) {
|
||||
this.batchesMatcher = batchesMatcher;
|
||||
return self();
|
||||
return this;
|
||||
}
|
||||
|
||||
public Self batches(int batches) {
|
||||
public BulkIndexByScrollResponseMatcher batches(int batches) {
|
||||
return batches(equalTo(batches));
|
||||
}
|
||||
|
||||
public Self batches(int total, int batchSize) {
|
||||
public BulkIndexByScrollResponseMatcher batches(int total, int batchSize) {
|
||||
// Round up
|
||||
return batches((total + batchSize - 1) / batchSize);
|
||||
}
|
||||
|
||||
public Self versionConflicts(Matcher<Long> versionConflictsMatcher) {
|
||||
public BulkIndexByScrollResponseMatcher versionConflicts(Matcher<Long> versionConflictsMatcher) {
|
||||
this.versionConflictsMatcher = versionConflictsMatcher;
|
||||
return self();
|
||||
return this;
|
||||
}
|
||||
|
||||
public Self versionConflicts(long versionConflicts) {
|
||||
public BulkIndexByScrollResponseMatcher versionConflicts(long versionConflicts) {
|
||||
return versionConflicts(equalTo(versionConflicts));
|
||||
}
|
||||
|
||||
@ -83,35 +89,37 @@ public abstract class AbstractBulkIndexByScrollResponseMatcher<
|
||||
* matching do it by hand. The type signatures required to match the
|
||||
* actual failures list here just don't work.
|
||||
*/
|
||||
public Self failures(Matcher<Integer> failuresMatcher) {
|
||||
public BulkIndexByScrollResponseMatcher failures(Matcher<Integer> failuresMatcher) {
|
||||
this.failuresMatcher = failuresMatcher;
|
||||
return self();
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the expected size of the failures list.
|
||||
*/
|
||||
public Self failures(int failures) {
|
||||
public BulkIndexByScrollResponseMatcher failures(int failures) {
|
||||
return failures(equalTo(failures));
|
||||
}
|
||||
|
||||
public Self reasonCancelled(Matcher<String> reasonCancelledMatcher) {
|
||||
public BulkIndexByScrollResponseMatcher reasonCancelled(Matcher<String> reasonCancelledMatcher) {
|
||||
this.reasonCancelledMatcher = reasonCancelledMatcher;
|
||||
return self();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean matchesSafely(Response item) {
|
||||
protected boolean matchesSafely(BulkIndexByScrollResponse item) {
|
||||
return updatedMatcher.matches(item.getUpdated()) &&
|
||||
(batchesMatcher == null || batchesMatcher.matches(item.getBatches())) &&
|
||||
versionConflictsMatcher.matches(item.getVersionConflicts()) &&
|
||||
failuresMatcher.matches(item.getIndexingFailures().size()) &&
|
||||
reasonCancelledMatcher.matches(item.getReasonCancelled());
|
||||
createdMatcher.matches(item.getCreated()) &&
|
||||
(batchesMatcher == null || batchesMatcher.matches(item.getBatches())) &&
|
||||
versionConflictsMatcher.matches(item.getVersionConflicts()) &&
|
||||
failuresMatcher.matches(item.getIndexingFailures().size()) &&
|
||||
reasonCancelledMatcher.matches(item.getReasonCancelled());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeTo(Description description) {
|
||||
description.appendText("indexed matches ").appendDescriptionOf(updatedMatcher);
|
||||
description.appendText(" and created matches ").appendDescriptionOf(createdMatcher);
|
||||
if (batchesMatcher != null) {
|
||||
description.appendText(" and batches matches ").appendDescriptionOf(batchesMatcher);
|
||||
}
|
@ -37,22 +37,22 @@ public class ReindexBasicTests extends ReindexTestCase {
|
||||
|
||||
// Copy all the docs
|
||||
ReindexRequestBuilder copy = reindex().source("source").destination("dest", "all").refresh(true);
|
||||
assertThat(copy.get(), reindexResponseMatcher().created(4));
|
||||
assertThat(copy.get(), matcher().created(4));
|
||||
assertHitCount(client().prepareSearch("dest").setTypes("all").setSize(0).get(), 4);
|
||||
|
||||
// Now none of them
|
||||
copy = reindex().source("source").destination("all", "none").filter(termQuery("foo", "no_match")).refresh(true);
|
||||
assertThat(copy.get(), reindexResponseMatcher().created(0));
|
||||
assertThat(copy.get(), matcher().created(0));
|
||||
assertHitCount(client().prepareSearch("dest").setTypes("none").setSize(0).get(), 0);
|
||||
|
||||
// Now half of them
|
||||
copy = reindex().source("source").destination("dest", "half").filter(termQuery("foo", "a")).refresh(true);
|
||||
assertThat(copy.get(), reindexResponseMatcher().created(2));
|
||||
assertThat(copy.get(), matcher().created(2));
|
||||
assertHitCount(client().prepareSearch("dest").setTypes("half").setSize(0).get(), 2);
|
||||
|
||||
// Limit with size
|
||||
copy = reindex().source("source").destination("dest", "size_one").size(1).refresh(true);
|
||||
assertThat(copy.get(), reindexResponseMatcher().created(1));
|
||||
assertThat(copy.get(), matcher().created(1));
|
||||
assertHitCount(client().prepareSearch("dest").setTypes("size_one").setSize(0).get(), 1);
|
||||
}
|
||||
|
||||
@ -70,7 +70,7 @@ public class ReindexBasicTests extends ReindexTestCase {
|
||||
ReindexRequestBuilder copy = reindex().source("source").destination("dest", "all").refresh(true);
|
||||
// Use a small batch size so we have to use more than one batch
|
||||
copy.source().setSize(5);
|
||||
assertThat(copy.get(), reindexResponseMatcher().created(max).batches(max, 5));
|
||||
assertThat(copy.get(), matcher().created(max).batches(max, 5));
|
||||
assertHitCount(client().prepareSearch("dest").setTypes("all").setSize(0).get(), max);
|
||||
|
||||
// Copy some of the docs
|
||||
@ -79,7 +79,7 @@ public class ReindexBasicTests extends ReindexTestCase {
|
||||
// Use a small batch size so we have to use more than one batch
|
||||
copy.source().setSize(5);
|
||||
copy.size(half); // The real "size" of the request.
|
||||
assertThat(copy.get(), reindexResponseMatcher().created(half).batches(half, 5));
|
||||
assertThat(copy.get(), matcher().created(half).batches(half, 5));
|
||||
assertHitCount(client().prepareSearch("dest").setTypes("half").setSize(0).get(), half);
|
||||
}
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ public class ReindexCancelTests extends ReindexTestCase {
|
||||
public void testCancel() throws Exception {
|
||||
BulkIndexByScrollResponse response = CancelTestUtils.testCancel(this, reindex().destination("dest", "test"), ReindexAction.NAME);
|
||||
|
||||
assertThat(response, reindexResponseMatcher().created(1).reasonCancelled(equalTo("by user request")));
|
||||
assertThat(response, matcher().created(1).reasonCancelled(equalTo("by user request")));
|
||||
refresh("dest");
|
||||
assertHitCount(client().prepareSearch("dest").setSize(0).get(), 1);
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ public class ReindexFailureTests extends ReindexTestCase {
|
||||
copy.source().setSize(1);
|
||||
|
||||
BulkIndexByScrollResponse response = copy.get();
|
||||
assertThat(response, reindexResponseMatcher()
|
||||
assertThat(response, matcher()
|
||||
.batches(1)
|
||||
.failures(both(greaterThan(0)).and(lessThanOrEqualTo(maximumNumberOfShards()))));
|
||||
for (Failure failure: response.getIndexingFailures()) {
|
||||
@ -78,7 +78,7 @@ public class ReindexFailureTests extends ReindexTestCase {
|
||||
copy.destination().setOpType(CREATE);
|
||||
|
||||
BulkIndexByScrollResponse response = copy.get();
|
||||
assertThat(response, reindexResponseMatcher().batches(1).versionConflicts(1).failures(1).created(99));
|
||||
assertThat(response, matcher().batches(1).versionConflicts(1).failures(1).created(99));
|
||||
for (Failure failure: response.getIndexingFailures()) {
|
||||
assertThat(failure.getMessage(), containsString("VersionConflictEngineException[[test]["));
|
||||
}
|
||||
|
@ -43,18 +43,18 @@ public class ReindexParentChildTests extends ReindexTestCase {
|
||||
|
||||
// Copy parent to the new index
|
||||
ReindexRequestBuilder copy = reindex().source("source").destination("dest").filter(findsCountry).refresh(true);
|
||||
assertThat(copy.get(), reindexResponseMatcher().created(1));
|
||||
assertThat(copy.get(), matcher().created(1));
|
||||
|
||||
// Copy the child to a new index
|
||||
copy = reindex().source("source").destination("dest").filter(findsCity).refresh(true);
|
||||
assertThat(copy.get(), reindexResponseMatcher().created(1));
|
||||
assertThat(copy.get(), matcher().created(1));
|
||||
|
||||
// Make sure parent/child is intact on that index
|
||||
assertSearchHits(client().prepareSearch("dest").setQuery(findsCity).get(), "pittsburgh");
|
||||
|
||||
// Copy the grandchild to a new index
|
||||
copy = reindex().source("source").destination("dest").filter(findsNeighborhood).refresh(true);
|
||||
assertThat(copy.get(), reindexResponseMatcher().created(1));
|
||||
assertThat(copy.get(), matcher().created(1));
|
||||
|
||||
// Make sure parent/child is intact on that index
|
||||
assertSearchHits(client().prepareSearch("dest").setQuery(findsNeighborhood).get(),
|
||||
@ -63,7 +63,7 @@ public class ReindexParentChildTests extends ReindexTestCase {
|
||||
// Copy the parent/child/grandchild structure all at once to a third index
|
||||
createParentChildIndex("dest_all_at_once");
|
||||
copy = reindex().source("source").destination("dest_all_at_once").refresh(true);
|
||||
assertThat(copy.get(), reindexResponseMatcher().created(3));
|
||||
assertThat(copy.get(), matcher().created(3));
|
||||
|
||||
// Make sure parent/child/grandchild is intact there too
|
||||
assertSearchHits(client().prepareSearch("dest_all_at_once").setQuery(findsNeighborhood).get(),
|
||||
|
@ -22,16 +22,14 @@ package org.elasticsearch.index.reindex;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.hamcrest.Description;
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@ClusterScope(scope = SUITE, transportClientRatio = 0)
|
||||
public abstract class ReindexTestCase extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(ReindexPlugin.class);
|
||||
@ -41,57 +39,15 @@ public abstract class ReindexTestCase extends ESIntegTestCase {
|
||||
return ReindexAction.INSTANCE.newRequestBuilder(client());
|
||||
}
|
||||
|
||||
protected IndexBySearchResponseMatcher reindexResponseMatcher() {
|
||||
return new IndexBySearchResponseMatcher();
|
||||
}
|
||||
|
||||
protected UpdateByQueryRequestBuilder updateByQuery() {
|
||||
return UpdateByQueryAction.INSTANCE.newRequestBuilder(client());
|
||||
}
|
||||
|
||||
protected BulkIndexbyScrollResponseMatcher updateByQueryResponseMatcher() {
|
||||
return new BulkIndexbyScrollResponseMatcher();
|
||||
}
|
||||
|
||||
protected RethrottleRequestBuilder rethrottle() {
|
||||
return RethrottleAction.INSTANCE.newRequestBuilder(client());
|
||||
}
|
||||
|
||||
protected static class IndexBySearchResponseMatcher
|
||||
extends AbstractBulkIndexByScrollResponseMatcher<BulkIndexByScrollResponse, IndexBySearchResponseMatcher> {
|
||||
private Matcher<Long> createdMatcher = equalTo(0L);
|
||||
|
||||
public IndexBySearchResponseMatcher created(Matcher<Long> updatedMatcher) {
|
||||
this.createdMatcher = updatedMatcher;
|
||||
return this;
|
||||
}
|
||||
|
||||
public IndexBySearchResponseMatcher created(long created) {
|
||||
return created(equalTo(created));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean matchesSafely(BulkIndexByScrollResponse item) {
|
||||
return super.matchesSafely(item) && createdMatcher.matches(item.getCreated());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeTo(Description description) {
|
||||
super.describeTo(description);
|
||||
description.appendText(" and created matches ").appendDescriptionOf(createdMatcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected IndexBySearchResponseMatcher self() {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
protected static class BulkIndexbyScrollResponseMatcher
|
||||
extends AbstractBulkIndexByScrollResponseMatcher<BulkIndexByScrollResponse, BulkIndexbyScrollResponseMatcher> {
|
||||
@Override
|
||||
protected BulkIndexbyScrollResponseMatcher self() {
|
||||
return this;
|
||||
}
|
||||
protected static BulkIndexByScrollResponseMatcher matcher() {
|
||||
return new BulkIndexByScrollResponseMatcher();
|
||||
}
|
||||
}
|
||||
|
@ -33,55 +33,55 @@ public class ReindexVersioningTests extends ReindexTestCase {
|
||||
|
||||
public void testExternalVersioningCreatesWhenAbsentAndSetsVersion() throws Exception {
|
||||
setupSourceAbsent();
|
||||
assertThat(reindexExternal(), reindexResponseMatcher().created(1));
|
||||
assertThat(reindexExternal(), matcher().created(1));
|
||||
assertDest("source", SOURCE_VERSION);
|
||||
}
|
||||
|
||||
public void testExternalVersioningUpdatesOnOlderAndSetsVersion() throws Exception {
|
||||
setupDestOlder();
|
||||
assertThat(reindexExternal(), reindexResponseMatcher().updated(1));
|
||||
assertThat(reindexExternal(), matcher().updated(1));
|
||||
assertDest("source", SOURCE_VERSION);
|
||||
}
|
||||
|
||||
public void testExternalVersioningVersionConflictsOnNewer() throws Exception {
|
||||
setupDestNewer();
|
||||
assertThat(reindexExternal(), reindexResponseMatcher().versionConflicts(1));
|
||||
assertThat(reindexExternal(), matcher().versionConflicts(1));
|
||||
assertDest("dest", NEWER_VERSION);
|
||||
}
|
||||
|
||||
public void testInternalVersioningCreatesWhenAbsent() throws Exception {
|
||||
setupSourceAbsent();
|
||||
assertThat(reindexInternal(), reindexResponseMatcher().created(1));
|
||||
assertThat(reindexInternal(), matcher().created(1));
|
||||
assertDest("source", 1);
|
||||
}
|
||||
|
||||
public void testInternalVersioningUpdatesOnOlder() throws Exception {
|
||||
setupDestOlder();
|
||||
assertThat(reindexInternal(), reindexResponseMatcher().updated(1));
|
||||
assertThat(reindexInternal(), matcher().updated(1));
|
||||
assertDest("source", OLDER_VERSION + 1);
|
||||
}
|
||||
|
||||
public void testInternalVersioningUpdatesOnNewer() throws Exception {
|
||||
setupDestNewer();
|
||||
assertThat(reindexInternal(), reindexResponseMatcher().updated(1));
|
||||
assertThat(reindexInternal(), matcher().updated(1));
|
||||
assertDest("source", NEWER_VERSION + 1);
|
||||
}
|
||||
|
||||
public void testCreateCreatesWhenAbsent() throws Exception {
|
||||
setupSourceAbsent();
|
||||
assertThat(reindexCreate(), reindexResponseMatcher().created(1));
|
||||
assertThat(reindexCreate(), matcher().created(1));
|
||||
assertDest("source", 1);
|
||||
}
|
||||
|
||||
public void testCreateVersionConflictsOnOlder() throws Exception {
|
||||
setupDestOlder();
|
||||
assertThat(reindexCreate(), reindexResponseMatcher().versionConflicts(1));
|
||||
assertThat(reindexCreate(), matcher().versionConflicts(1));
|
||||
assertDest("dest", OLDER_VERSION);
|
||||
}
|
||||
|
||||
public void testCreateVersionConflictsOnNewer() throws Exception {
|
||||
setupDestNewer();
|
||||
assertThat(reindexCreate(), reindexResponseMatcher().versionConflicts(1));
|
||||
assertThat(reindexCreate(), matcher().versionConflicts(1));
|
||||
assertDest("dest", NEWER_VERSION);
|
||||
}
|
||||
|
||||
|
@ -35,19 +35,17 @@ public class UpdateByQueryBasicTests extends ReindexTestCase {
|
||||
assertEquals(1, client().prepareGet("test", "test", "4").get().getVersion());
|
||||
|
||||
// Reindex all the docs
|
||||
assertThat(updateByQuery().source("test").refresh(true).get(), updateByQueryResponseMatcher().updated(4));
|
||||
assertThat(updateByQuery().source("test").refresh(true).get(), matcher().updated(4));
|
||||
assertEquals(2, client().prepareGet("test", "test", "1").get().getVersion());
|
||||
assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion());
|
||||
|
||||
// Now none of them
|
||||
assertThat(updateByQuery().source("test").filter(termQuery("foo", "no_match")).refresh(true).get(),
|
||||
updateByQueryResponseMatcher().updated(0));
|
||||
assertThat(updateByQuery().source("test").filter(termQuery("foo", "no_match")).refresh(true).get(), matcher().updated(0));
|
||||
assertEquals(2, client().prepareGet("test", "test", "1").get().getVersion());
|
||||
assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion());
|
||||
|
||||
// Now half of them
|
||||
assertThat(updateByQuery().source("test").filter(termQuery("foo", "a")).refresh(true).get(),
|
||||
updateByQueryResponseMatcher().updated(2));
|
||||
assertThat(updateByQuery().source("test").filter(termQuery("foo", "a")).refresh(true).get(), matcher().updated(2));
|
||||
assertEquals(3, client().prepareGet("test", "test", "1").get().getVersion());
|
||||
assertEquals(3, client().prepareGet("test", "test", "2").get().getVersion());
|
||||
assertEquals(2, client().prepareGet("test", "test", "3").get().getVersion());
|
||||
@ -56,7 +54,7 @@ public class UpdateByQueryBasicTests extends ReindexTestCase {
|
||||
// Limit with size
|
||||
UpdateByQueryRequestBuilder request = updateByQuery().source("test").size(3).refresh(true);
|
||||
request.source().addSort("foo.keyword", SortOrder.ASC);
|
||||
assertThat(request.get(), updateByQueryResponseMatcher().updated(3));
|
||||
assertThat(request.get(), matcher().updated(3));
|
||||
// Only the first three documents are updated because of sort
|
||||
assertEquals(4, client().prepareGet("test", "test", "1").get().getVersion());
|
||||
assertEquals(4, client().prepareGet("test", "test", "2").get().getVersion());
|
||||
|
@ -36,7 +36,7 @@ public class UpdateByQueryCancelTests extends ReindexTestCase {
|
||||
public void testCancel() throws Exception {
|
||||
BulkIndexByScrollResponse response = CancelTestUtils.testCancel(this, updateByQuery(), UpdateByQueryAction.NAME);
|
||||
|
||||
assertThat(response, updateByQueryResponseMatcher().updated(1).reasonCancelled(equalTo("by user request")));
|
||||
assertThat(response, matcher().updated(1).reasonCancelled(equalTo("by user request")));
|
||||
refresh("source");
|
||||
assertHitCount(client().prepareSearch("source").setSize(0).setQuery(matchQuery("giraffes", "giraffes")).get(), 1);
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ public class UpdateByQueryWhileModifyingTests extends ReindexTestCase {
|
||||
while (keepUpdating.get()) {
|
||||
try {
|
||||
BulkIndexByScrollResponse response = updateByQuery().source("test").refresh(true).abortOnVersionConflict(false).get();
|
||||
assertThat(response, updateByQueryResponseMatcher().updated(either(equalTo(0L)).or(equalTo(1L)))
|
||||
assertThat(response, matcher().updated(either(equalTo(0L)).or(equalTo(1L)))
|
||||
.versionConflicts(either(equalTo(0L)).or(equalTo(1L))));
|
||||
} catch (Throwable t) {
|
||||
failure.set(t);
|
||||
|
Loading…
x
Reference in New Issue
Block a user