[reindex] Timeout if sub-requests timeout

Sadly, it isn't easy to simulate a timeout during an integration test, you
just have to cause one. Groovy's sleep should do the job.
This commit is contained in:
Nik Everett 2016-03-04 14:06:32 -05:00
parent f3195cb514
commit b8d931d23c
14 changed files with 135 additions and 27 deletions

View File

@ -103,7 +103,8 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
protected abstract BulkRequest buildBulk(Iterable<SearchHit> docs); protected abstract BulkRequest buildBulk(Iterable<SearchHit> docs);
protected abstract Response buildResponse(TimeValue took, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures); protected abstract Response buildResponse(TimeValue took, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures,
boolean timedOut);
public void start() { public void start() {
initialSearch(); initialSearch();
@ -161,8 +162,13 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
return; return;
} }
setScroll(searchResponse.getScrollId()); setScroll(searchResponse.getScrollId());
if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) { if ( // If any of the shards failed that should abort the request.
startNormalTermination(emptyList(), unmodifiableList(Arrays.asList(searchResponse.getShardFailures()))); (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0)
// Timeouts aren't shard failures but we still need to pass them back to the user.
|| searchResponse.isTimedOut()
) {
startNormalTermination(emptyList(), unmodifiableList(Arrays.asList(searchResponse.getShardFailures())),
searchResponse.isTimedOut());
return; return;
} }
long total = searchResponse.getHits().totalHits(); long total = searchResponse.getHits().totalHits();
@ -176,7 +182,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
SearchHit[] docs = searchResponse.getHits().getHits(); SearchHit[] docs = searchResponse.getHits().getHits();
logger.debug("scroll returned [{}] documents with a scroll id of [{}]", docs.length, searchResponse.getScrollId()); logger.debug("scroll returned [{}] documents with a scroll id of [{}]", docs.length, searchResponse.getScrollId());
if (docs.length == 0) { if (docs.length == 0) {
startNormalTermination(emptyList(), emptyList()); startNormalTermination(emptyList(), emptyList(), false);
return; return;
} }
task.countBatch(); task.countBatch();
@ -266,13 +272,13 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
destinationIndices.addAll(destinationIndicesThisBatch); destinationIndices.addAll(destinationIndicesThisBatch);
if (false == failures.isEmpty()) { if (false == failures.isEmpty()) {
startNormalTermination(unmodifiableList(failures), emptyList()); startNormalTermination(unmodifiableList(failures), emptyList(), false);
return; return;
} }
if (mainRequest.getSize() != SIZE_ALL_MATCHES && task.getSuccessfullyProcessed() >= mainRequest.getSize()) { if (mainRequest.getSize() != SIZE_ALL_MATCHES && task.getSuccessfullyProcessed() >= mainRequest.getSize()) {
// We've processed all the requested docs. // We've processed all the requested docs.
startNormalTermination(emptyList(), emptyList()); startNormalTermination(emptyList(), emptyList(), false);
return; return;
} }
startNextScroll(); startNextScroll();
@ -311,9 +317,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
failures.add(failure); failures.add(failure);
} }
void startNormalTermination(List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) { void startNormalTermination(List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures, boolean timedOut) {
if (false == mainRequest.isRefresh()) { if (false == mainRequest.isRefresh()) {
finishHim(null, indexingFailures, searchFailures); finishHim(null, indexingFailures, searchFailures, timedOut);
return; return;
} }
RefreshRequest refresh = new RefreshRequest(); RefreshRequest refresh = new RefreshRequest();
@ -321,7 +327,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
client.admin().indices().refresh(refresh, new ActionListener<RefreshResponse>() { client.admin().indices().refresh(refresh, new ActionListener<RefreshResponse>() {
@Override @Override
public void onResponse(RefreshResponse response) { public void onResponse(RefreshResponse response) {
finishHim(null, indexingFailures, searchFailures); finishHim(null, indexingFailures, searchFailures, timedOut);
} }
@Override @Override
@ -337,7 +343,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
* @param failure if non null then the request failed catastrophically with this exception * @param failure if non null then the request failed catastrophically with this exception
*/ */
void finishHim(Throwable failure) { void finishHim(Throwable failure) {
finishHim(failure, emptyList(), emptyList()); finishHim(failure, emptyList(), emptyList(), false);
} }
/** /**
@ -346,8 +352,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
* @param failure if non null then the request failed catastrophically with this exception * @param failure if non null then the request failed catastrophically with this exception
* @param indexingFailures any indexing failures accumulated during the request * @param indexingFailures any indexing failures accumulated during the request
* @param searchFailures any search failures accumulated during the request * @param searchFailures any search failures accumulated during the request
* @param timedOut have any of the sub-requests timed out?
*/ */
void finishHim(Throwable failure, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) { void finishHim(Throwable failure, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures, boolean timedOut) {
String scrollId = scroll.get(); String scrollId = scroll.get();
if (Strings.hasLength(scrollId)) { if (Strings.hasLength(scrollId)) {
/* /*
@ -369,7 +376,8 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
}); });
} }
if (failure == null) { if (failure == null) {
listener.onResponse(buildResponse(timeValueNanos(System.nanoTime() - startTime.get()), indexingFailures, searchFailures)); listener.onResponse(
buildResponse(timeValueNanos(System.nanoTime() - startTime.get()), indexingFailures, searchFailures, timedOut));
} else { } else {
listener.onFailure(failure); listener.onFailure(failure);
} }

View File

@ -45,16 +45,18 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
private BulkByScrollTask.Status status; private BulkByScrollTask.Status status;
private List<Failure> indexingFailures; private List<Failure> indexingFailures;
private List<ShardSearchFailure> searchFailures; private List<ShardSearchFailure> searchFailures;
private boolean timedOut;
public BulkIndexByScrollResponse() { public BulkIndexByScrollResponse() {
} }
public BulkIndexByScrollResponse(TimeValue took, BulkByScrollTask.Status status, List<Failure> indexingFailures, public BulkIndexByScrollResponse(TimeValue took, BulkByScrollTask.Status status, List<Failure> indexingFailures,
List<ShardSearchFailure> searchFailures) { List<ShardSearchFailure> searchFailures, boolean timedOut) {
this.took = took; this.took = took;
this.status = requireNonNull(status, "Null status not supported"); this.status = requireNonNull(status, "Null status not supported");
this.indexingFailures = indexingFailures; this.indexingFailures = indexingFailures;
this.searchFailures = searchFailures; this.searchFailures = searchFailures;
this.timedOut = timedOut;
} }
public TimeValue getTook() { public TimeValue getTook() {
@ -103,6 +105,13 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
return searchFailures; return searchFailures;
} }
/**
* Did any of the sub-requests that were part of this request timeout?
*/
public boolean isTimedOut() {
return timedOut;
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
@ -116,6 +125,7 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
for (ShardSearchFailure failure: searchFailures) { for (ShardSearchFailure failure: searchFailures) {
failure.writeTo(out); failure.writeTo(out);
} }
out.writeBoolean(timedOut);
} }
@Override @Override
@ -135,11 +145,13 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
searchFailures.add(readShardSearchFailure(in)); searchFailures.add(readShardSearchFailure(in));
} }
this.searchFailures = unmodifiableList(searchFailures); this.searchFailures = unmodifiableList(searchFailures);
this.timedOut = in.readBoolean();
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("took", took.millis()); builder.field("took", took.millis());
builder.field("timed_out", timedOut);
status.innerXContent(builder, params, false, false); status.innerXContent(builder, params, false, false);
builder.startArray("failures"); builder.startArray("failures");
for (Failure failure: indexingFailures) { for (Failure failure: indexingFailures) {

View File

@ -36,6 +36,9 @@ public class BulkIndexByScrollResponseContentListener<R extends BulkIndexByScrol
@Override @Override
protected RestStatus getStatus(R response) { protected RestStatus getStatus(R response) {
RestStatus status = RestStatus.OK; RestStatus status = RestStatus.OK;
if (response.isTimedOut()) {
status = RestStatus.REQUEST_TIMEOUT;
}
for (Failure failure : response.getIndexingFailures()) { for (Failure failure : response.getIndexingFailures()) {
if (failure.getStatus().getStatus() > status.getStatus()) { if (failure.getStatus().getStatus() > status.getStatus()) {
status = failure.getStatus(); status = failure.getStatus();

View File

@ -35,8 +35,9 @@ public class ReindexResponse extends BulkIndexByScrollResponse {
public ReindexResponse() { public ReindexResponse() {
} }
public ReindexResponse(TimeValue took, Status status, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) { public ReindexResponse(TimeValue took, Status status, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures,
super(took, status, indexingFailures, searchFailures); boolean timedOut) {
super(took, status, indexingFailures, searchFailures, timedOut);
} }
public long getCreated() { public long getCreated() {
@ -46,6 +47,7 @@ public class ReindexResponse extends BulkIndexByScrollResponse {
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("took", getTook()); builder.field("took", getTook());
builder.field("timed_out", isTimedOut());
getStatus().innerXContent(builder, params, true, false); getStatus().innerXContent(builder, params, true, false);
builder.startArray("failures"); builder.startArray("failures");
for (Failure failure: getIndexingFailures()) { for (Failure failure: getIndexingFailures()) {

View File

@ -107,7 +107,10 @@ public class RestUpdateByQueryAction extends
internalRequest.setSize(internalRequest.getSearchRequest().source().size()); internalRequest.setSize(internalRequest.getSearchRequest().source().size());
internalRequest.setPipeline(request.param("pipeline")); internalRequest.setPipeline(request.param("pipeline"));
internalRequest.getSearchRequest().source().size(request.paramAsInt("scroll_size", scrollSize)); internalRequest.getSearchRequest().source().size(request.paramAsInt("scroll_size", scrollSize));
// Let the requester set search timeout. It is probably only going to be useful for testing but who knows.
if (request.hasParam("search_timeout")) {
internalRequest.getSearchRequest().source().timeout(request.paramAsTime("search_timeout", null));
}
execute(request, internalRequest, channel); execute(request, internalRequest, channel);
} }

View File

@ -191,8 +191,9 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
} }
@Override @Override
protected ReindexResponse buildResponse(TimeValue took, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) { protected ReindexResponse buildResponse(TimeValue took, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures,
return new ReindexResponse(took, task.getStatus(), indexingFailures, searchFailures); boolean timedOut) {
return new ReindexResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
} }
/* /*

View File

@ -96,8 +96,8 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
@Override @Override
protected BulkIndexByScrollResponse buildResponse(TimeValue took, List<Failure> indexingFailures, protected BulkIndexByScrollResponse buildResponse(TimeValue took, List<Failure> indexingFailures,
List<ShardSearchFailure> searchFailures) { List<ShardSearchFailure> searchFailures, boolean timedOut) {
return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures); return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
} }
@Override @Override

View File

@ -248,15 +248,33 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
*/ */
public void testShardFailuresAbortRequest() throws Exception { public void testShardFailuresAbortRequest() throws Exception {
ShardSearchFailure shardFailure = new ShardSearchFailure(new RuntimeException("test")); ShardSearchFailure shardFailure = new ShardSearchFailure(new RuntimeException("test"));
new DummyAbstractAsyncBulkByScrollAction() InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, false, null);
.onScrollResponse(new SearchResponse(null, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure })); new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(
new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure }));
BulkIndexByScrollResponse response = listener.get(); BulkIndexByScrollResponse response = listener.get();
assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class)); assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class));
assertThat(response.getSearchFailures(), contains(shardFailure)); assertThat(response.getSearchFailures(), contains(shardFailure));
assertFalse(response.isTimedOut());
assertNull(response.getReasonCancelled()); assertNull(response.getReasonCancelled());
assertThat(client.scrollsCleared, contains(scrollId)); assertThat(client.scrollsCleared, contains(scrollId));
} }
/**
* Mimicks search timeouts.
*/
public void testSearchTimeoutsAbortRequest() throws Exception {
InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, true, null);
new DummyAbstractAsyncBulkByScrollAction()
.onScrollResponse(new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[0]));
BulkIndexByScrollResponse response = listener.get();
assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class));
assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class));
assertTrue(response.isTimedOut());
assertNull(response.getReasonCancelled());
assertThat(client.scrollsCleared, contains(scrollId));
}
/** /**
* Mimicks bulk indexing failures. * Mimicks bulk indexing failures.
*/ */
@ -396,7 +414,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
public void testCancelBeforeStartNormalTermination() throws Exception { public void testCancelBeforeStartNormalTermination() throws Exception {
// Refresh or not doesn't matter - we don't try to refresh. // Refresh or not doesn't matter - we don't try to refresh.
mainRequest.setRefresh(usually()); mainRequest.setRefresh(usually());
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNormalTermination(emptyList(), emptyList())); cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNormalTermination(emptyList(), emptyList(), false));
// This wouldn't return if we called refresh - the action would hang waiting for the refresh that we haven't mocked. // This wouldn't return if we called refresh - the action would hang waiting for the refresh that we haven't mocked.
} }
@ -430,8 +448,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
@Override @Override
protected BulkIndexByScrollResponse buildResponse(TimeValue took, List<Failure> indexingFailures, protected BulkIndexByScrollResponse buildResponse(TimeValue took, List<Failure> indexingFailures,
List<ShardSearchFailure> searchFailures) { List<ShardSearchFailure> searchFailures, boolean timedOut) {
return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures); return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
} }
} }

