From b8d931d23cb87df2838efaec20751da1397fad64 Mon Sep 17 00:00:00 2001
From: Nik Everett <nik9000@gmail.com>
Date: Fri, 4 Mar 2016 14:06:32 -0500
Subject: [PATCH] [reindex] Timeout if sub-requests timeout

Sadly, it isn't easy to simulate a timeout during an integration test, you
just have to cause one. Groovy's sleep should do the job.
---
 .../AbstractAsyncBulkByScrollAction.java      | 32 ++++++++++++-------
 .../reindex/BulkIndexByScrollResponse.java    | 14 +++++++-
 ...kIndexByScrollResponseContentListener.java |  3 ++
 .../index/reindex/ReindexResponse.java        |  6 ++--
 .../reindex/RestUpdateByQueryAction.java      |  5 ++-
 .../index/reindex/TransportReindexAction.java |  5 +--
 .../reindex/TransportUpdateByQueryAction.java |  4 +--
 .../reindex/AsyncBulkByScrollActionTests.java | 28 +++++++++++++---
 .../index/reindex/RoundTripTests.java         |  4 +--
 .../rest-api-spec/test/reindex/10_basic.yaml  |  1 +
 .../test/update-by-query/10_basic.yaml        |  1 +
 .../test/reindex/30_timeout.yaml              | 29 +++++++++++++++++
 .../test/update-by-query/30_timeout.yaml      | 26 +++++++++++++++
 .../rest-api-spec/api/update-by-query.json    |  4 +++
 14 files changed, 135 insertions(+), 27 deletions(-)
 create mode 100644 qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/reindex/30_timeout.yaml
 create mode 100644 qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/update-by-query/30_timeout.yaml

diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java
index 861c03cd706..4de06c88b8d 100644
--- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java
+++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java
@@ -103,7 +103,8 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
 
     protected abstract BulkRequest buildBulk(Iterable<SearchHit> docs);
 
-    protected abstract Response buildResponse(TimeValue took, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures);
+    protected abstract Response buildResponse(TimeValue took, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures,
+            boolean timedOut);
 
     public void start() {
         initialSearch();
@@ -161,8 +162,13 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
             return;
         }
         setScroll(searchResponse.getScrollId());
-        if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) {
-            startNormalTermination(emptyList(), unmodifiableList(Arrays.asList(searchResponse.getShardFailures())));
+        if (    // If any of the shards failed that should abort the request.
+                (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0)
+                // Timeouts aren't shard failures but we still need to pass them back to the user.
+                || searchResponse.isTimedOut()
+                ) {
+            startNormalTermination(emptyList(), unmodifiableList(Arrays.asList(searchResponse.getShardFailures())),
+                    searchResponse.isTimedOut());
             return;
         }
         long total = searchResponse.getHits().totalHits();
@@ -176,7 +182,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
                 SearchHit[] docs = searchResponse.getHits().getHits();
                 logger.debug("scroll returned [{}] documents with a scroll id of [{}]", docs.length, searchResponse.getScrollId());
                 if (docs.length == 0) {
-                    startNormalTermination(emptyList(), emptyList());
+                    startNormalTermination(emptyList(), emptyList(), false);
                     return;
                 }
                 task.countBatch();
@@ -266,13 +272,13 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
             destinationIndices.addAll(destinationIndicesThisBatch);
 
             if (false == failures.isEmpty()) {
-                startNormalTermination(unmodifiableList(failures), emptyList());
+                startNormalTermination(unmodifiableList(failures), emptyList(), false);
                 return;
             }
 
             if (mainRequest.getSize() != SIZE_ALL_MATCHES && task.getSuccessfullyProcessed() >= mainRequest.getSize()) {
                 // We've processed all the requested docs.
-                startNormalTermination(emptyList(), emptyList());
+                startNormalTermination(emptyList(), emptyList(), false);
                 return;
             }
             startNextScroll();
@@ -311,9 +317,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
         failures.add(failure);
     }
 
