diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index 861c03cd706..4de06c88b8d 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -103,7 +103,8 @@ public abstract class AbstractAsyncBulkByScrollAction docs); - protected abstract Response buildResponse(TimeValue took, List indexingFailures, List searchFailures); + protected abstract Response buildResponse(TimeValue took, List indexingFailures, List searchFailures, + boolean timedOut); public void start() { initialSearch(); @@ -161,8 +162,13 @@ public abstract class AbstractAsyncBulkByScrollAction 0) { - startNormalTermination(emptyList(), unmodifiableList(Arrays.asList(searchResponse.getShardFailures()))); + if ( // If any of the shards failed that should abort the request. + (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) + // Timeouts aren't shard failures but we still need to pass them back to the user. + || searchResponse.isTimedOut() + ) { + startNormalTermination(emptyList(), unmodifiableList(Arrays.asList(searchResponse.getShardFailures())), + searchResponse.isTimedOut()); return; } long total = searchResponse.getHits().totalHits(); @@ -176,7 +182,7 @@ public abstract class AbstractAsyncBulkByScrollAction= mainRequest.getSize()) { // We've processed all the requested docs. - startNormalTermination(emptyList(), emptyList()); + startNormalTermination(emptyList(), emptyList(), false); return; } startNextScroll(); @@ -311,9 +317,9 @@ public abstract class AbstractAsyncBulkByScrollAction indexingFailures, List searchFailures) { + void startNormalTermination(List indexingFailures, List searchFailures, boolean timedOut) { if (false == mainRequest.isRefresh()) { - finishHim(null, indexingFailures, searchFailures); + finishHim(null, indexingFailures, searchFailures, timedOut); return; } RefreshRequest refresh = new RefreshRequest(); @@ -321,7 +327,7 @@ public abstract class AbstractAsyncBulkByScrollAction() { @Override public void onResponse(RefreshResponse response) { - finishHim(null, indexingFailures, searchFailures); + finishHim(null, indexingFailures, searchFailures, timedOut); } @Override @@ -337,7 +343,7 @@ public abstract class AbstractAsyncBulkByScrollAction indexingFailures, List searchFailures) { + void finishHim(Throwable failure, List indexingFailures, List searchFailures, boolean timedOut) { String scrollId = scroll.get(); if (Strings.hasLength(scrollId)) { /* @@ -369,7 +376,8 @@ public abstract class AbstractAsyncBulkByScrollAction indexingFailures; private List searchFailures; + private boolean timedOut; public BulkIndexByScrollResponse() { } public BulkIndexByScrollResponse(TimeValue took, BulkByScrollTask.Status status, List indexingFailures, - List searchFailures) { + List searchFailures, boolean timedOut) { this.took = took; this.status = requireNonNull(status, "Null status not supported"); this.indexingFailures = indexingFailures; this.searchFailures = searchFailures; + this.timedOut = timedOut; } public TimeValue getTook() { @@ -103,6 +105,13 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont return searchFailures; } + /** + * Did any of the sub-requests that were part of this request timeout? + */ + public boolean isTimedOut() { + return timedOut; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -116,6 +125,7 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont for (ShardSearchFailure failure: searchFailures) { failure.writeTo(out); } + out.writeBoolean(timedOut); } @Override @@ -135,11 +145,13 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont searchFailures.add(readShardSearchFailure(in)); } this.searchFailures = unmodifiableList(searchFailures); + this.timedOut = in.readBoolean(); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("took", took.millis()); + builder.field("timed_out", timedOut); status.innerXContent(builder, params, false, false); builder.startArray("failures"); for (Failure failure: indexingFailures) { diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java index 24fdb16b397..6a46a2c8e49 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java @@ -36,6 +36,9 @@ public class BulkIndexByScrollResponseContentListener status.getStatus()) { status = failure.getStatus(); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexResponse.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexResponse.java index a4aee0c00d3..7e74fe26ec2 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexResponse.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexResponse.java @@ -35,8 +35,9 @@ public class ReindexResponse extends BulkIndexByScrollResponse { public ReindexResponse() { } - public ReindexResponse(TimeValue took, Status status, List indexingFailures, List searchFailures) { - super(took, status, indexingFailures, searchFailures); + public ReindexResponse(TimeValue took, Status status, List indexingFailures, List searchFailures, + boolean timedOut) { + super(took, status, indexingFailures, searchFailures, timedOut); } public long getCreated() { @@ -46,6 +47,7 @@ public class ReindexResponse extends BulkIndexByScrollResponse { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("took", getTook()); + builder.field("timed_out", isTimedOut()); getStatus().innerXContent(builder, params, true, false); builder.startArray("failures"); for (Failure failure: getIndexingFailures()) { diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java index f4afd8c36e1..17214ad15c5 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java @@ -107,7 +107,10 @@ public class RestUpdateByQueryAction extends internalRequest.setSize(internalRequest.getSearchRequest().source().size()); internalRequest.setPipeline(request.param("pipeline")); internalRequest.getSearchRequest().source().size(request.paramAsInt("scroll_size", scrollSize)); - + // Let the requester set search timeout. It is probably only going to be useful for testing but who knows. + if (request.hasParam("search_timeout")) { + internalRequest.getSearchRequest().source().timeout(request.paramAsTime("search_timeout", null)); + } execute(request, internalRequest, channel); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java index 069ee032f8e..dbe464e98b4 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java @@ -191,8 +191,9 @@ public class TransportReindexAction extends HandledTransportAction indexingFailures, List searchFailures) { - return new ReindexResponse(took, task.getStatus(), indexingFailures, searchFailures); + protected ReindexResponse buildResponse(TimeValue took, List indexingFailures, List searchFailures, + boolean timedOut) { + return new ReindexResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut); } /* diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java index 0e13c6718dd..d004e86ac0c 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java @@ -96,8 +96,8 @@ public class TransportUpdateByQueryAction extends HandledTransportAction indexingFailures, - List searchFailures) { - return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures); + List searchFailures, boolean timedOut) { + return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut); } @Override diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index ae05f3270df..2aedd603fbc 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -248,15 +248,33 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { */ public void testShardFailuresAbortRequest() throws Exception { ShardSearchFailure shardFailure = new ShardSearchFailure(new RuntimeException("test")); - new DummyAbstractAsyncBulkByScrollAction() - .onScrollResponse(new SearchResponse(null, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure })); + InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, false, null); + new DummyAbstractAsyncBulkByScrollAction().onScrollResponse( + new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure })); BulkIndexByScrollResponse response = listener.get(); assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class)); assertThat(response.getSearchFailures(), contains(shardFailure)); + assertFalse(response.isTimedOut()); assertNull(response.getReasonCancelled()); assertThat(client.scrollsCleared, contains(scrollId)); } + /** + * Mimicks search timeouts. + */ + public void testSearchTimeoutsAbortRequest() throws Exception { + InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, true, null); + new DummyAbstractAsyncBulkByScrollAction() + .onScrollResponse(new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[0])); + BulkIndexByScrollResponse response = listener.get(); + assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class)); + assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class)); + assertTrue(response.isTimedOut()); + assertNull(response.getReasonCancelled()); + assertThat(client.scrollsCleared, contains(scrollId)); + } + + /** * Mimicks bulk indexing failures. */ @@ -396,7 +414,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { public void testCancelBeforeStartNormalTermination() throws Exception { // Refresh or not doesn't matter - we don't try to refresh. mainRequest.setRefresh(usually()); - cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNormalTermination(emptyList(), emptyList())); + cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNormalTermination(emptyList(), emptyList(), false)); // This wouldn't return if we called refresh - the action would hang waiting for the refresh that we haven't mocked. } @@ -430,8 +448,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { @Override protected BulkIndexByScrollResponse buildResponse(TimeValue took, List indexingFailures, - List searchFailures) { - return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures); + List searchFailures, boolean timedOut) { + return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index f5c31fe8f42..6e1cbb59e86 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -102,7 +102,7 @@ public class RoundTripTests extends ESTestCase { public void testReindexResponse() throws IOException { ReindexResponse response = new ReindexResponse(timeValueMillis(randomPositiveLong()), randomStatus(), randomIndexingFailures(), - randomSearchFailures()); + randomSearchFailures(), randomBoolean()); ReindexResponse tripped = new ReindexResponse(); roundTrip(response, tripped); assertResponseEquals(response, tripped); @@ -110,7 +110,7 @@ public class RoundTripTests extends ESTestCase { public void testBulkIndexByScrollResponse() throws IOException { BulkIndexByScrollResponse response = new BulkIndexByScrollResponse(timeValueMillis(randomPositiveLong()), randomStatus(), - randomIndexingFailures(), randomSearchFailures()); + randomIndexingFailures(), randomSearchFailures(), randomBoolean()); BulkIndexByScrollResponse tripped = new BulkIndexByScrollResponse(); roundTrip(response, tripped); assertResponseEquals(response, tripped); diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml index 7f84c1aac8b..a00fefc444a 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml @@ -75,6 +75,7 @@ index: source dest: index: dest + - is_false: timed_out - match: {task: '/.+:\d+/'} - set: {task: task} - is_false: updated diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml index 94ffa2349a9..b4ebb93c327 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml @@ -12,6 +12,7 @@ - do: update-by-query: index: test + - is_false: timed_out - match: {updated: 1} - match: {version_conflicts: 0} - match: {batches: 1} diff --git a/qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/reindex/30_timeout.yaml b/qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/reindex/30_timeout.yaml new file mode 100644 index 00000000000..533dbc3462b --- /dev/null +++ b/qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/reindex/30_timeout.yaml @@ -0,0 +1,29 @@ +--- +"Timeout": + - do: + index: + index: twitter + type: tweet + id: 1 + body: { "user": "kimchy" } + - do: + indices.refresh: {} + + - do: + catch: request_timeout + reindex: + refresh: true + body: + source: + index: twitter + timeout: 10ms + query: + script: + # Sleep 100x longer than the timeout. That should cause a timeout! + # Return true causes the document to try to be collected which is what actually triggers the timeout. + script: sleep(1000); return true + dest: + index: new_twitter + - is_true: timed_out + - match: {created: 0} + - match: {noops: 0} diff --git a/qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/update-by-query/30_timeout.yaml b/qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/update-by-query/30_timeout.yaml new file mode 100644 index 00000000000..2a291bf0541 --- /dev/null +++ b/qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/update-by-query/30_timeout.yaml @@ -0,0 +1,26 @@ +--- +"Timeout": + - do: + index: + index: twitter + type: tweet + id: 1 + body: { "user": "kimchy" } + - do: + indices.refresh: {} + + - do: + catch: request_timeout + update-by-query: + index: twitter + refresh: true + search_timeout: 10ms + body: + query: + script: + # Sleep 100x longer than the timeout. That should cause a timeout! + # Return true causes the document to try to be collected which is what actually triggers the timeout. + script: sleep(1000); return true + - is_true: timed_out + - match: {updated: 0} + - match: {noops: 0} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/update-by-query.json b/rest-api-spec/src/main/resources/rest-api-spec/api/update-by-query.json index 9d5183ee4f3..dca49cbcc6a 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/update-by-query.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/update-by-query.json @@ -105,6 +105,10 @@ "options" : ["query_then_fetch", "dfs_query_then_fetch"], "description" : "Search operation type" }, + "search_timeout": { + "type" : "time", + "description" : "Explicit timeout for each search request. Defaults to no timeout." + }, "size": { "type" : "number", "description" : "Number of hits to return (default: 10)"