View File

@ -102,7 +102,7 @@ public class RoundTripTests extends ESTestCase {
public void testReindexResponse() throws IOException { public void testReindexResponse() throws IOException {
ReindexResponse response = new ReindexResponse(timeValueMillis(randomPositiveLong()), randomStatus(), randomIndexingFailures(), ReindexResponse response = new ReindexResponse(timeValueMillis(randomPositiveLong()), randomStatus(), randomIndexingFailures(),
randomSearchFailures()); randomSearchFailures(), randomBoolean());
ReindexResponse tripped = new ReindexResponse(); ReindexResponse tripped = new ReindexResponse();
roundTrip(response, tripped); roundTrip(response, tripped);
assertResponseEquals(response, tripped); assertResponseEquals(response, tripped);
@ -110,7 +110,7 @@ public class RoundTripTests extends ESTestCase {
public void testBulkIndexByScrollResponse() throws IOException { public void testBulkIndexByScrollResponse() throws IOException {
BulkIndexByScrollResponse response = new BulkIndexByScrollResponse(timeValueMillis(randomPositiveLong()), randomStatus(), BulkIndexByScrollResponse response = new BulkIndexByScrollResponse(timeValueMillis(randomPositiveLong()), randomStatus(),
randomIndexingFailures(), randomSearchFailures()); randomIndexingFailures(), randomSearchFailures(), randomBoolean());
BulkIndexByScrollResponse tripped = new BulkIndexByScrollResponse(); BulkIndexByScrollResponse tripped = new BulkIndexByScrollResponse();
roundTrip(response, tripped); roundTrip(response, tripped);
assertResponseEquals(response, tripped); assertResponseEquals(response, tripped);

View File

@ -75,6 +75,7 @@
index: source index: source
dest: dest:
index: dest index: dest
- is_false: timed_out
- match: {task: '/.+:\d+/'} - match: {task: '/.+:\d+/'}
- set: {task: task} - set: {task: task}
- is_false: updated - is_false: updated

View File

@ -12,6 +12,7 @@
- do: - do:
update-by-query: update-by-query:
index: test index: test
- is_false: timed_out
- match: {updated: 1} - match: {updated: 1}
- match: {version_conflicts: 0} - match: {version_conflicts: 0}
- match: {batches: 1} - match: {batches: 1}

View File

@ -0,0 +1,29 @@
---
"Timeout":
- do:
index:
index: twitter
type: tweet
id: 1
body: { "user": "kimchy" }
- do:
indices.refresh: {}
- do:
catch: request_timeout
reindex:
refresh: true
body:
source:
index: twitter
timeout: 10ms
query:
script:
# Sleep 100x longer than the timeout. That should cause a timeout!
# Return true causes the document to try to be collected which is what actually triggers the timeout.
script: sleep(1000); return true
dest:
index: new_twitter
- is_true: timed_out
- match: {created: 0}
- match: {noops: 0}

View File

@ -0,0 +1,26 @@
---
"Timeout":
- do:
index:
index: twitter
type: tweet
id: 1
body: { "user": "kimchy" }
- do:
indices.refresh: {}
- do:
catch: request_timeout
update-by-query:
index: twitter
refresh: true
search_timeout: 10ms
body:
query:
script:
# Sleep 100x longer than the timeout. That should cause a timeout!
# Return true causes the document to try to be collected which is what actually triggers the timeout.
script: sleep(1000); return true
- is_true: timed_out
- match: {updated: 0}
- match: {noops: 0}

View File

@ -105,6 +105,10 @@
"options" : ["query_then_fetch", "dfs_query_then_fetch"], "options" : ["query_then_fetch", "dfs_query_then_fetch"],
"description" : "Search operation type" "description" : "Search operation type"
}, },
"search_timeout": {
"type" : "time",
"description" : "Explicit timeout for each search request. Defaults to no timeout."
},
"size": { "size": {
"type" : "number", "type" : "number",
"description" : "Number of hits to return (default: 10)" "description" : "Number of hits to return (default: 10)"