-    void startNormalTermination(List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) {
+    void startNormalTermination(List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures, boolean timedOut) {
         if (false == mainRequest.isRefresh()) {
-            finishHim(null, indexingFailures, searchFailures);
+            finishHim(null, indexingFailures, searchFailures, timedOut);
             return;
         }
         RefreshRequest refresh = new RefreshRequest();
@@ -321,7 +327,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
         client.admin().indices().refresh(refresh, new ActionListener<RefreshResponse>() {
             @Override
             public void onResponse(RefreshResponse response) {
-                finishHim(null, indexingFailures, searchFailures);
+                finishHim(null, indexingFailures, searchFailures, timedOut);
             }
 
             @Override
@@ -337,7 +343,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
      * @param failure if non null then the request failed catastrophically with this exception
      */
     void finishHim(Throwable failure) {
-        finishHim(failure, emptyList(), emptyList());
+        finishHim(failure, emptyList(), emptyList(), false);
     }
 
     /**
@@ -346,8 +352,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
      * @param failure if non null then the request failed catastrophically with this exception
      * @param indexingFailures any indexing failures accumulated during the request
      * @param searchFailures any search failures accumulated during the request
+     * @param timedOut have any of the sub-requests timed out?
      */
-    void finishHim(Throwable failure, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) {
+    void finishHim(Throwable failure, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures, boolean timedOut) {
         String scrollId = scroll.get();
         if (Strings.hasLength(scrollId)) {
             /*
@@ -369,7 +376,8 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
             });
         }
         if (failure == null) {
-            listener.onResponse(buildResponse(timeValueNanos(System.nanoTime() - startTime.get()), indexingFailures, searchFailures));
+            listener.onResponse(
+                    buildResponse(timeValueNanos(System.nanoTime() - startTime.get()), indexingFailures, searchFailures, timedOut));
         } else {
             listener.onFailure(failure);
         }
diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponse.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponse.java
index ca1a53ef999..60de9bfbd03 100644
--- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponse.java
+++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponse.java
@@ -45,16 +45,18 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
     private BulkByScrollTask.Status status;
     private List<Failure> indexingFailures;
     private List<ShardSearchFailure> searchFailures;
+    private boolean timedOut;
 
     public BulkIndexByScrollResponse() {
     }
 
     public BulkIndexByScrollResponse(TimeValue took, BulkByScrollTask.Status status, List<Failure> indexingFailures,
-            List<ShardSearchFailure> searchFailures) {
+            List<ShardSearchFailure> searchFailures, boolean timedOut) {
         this.took = took;
         this.status = requireNonNull(status, "Null status not supported");
         this.indexingFailures = indexingFailures;
         this.searchFailures = searchFailures;
+        this.timedOut = timedOut;
     }
 
     public TimeValue getTook() {
@@ -103,6 +105,13 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
         return searchFailures;
     }
 
+    /**
+     * Did any of the sub-requests that were part of this request timeout?
+     */
+    public boolean isTimedOut() {
+        return timedOut;
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
@@ -116,6 +125,7 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
         for (ShardSearchFailure failure: searchFailures) {
             failure.writeTo(out);
         }
+        out.writeBoolean(timedOut);
     }
 
     @Override
@@ -135,11 +145,13 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
             searchFailures.add(readShardSearchFailure(in));
         }
         this.searchFailures = unmodifiableList(searchFailures);
+        this.timedOut = in.readBoolean();
     }
 
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.field("took", took.millis());
+        builder.field("timed_out", timedOut);
         status.innerXContent(builder, params, false, false);
         builder.startArray("failures");
         for (Failure failure: indexingFailures) {
diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java
index 24fdb16b397..6a46a2c8e49 100644
--- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java
+++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java
@@ -36,6 +36,9 @@ public class BulkIndexByScrollResponseContentListener<R extends BulkIndexByScrol
     @Override
     protected RestStatus getStatus(R response) {
         RestStatus status = RestStatus.OK;
+        if (response.isTimedOut()) {
+            status = RestStatus.REQUEST_TIMEOUT;
+        }
         for (Failure failure : response.getIndexingFailures()) {
             if (failure.getStatus().getStatus() > status.getStatus()) {
                 status = failure.getStatus();
diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexResponse.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexResponse.java
index a4aee0c00d3..7e74fe26ec2 100644
--- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexResponse.java
+++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexResponse.java
@@ -35,8 +35,9 @@ public class ReindexResponse extends BulkIndexByScrollResponse {
     public ReindexResponse() {
     }
 
-    public ReindexResponse(TimeValue took, Status status, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) {
-        super(took, status, indexingFailures, searchFailures);
+    public ReindexResponse(TimeValue took, Status status, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures,
+            boolean timedOut) {
+        super(took, status, indexingFailures, searchFailures, timedOut);
     }
 
     public long getCreated() {
@@ -46,6 +47,7 @@ public class ReindexResponse extends BulkIndexByScrollResponse {
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.field("took", getTook());
+        builder.field("timed_out", isTimedOut());
         getStatus().innerXContent(builder, params, true, false);
         builder.startArray("failures");
         for (Failure failure: getIndexingFailures()) {
diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java
index f4afd8c36e1..17214ad15c5 100644
--- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java
+++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java
@@ -107,7 +107,10 @@ public class RestUpdateByQueryAction extends
         internalRequest.setSize(internalRequest.getSearchRequest().source().size());
         internalRequest.setPipeline(request.param("pipeline"));
         internalRequest.getSearchRequest().source().size(request.paramAsInt("scroll_size", scrollSize));
-
+        // Let the requester set search timeout. It is probably only going to be useful for testing but who knows.
+        if (request.hasParam("search_timeout")) {
+            internalRequest.getSearchRequest().source().timeout(request.paramAsTime("search_timeout", null));
+        }
 
         execute(request, internalRequest, channel);
     }
diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java
index 069ee032f8e..dbe464e98b4 100644
--- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java
+++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java
@@ -191,8 +191,9 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
         }
 
         @Override
-        protected ReindexResponse buildResponse(TimeValue took, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) {
-            return new ReindexResponse(took, task.getStatus(), indexingFailures, searchFailures);
+        protected ReindexResponse buildResponse(TimeValue took, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures,
+                boolean timedOut) {
+            return new ReindexResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
         }
 
         /*
diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java
index 0e13c6718dd..d004e86ac0c 100644
--- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java
+++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java
@@ -96,8 +96,8 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
 
         @Override
         protected BulkIndexByScrollResponse buildResponse(TimeValue took, List<Failure> indexingFailures,
-                List<ShardSearchFailure> searchFailures) {
-            return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures);
+                List<ShardSearchFailure> searchFailures, boolean timedOut) {
+            return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
         }
 
         @Override
diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java
index ae05f3270df..2aedd603fbc 100644
--- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java
+++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java
@@ -248,15 +248,33 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
      */
     public void testShardFailuresAbortRequest() throws Exception {
         ShardSearchFailure shardFailure = new ShardSearchFailure(new RuntimeException("test"));
-        new DummyAbstractAsyncBulkByScrollAction()
-                .onScrollResponse(new SearchResponse(null, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure }));
+        InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, false, null);
+        new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(
+                new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure }));
         BulkIndexByScrollResponse response = listener.get();
         assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class));
         assertThat(response.getSearchFailures(), contains(shardFailure));
+        assertFalse(response.isTimedOut());
         assertNull(response.getReasonCancelled());
         assertThat(client.scrollsCleared, contains(scrollId));
     }
 
+    /**
+     * Mimicks search timeouts.
+     */
+    public void testSearchTimeoutsAbortRequest() throws Exception {
+        InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, true, null);
+        new DummyAbstractAsyncBulkByScrollAction()
+                .onScrollResponse(new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[0]));
+        BulkIndexByScrollResponse response = listener.get();
+        assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class));
+        assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class));
+        assertTrue(response.isTimedOut());
+        assertNull(response.getReasonCancelled());
+        assertThat(client.scrollsCleared, contains(scrollId));
+    }
+
+
     /**
      * Mimicks bulk indexing failures.
      */
@@ -396,7 +414,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
     public void testCancelBeforeStartNormalTermination() throws Exception {
         // Refresh or not doesn't matter - we don't try to refresh.
         mainRequest.setRefresh(usually());
-        cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNormalTermination(emptyList(), emptyList()));
+        cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNormalTermination(emptyList(), emptyList(), false));
         // This wouldn't return if we called refresh - the action would hang waiting for the refresh that we haven't mocked.
     }
 
