Bulk should support shard timeout like the index api

closes #4220
This commit is contained in:
Shay Banon 2013-11-25 11:48:22 +01:00
parent 2ba7c1d4a1
commit 32d073bbf8
4 changed files with 47 additions and 1 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.bulk;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.WriteConsistencyLevel;
@ -59,6 +60,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
final List<ActionRequest> requests = Lists.newArrayList(); final List<ActionRequest> requests = Lists.newArrayList();
List<Object> payloads = null; List<Object> payloads = null;
protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
private ReplicationType replicationType = ReplicationType.DEFAULT; private ReplicationType replicationType = ReplicationType.DEFAULT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private boolean refresh = false; private boolean refresh = false;
@ -426,6 +428,25 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
return this.replicationType; return this.replicationType;
} }
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
public final BulkRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
public final BulkRequest timeout(String timeout) {
return timeout(TimeValue.parseTimeValue(timeout, null));
}
public TimeValue timeout() {
return timeout;
}
private int findNextMarker(byte marker, int from, BytesReference data, int length) { private int findNextMarker(byte marker, int from, BytesReference data, int length) {
for (int i = from; i < length; i++) { for (int i = from; i < length; i++) {
if (data.get(i) == marker) { if (data.get(i) == marker) {
@ -477,6 +498,9 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
} }
} }
refresh = in.readBoolean(); refresh = in.readBoolean();
if (in.getVersion().after(Version.V_0_90_7)) {
timeout = TimeValue.readTimeValue(in);
}
} }
@Override @Override
@ -496,5 +520,8 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
request.writeTo(out); request.writeTo(out);
} }
out.writeBoolean(refresh); out.writeBoolean(refresh);
if (out.getVersion().after(Version.V_0_90_7)) {
timeout.writeTo(out);
}
} }
} }

View File

@ -32,6 +32,7 @@ import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.internal.InternalClient; import org.elasticsearch.client.internal.InternalClient;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
/** /**
* A bulk request holds an ordered {@link IndexRequest}s and {@link DeleteRequest}s and allows to executes * A bulk request holds an ordered {@link IndexRequest}s and {@link DeleteRequest}s and allows to executes
@ -78,7 +79,6 @@ public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkRe
} }
/** /**
* Adds an {@link DeleteRequest} to the list of actions to execute. * Adds an {@link DeleteRequest} to the list of actions to execute.
*/ */
@ -137,6 +137,22 @@ public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkRe
return this; return this;
} }
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
public final BulkRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return this;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
public final BulkRequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return this;
}
/** /**
* The number of actions currently in the bulk. * The number of actions currently in the bulk.
*/ */

View File

@ -241,6 +241,7 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId.index().name(), shardId.id(), bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()])); BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId.index().name(), shardId.id(), bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.replicationType(bulkRequest.replicationType()); bulkShardRequest.replicationType(bulkRequest.replicationType());
bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel()); bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel());
bulkShardRequest.timeout(bulkRequest.timeout());
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() { shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
@Override @Override
public void onResponse(BulkShardResponse bulkShardResponse) { public void onResponse(BulkShardResponse bulkShardResponse) {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -85,6 +86,7 @@ public class RestBulkAction extends BaseRestHandler {
if (consistencyLevel != null) { if (consistencyLevel != null) {
bulkRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); bulkRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
} }
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.refresh(request.paramAsBoolean("refresh", bulkRequest.refresh())); bulkRequest.refresh(request.paramAsBoolean("refresh", bulkRequest.refresh()));
try { try {
bulkRequest.add(request.content(), request.contentUnsafe(), defaultIndex, defaultType, defaultRouting, null, allowExplicitIndex); bulkRequest.add(request.content(), request.contentUnsafe(), defaultIndex, defaultType, defaultRouting, null, allowExplicitIndex);