[reindex] Switch throttle to Float.POSITIVE_INFITINTY/"unlimited"

All other values are errors.

Add java test for throttling. We had a REST test but it only ran against
one node so it didn't catch serialization errors.

Add Simple round trip test for rethrottle request
This commit is contained in:
Nik Everett 2016-04-27 18:10:57 -04:00
parent 67c0734bf3
commit 230697c202
30 changed files with 458 additions and 256 deletions

View File

@ -342,14 +342,15 @@ request. `timeout` controls how long each write request waits for unavailable
shards to become available. Both work exactly how they work in the
<<docs-bulk,Bulk API>>.
`requests_per_second` can be set to any decimal number (1.4, 6, 1000, etc) and
throttle the number of requests per second that the reindex issues. The
`requests_per_second` can be set to any decimal number (`1.4`, `6`, `1000`, etc)
and throttles the number of requests per second that the reindex issues. The
throttling is done waiting between bulk batches so that it can manipulate the
scroll timeout. The wait time is the difference between the time it took the
batch to complete and the time `requests_per_second * requests_in_the_batch`.
Since the batch isn't broken into multiple bulk requests large batch sizes will
cause Elasticsearch to create many requests and then wait for a while before
starting the next set. This is "bursty" instead of "smooth".
starting the next set. This is "bursty" instead of "smooth". The default is
`unlimited` which is also the only non-number value that it accepts.
[float]
=== Response body
@ -464,6 +465,46 @@ progress by adding the `updated`, `created`, and `deleted` fields. The request
will finish when their sum is equal to the `total` field.
[float]
[[docs-reindex-cancel-task-api]]
=== Works with the Cancel Task API
Any Reindex can be canceled using the <<tasks,Task Cancel API>>:
[source,js]
--------------------------------------------------
POST /_tasks/{task_id}/_cancel
--------------------------------------------------
// AUTOSENSE
The `task_id` can be found using the tasks API above.
Cancelation should happen quickly but might take a few seconds. The task status
API above will continue to list the task until it is wakes to cancel itself.
[float]
[[docs-reindex-rethrottle]]
=== Rethrottling
The value of `requests_per_second` can be changed on a running reindex using
the `_rethrottle` API:
[source,js]
--------------------------------------------------
POST /_reindex/{task_id}/_rethrottle?requests_per_second=unlimited
--------------------------------------------------
// AUTOSENSE
The `task_id` can be found using the tasks API above.
Just like when setting it on the `_reindex` API `requests_per_second` can be
either `unlimited` to disable throttling or any decimal number like `1.7` or
`12` to throttle to that level. Rethrottling that speeds up the query takes
effect immediately but rethrotting that slows down the query will take effect
on after completing the current batch. This prevents scroll timeouts.
[float]
=== Reindex to change the name of a field

View File

@ -172,14 +172,15 @@ request. `timeout` controls how long each write request waits for unavailable
shards to become available. Both work exactly how they work in the
<<docs-bulk,Bulk API>>.
`requests_per_second` can be set to any decimal number (1.4, 6, 1000, etc) and
throttle the number of requests per second that the update by query issues. The
throttling is done waiting between bulk batches so that it can manipulate the
scroll timeout. The wait time is the difference between the time it took the
`requests_per_second` can be set to any decimal number (`1.4`, `6`, `1000`, etc)
and throttles the number of requests per second that the update by query issues.
The throttling is done waiting between bulk batches so that it can manipulate
the scroll timeout. The wait time is the difference between the time it took the
batch to complete and the time `requests_per_second * requests_in_the_batch`.
Since the batch isn't broken into multiple bulk requests large batch sizes will
cause Elasticsearch to create many requests and then wait for a while before
starting the next set. This is "bursty" instead of "smooth".
starting the next set. This is "bursty" instead of "smooth". The default is
`unlimited` which is also the only non-number value that it accepts.
[float]
=== Response body
@ -290,6 +291,46 @@ progress by adding the `updated`, `created`, and `deleted` fields. The request
will finish when their sum is equal to the `total` field.
[float]
[[docs-update-by-query-cancel-task-api]]
=== Works with the Cancel Task API
Any Update By Query can be canceled using the <<tasks,Task Cancel API>>:
[source,js]
--------------------------------------------------
POST /_tasks/{task_id}/_cancel
--------------------------------------------------
// AUTOSENSE
The `task_id` can be found using the tasks API above.
Cancelation should happen quickly but might take a few seconds. The task status
API above will continue to list the task until it is wakes to cancel itself.
[float]
[[docs-update-by-query-rethrottle]]
=== Rethrottling
The value of `requests_per_second` can be changed on a running update by query
using the `_rethrottle` API:
[source,js]
--------------------------------------------------
POST /_update_by_query/{task_id}/_rethrottle?requests_per_second=unlimited
--------------------------------------------------
// AUTOSENSE
The `task_id` can be found using the tasks API above.
Just like when setting it on the `_update_by_query` API `requests_per_second`
can be either `unlimited` to disable throttling or any decimal number like `1.7`
or `12` to throttle to that level. Rethrottling that speeds up the query takes
effect immediately but rethrotting that slows down the query will take effect
on after completing the current batch. This prevents scroll timeouts.
[float]
[[picking-up-a-new-property]]
=== Pick up a new property