@@ -430,8 +448,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
 
         @Override
         protected BulkIndexByScrollResponse buildResponse(TimeValue took, List<Failure> indexingFailures,
-                List<ShardSearchFailure> searchFailures) {
-            return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures);
+                List<ShardSearchFailure> searchFailures, boolean timedOut) {
+            return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
         }
     }
 
diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java
index f5c31fe8f42..6e1cbb59e86 100644
--- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java
+++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java
@@ -102,7 +102,7 @@ public class RoundTripTests extends ESTestCase {
 
     public void testReindexResponse() throws IOException {
         ReindexResponse response = new ReindexResponse(timeValueMillis(randomPositiveLong()), randomStatus(), randomIndexingFailures(),
-                randomSearchFailures());
+                randomSearchFailures(), randomBoolean());
         ReindexResponse tripped = new ReindexResponse();
         roundTrip(response, tripped);
         assertResponseEquals(response, tripped);
@@ -110,7 +110,7 @@ public class RoundTripTests extends ESTestCase {
 
     public void testBulkIndexByScrollResponse() throws IOException {
         BulkIndexByScrollResponse response = new BulkIndexByScrollResponse(timeValueMillis(randomPositiveLong()), randomStatus(),
-                randomIndexingFailures(), randomSearchFailures());
+                randomIndexingFailures(), randomSearchFailures(), randomBoolean());
         BulkIndexByScrollResponse tripped = new BulkIndexByScrollResponse();
         roundTrip(response, tripped);
         assertResponseEquals(response, tripped);
diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml
index 7f84c1aac8b..a00fefc444a 100644
--- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml
+++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml
@@ -75,6 +75,7 @@
             index: source
           dest:
             index: dest
+  - is_false: timed_out
   - match: {task: '/.+:\d+/'}
   - set: {task: task}
   - is_false: updated
diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml
index 94ffa2349a9..b4ebb93c327 100644
--- a/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml
+++ b/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml
@@ -12,6 +12,7 @@
   - do:
       update-by-query:
         index: test
+  - is_false: timed_out
   - match: {updated: 1}
   - match: {version_conflicts: 0}
   - match: {batches: 1}
diff --git a/qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/reindex/30_timeout.yaml b/qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/reindex/30_timeout.yaml
new file mode 100644
index 00000000000..533dbc3462b
--- /dev/null
+++ b/qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/reindex/30_timeout.yaml
@@ -0,0 +1,29 @@
+---
+"Timeout":
+  - do:
+      index:
+        index:  twitter
+        type:   tweet
+        id:     1
+        body:   { "user": "kimchy" }
+  - do:
+      indices.refresh: {}
+
+  - do:
+      catch: request_timeout
+      reindex:
+        refresh: true
+        body:
+          source:
+            index: twitter
+            timeout: 10ms
+            query:
+              script:
+                # Sleep 100x longer than the timeout. That should cause a timeout!
+                # Return true causes the document to try to be collected which is what actually triggers the timeout.
+                script: sleep(1000); return true
+          dest:
+            index: new_twitter
+  - is_true: timed_out
+  - match: {created: 0}
+  - match: {noops: 0}
diff --git a/qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/update-by-query/30_timeout.yaml b/qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/update-by-query/30_timeout.yaml
new file mode 100644
index 00000000000..2a291bf0541
--- /dev/null
+++ b/qa/smoke-test-reindex-with-groovy/src/test/resources/rest-api-spec/test/update-by-query/30_timeout.yaml
@@ -0,0 +1,26 @@
+---
+"Timeout":
+  - do:
+      index:
+        index:  twitter
+        type:   tweet
+        id:     1
+        body:   { "user": "kimchy" }
+  - do:
+      indices.refresh: {}
+
+  - do:
+      catch: request_timeout
+      update-by-query:
+        index:   twitter
+        refresh: true
+        search_timeout: 10ms
+        body:
+          query:
+            script:
+              # Sleep 100x longer than the timeout. That should cause a timeout!
+              # Return true causes the document to try to be collected which is what actually triggers the timeout.
+              script: sleep(1000); return true
+  - is_true: timed_out
+  - match: {updated: 0}
+  - match: {noops: 0}
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/update-by-query.json b/rest-api-spec/src/main/resources/rest-api-spec/api/update-by-query.json
index 9d5183ee4f3..dca49cbcc6a 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/update-by-query.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/update-by-query.json
@@ -105,6 +105,10 @@
           "options" : ["query_then_fetch", "dfs_query_then_fetch"],
           "description" : "Search operation type"
         },
+        "search_timeout": {
+          "type" : "time",
+          "description" : "Explicit timeout for each search request. Defaults to no timeout."
+        },
         "size": {
           "type" : "number",
           "description" : "Number of hits to return (default: 10)"