View File

@ -339,7 +339,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
* How many nanoseconds should a batch of lastBatchSize have taken if it were perfectly throttled? Package private for testing.
*/
float perfectlyThrottledBatchTime(int lastBatchSize) {
if (task.getRequestsPerSecond() == 0) {
if (task.getRequestsPerSecond() == Float.POSITIVE_INFINITY) {
return 0;
}
// requests

View File

@ -48,17 +48,26 @@ public abstract class AbstractBaseReindexRestHandler<
* @return requests_per_second from the request as a float if it was on the request, null otherwise
*/
public static Float parseRequestsPerSecond(RestRequest request) {
String requestsPerSecond = request.param("requests_per_second");
if (requestsPerSecond == null) {
String requestsPerSecondString = request.param("requests_per_second");
if (requestsPerSecondString == null) {
return null;
}
if ("".equals(requestsPerSecond)) {
throw new IllegalArgumentException("requests_per_second cannot be an empty string");
if ("unlimited".equals(requestsPerSecondString)) {
return Float.POSITIVE_INFINITY;
}
if ("unlimited".equals(requestsPerSecond)) {
return 0f;
float requestsPerSecond;
try {
requestsPerSecond = Float.parseFloat(requestsPerSecondString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"[requests_per_second] must be a float greater than 0. Use \"unlimited\" to disable throttling.", e);
}
return Float.parseFloat(requestsPerSecond);
if (requestsPerSecond <= 0) {
// We validate here and in the setters because the setters use "Float.POSITIVE_INFINITY" instead of "unlimited"
throw new IllegalArgumentException(
"[requests_per_second] must be a float greater than 0. Use \"unlimited\" to disable throttling.");
}
return requestsPerSecond;
}
protected final IndicesQueriesRegistry indicesQueriesRegistry;

View File

@ -87,11 +87,11 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
private int maxRetries = 11;
/**
* The throttle for this request in sub-requests per second. 0 means set no throttle and that is the default. Throttling is done between
* batches, as we start the next scroll requests. That way we can increase the scroll's timeout to make sure that it contains any time
* that we might wait.
* The throttle for this request in sub-requests per second. {@link Float#POSITIVE_INFINITY} means set no throttle and that is the
* default. Throttling is done between batches, as we start the next scroll requests. That way we can increase the scroll's timeout to
* make sure that it contains any time that we might wait.
*/
private float requestsPerSecond = 0;
private float requestsPerSecond = Float.POSITIVE_INFINITY;
public AbstractBulkByScrollRequest() {
}
@ -264,16 +264,24 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
}
/**
* The throttle for this request in sub-requests per second. 0 means set no throttle and that is the default.
* The throttle for this request in sub-requests per second. {@link Float#POSITIVE_INFINITY} means set no throttle and that is the
* default. Throttling is done between batches, as we start the next scroll requests. That way we can increase the scroll's timeout to
* make sure that it contains any time that we might wait.
*/
public float getRequestsPerSecond() {
return requestsPerSecond;
}
/**
* Set the throttle for this request in sub-requests per second. 0 means set no throttle and that is the default.
* Set the throttle for this request in sub-requests per second. {@link Float#POSITIVE_INFINITY} means set no throttle and that is the
* default. Throttling is done between batches, as we start the next scroll requests. That way we can increase the scroll's timeout to
* make sure that it contains any time that we might wait.
*/
public Self setRequestsPerSecond(float requestsPerSecond) {
if (requestsPerSecond <= 0) {
throw new IllegalArgumentException(
"[requests_per_second] must be greater than 0. Use Float.POSITIVE_INFINITY to disable throttling.");
}
this.requestsPerSecond = requestsPerSecond;
return self();
}

View File

@ -106,4 +106,14 @@ public abstract class AbstractBulkByScrollRequestBuilder<
request.setConsistency(consistency);
return self();
}
/**
* Set the throttle for this request in sub-requests per second. {@link Float#POSITIVE_INFINITY} means set no throttle and that is the
* default. Throttling is done between batches, as we start the next scroll requests. That way we can increase the scroll's timeout to
* make sure that it contains any time that we might wait.
*/
public Self setRequestsPerSecond(float requestsPerSecond) {
request.setRequestsPerSecond(requestsPerSecond);
return self();
}
}

View File

@ -20,14 +20,13 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.script.Script;
public abstract class AbstractBulkIndexByScrollRequestBuilder<
Request extends AbstractBulkIndexByScrollRequest<Request>,
Response extends ActionResponse,
Response extends BulkIndexByScrollResponse,
Self extends AbstractBulkIndexByScrollRequestBuilder<Request, Response, Self>>
extends AbstractBulkByScrollRequestBuilder<Request, Response, Self> {

View File

@ -190,7 +190,7 @@ public class BulkByScrollTask extends CancellableTask {
builder.field("noops", noops);
builder.field("retries", retries);
builder.timeValueField("throttled_millis", "throttled", throttled);
builder.field("requests_per_second", requestsPerSecond == 0 ? "unlimited" : requestsPerSecond);
builder.field("requests_per_second", requestsPerSecond == Float.POSITIVE_INFINITY ? "unlimited" : requestsPerSecond);
if (reasonCancelled != null) {
builder.field("canceled", reasonCancelled);
}
@ -393,9 +393,6 @@ public class BulkByScrollTask extends CancellableTask {
}
private void setRequestsPerSecond(float requestsPerSecond) {
if (requestsPerSecond == -1) {
requestsPerSecond = 0;
}
this.requestsPerSecond = requestsPerSecond;
}

View File

@ -21,6 +21,10 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -29,11 +33,11 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
*/
public class RethrottleRequest extends BaseTasksRequest<RethrottleRequest> {
/**
* The throttle to apply to all matching requests in sub-requests per second. 0 means set no throttle and that is the default.
* Throttling is done between batches, as we start the next scroll requests. That way we can increase the scroll's timeout to make sure
* that it contains any time that we might wait.
* The throttle to apply to all matching requests in sub-requests per second. 0 means set no throttle. Throttling is done between
* batches, as we start the next scroll requests. That way we can increase the scroll's timeout to make sure that it contains any time
* that we might wait.
*/
private float requestsPerSecond = 0;
private Float requestsPerSecond;
/**
* The throttle to apply to all matching requests in sub-requests per second. 0 means set no throttle and that is the default.
@ -43,9 +47,15 @@ public class RethrottleRequest extends BaseTasksRequest<RethrottleRequest> {
}
/**
* Set the throttle to apply to all matching requests in sub-requests per second. 0 means set no throttle and that is the default.
* Set the throttle to apply to all matching requests in sub-requests per second. {@link Float#POSITIVE_INFINITY} means set no throttle.
* Throttling is done between batches, as we start the next scroll requests. That way we can increase the scroll's timeout to make sure
* that it contains any time that we might wait.
*/
public RethrottleRequest setRequestsPerSecond(float requestsPerSecond) {
if (requestsPerSecond <= 0) {
throw new IllegalArgumentException(
"[requests_per_second] must be greater than 0. Use Float.POSITIVE_INFINITY to disable throttling.");
}
this.requestsPerSecond = requestsPerSecond;
return this;
}
@ -53,6 +63,9 @@ public class RethrottleRequest extends BaseTasksRequest<RethrottleRequest> {
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
if (requestsPerSecond == null) {
validationException = addValidationError("requests_per_second must be set", validationException);
}
for (String action : getActions()) {
switch (action) {
case ReindexAction.NAME:
@ -65,4 +78,16 @@ public class RethrottleRequest extends BaseTasksRequest<RethrottleRequest> {
}
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestsPerSecond = in.readFloat();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeFloat(requestsPerSecond);
}
}

View File

@ -34,7 +34,9 @@ public class RethrottleRequestBuilder extends TasksRequestBuilder<RethrottleRequ
}
/**
* Set the throttle to apply to all matching requests in sub-requests per second. 0 means set no throttle and that is the default.
* Set the throttle to apply to all matching requests in sub-requests per second. {@link Float#POSITIVE_INFINITY} means set no throttle.
* Throttling is done between batches, as we start the next scroll requests. That way we can increase the scroll's timeout to make sure
* that it contains any time that we might wait.
*/
public RethrottleRequestBuilder setRequestsPerSecond(float requestsPerSecond) {
request.setRequestsPerSecond(requestsPerSecond);

View File

@ -369,7 +369,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
public void testPerfectlyThrottledBatchTime() {
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
testRequest.setRequestsPerSecond(0);
testRequest.setRequestsPerSecond(Float.POSITIVE_INFINITY);
assertThat((double) action.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f));
int total = between(0, 1000000);

View File

@ -21,6 +21,9 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@ -40,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -49,7 +53,7 @@ public class BulkByScrollTaskTests extends ESTestCase {
@Before
public void createTask() {
task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, 0);
task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, Float.POSITIVE_INFINITY);
}
public void testBasicData() {
@ -270,4 +274,10 @@ public class BulkByScrollTaskTests extends ESTestCase {
threadPool.shutdown();
}
}
public void testXContentRepresentationOfUnlimitedRequestsPerSecon() throws IOException {
XContentBuilder builder = JsonXContent.contentBuilder();
task.getStatus().toXContent(builder, ToXContent.EMPTY_PARAMS);
assertThat(builder.string(), containsString("\"requests_per_second\":\"unlimited\""));
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
@ -51,7 +50,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
/**
* Utilities for testing reindex and update-by-query cancellation. This whole class isn't thread safe. Luckily we run out tests in separate
* Utilities for testing reindex and update-by-query cancellation. This whole class isn't thread safe. Luckily we run our tests in separate
* jvms.
*/
public class CancelTestUtils {
@ -62,7 +61,7 @@ public class CancelTestUtils {
private static final CyclicBarrier barrier = new CyclicBarrier(2);
public static <Request extends AbstractBulkIndexByScrollRequest<Request>,
Response extends ActionResponse,
Response extends BulkIndexByScrollResponse,
Builder extends AbstractBulkIndexByScrollRequestBuilder<Request, Response, Builder>>
Response testCancel(ESIntegTestCase test, Builder request, String actionToCancel) throws Exception {

View File

@ -37,22 +37,22 @@ public class ReindexBasicTests extends ReindexTestCase {
// Copy all the docs
ReindexRequestBuilder copy = reindex().source("source").destination("dest", "all").refresh(true);
assertThat(copy.get(), responseMatcher().created(4));
assertThat(copy.get(), reindexResponseMatcher().created(4));
assertHitCount(client().prepareSearch("dest").setTypes("all").setSize(0).get(), 4);
// Now none of them
copy = reindex().source("source").destination("all", "none").filter(termQuery("foo", "no_match")).refresh(true);
assertThat(copy.get(), responseMatcher().created(0));
assertThat(copy.get(), reindexResponseMatcher().created(0));
assertHitCount(client().prepareSearch("dest").setTypes("none").setSize(0).get(), 0);
// Now half of them
copy = reindex().source("source").destination("dest", "half").filter(termQuery("foo", "a")).refresh(true);
assertThat(copy.get(), responseMatcher().created(2));
assertThat(copy.get(), reindexResponseMatcher().created(2));
assertHitCount(client().prepareSearch("dest").setTypes("half").setSize(0).get(), 2);
// Limit with size
copy = reindex().source("source").destination("dest", "size_one").size(1).refresh(true);
assertThat(copy.get(), responseMatcher().created(1));
assertThat(copy.get(), reindexResponseMatcher().created(1));
assertHitCount(client().prepareSearch("dest").setTypes("size_one").setSize(0).get(), 1);
}
@ -70,7 +70,7 @@ public class ReindexBasicTests extends ReindexTestCase {
ReindexRequestBuilder copy = reindex().source("source").destination("dest", "all").refresh(true);
// Use a small batch size so we have to use more than one batch
copy.source().setSize(5);
assertThat(copy.get(), responseMatcher().created(max).batches(max, 5));
assertThat(copy.get(), reindexResponseMatcher().created(max).batches(max, 5));
assertHitCount(client().prepareSearch("dest").setTypes("all").setSize(0).get(), max);
// Copy some of the docs
@ -79,7 +79,7 @@ public class ReindexBasicTests extends ReindexTestCase {
// Use a small batch size so we have to use more than one batch
copy.source().setSize(5);
copy.size(half); // The real "size" of the request.
assertThat(copy.get(), responseMatcher().created(half).batches(half, 5));
assertThat(copy.get(), reindexResponseMatcher().created(half).batches(half, 5));
assertHitCount(client().prepareSearch("dest").setTypes("half").setSize(0).get(), half);
}
}

View File

@ -35,7 +35,7 @@ public class ReindexCancelTests extends ReindexTestCase {
public void testCancel() throws Exception {
ReindexResponse response = CancelTestUtils.testCancel(this, reindex().destination("dest", "test"), ReindexAction.NAME);
assertThat(response, responseMatcher().created(1).reasonCancelled(equalTo("by user request")));
assertThat(response, reindexResponseMatcher().created(1).reasonCancelled(equalTo("by user request")));
refresh("dest");
assertHitCount(client().prepareSearch("dest").setSize(0).get(), 1);
}

View File

@ -58,7 +58,7 @@ public class ReindexFailureTests extends ReindexTestCase {
copy.source().setSize(1);
ReindexResponse response = copy.get();
assertThat(response, responseMatcher()
assertThat(response, reindexResponseMatcher()
.batches(1)
.failures(both(greaterThan(0)).and(lessThanOrEqualTo(maximumNumberOfShards()))));
for (Failure failure: response.getIndexingFailures()) {
@ -78,7 +78,7 @@ public class ReindexFailureTests extends ReindexTestCase {
copy.destination().setOpType(CREATE);
ReindexResponse response = copy.get();
assertThat(response, responseMatcher().batches(1).versionConflicts(1).failures(1).created(99));
assertThat(response, reindexResponseMatcher().batches(1).versionConflicts(1).failures(1).created(99));
for (Failure failure: response.getIndexingFailures()) {
assertThat(failure.getMessage(), containsString("VersionConflictEngineException[[test]["));
}

View File

@ -43,18 +43,18 @@ public class ReindexParentChildTests extends ReindexTestCase {
// Copy parent to the new index
ReindexRequestBuilder copy = reindex().source("source").destination("dest").filter(findsCountry).refresh(true);
assertThat(copy.get(), responseMatcher().created(1));
assertThat(copy.get(), reindexResponseMatcher().created(1));
// Copy the child to a new index
copy = reindex().source("source").destination("dest").filter(findsCity).refresh(true);
assertThat(copy.get(), responseMatcher().created(1));
assertThat(copy.get(), reindexResponseMatcher().created(1));
// Make sure parent/child is intact on that index
assertSearchHits(client().prepareSearch("dest").setQuery(findsCity).get(), "pittsburgh");
// Copy the grandchild to a new index
copy = reindex().source("source").destination("dest").filter(findsNeighborhood).refresh(true);
assertThat(copy.get(), responseMatcher().created(1));
assertThat(copy.get(), reindexResponseMatcher().created(1));
// Make sure parent/child is intact on that index
assertSearchHits(client().prepareSearch("dest").setQuery(findsNeighborhood).get(),
@ -63,7 +63,7 @@ public class ReindexParentChildTests extends ReindexTestCase {
// Copy the parent/child/grandchild structure all at once to a third index
createParentChildIndex("dest_all_at_once");
copy = reindex().source("source").destination("dest_all_at_once").refresh(true);
assertThat(copy.get(), responseMatcher().created(3));
assertThat(copy.get(), reindexResponseMatcher().created(3));
// Make sure parent/child/grandchild is intact there too
assertSearchHits(client().prepareSearch("dest_all_at_once").setQuery(findsNeighborhood).get(),

View File

@ -41,11 +41,23 @@ public abstract class ReindexTestCase extends ESIntegTestCase {
return ReindexAction.INSTANCE.newRequestBuilder(client());
}
public IndexBySearchResponseMatcher responseMatcher() {
protected IndexBySearchResponseMatcher reindexResponseMatcher() {
return new IndexBySearchResponseMatcher();
}
public static class IndexBySearchResponseMatcher
protected UpdateByQueryRequestBuilder updateByQuery() {
return UpdateByQueryAction.INSTANCE.newRequestBuilder(client());
}
protected BulkIndexbyScrollResponseMatcher updateByQueryResponseMatcher() {
return new BulkIndexbyScrollResponseMatcher();
}
protected RethrottleRequestBuilder rethrottle() {
return RethrottleAction.INSTANCE.newRequestBuilder(client());
}
protected static class IndexBySearchResponseMatcher
extends AbstractBulkIndexByScrollResponseMatcher<ReindexResponse, IndexBySearchResponseMatcher> {
private Matcher<Long> createdMatcher = equalTo(0L);
@ -74,4 +86,12 @@ public abstract class ReindexTestCase extends ESIntegTestCase {
return this;
}
}
protected static class BulkIndexbyScrollResponseMatcher
extends AbstractBulkIndexByScrollResponseMatcher<BulkIndexByScrollResponse, BulkIndexbyScrollResponseMatcher> {
@Override
protected BulkIndexbyScrollResponseMatcher self() {
return this;
}
}
}

View File

@ -33,55 +33,55 @@ public class ReindexVersioningTests extends ReindexTestCase {
public void testExternalVersioningCreatesWhenAbsentAndSetsVersion() throws Exception {
setupSourceAbsent();
assertThat(reindexExternal(), responseMatcher().created(1));
assertThat(reindexExternal(), reindexResponseMatcher().created(1));
assertDest("source", SOURCE_VERSION);
}
public void testExternalVersioningUpdatesOnOlderAndSetsVersion() throws Exception {
setupDestOlder();
assertThat(reindexExternal(), responseMatcher().updated(1));
assertThat(reindexExternal(), reindexResponseMatcher().updated(1));
assertDest("source", SOURCE_VERSION);
}
public void testExternalVersioningVersionConflictsOnNewer() throws Exception {
setupDestNewer();
assertThat(reindexExternal(), responseMatcher().versionConflicts(1));
assertThat(reindexExternal(), reindexResponseMatcher().versionConflicts(1));
assertDest("dest", NEWER_VERSION);
}
public void testInternalVersioningCreatesWhenAbsent() throws Exception {
setupSourceAbsent();
assertThat(reindexInternal(), responseMatcher().created(1));
assertThat(reindexInternal(), reindexResponseMatcher().created(1));
assertDest("source", 1);
}
public void testInternalVersioningUpdatesOnOlder() throws Exception {
setupDestOlder();
assertThat(reindexInternal(), responseMatcher().updated(1));
assertThat(reindexInternal(), reindexResponseMatcher().updated(1));
assertDest("source", OLDER_VERSION + 1);
}
public void testInternalVersioningUpdatesOnNewer() throws Exception {
setupDestNewer();
assertThat(reindexInternal(), responseMatcher().updated(1));
assertThat(reindexInternal(), reindexResponseMatcher().updated(1));
assertDest("source", NEWER_VERSION + 1);
}
public void testCreateCreatesWhenAbsent() throws Exception {
setupSourceAbsent();
assertThat(reindexCreate(), responseMatcher().created(1));
assertThat(reindexCreate(), reindexResponseMatcher().created(1));
assertDest("source", 1);
}
public void testCreateVersionConflictsOnOlder() throws Exception {
setupDestOlder();
assertThat(reindexCreate(), responseMatcher().versionConflicts(1));
assertThat(reindexCreate(), reindexResponseMatcher().versionConflicts(1));
assertDest("dest", OLDER_VERSION);
}
public void testCreateVersionConflictsOnNewer() throws Exception {
setupDestNewer();
assertThat(reindexCreate(), responseMatcher().versionConflicts(1));
assertThat(reindexCreate(), reindexResponseMatcher().versionConflicts(1));
assertDest("dest", NEWER_VERSION);
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import static org.hamcrest.Matchers.hasSize;
/**
* Tests that you can set requests_per_second over the Java API and that you can rethrottle running requests. There are REST tests for this
* too but this is the only place that tests running against multiple nodes so it is the only integration tests that checks for
* serialization.
*/
public class RethrottleTests extends ReindexTestCase {
public void testReindex() throws Exception {
testCase(reindex().source("test").destination("dest"), ReindexAction.NAME);
}
public void testUpdateByQuery() throws Exception {
testCase(updateByQuery().source("test"), UpdateByQueryAction.NAME);
}
private void testCase(AbstractBulkIndexByScrollRequestBuilder<?, ? extends BulkIndexByScrollResponse, ?> request, String actionName)
throws Exception {
// Use a single shard so the reindex has to happen in multiple batches
client().admin().indices().prepareCreate("test").setSettings("index.number_of_shards", 1).get();
indexRandom(true,
client().prepareIndex("test", "test", "1").setSource("foo", "bar"),
client().prepareIndex("test", "test", "2").setSource("foo", "bar"),
client().prepareIndex("test", "test", "3").setSource("foo", "bar"));
// Start a request that will never finish unless we rethrottle it
request.setRequestsPerSecond(.000001f); // Throttle forever
request.source().setSize(1); // Make sure we use multiple batches
ListenableActionFuture<? extends BulkIndexByScrollResponse> responseListener = request.execute();
// Now rethrottle it so it'll finish
ListTasksResponse rethrottleResponse = rethrottle().setActions(actionName).setRequestsPerSecond(Float.POSITIVE_INFINITY).get();
assertThat(rethrottleResponse.getTasks(), hasSize(1));
// Now the response should come back quickly because we've rethrottled the request
BulkIndexByScrollResponse response = responseListener.get();
assertEquals("Batches didn't match, this may invalidate the test as throttling is done between batches", 3, response.getBatches());
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService.ScriptType;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
@ -120,9 +121,24 @@ public class RoundTripTests extends ESTestCase {
assertResponseEquals(response, tripped);
}
public void testRethrottleRequest() throws IOException {
RethrottleRequest request = new RethrottleRequest();
request.setRequestsPerSecond((float) randomDoubleBetween(0, Float.POSITIVE_INFINITY, false));
if (randomBoolean()) {
request.setActions(randomFrom(UpdateByQueryAction.NAME, ReindexAction.NAME));
} else {
request.setTaskId(new TaskId(randomAsciiOfLength(5), randomLong()));
}
RethrottleRequest tripped = new RethrottleRequest();
roundTrip(request, tripped);
assertEquals(request.getRequestsPerSecond(), tripped.getRequestsPerSecond(), 0.00001);
assertArrayEquals(request.getActions(), tripped.getActions());
assertEquals(request.getTaskId(), tripped.getTaskId());
}
private BulkByScrollTask.Status randomStatus() {
return new BulkByScrollTask.Status(randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(),
randomPositiveInt(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(),
randomInt(Integer.MAX_VALUE), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(),
parseTimeValue(randomPositiveTimeValue(), "test"), abs(random().nextFloat()),
random().nextBoolean() ? null : randomSimpleString(random()), parseTimeValue(randomPositiveTimeValue(), "test"));
}
@ -163,10 +179,6 @@ public class RoundTripTests extends ESTestCase {
return l;
}
private int randomPositiveInt() {
return randomInt(Integer.MAX_VALUE);
}
private void assertResponseEquals(BulkIndexByScrollResponse expected, BulkIndexByScrollResponse actual) {
assertEquals(expected.getTook(), actual.getTook());
assertTaskStatusEquals(expected.getStatus(), actual.getStatus());

View File

@ -24,7 +24,7 @@ import org.elasticsearch.search.sort.SortOrder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
public class UpdateByQueryBasicTests extends UpdateByQueryTestCase {
public class UpdateByQueryBasicTests extends ReindexTestCase {
public void testBasics() throws Exception {
indexRandom(true, client().prepareIndex("test", "test", "1").setSource("foo", "a"),
client().prepareIndex("test", "test", "2").setSource("foo", "a"),
@ -35,26 +35,28 @@ public class UpdateByQueryBasicTests extends UpdateByQueryTestCase {
assertEquals(1, client().prepareGet("test", "test", "4").get().getVersion());
// Reindex all the docs
assertThat(request().source("test").refresh(true).get(), responseMatcher().updated(4));
assertThat(updateByQuery().source("test").refresh(true).get(), updateByQueryResponseMatcher().updated(4));
assertEquals(2, client().prepareGet("test", "test", "1").get().getVersion());
assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion());
// Now none of them
assertThat(request().source("test").filter(termQuery("foo", "no_match")).refresh(true).get(), responseMatcher().updated(0));
assertThat(updateByQuery().source("test").filter(termQuery("foo", "no_match")).refresh(true).get(),
updateByQueryResponseMatcher().updated(0));
assertEquals(2, client().prepareGet("test", "test", "1").get().getVersion());
assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion());
// Now half of them
assertThat(request().source("test").filter(termQuery("foo", "a")).refresh(true).get(), responseMatcher().updated(2));
assertThat(updateByQuery().source("test").filter(termQuery("foo", "a")).refresh(true).get(),
updateByQueryResponseMatcher().updated(2));
assertEquals(3, client().prepareGet("test", "test", "1").get().getVersion());
assertEquals(3, client().prepareGet("test", "test", "2").get().getVersion());
assertEquals(2, client().prepareGet("test", "test", "3").get().getVersion());
assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion());
// Limit with size
UpdateByQueryRequestBuilder request = request().source("test").size(3).refresh(true);
UpdateByQueryRequestBuilder request = updateByQuery().source("test").size(3).refresh(true);
request.source().addSort("foo.keyword", SortOrder.ASC);
assertThat(request.get(), responseMatcher().updated(3));
assertThat(request.get(), updateByQueryResponseMatcher().updated(3));
// Only the first three documents are updated because of sort
assertEquals(4, client().prepareGet("test", "test", "1").get().getVersion());
assertEquals(4, client().prepareGet("test", "test", "2").get().getVersion());

View File

@ -32,11 +32,11 @@ import static org.hamcrest.Matchers.equalTo;
* places - that is the responsibility of {@link AsyncBulkByScrollActionTests} which have more precise control to simulate failures but do
* not exercise important portion of the stack like transport and task management.
*/
public class UpdateByQueryCancelTests extends UpdateByQueryTestCase {
public class UpdateByQueryCancelTests extends ReindexTestCase {
public void testCancel() throws Exception {
BulkIndexByScrollResponse response = CancelTestUtils.testCancel(this, request(), UpdateByQueryAction.NAME);
BulkIndexByScrollResponse response = CancelTestUtils.testCancel(this, updateByQuery(), UpdateByQueryAction.NAME);
assertThat(response, responseMatcher().updated(1).reasonCancelled(equalTo("by user request")));
assertThat(response, updateByQueryResponseMatcher().updated(1).reasonCancelled(equalTo("by user request")));
refresh("source");
assertHitCount(client().prepareSearch("source").setSize(0).setQuery(matchQuery("giraffes", "giraffes")).get(), 1);
}

View File

@ -1,52 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import java.util.Collection;
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
@ClusterScope(scope = SUITE, transportClientRatio = 0)
public abstract class UpdateByQueryTestCase extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(ReindexPlugin.class);
}
protected UpdateByQueryRequestBuilder request() {
return UpdateByQueryAction.INSTANCE.newRequestBuilder(client());
}
public BulkIndexbyScrollResponseMatcher responseMatcher() {
return new BulkIndexbyScrollResponseMatcher();
}
public static class BulkIndexbyScrollResponseMatcher extends
AbstractBulkIndexByScrollResponseMatcher<BulkIndexByScrollResponse, BulkIndexbyScrollResponseMatcher> {
@Override
protected BulkIndexbyScrollResponseMatcher self() {
return this;
}
}
}

View File

@ -35,7 +35,7 @@ import static org.hamcrest.Matchers.equalTo;
* Mutates a document while update-by-query-ing it and asserts that the mutation
* always sticks. Update-by-query should never revert documents.
*/
public class UpdateByQueryWhileModifyingTests extends UpdateByQueryTestCase {
public class UpdateByQueryWhileModifyingTests extends ReindexTestCase {
private static final int MAX_MUTATIONS = 50;
private static final int MAX_ATTEMPTS = 10;
@ -48,8 +48,9 @@ public class UpdateByQueryWhileModifyingTests extends UpdateByQueryTestCase {
Thread updater = new Thread(() -> {
while (keepUpdating.get()) {
try {
assertThat(request().source("test").refresh(true).abortOnVersionConflict(false).get(), responseMatcher()
.updated(either(equalTo(0L)).or(equalTo(1L))).versionConflicts(either(equalTo(0L)).or(equalTo(1L))));
BulkIndexByScrollResponse response = updateByQuery().source("test").refresh(true).abortOnVersionConflict(false).get();
assertThat(response, updateByQueryResponseMatcher().updated(either(equalTo(0L)).or(equalTo(1L)))
.versionConflicts(either(equalTo(0L)).or(equalTo(1L))));
} catch (Throwable t) {
failure.set(t);
}

View File

@ -152,7 +152,7 @@
---
"requests_per_second cannot be an empty string":
- do:
catch: /requests_per_second cannot be an empty string/
catch: /\[requests_per_second\] must be a float greater than 0. Use "unlimited" to disable throttling./
reindex:
requests_per_second: ""
body:
@ -161,6 +161,30 @@
dest:
index: dest
---
"requests_per_second cannot be negative":
- do:
catch: /\[requests_per_second\] must be a float greater than 0. Use "unlimited" to disable throttling./
reindex:
requests_per_second: -12
body:
source:
index: test
dest:
index: dest
---
"requests_per_second cannot be zero":
- do:
catch: /\[requests_per_second\] must be a float greater than 0. Use "unlimited" to disable throttling./
reindex:
requests_per_second: 0
body:
source:
index: test
dest:
index: dest
---
"reindex without source gives useful error message":
- do:

View File

@ -52,6 +52,56 @@
- gte: { took: 1000 }
- is_false: task
---
"requests_per_second supports unlimited to turn off throttling":
- do:
indices.create:
index: source
body:
settings:
number_of_shards: "1"
number_of_replicas: "0"
- do:
cluster.health:
wait_for_status: yellow
- do:
index:
index: source
type: foo
id: 1
body: { "text": "test" }
- do:
index:
index: source
type: foo
id: 2
body: { "text": "test" }
- do:
index:
index: source
type: foo
id: 3
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
reindex:
requests_per_second: unlimited
body:
source:
index: source
size: 1
dest:
index: dest
- match: {created: 3}
- match: {updated: 0}
- match: {version_conflicts: 0}
- match: {batches: 3}
- match: {failures: []}
- match: {throttled_millis: 0}
- is_false: task
---
"Rethrottle":
# Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
@ -110,64 +160,6 @@
wait_for_completion: true
task_id: $task
---
"Rethrottle to -1 which also means unlimited":
# Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
# and a small batch size on the request
- do:
indices.create:
index: source
body:
settings:
number_of_shards: "1"
number_of_replicas: "0"
- do:
cluster.health:
wait_for_status: yellow
- do:
index:
index: source
type: foo
id: 1
body: { "text": "test" }
- do:
index:
index: source
type: foo
id: 2
body: { "text": "test" }
- do:
index:
index: source
type: foo
id: 3
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
reindex:
requests_per_second: .00000001 # About 9.5 years to complete the request
wait_for_completion: false
body:
source:
index: source
size: 1
dest:
index: dest
- match: {task: '/.+:\d+/'}
- set: {task: task}
- do:
reindex.rethrottle:
requests_per_second: -1
task_id: $task
- do:
tasks.list:
wait_for_completion: true
task_id: $task
---
"Rethrottle but not unlimited":
# Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard

View File

@ -40,20 +40,6 @@
index: test
scroll_size: asdf
---
"requests_per_second cannot be an empty string":
- do:
index:
index: test
type: test
id: 1
body: { "text": "test" }
- do:
catch: /requests_per_second cannot be an empty string/
update_by_query:
index: test
requests_per_second: ''
---
"update_by_query without source gives useful error message":
- do:
@ -86,3 +72,27 @@
index: test
body:
fields: [_id]
---
"requests_per_second cannot be an empty string":
- do:
catch: /\[requests_per_second\] must be a float greater than 0. Use "unlimited" to disable throttling./
update_by_query:
requests_per_second: ""
index: test
---
"requests_per_second cannot be negative":
- do:
catch: /\[requests_per_second\] must be a float greater than 0. Use "unlimited" to disable throttling./
update_by_query:
requests_per_second: -12
index: test
---
"requests_per_second cannot be zero":
- do:
catch: /\[requests_per_second\] must be a float greater than 0. Use "unlimited" to disable throttling./
update_by_query:
requests_per_second: 0
index: test

View File

@ -38,6 +38,44 @@
- gt: {throttled_millis: 1000}
- lt: {throttled_millis: 4000}
---
"requests_per_second supports unlimited which turns off throttling":
- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
- do:
cluster.health:
wait_for_status: yellow
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
update_by_query:
index: test
scroll_size: 1
requests_per_second: unlimited
- match: {batches: 3}
- match: {updated: 3}
- match: {throttled_millis: 0}
---
"Rethrottle":
# Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
@ -88,56 +126,6 @@
wait_for_completion: true
task_id: $task
---
"Rethrottle to -1 which also means unlimited":
# Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
# and a small batch size on the request
- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
- do:
cluster.health:
wait_for_status: yellow
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
update_by_query:
requests_per_second: .00000001 # About 9.5 years to complete the request
wait_for_completion: false
index: test
scroll_size: 1
- match: {task: '/.+:\d+/'}
- set: {task: task}
- do:
reindex.rethrottle:
requests_per_second: -1
task_id: $task
- do:
tasks.list:
wait_for_completion: true
task_id: $task
---
"Rethrottle but not unlimited":
# Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard

View File

@ -14,7 +14,7 @@
"params": {
"requests_per_second": {
"type": "float",
"default": 0,
"required": true,
"description": "The throttle to set on this request in sub-requests per second. 0 means set no throttle. As does \"unlimited\". Otherwise it must be a float."
}
}