mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 01:19:02 +00:00
Make bulk item-level requests implement DocumentRequest interface
Currently, bulk item requests can be any ActionRequest, this commit restricts bulk item requests to DocumentRequest. This simplifies handling failures during bulk requests. Additionally, a new enum is added to DocumentRequest to represent the intended operation to be performed by a document request. Now, index operation type also uses the new enum to specify whether the request should create or index a document.
This commit is contained in:
parent
d4dec26aa0
commit
80ca78479f
@ -19,11 +19,13 @@
|
||||
package org.elasticsearch.action;
|
||||
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
|
||||
import java.util.Locale;
|
||||
|
||||
/**
|
||||
* Generic interface to group ActionRequest, which work on single document level
|
||||
*
|
||||
* Forces this class return index/type/id getters
|
||||
* Generic interface to group ActionRequest, which perform writes to a single document
|
||||
* Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest}
|
||||
*/
|
||||
public interface DocumentRequest<T> extends IndicesRequest {
|
||||
|
||||
@ -70,4 +72,78 @@ public interface DocumentRequest<T> extends IndicesRequest {
|
||||
*/
|
||||
String parent();
|
||||
|
||||
/**
|
||||
* Get the document version for this request
|
||||
* @return the document version
|
||||
*/
|
||||
long version();
|
||||
|
||||
/**
|
||||
* Sets the version, which will perform the operation only if a matching
|
||||
* version exists and no changes happened on the doc since then.
|
||||
*/
|
||||
T version(long version);
|
||||
|
||||
/**
|
||||
* Get the document version type for this request
|
||||
* @return the document version type
|
||||
*/
|
||||
VersionType versionType();
|
||||
|
||||
/**
|
||||
* Sets the versioning type. Defaults to {@link VersionType#INTERNAL}.
|
||||
*/
|
||||
T versionType(VersionType versionType);
|
||||
|
||||
/**
|
||||
* Get the requested document operation type of the request
|
||||
* @return the operation type {@link OpType}
|
||||
*/
|
||||
OpType opType();
|
||||
|
||||
/**
|
||||
* Requested operation type to perform on the document
|
||||
*/
|
||||
enum OpType {
|
||||
/**
|
||||
* Creates the resource. Simply adds it to the index, if there is an existing
|
||||
* document with the id, then it won't be removed.
|
||||
*/
|
||||
CREATE(0),
|
||||
/**
|
||||
* Index the source. If there an existing document with the id, it will
|
||||
* be replaced.
|
||||
*/
|
||||
INDEX(1),
|
||||
/** Updates a document */
|
||||
UPDATE(2),
|
||||
/** Deletes a document */
|
||||
DELETE(3);
|
||||
|
||||
private final byte op;
|
||||
private final String lowercase;
|
||||
|
||||
OpType(int op) {
|
||||
this.op = (byte) op;
|
||||
this.lowercase = this.toString().toLowerCase(Locale.ENGLISH);
|
||||
}
|
||||
|
||||
public byte getId() {
|
||||
return op;
|
||||
}
|
||||
|
||||
public String getLowercase() {
|
||||
return lowercase;
|
||||
}
|
||||
|
||||
public static OpType fromId(byte id) {
|
||||
switch (id) {
|
||||
case 0: return CREATE;
|
||||
case 1: return INDEX;
|
||||
case 2: return UPDATE;
|
||||
case 3: return DELETE;
|
||||
default: throw new IllegalArgumentException("Unknown opType: [" + id + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,8 +19,7 @@
|
||||
|
||||
package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
@ -36,7 +35,7 @@ import java.io.IOException;
|
||||
public class BulkItemRequest implements Streamable {
|
||||
|
||||
private int id;
|
||||
private ActionRequest request;
|
||||
private DocumentRequest<?> request;
|
||||
private volatile BulkItemResponse primaryResponse;
|
||||
private volatile boolean ignoreOnReplica;
|
||||
|
||||
@ -44,8 +43,7 @@ public class BulkItemRequest implements Streamable {
|
||||
|
||||
}
|
||||
|
||||
public BulkItemRequest(int id, ActionRequest request) {
|
||||
assert request instanceof IndicesRequest;
|
||||
public BulkItemRequest(int id, DocumentRequest<?> request) {
|
||||
this.id = id;
|
||||
this.request = request;
|
||||
}
|
||||
@ -54,14 +52,13 @@ public class BulkItemRequest implements Streamable {
|
||||
return id;
|
||||
}
|
||||
|
||||
public ActionRequest request() {
|
||||
public DocumentRequest<?> request() {
|
||||
return request;
|
||||
}
|
||||
|
||||
public String index() {
|
||||
IndicesRequest indicesRequest = (IndicesRequest) request;
|
||||
assert indicesRequest.indices().length == 1;
|
||||
return indicesRequest.indices()[0];
|
||||
assert request.indices().length == 1;
|
||||
return request.indices()[0];
|
||||
}
|
||||
|
||||
BulkItemResponse getPrimaryResponse() {
|
||||
@ -94,13 +91,18 @@ public class BulkItemRequest implements Streamable {
|
||||
id = in.readVInt();
|
||||
byte type = in.readByte();
|
||||
if (type == 0) {
|
||||
request = new IndexRequest();
|
||||
IndexRequest indexRequest = new IndexRequest();
|
||||
indexRequest.readFrom(in);
|
||||
request = indexRequest;
|
||||
} else if (type == 1) {
|
||||
request = new DeleteRequest();
|
||||
DeleteRequest deleteRequest = new DeleteRequest();
|
||||
deleteRequest.readFrom(in);
|
||||
request = deleteRequest;
|
||||
} else if (type == 2) {
|
||||
request = new UpdateRequest();
|
||||
UpdateRequest updateRequest = new UpdateRequest();
|
||||
updateRequest.readFrom(in);
|
||||
request = updateRequest;
|
||||
}
|
||||
request.readFrom(in);
|
||||
if (in.readBoolean()) {
|
||||
primaryResponse = BulkItemResponse.readBulkItem(in);
|
||||
}
|
||||
@ -112,12 +114,14 @@ public class BulkItemRequest implements Streamable {
|
||||
out.writeVInt(id);
|
||||
if (request instanceof IndexRequest) {
|
||||
out.writeByte((byte) 0);
|
||||
((IndexRequest) request).writeTo(out);
|
||||
} else if (request instanceof DeleteRequest) {
|
||||
out.writeByte((byte) 1);
|
||||
((DeleteRequest) request).writeTo(out);
|
||||
} else if (request instanceof UpdateRequest) {
|
||||
out.writeByte((byte) 2);
|
||||
((UpdateRequest) request).writeTo(out);
|
||||
}
|
||||
request.writeTo(out);
|
||||
out.writeOptionalStreamable(primaryResponse);
|
||||
out.writeBoolean(ignoreOnReplica);
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.DocumentRequest.OpType;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
@ -50,7 +51,7 @@ public class BulkItemResponse implements Streamable, StatusToXContent {
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(opType);
|
||||
builder.startObject(opType.getLowercase());
|
||||
if (failure == null) {
|
||||
response.toXContent(builder, params);
|
||||
builder.field(Fields.STATUS, response.status().getStatus());
|
||||
@ -183,7 +184,7 @@ public class BulkItemResponse implements Streamable, StatusToXContent {
|
||||
|
||||
private int id;
|
||||
|
||||
private String opType;
|
||||
private OpType opType;
|
||||
|
||||
private DocWriteResponse response;
|
||||
|
||||
@ -193,13 +194,13 @@ public class BulkItemResponse implements Streamable, StatusToXContent {
|
||||
|
||||
}
|
||||
|
||||
public BulkItemResponse(int id, String opType, DocWriteResponse response) {
|
||||
public BulkItemResponse(int id, OpType opType, DocWriteResponse response) {
|
||||
this.id = id;
|
||||
this.opType = opType;
|
||||
this.response = response;
|
||||
this.opType = opType;
|
||||
}
|
||||
|
||||
public BulkItemResponse(int id, String opType, Failure failure) {
|
||||
public BulkItemResponse(int id, OpType opType, Failure failure) {
|
||||
this.id = id;
|
||||
this.opType = opType;
|
||||
this.failure = failure;
|
||||
@ -215,7 +216,7 @@ public class BulkItemResponse implements Streamable, StatusToXContent {
|
||||
/**
|
||||
* The operation type ("index", "create" or "delete").
|
||||
*/
|
||||
public String getOpType() {
|
||||
public OpType getOpType() {
|
||||
return this.opType;
|
||||
}
|
||||
|
||||
@ -300,7 +301,7 @@ public class BulkItemResponse implements Streamable, StatusToXContent {
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
id = in.readVInt();
|
||||
opType = in.readString();
|
||||
opType = OpType.fromId(in.readByte());
|
||||
|
||||
byte type = in.readByte();
|
||||
if (type == 0) {
|
||||
@ -322,7 +323,7 @@ public class BulkItemResponse implements Streamable, StatusToXContent {
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(id);
|
||||
out.writeString(opType);
|
||||
out.writeByte(opType.getId());
|
||||
|
||||
if (response == null) {
|
||||
out.writeByte((byte) 2);
|
||||
|
@ -19,7 +19,7 @@
|
||||
|
||||
package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
@ -250,24 +250,24 @@ public class BulkProcessor implements Closeable {
|
||||
* (for example, if no id is provided, one will be generated, or usage of the create flag).
|
||||
*/
|
||||
public BulkProcessor add(IndexRequest request) {
|
||||
return add((ActionRequest<?>) request);
|
||||
return add((DocumentRequest<?>) request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an {@link DeleteRequest} to the list of actions to execute.
|
||||
*/
|
||||
public BulkProcessor add(DeleteRequest request) {
|
||||
return add((ActionRequest<?>) request);
|
||||
return add((DocumentRequest<?>) request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds either a delete or an index request.
|
||||
*/
|
||||
public BulkProcessor add(ActionRequest<?> request) {
|
||||
public BulkProcessor add(DocumentRequest<?> request) {
|
||||
return add(request, null);
|
||||
}
|
||||
|
||||
public BulkProcessor add(ActionRequest<?> request, @Nullable Object payload) {
|
||||
public BulkProcessor add(DocumentRequest<?> request, @Nullable Object payload) {
|
||||
internalAdd(request, payload);
|
||||
return this;
|
||||
}
|
||||
@ -282,7 +282,7 @@ public class BulkProcessor implements Closeable {
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void internalAdd(ActionRequest<?> request, @Nullable Object payload) {
|
||||
private synchronized void internalAdd(DocumentRequest<?> request, @Nullable Object payload) {
|
||||
ensureOpen();
|
||||
bulkRequest.add(request, payload);
|
||||
executeIfNeeded();
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.CompositeIndicesRequest;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
@ -46,6 +47,7 @@ import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
|
||||
@ -65,7 +67,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
||||
* {@link WriteRequest}s to this but java doesn't support syntax to declare that everything in the array has both types so we declare
|
||||
* the one with the least casts.
|
||||
*/
|
||||
final List<ActionRequest<?>> requests = new ArrayList<>();
|
||||
final List<DocumentRequest<?>> requests = new ArrayList<>();
|
||||
List<Object> payloads = null;
|
||||
|
||||
protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
|
||||
@ -80,14 +82,14 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
||||
/**
|
||||
* Adds a list of requests to be executed. Either index or delete requests.
|
||||
*/
|
||||
public BulkRequest add(ActionRequest<?>... requests) {
|
||||
for (ActionRequest<?> request : requests) {
|
||||
public BulkRequest add(DocumentRequest<?>... requests) {
|
||||
for (DocumentRequest<?> request : requests) {
|
||||
add(request, null);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public BulkRequest add(ActionRequest<?> request) {
|
||||
public BulkRequest add(DocumentRequest<?> request) {
|
||||
return add(request, null);
|
||||
}
|
||||
|
||||
@ -97,7 +99,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
||||
* @param payload Optional payload
|
||||
* @return the current bulk request
|
||||
*/
|
||||
public BulkRequest add(ActionRequest<?> request, @Nullable Object payload) {
|
||||
public BulkRequest add(DocumentRequest<?> request, @Nullable Object payload) {
|
||||
if (request instanceof IndexRequest) {
|
||||
add((IndexRequest) request, payload);
|
||||
} else if (request instanceof DeleteRequest) {
|
||||
@ -113,8 +115,8 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
||||
/**
|
||||
* Adds a list of requests to be executed. Either index or delete requests.
|
||||
*/
|
||||
public BulkRequest add(Iterable<ActionRequest<?>> requests) {
|
||||
for (ActionRequest<?> request : requests) {
|
||||
public BulkRequest add(Iterable<DocumentRequest<?>> requests) {
|
||||
for (DocumentRequest<?> request : requests) {
|
||||
add(request);
|
||||
}
|
||||
return this;
|
||||
@ -200,18 +202,13 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
||||
/**
|
||||
* The list of requests in this bulk request.
|
||||
*/
|
||||
public List<ActionRequest<?>> requests() {
|
||||
public List<DocumentRequest<?>> requests() {
|
||||
return this.requests;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<? extends IndicesRequest> subRequests() {
|
||||
List<IndicesRequest> indicesRequests = new ArrayList<>();
|
||||
for (ActionRequest<?> request : requests) {
|
||||
assert request instanceof IndicesRequest;
|
||||
indicesRequests.add((IndicesRequest) request);
|
||||
}
|
||||
return indicesRequests;
|
||||
return requests.stream().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -497,7 +494,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
||||
* @return Whether this bulk request contains index request with an ingest pipeline enabled.
|
||||
*/
|
||||
public boolean hasIndexRequestsWithPipelines() {
|
||||
for (ActionRequest<?> actionRequest : requests) {
|
||||
for (DocumentRequest<?> actionRequest : requests) {
|
||||
if (actionRequest instanceof IndexRequest) {
|
||||
IndexRequest indexRequest = (IndexRequest) actionRequest;
|
||||
if (Strings.hasText(indexRequest.getPipeline())) {
|
||||
@ -515,13 +512,13 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
||||
if (requests.isEmpty()) {
|
||||
validationException = addValidationError("no requests added", validationException);
|
||||
}
|
||||
for (ActionRequest<?> request : requests) {
|
||||
for (DocumentRequest<?> request : requests) {
|
||||
// We first check if refresh has been set
|
||||
if (((WriteRequest<?>) request).getRefreshPolicy() != RefreshPolicy.NONE) {
|
||||
validationException = addValidationError(
|
||||
"RefreshPolicy is not supported on an item request. Set it on the BulkRequest instead.", validationException);
|
||||
}
|
||||
ActionRequestValidationException ex = request.validate();
|
||||
ActionRequestValidationException ex = ((WriteRequest<?>) request).validate();
|
||||
if (ex != null) {
|
||||
if (validationException == null) {
|
||||
validationException = new ActionRequestValidationException();
|
||||
@ -563,15 +560,17 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
||||
super.writeTo(out);
|
||||
waitForActiveShards.writeTo(out);
|
||||
out.writeVInt(requests.size());
|
||||
for (ActionRequest<?> request : requests) {
|
||||
for (DocumentRequest<?> request : requests) {
|
||||
if (request instanceof IndexRequest) {
|
||||
out.writeByte((byte) 0);
|
||||
((IndexRequest) request).writeTo(out);
|
||||
} else if (request instanceof DeleteRequest) {
|
||||
out.writeByte((byte) 1);
|
||||
((DeleteRequest) request).writeTo(out);
|
||||
} else if (request instanceof UpdateRequest) {
|
||||
out.writeByte((byte) 2);
|
||||
((UpdateRequest) request).writeTo(out);
|
||||
}
|
||||
request.writeTo(out);
|
||||
}
|
||||
refreshPolicy.writeTo(out);
|
||||
timeout.writeTo(out);
|
||||
|
@ -19,11 +19,9 @@
|
||||
|
||||
package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.RoutingMissingException;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
@ -58,18 +56,18 @@ import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.LongSupplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
*
|
||||
* Groups bulk request items by shard, optionally creating non-existent indices and
|
||||
* delegates to {@link TransportShardBulkAction} for shard-level bulk execution
|
||||
*/
|
||||
public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
|
||||
|
||||
@ -119,15 +117,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
|
||||
if (needToCheck()) {
|
||||
// Keep track of all unique indices and all unique types per index for the create index requests:
|
||||
final Set<String> autoCreateIndices = new HashSet<>();
|
||||
for (ActionRequest request : bulkRequest.requests) {
|
||||
if (request instanceof DocumentRequest) {
|
||||
DocumentRequest req = (DocumentRequest) request;
|
||||
autoCreateIndices.add(req.index());
|
||||
} else {
|
||||
throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
final Set<String> autoCreateIndices = bulkRequest.requests.stream()
|
||||
.map(DocumentRequest::index)
|
||||
.collect(Collectors.toSet());
|
||||
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
|
||||
ClusterState state = clusterService.state();
|
||||
for (String index : autoCreateIndices) {
|
||||
@ -153,7 +145,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
if (!(ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException)) {
|
||||
// fail all requests involving this index, if create didnt work
|
||||
for (int i = 0; i < bulkRequest.requests.size(); i++) {
|
||||
ActionRequest request = bulkRequest.requests.get(i);
|
||||
DocumentRequest<?> request = bulkRequest.requests.get(i);
|
||||
if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
|
||||
bulkRequest.requests.set(i, null);
|
||||
}
|
||||
@ -188,27 +180,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
return autoCreateIndex.shouldAutoCreate(index, state);
|
||||
}
|
||||
|
||||
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, ActionRequest request, String index, Exception e) {
|
||||
if (request instanceof IndexRequest) {
|
||||
IndexRequest indexRequest = (IndexRequest) request;
|
||||
if (index.equals(indexRequest.index())) {
|
||||
responses.set(idx, new BulkItemResponse(idx, "index", new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e)));
|
||||
return true;
|
||||
}
|
||||
} else if (request instanceof DeleteRequest) {
|
||||
DeleteRequest deleteRequest = (DeleteRequest) request;
|
||||
if (index.equals(deleteRequest.index())) {
|
||||
responses.set(idx, new BulkItemResponse(idx, "delete", new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), e)));
|
||||
return true;
|
||||
}
|
||||
} else if (request instanceof UpdateRequest) {
|
||||
UpdateRequest updateRequest = (UpdateRequest) request;
|
||||
if (index.equals(updateRequest.index())) {
|
||||
responses.set(idx, new BulkItemResponse(idx, "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), e)));
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName());
|
||||
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocumentRequest<?> request, String index, Exception e) {
|
||||
if (index.equals(request.index())) {
|
||||
responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(), request.id(), e)));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@ -236,95 +211,56 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
|
||||
MetaData metaData = clusterState.metaData();
|
||||
for (int i = 0; i < bulkRequest.requests.size(); i++) {
|
||||
ActionRequest request = bulkRequest.requests.get(i);
|
||||
DocumentRequest<?> documentRequest = bulkRequest.requests.get(i);
|
||||
//the request can only be null because we set it to null in the previous step, so it gets ignored
|
||||
if (request == null) {
|
||||
if (documentRequest == null) {
|
||||
continue;
|
||||
}
|
||||
DocumentRequest documentRequest = (DocumentRequest) request;
|
||||
if (addFailureIfIndexIsUnavailable(documentRequest, bulkRequest, responses, i, concreteIndices, metaData)) {
|
||||
continue;
|
||||
}
|
||||
Index concreteIndex = concreteIndices.resolveIfAbsent(documentRequest);
|
||||
if (request instanceof IndexRequest) {
|
||||
IndexRequest indexRequest = (IndexRequest) request;
|
||||
MappingMetaData mappingMd = null;
|
||||
final IndexMetaData indexMetaData = metaData.index(concreteIndex);
|
||||
if (indexMetaData != null) {
|
||||
mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
|
||||
try {
|
||||
switch (documentRequest.opType()) {
|
||||
case CREATE:
|
||||
case INDEX:
|
||||
IndexRequest indexRequest = (IndexRequest) documentRequest;
|
||||
MappingMetaData mappingMd = null;
|
||||
final IndexMetaData indexMetaData = metaData.index(concreteIndex);
|
||||
if (indexMetaData != null) {
|
||||
mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
|
||||
}
|
||||
indexRequest.resolveRouting(metaData);
|
||||
indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
|
||||
break;
|
||||
case UPDATE:
|
||||
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest)documentRequest);
|
||||
break;
|
||||
case DELETE:
|
||||
TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest)documentRequest);
|
||||
break;
|
||||
default: throw new AssertionError("request type not supported: [" + documentRequest.opType() + "]");
|
||||
}
|
||||
try {
|
||||
indexRequest.resolveRouting(metaData);
|
||||
indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
|
||||
} catch (ElasticsearchParseException | RoutingMissingException e) {
|
||||
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), indexRequest.type(), indexRequest.id(), e);
|
||||
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
|
||||
responses.set(i, bulkItemResponse);
|
||||
// make sure the request gets never processed again
|
||||
bulkRequest.requests.set(i, null);
|
||||
}
|
||||
} else if (request instanceof DeleteRequest) {
|
||||
try {
|
||||
TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest)request);
|
||||
} catch(RoutingMissingException e) {
|
||||
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), documentRequest.type(), documentRequest.id(), e);
|
||||
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "delete", failure);
|
||||
responses.set(i, bulkItemResponse);
|
||||
// make sure the request gets never processed again
|
||||
bulkRequest.requests.set(i, null);
|
||||
}
|
||||
|
||||
} else if (request instanceof UpdateRequest) {
|
||||
try {
|
||||
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest)request);
|
||||
} catch(RoutingMissingException e) {
|
||||
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), documentRequest.type(), documentRequest.id(), e);
|
||||
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "update", failure);
|
||||
responses.set(i, bulkItemResponse);
|
||||
// make sure the request gets never processed again
|
||||
bulkRequest.requests.set(i, null);
|
||||
}
|
||||
} else {
|
||||
throw new AssertionError("request type not supported: [" + request.getClass().getName() + "]");
|
||||
} catch (ElasticsearchParseException | RoutingMissingException e) {
|
||||
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), documentRequest.type(), documentRequest.id(), e);
|
||||
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, documentRequest.opType(), failure);
|
||||
responses.set(i, bulkItemResponse);
|
||||
// make sure the request gets never processed again
|
||||
bulkRequest.requests.set(i, null);
|
||||
}
|
||||
}
|
||||
|
||||
// first, go over all the requests and create a ShardId -> Operations mapping
|
||||
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
|
||||
|
||||
for (int i = 0; i < bulkRequest.requests.size(); i++) {
|
||||
ActionRequest request = bulkRequest.requests.get(i);
|
||||
if (request instanceof IndexRequest) {
|
||||
IndexRequest indexRequest = (IndexRequest) request;
|
||||
String concreteIndex = concreteIndices.getConcreteIndex(indexRequest.index()).getName();
|
||||
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, indexRequest.id(), indexRequest.routing()).shardId();
|
||||
List<BulkItemRequest> list = requestsByShard.get(shardId);
|
||||
if (list == null) {
|
||||
list = new ArrayList<>();
|
||||
requestsByShard.put(shardId, list);
|
||||
}
|
||||
list.add(new BulkItemRequest(i, request));
|
||||
} else if (request instanceof DeleteRequest) {
|
||||
DeleteRequest deleteRequest = (DeleteRequest) request;
|
||||
String concreteIndex = concreteIndices.getConcreteIndex(deleteRequest.index()).getName();
|
||||
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, deleteRequest.id(), deleteRequest.routing()).shardId();
|
||||
List<BulkItemRequest> list = requestsByShard.get(shardId);
|
||||
if (list == null) {
|
||||
list = new ArrayList<>();
|
||||
requestsByShard.put(shardId, list);
|
||||
}
|
||||
list.add(new BulkItemRequest(i, request));
|
||||
} else if (request instanceof UpdateRequest) {
|
||||
UpdateRequest updateRequest = (UpdateRequest) request;
|
||||
String concreteIndex = concreteIndices.getConcreteIndex(updateRequest.index()).getName();
|
||||
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, updateRequest.id(), updateRequest.routing()).shardId();
|
||||
List<BulkItemRequest> list = requestsByShard.get(shardId);
|
||||
if (list == null) {
|
||||
list = new ArrayList<>();
|
||||
requestsByShard.put(shardId, list);
|
||||
}
|
||||
list.add(new BulkItemRequest(i, request));
|
||||
DocumentRequest<?> request = bulkRequest.requests.get(i);
|
||||
if (request == null) {
|
||||
continue;
|
||||
}
|
||||
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
|
||||
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
|
||||
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
|
||||
shardRequests.add(new BulkItemRequest(i, request));
|
||||
}
|
||||
|
||||
if (requestsByShard.isEmpty()) {
|
||||
@ -364,19 +300,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
// create failures for all relevant requests
|
||||
for (BulkItemRequest request : requests) {
|
||||
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
|
||||
if (request.request() instanceof IndexRequest) {
|
||||
IndexRequest indexRequest = (IndexRequest) request.request();
|
||||
responses.set(request.id(), new BulkItemResponse(request.id(), indexRequest.opType().toString().toLowerCase(Locale.ENGLISH),
|
||||
new BulkItemResponse.Failure(indexName, indexRequest.type(), indexRequest.id(), e)));
|
||||
} else if (request.request() instanceof DeleteRequest) {
|
||||
DeleteRequest deleteRequest = (DeleteRequest) request.request();
|
||||
responses.set(request.id(), new BulkItemResponse(request.id(), "delete",
|
||||
new BulkItemResponse.Failure(indexName, deleteRequest.type(), deleteRequest.id(), e)));
|
||||
} else if (request.request() instanceof UpdateRequest) {
|
||||
UpdateRequest updateRequest = (UpdateRequest) request.request();
|
||||
responses.set(request.id(), new BulkItemResponse(request.id(), "update",
|
||||
new BulkItemResponse.Failure(indexName, updateRequest.type(), updateRequest.id(), e)));
|
||||
}
|
||||
DocumentRequest<?> documentRequest = request.request();
|
||||
responses.set(request.id(), new BulkItemResponse(request.id(), documentRequest.opType(),
|
||||
new BulkItemResponse.Failure(indexName, documentRequest.type(), documentRequest.id(), e)));
|
||||
}
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
finishHim();
|
||||
@ -413,15 +339,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||
if (unavailableException != null) {
|
||||
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(),
|
||||
unavailableException);
|
||||
String operationType = "unknown";
|
||||
if (request instanceof IndexRequest) {
|
||||
operationType = "index";
|
||||
} else if (request instanceof DeleteRequest) {
|
||||
operationType = "delete";
|
||||
} else if (request instanceof UpdateRequest) {
|
||||
operationType = "update";
|
||||
}
|
||||
BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, operationType, failure);
|
||||
BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, request.opType(), failure);
|
||||
responses.set(idx, bulkItemResponse);
|
||||
// make sure the request gets never processed again
|
||||
bulkRequest.requests.set(idx, null);
|
||||
|
@ -71,9 +71,6 @@ import static org.elasticsearch.action.support.replication.ReplicationOperation.
|
||||
*/
|
||||
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardResponse> {
|
||||
|
||||
private static final String OP_TYPE_UPDATE = "update";
|
||||
private static final String OP_TYPE_DELETE = "delete";
|
||||
|
||||
public static final String ACTION_NAME = BulkAction.NAME + "[s]";
|
||||
|
||||
private final UpdateHelper updateHelper;
|
||||
@ -157,7 +154,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||
location = locationToSync(location, result.getLocation());
|
||||
// add the response
|
||||
IndexResponse indexResponse = result.getResponse();
|
||||
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse));
|
||||
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType(), indexResponse));
|
||||
} catch (Exception e) {
|
||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
||||
if (retryPrimaryException(e)) {
|
||||
@ -174,7 +171,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||
if (item.getPrimaryResponse() != null && isConflictException(e)) {
|
||||
setResponse(item, item.getPrimaryResponse());
|
||||
} else {
|
||||
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
|
||||
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType(),
|
||||
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e)));
|
||||
}
|
||||
}
|
||||
@ -199,7 +196,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||
final WriteResult<DeleteResponse> writeResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
|
||||
DeleteResponse deleteResponse = writeResult.getResponse();
|
||||
location = locationToSync(location, writeResult.getLocation());
|
||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse));
|
||||
setResponse(item, new BulkItemResponse(item.id(), deleteRequest.opType(), deleteResponse));
|
||||
} catch (Exception e) {
|
||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
||||
if (retryPrimaryException(e)) {
|
||||
@ -216,7 +213,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||
if (item.getPrimaryResponse() != null && isConflictException(e)) {
|
||||
setResponse(item, item.getPrimaryResponse());
|
||||
} else {
|
||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE,
|
||||
setResponse(item, new BulkItemResponse(item.id(), deleteRequest.opType(),
|
||||
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e)));
|
||||
}
|
||||
}
|
||||
@ -254,7 +251,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
|
||||
}
|
||||
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
|
||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse));
|
||||
setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), updateResponse));
|
||||
break;
|
||||
case DELETED:
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -265,10 +262,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
|
||||
// Replace the update request to the translated delete request to execute on the replica.
|
||||
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
|
||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse));
|
||||
setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), updateResponse));
|
||||
break;
|
||||
case NOOP:
|
||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResult.noopResult));
|
||||
setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), updateResult.noopResult));
|
||||
item.setIgnoreOnReplica(); // no need to go to the replica
|
||||
break;
|
||||
default:
|
||||
@ -281,7 +278,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||
if (updateResult.retry) {
|
||||
// updateAttemptCount is 0 based and marks current attempt, if it's equal to retryOnConflict we are going out of the iteration
|
||||
if (updateAttemptsCount >= updateRequest.retryOnConflict()) {
|
||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
|
||||
setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(),
|
||||
new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e)));
|
||||
}
|
||||
} else {
|
||||
@ -299,20 +296,20 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
||||
if (item.getPrimaryResponse() != null && isConflictException(e)) {
|
||||
setResponse(item, item.getPrimaryResponse());
|
||||
} else if (updateResult.result == null) {
|
||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e)));
|
||||
setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(), new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e)));
|
||||
} else {
|
||||
switch (updateResult.result.getResponseResult()) {
|
||||
case CREATED:
|
||||
case UPDATED:
|
||||
IndexRequest indexRequest = updateResult.request();
|
||||
logFailure(e, "index", request.shardId(), indexRequest);
|
||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
|
||||
setResponse(item, new BulkItemResponse(item.id(), updateRequest.opType(),
|
||||
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e)));
|
||||
break;
|
||||
case DELETED:
|
||||
DeleteRequest deleteRequest = updateResult.request();
|
||||
logFailure(e, "delete", request.shardId(), deleteRequest);
|
||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE,
|
||||
setResponse(item, new BulkItemResponse(item.id(), deleteRequest.opType(),
|
||||
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e)));
|
||||
break;
|
||||
default:
|
||||
|
@ -164,28 +164,33 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest> impleme
|
||||
return this.routing;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the version, which will cause the delete operation to only be performed if a matching
|
||||
* version exists and no changes happened on the doc since then.
|
||||
*/
|
||||
@Override
|
||||
public DeleteRequest version(long version) {
|
||||
this.version = version;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long version() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteRequest versionType(VersionType versionType) {
|
||||
this.versionType = versionType;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionType versionType() {
|
||||
return this.versionType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpType opType() {
|
||||
return OpType.DELETE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
|
@ -69,67 +69,6 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
*/
|
||||
public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implements DocumentRequest<IndexRequest> {
|
||||
|
||||
/**
|
||||
* Operation type controls if the type of the index operation.
|
||||
*/
|
||||
public static enum OpType {
|
||||
/**
|
||||
* Index the source. If there an existing document with the id, it will
|
||||
* be replaced.
|
||||
*/
|
||||
INDEX((byte) 0),
|
||||
/**
|
||||
* Creates the resource. Simply adds it to the index, if there is an existing
|
||||
* document with the id, then it won't be removed.
|
||||
*/
|
||||
CREATE((byte) 1);
|
||||
|
||||
private final byte id;
|
||||
private final String lowercase;
|
||||
|
||||
OpType(byte id) {
|
||||
this.id = id;
|
||||
this.lowercase = this.toString().toLowerCase(Locale.ENGLISH);
|
||||
}
|
||||
|
||||
/**
|
||||
* The internal representation of the operation type.
|
||||
*/
|
||||
public byte id() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public String lowercase() {
|
||||
return this.lowercase;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs the operation type from its internal representation.
|
||||
*/
|
||||
public static OpType fromId(byte id) {
|
||||
if (id == 0) {
|
||||
return INDEX;
|
||||
} else if (id == 1) {
|
||||
return CREATE;
|
||||
} else {
|
||||
throw new IllegalArgumentException("No type match for [" + id + "]");
|
||||
}
|
||||
}
|
||||
|
||||
public static OpType fromString(String sOpType) {
|
||||
String lowersOpType = sOpType.toLowerCase(Locale.ROOT);
|
||||
switch (lowersOpType) {
|
||||
case "create":
|
||||
return OpType.CREATE;
|
||||
case "index":
|
||||
return OpType.INDEX;
|
||||
default:
|
||||
throw new IllegalArgumentException("opType [" + sOpType + "] not allowed, either [index] or [create] are allowed");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private String type;
|
||||
private String id;
|
||||
@Nullable
|
||||
@ -506,6 +445,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
|
||||
* Sets the type of operation to perform.
|
||||
*/
|
||||
public IndexRequest opType(OpType opType) {
|
||||
if (opType != OpType.CREATE && opType != OpType.INDEX) {
|
||||
throw new IllegalArgumentException("opType must be 'create' or 'index', found: [" + opType + "]");
|
||||
}
|
||||
this.opType = opType;
|
||||
if (opType == OpType.CREATE) {
|
||||
version(Versions.MATCH_DELETED);
|
||||
@ -515,11 +457,19 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a string representation of the {@link #opType(org.elasticsearch.action.index.IndexRequest.OpType)}. Can
|
||||
* Sets a string representation of the {@link #opType(OpType)}. Can
|
||||
* be either "index" or "create".
|
||||
*/
|
||||
public IndexRequest opType(String opType) {
|
||||
return opType(OpType.fromString(opType));
|
||||
String op = opType.toLowerCase(Locale.ROOT);
|
||||
if (op.equals("create")) {
|
||||
opType(OpType.CREATE);
|
||||
} else if (op.equals("index")) {
|
||||
opType(OpType.INDEX);
|
||||
} else {
|
||||
throw new IllegalArgumentException("opType must be 'create' or 'index', found: [" + opType + "]");
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@ -534,34 +484,29 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The type of operation to perform.
|
||||
*/
|
||||
@Override
|
||||
public OpType opType() {
|
||||
return this.opType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the version, which will cause the index operation to only be performed if a matching
|
||||
* version exists and no changes happened on the doc since then.
|
||||
*/
|
||||
@Override
|
||||
public IndexRequest version(long version) {
|
||||
this.version = version;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long version() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the versioning type. Defaults to {@link VersionType#INTERNAL}.
|
||||
*/
|
||||
@Override
|
||||
public IndexRequest versionType(VersionType versionType) {
|
||||
this.versionType = versionType;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionType versionType() {
|
||||
return this.versionType;
|
||||
}
|
||||
@ -651,7 +596,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
|
||||
out.writeOptionalString(timestamp);
|
||||
out.writeOptionalWriteable(ttl);
|
||||
out.writeBytesReference(source);
|
||||
out.writeByte(opType.id());
|
||||
out.writeByte(opType.getId());
|
||||
out.writeLong(version);
|
||||
out.writeByte(versionType.getValue());
|
||||
out.writeOptionalString(pipeline);
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.action.index;
|
||||
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.support.WriteRequestBuilder;
|
||||
import org.elasticsearch.action.support.replication.ReplicationRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
@ -200,17 +201,17 @@ public class IndexRequestBuilder extends ReplicationRequestBuilder<IndexRequest,
|
||||
/**
|
||||
* Sets the type of operation to perform.
|
||||
*/
|
||||
public IndexRequestBuilder setOpType(IndexRequest.OpType opType) {
|
||||
public IndexRequestBuilder setOpType(DocumentRequest.OpType opType) {
|
||||
request.opType(opType);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a string representation of the {@link #setOpType(org.elasticsearch.action.index.IndexRequest.OpType)}. Can
|
||||
* Sets a string representation of the {@link #setOpType(DocumentRequest.OpType)}. Can
|
||||
* be either "index" or "create".
|
||||
*/
|
||||
public IndexRequestBuilder setOpType(String opType) {
|
||||
request.opType(IndexRequest.OpType.fromString(opType));
|
||||
request.opType(opType);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.action.ingest;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.bulk.BulkAction;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
@ -132,7 +133,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
static final class BulkRequestModifier implements Iterator<ActionRequest<?>> {
|
||||
static final class BulkRequestModifier implements Iterator<DocumentRequest<?>> {
|
||||
|
||||
final BulkRequest bulkRequest;
|
||||
final Set<Integer> failedSlots;
|
||||
@ -148,7 +149,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequest next() {
|
||||
public DocumentRequest<?> next() {
|
||||
return bulkRequest.requests().get(++currentSlot);
|
||||
}
|
||||
|
||||
@ -169,7 +170,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
||||
int slot = 0;
|
||||
originalSlots = new int[bulkRequest.requests().size() - failedSlots.size()];
|
||||
for (int i = 0; i < bulkRequest.requests().size(); i++) {
|
||||
ActionRequest request = bulkRequest.requests().get(i);
|
||||
DocumentRequest<?> request = bulkRequest.requests().get(i);
|
||||
if (failedSlots.contains(i) == false) {
|
||||
modifiedBulkRequest.add(request);
|
||||
originalSlots[slot++] = i;
|
||||
@ -205,7 +206,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
||||
// 3) Continue with the next request in the bulk.
|
||||
failedSlots.add(currentSlot);
|
||||
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e);
|
||||
itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType().lowercase(), failure));
|
||||
itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -21,7 +21,6 @@ package org.elasticsearch.action.termvectors;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.RealtimeRequest;
|
||||
import org.elasticsearch.action.ValidateActions;
|
||||
import org.elasticsearch.action.get.MultiGetRequest;
|
||||
@ -56,7 +55,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
* Note, the {@link #index()}, {@link #type(String)} and {@link #id(String)} are
|
||||
* required.
|
||||
*/
|
||||
public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> implements DocumentRequest<TermVectorsRequest>, RealtimeRequest {
|
||||
public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> implements RealtimeRequest {
|
||||
|
||||
private String type;
|
||||
|
||||
@ -200,7 +199,6 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
|
||||
/**
|
||||
* Returns the type of document to get the term vector for.
|
||||
*/
|
||||
@Override
|
||||
public String type() {
|
||||
return type;
|
||||
}
|
||||
@ -208,7 +206,6 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
|
||||
/**
|
||||
* Returns the id of document the term vector is requested for.
|
||||
*/
|
||||
@Override
|
||||
public String id() {
|
||||
return id;
|
||||
}
|
||||
@ -250,18 +247,15 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
|
||||
/**
|
||||
* @return The routing for this request.
|
||||
*/
|
||||
@Override
|
||||
public String routing() {
|
||||
return routing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TermVectorsRequest routing(String routing) {
|
||||
this.routing = routing;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String parent() {
|
||||
return parent;
|
||||
}
|
||||
|
@ -398,31 +398,33 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
|
||||
return this.retryOnConflict;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the version, which will cause the index operation to only be performed if a matching
|
||||
* version exists and no changes happened on the doc since then.
|
||||
*/
|
||||
@Override
|
||||
public UpdateRequest version(long version) {
|
||||
this.version = version;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long version() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the versioning type. Defaults to {@link VersionType#INTERNAL}.
|
||||
*/
|
||||
@Override
|
||||
public UpdateRequest versionType(VersionType versionType) {
|
||||
this.versionType = versionType;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public VersionType versionType() {
|
||||
return this.versionType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpType opType() {
|
||||
return OpType.UPDATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateRequest setRefreshPolicy(RefreshPolicy refreshPolicy) {
|
||||
this.refreshPolicy = refreshPolicy;
|
||||
|
@ -19,7 +19,7 @@
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
@ -68,7 +68,7 @@ public class PipelineExecutionService implements ClusterStateListener {
|
||||
});
|
||||
}
|
||||
|
||||
public void executeBulkRequest(Iterable<ActionRequest<?>> actionRequests,
|
||||
public void executeBulkRequest(Iterable<DocumentRequest<?>> actionRequests,
|
||||
BiConsumer<IndexRequest, Exception> itemFailureHandler,
|
||||
Consumer<Exception> completionHandler) {
|
||||
threadPool.executor(ThreadPool.Names.BULK).execute(new AbstractRunnable() {
|
||||
@ -80,7 +80,7 @@ public class PipelineExecutionService implements ClusterStateListener {
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
for (ActionRequest actionRequest : actionRequests) {
|
||||
for (DocumentRequest<?> actionRequest : actionRequests) {
|
||||
if ((actionRequest instanceof IndexRequest)) {
|
||||
IndexRequest indexRequest = (IndexRequest) actionRequest;
|
||||
if (Strings.hasText(indexRequest.getPipeline())) {
|
||||
|
@ -86,7 +86,7 @@ public class RestIndexAction extends BaseRestHandler {
|
||||
String sOpType = request.param("op_type");
|
||||
if (sOpType != null) {
|
||||
try {
|
||||
indexRequest.opType(IndexRequest.OpType.fromString(sOpType));
|
||||
indexRequest.opType(sOpType);
|
||||
} catch (IllegalArgumentException eia){
|
||||
try {
|
||||
XContentBuilder builder = channel.newErrorBuilder();
|
||||
|
@ -20,8 +20,8 @@
|
||||
package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
|
||||
@ -111,7 +111,7 @@ public class BulkRequestTests extends ESTestCase {
|
||||
|
||||
public void testBulkAddIterable() {
|
||||
BulkRequest bulkRequest = Requests.bulkRequest();
|
||||
List<ActionRequest<?>> requests = new ArrayList<>();
|
||||
List<DocumentRequest<?>> requests = new ArrayList<>();
|
||||
requests.add(new IndexRequest("test", "test", "id").source("field", "value"));
|
||||
requests.add(new UpdateRequest("test", "test", "id").doc("field", "value"));
|
||||
requests.add(new DeleteRequest("test", "test", "id"));
|
||||
|
@ -47,6 +47,7 @@ import java.util.Map;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.action.DocumentRequest.OpType;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.script.ScriptService.ScriptType;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
@ -319,7 +320,7 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
|
||||
assertThat(response.getItems()[i].getVersion(), equalTo(1L));
|
||||
assertThat(response.getItems()[i].getIndex(), equalTo("test"));
|
||||
assertThat(response.getItems()[i].getType(), equalTo("type1"));
|
||||
assertThat(response.getItems()[i].getOpType(), equalTo("update"));
|
||||
assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE));
|
||||
assertThat(response.getItems()[i].getResponse().getId(), equalTo(Integer.toString(i)));
|
||||
assertThat(response.getItems()[i].getResponse().getVersion(), equalTo(1L));
|
||||
assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue(), equalTo(1));
|
||||
@ -357,7 +358,7 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
|
||||
assertThat(response.getItems()[i].getVersion(), equalTo(2L));
|
||||
assertThat(response.getItems()[i].getIndex(), equalTo("test"));
|
||||
assertThat(response.getItems()[i].getType(), equalTo("type1"));
|
||||
assertThat(response.getItems()[i].getOpType(), equalTo("update"));
|
||||
assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE));
|
||||
assertThat(response.getItems()[i].getResponse().getId(), equalTo(Integer.toString(i)));
|
||||
assertThat(response.getItems()[i].getResponse().getVersion(), equalTo(2L));
|
||||
assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue(), equalTo(2));
|
||||
@ -381,7 +382,7 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
|
||||
assertThat(response.getItems()[i].getVersion(), equalTo(3L));
|
||||
assertThat(response.getItems()[i].getIndex(), equalTo("test"));
|
||||
assertThat(response.getItems()[i].getType(), equalTo("type1"));
|
||||
assertThat(response.getItems()[i].getOpType(), equalTo("update"));
|
||||
assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE));
|
||||
}
|
||||
}
|
||||
|
||||
@ -398,7 +399,7 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
|
||||
assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i)));
|
||||
assertThat(response.getItems()[i].getIndex(), equalTo("test"));
|
||||
assertThat(response.getItems()[i].getType(), equalTo("type1"));
|
||||
assertThat(response.getItems()[i].getOpType(), equalTo("update"));
|
||||
assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE));
|
||||
}
|
||||
|
||||
builder = client().prepareBulk();
|
||||
@ -414,7 +415,7 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
|
||||
assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i)));
|
||||
assertThat(response.getItems()[i].getIndex(), equalTo("test"));
|
||||
assertThat(response.getItems()[i].getType(), equalTo("type1"));
|
||||
assertThat(response.getItems()[i].getOpType(), equalTo("update"));
|
||||
assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE));
|
||||
for (int j = 0; j < 5; j++) {
|
||||
GetResponse getResponse = client().prepareGet("test", "type1", Integer.toString(i)).setFields("counter").execute()
|
||||
.actionGet();
|
||||
@ -755,12 +756,12 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
|
||||
|
||||
assertNoFailures(indexBulkItemResponse);
|
||||
assertThat(bulkItemResponse.getItems().length, is(6));
|
||||
assertThat(bulkItemResponse.getItems()[0].getOpType(), is("index"));
|
||||
assertThat(bulkItemResponse.getItems()[1].getOpType(), is("index"));
|
||||
assertThat(bulkItemResponse.getItems()[2].getOpType(), is("update"));
|
||||
assertThat(bulkItemResponse.getItems()[3].getOpType(), is("update"));
|
||||
assertThat(bulkItemResponse.getItems()[4].getOpType(), is("delete"));
|
||||
assertThat(bulkItemResponse.getItems()[5].getOpType(), is("delete"));
|
||||
assertThat(bulkItemResponse.getItems()[0].getOpType(), is(OpType.INDEX));
|
||||
assertThat(bulkItemResponse.getItems()[1].getOpType(), is(OpType.INDEX));
|
||||
assertThat(bulkItemResponse.getItems()[2].getOpType(), is(OpType.UPDATE));
|
||||
assertThat(bulkItemResponse.getItems()[3].getOpType(), is(OpType.UPDATE));
|
||||
assertThat(bulkItemResponse.getItems()[4].getOpType(), is(OpType.DELETE));
|
||||
assertThat(bulkItemResponse.getItems()[5].getOpType(), is(OpType.DELETE));
|
||||
}
|
||||
|
||||
private static String indexOrAlias() {
|
||||
@ -805,9 +806,9 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
|
||||
assertThat(bulkResponse.hasFailures(), is(true));
|
||||
BulkItemResponse[] responseItems = bulkResponse.getItems();
|
||||
assertThat(responseItems.length, is(3));
|
||||
assertThat(responseItems[0].getOpType(), is("index"));
|
||||
assertThat(responseItems[1].getOpType(), is("update"));
|
||||
assertThat(responseItems[2].getOpType(), is("delete"));
|
||||
assertThat(responseItems[0].getOpType(), is(OpType.INDEX));
|
||||
assertThat(responseItems[1].getOpType(), is(OpType.UPDATE));
|
||||
assertThat(responseItems[2].getOpType(), is(OpType.DELETE));
|
||||
}
|
||||
|
||||
// issue 9821
|
||||
@ -817,9 +818,9 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
|
||||
.add(client().prepareUpdate().setIndex("INVALID.NAME").setType("type1").setId("1").setDoc("field", randomInt()))
|
||||
.add(client().prepareDelete().setIndex("INVALID.NAME").setType("type1").setId("1")).get();
|
||||
assertThat(bulkResponse.getItems().length, is(3));
|
||||
assertThat(bulkResponse.getItems()[0].getOpType(), is("index"));
|
||||
assertThat(bulkResponse.getItems()[1].getOpType(), is("update"));
|
||||
assertThat(bulkResponse.getItems()[2].getOpType(), is("delete"));
|
||||
assertThat(bulkResponse.getItems()[0].getOpType(), is(OpType.INDEX));
|
||||
assertThat(bulkResponse.getItems()[1].getOpType(), is(OpType.UPDATE));
|
||||
assertThat(bulkResponse.getItems()[2].getOpType(), is(OpType.DELETE));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,12 @@ package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.DocumentRequest.OpType;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
@ -212,11 +217,11 @@ public class RetryTests extends ESTestCase {
|
||||
}
|
||||
|
||||
private BulkItemResponse successfulResponse() {
|
||||
return new BulkItemResponse(1, "update", new DeleteResponse());
|
||||
return new BulkItemResponse(1, OpType.DELETE, new DeleteResponse());
|
||||
}
|
||||
|
||||
private BulkItemResponse failedResponse() {
|
||||
return new BulkItemResponse(1, "update", new BulkItemResponse.Failure("test", "test", "1", new EsRejectedExecutionException("pool full")));
|
||||
return new BulkItemResponse(1, OpType.INDEX, new BulkItemResponse.Failure("test", "test", "1", new EsRejectedExecutionException("pool full")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.elasticsearch.action.index;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
@ -43,18 +44,24 @@ public class IndexRequestTests extends ESTestCase {
|
||||
String createUpper = "CREATE";
|
||||
String indexUpper = "INDEX";
|
||||
|
||||
assertThat(IndexRequest.OpType.fromString(create), equalTo(IndexRequest.OpType.CREATE));
|
||||
assertThat(IndexRequest.OpType.fromString(index), equalTo(IndexRequest.OpType.INDEX));
|
||||
assertThat(IndexRequest.OpType.fromString(createUpper), equalTo(IndexRequest.OpType.CREATE));
|
||||
assertThat(IndexRequest.OpType.fromString(indexUpper), equalTo(IndexRequest.OpType.INDEX));
|
||||
IndexRequest indexRequest = new IndexRequest("");
|
||||
indexRequest.opType(create);
|
||||
assertThat(indexRequest.opType() , equalTo(DocumentRequest.OpType.CREATE));
|
||||
indexRequest.opType(createUpper);
|
||||
assertThat(indexRequest.opType() , equalTo(DocumentRequest.OpType.CREATE));
|
||||
indexRequest.opType(index);
|
||||
assertThat(indexRequest.opType() , equalTo(DocumentRequest.OpType.INDEX));
|
||||
indexRequest.opType(indexUpper);
|
||||
assertThat(indexRequest.opType() , equalTo(DocumentRequest.OpType.INDEX));
|
||||
}
|
||||
|
||||
public void testReadBogusString() {
|
||||
try {
|
||||
IndexRequest.OpType.fromString("foobar");
|
||||
IndexRequest indexRequest = new IndexRequest("");
|
||||
indexRequest.opType("foobar");
|
||||
fail("Expected IllegalArgumentException");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("opType [foobar] not allowed"));
|
||||
assertThat(e.getMessage(), equalTo("opType must be 'create' or 'index', found: [foobar]"));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,7 @@ package org.elasticsearch.action.ingest;
|
||||
*/
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
@ -116,10 +116,10 @@ public class BulkRequestModifierTests extends ESTestCase {
|
||||
});
|
||||
|
||||
List<BulkItemResponse> originalResponses = new ArrayList<>();
|
||||
for (ActionRequest actionRequest : bulkRequest.requests()) {
|
||||
for (DocumentRequest actionRequest : bulkRequest.requests()) {
|
||||
IndexRequest indexRequest = (IndexRequest) actionRequest;
|
||||
IndexResponse indexResponse = new IndexResponse(new ShardId("index", "_na_", 0), indexRequest.type(), indexRequest.id(), 1, true);
|
||||
originalResponses.add(new BulkItemResponse(Integer.parseInt(indexRequest.id()), indexRequest.opType().lowercase(), indexResponse));
|
||||
originalResponses.add(new BulkItemResponse(Integer.parseInt(indexRequest.id()), indexRequest.opType(), indexResponse));
|
||||
}
|
||||
bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[originalResponses.size()]), 0));
|
||||
|
||||
|
@ -20,7 +20,7 @@
|
||||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.bulk.BulkAction;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
@ -174,7 +174,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
||||
int numRequest = scaledRandomIntBetween(8, 64);
|
||||
for (int i = 0; i < numRequest; i++) {
|
||||
if (rarely()) {
|
||||
ActionRequest request;
|
||||
DocumentRequest request;
|
||||
if (randomBoolean()) {
|
||||
request = new DeleteRequest("_index", "_type", "_id");
|
||||
} else {
|
||||
@ -196,7 +196,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
||||
verifyZeroInteractions(actionListener);
|
||||
|
||||
int assertedRequests = 0;
|
||||
for (ActionRequest actionRequest : bulkRequest.requests()) {
|
||||
for (DocumentRequest actionRequest : bulkRequest.requests()) {
|
||||
if (actionRequest instanceof IndexRequest) {
|
||||
IndexRequest indexRequest = (IndexRequest) actionRequest;
|
||||
assertThat(indexRequest.sourceAsMap().size(), equalTo(2));
|
||||
|
@ -36,6 +36,7 @@ import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.action.DocumentRequest.OpType;
|
||||
import static org.elasticsearch.client.Requests.clearIndicesCacheRequest;
|
||||
import static org.elasticsearch.client.Requests.getRequest;
|
||||
import static org.elasticsearch.client.Requests.indexRequest;
|
||||
@ -190,31 +191,31 @@ public class DocumentActionsIT extends ESIntegTestCase {
|
||||
assertThat(bulkResponse.getItems().length, equalTo(5));
|
||||
|
||||
assertThat(bulkResponse.getItems()[0].isFailed(), equalTo(false));
|
||||
assertThat(bulkResponse.getItems()[0].getOpType(), equalTo("index"));
|
||||
assertThat(bulkResponse.getItems()[0].getOpType(), equalTo(OpType.INDEX));
|
||||
assertThat(bulkResponse.getItems()[0].getIndex(), equalTo(getConcreteIndexName()));
|
||||
assertThat(bulkResponse.getItems()[0].getType(), equalTo("type1"));
|
||||
assertThat(bulkResponse.getItems()[0].getId(), equalTo("1"));
|
||||
|
||||
assertThat(bulkResponse.getItems()[1].isFailed(), equalTo(false));
|
||||
assertThat(bulkResponse.getItems()[1].getOpType(), equalTo("create"));
|
||||
assertThat(bulkResponse.getItems()[1].getOpType(), equalTo(OpType.CREATE));
|
||||
assertThat(bulkResponse.getItems()[1].getIndex(), equalTo(getConcreteIndexName()));
|
||||
assertThat(bulkResponse.getItems()[1].getType(), equalTo("type1"));
|
||||
assertThat(bulkResponse.getItems()[1].getId(), equalTo("2"));
|
||||
|
||||
assertThat(bulkResponse.getItems()[2].isFailed(), equalTo(false));
|
||||
assertThat(bulkResponse.getItems()[2].getOpType(), equalTo("index"));
|
||||
assertThat(bulkResponse.getItems()[2].getOpType(), equalTo(OpType.INDEX));
|
||||
assertThat(bulkResponse.getItems()[2].getIndex(), equalTo(getConcreteIndexName()));
|
||||
assertThat(bulkResponse.getItems()[2].getType(), equalTo("type1"));
|
||||
String generatedId3 = bulkResponse.getItems()[2].getId();
|
||||
|
||||
assertThat(bulkResponse.getItems()[3].isFailed(), equalTo(false));
|
||||
assertThat(bulkResponse.getItems()[3].getOpType(), equalTo("delete"));
|
||||
assertThat(bulkResponse.getItems()[3].getOpType(), equalTo(OpType.DELETE));
|
||||
assertThat(bulkResponse.getItems()[3].getIndex(), equalTo(getConcreteIndexName()));
|
||||
assertThat(bulkResponse.getItems()[3].getType(), equalTo("type1"));
|
||||
assertThat(bulkResponse.getItems()[3].getId(), equalTo("1"));
|
||||
|
||||
assertThat(bulkResponse.getItems()[4].isFailed(), equalTo(true));
|
||||
assertThat(bulkResponse.getItems()[4].getOpType(), equalTo("index"));
|
||||
assertThat(bulkResponse.getItems()[4].getOpType(), equalTo(OpType.INDEX));
|
||||
assertThat(bulkResponse.getItems()[4].getIndex(), equalTo(getConcreteIndexName()));
|
||||
assertThat(bulkResponse.getItems()[4].getType(), equalTo("type1"));
|
||||
|
||||
|
@ -21,7 +21,7 @@ package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
@ -314,7 +314,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
||||
int numRequest = scaledRandomIntBetween(8, 64);
|
||||
int numIndexRequests = 0;
|
||||
for (int i = 0; i < numRequest; i++) {
|
||||
ActionRequest request;
|
||||
DocumentRequest request;
|
||||
if (randomBoolean()) {
|
||||
if (randomBoolean()) {
|
||||
request = new DeleteRequest("_index", "_type", "_id");
|
||||
|
@ -20,6 +20,8 @@
|
||||
package org.elasticsearch.routing;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.RoutingMissingException;
|
||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
@ -259,7 +261,7 @@ public class SimpleRoutingIT extends ESIntegTestCase {
|
||||
|
||||
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
||||
assertThat(bulkItemResponse.isFailed(), equalTo(true));
|
||||
assertThat(bulkItemResponse.getOpType(), equalTo("index"));
|
||||
assertThat(bulkItemResponse.getOpType(), equalTo(DocumentRequest.OpType.INDEX));
|
||||
assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class));
|
||||
assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]"));
|
||||
@ -280,7 +282,7 @@ public class SimpleRoutingIT extends ESIntegTestCase {
|
||||
|
||||
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
||||
assertThat(bulkItemResponse.isFailed(), equalTo(true));
|
||||
assertThat(bulkItemResponse.getOpType(), equalTo("update"));
|
||||
assertThat(bulkItemResponse.getOpType(), equalTo(DocumentRequest.OpType.UPDATE));
|
||||
assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class));
|
||||
assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]"));
|
||||
@ -301,7 +303,7 @@ public class SimpleRoutingIT extends ESIntegTestCase {
|
||||
|
||||
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
||||
assertThat(bulkItemResponse.isFailed(), equalTo(true));
|
||||
assertThat(bulkItemResponse.getOpType(), equalTo("delete"));
|
||||
assertThat(bulkItemResponse.getOpType(), equalTo(DocumentRequest.OpType.DELETE));
|
||||
assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class));
|
||||
assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]"));
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.versioning;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
@ -724,7 +725,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
|
||||
client()
|
||||
.prepareIndex("test", "type", "id")
|
||||
.setSource("foo", "bar")
|
||||
.setOpType(IndexRequest.OpType.INDEX)
|
||||
.setOpType(DocumentRequest.OpType.INDEX)
|
||||
.setVersion(10)
|
||||
.setVersionType(VersionType.EXTERNAL)
|
||||
.execute()
|
||||
@ -793,7 +794,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
|
||||
client()
|
||||
.prepareIndex("test", "type", "id")
|
||||
.setSource("foo", "bar")
|
||||
.setOpType(IndexRequest.OpType.INDEX)
|
||||
.setOpType(DocumentRequest.OpType.INDEX)
|
||||
.setVersion(10)
|
||||
.setVersionType(VersionType.EXTERNAL)
|
||||
.execute()
|
||||
|
@ -256,22 +256,21 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
recordFailure(item.getFailure(), failures);
|
||||
continue;
|
||||
}
|
||||
|
||||
switch (item.getOpType()) {
|
||||
case "index":
|
||||
case "create":
|
||||
IndexResponse ir = item.getResponse();
|
||||
if (ir.getResult() == DocWriteResponse.Result.CREATED) {
|
||||
task.countCreated();
|
||||
} else {
|
||||
case CREATE:
|
||||
case INDEX:
|
||||
if (item.getResponse().getResult() == DocWriteResponse.Result.CREATED) {
|
||||
task.countCreated();
|
||||
} else {
|
||||
task.countUpdated();
|
||||
}
|
||||
break;
|
||||
case UPDATE:
|
||||
task.countUpdated();
|
||||
}
|
||||
break;
|
||||
case "delete":
|
||||
task.countDeleted();
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown op type: " + item.getOpType());
|
||||
break;
|
||||
case DELETE:
|
||||
task.countDeleted();
|
||||
break;
|
||||
}
|
||||
// Track the indexes we've seen so we can refresh them if requested
|
||||
destinationIndicesThisBatch.add(item.getIndex());
|
||||
|
@ -20,7 +20,7 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
@ -154,9 +154,9 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends Abstr
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for the {@link ActionRequest} that are used in this action class.
|
||||
* Wrapper for the {@link DocumentRequest} that are used in this action class.
|
||||
*/
|
||||
interface RequestWrapper<Self extends ActionRequest<Self>> {
|
||||
interface RequestWrapper<Self extends DocumentRequest<Self>> {
|
||||
|
||||
void setIndex(String index);
|
||||
|
||||
|
@ -28,6 +28,8 @@ import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.DocWriteResponse.Result;
|
||||
import org.elasticsearch.action.DocumentRequest;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
@ -257,35 +259,36 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
BulkItemResponse[] responses = new BulkItemResponse[randomIntBetween(0, 100)];
|
||||
for (int i = 0; i < responses.length; i++) {
|
||||
ShardId shardId = new ShardId(new Index("name", "uid"), 0);
|
||||
String opType;
|
||||
if (rarely()) {
|
||||
opType = randomSimpleString(random());
|
||||
versionConflicts++;
|
||||
responses[i] = new BulkItemResponse(i, opType, new Failure(shardId.getIndexName(), "type", "id" + i,
|
||||
responses[i] = new BulkItemResponse(i, randomFrom(DocumentRequest.OpType.values()),
|
||||
new Failure(shardId.getIndexName(), "type", "id" + i,
|
||||
new VersionConflictEngineException(shardId, "type", "id", "test")));
|
||||
continue;
|
||||
}
|
||||
boolean createdResponse;
|
||||
DocumentRequest.OpType opType;
|
||||
switch (randomIntBetween(0, 2)) {
|
||||
case 0:
|
||||
opType = randomFrom("index", "create");
|
||||
createdResponse = true;
|
||||
opType = DocumentRequest.OpType.CREATE;
|
||||
created++;
|
||||
break;
|
||||
case 1:
|
||||
opType = randomFrom("index", "create");
|
||||
createdResponse = false;
|
||||
opType = randomFrom(DocumentRequest.OpType.INDEX, DocumentRequest.OpType.UPDATE);
|
||||
updated++;
|
||||
break;
|
||||
case 2:
|
||||
opType = "delete";
|
||||
createdResponse = false;
|
||||
opType = DocumentRequest.OpType.DELETE;
|
||||
deleted++;
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Bad scenario");
|
||||
}
|
||||
responses[i] = new BulkItemResponse(i, opType, new IndexResponse(shardId, "type", "id" + i, randomInt(), createdResponse));
|
||||
responses[i] = new BulkItemResponse(i, opType,
|
||||
new IndexResponse(shardId, "type", "id" + i, randomInt(), createdResponse));
|
||||
}
|
||||
new DummyAbstractAsyncBulkByScrollAction().onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(responses, 0));
|
||||
assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts());
|
||||
@ -359,7 +362,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
public void testBulkFailuresAbortRequest() throws Exception {
|
||||
Failure failure = new Failure("index", "type", "id", new RuntimeException("test"));
|
||||
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
|
||||
BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[] {new BulkItemResponse(0, "index", failure)}, randomLong());
|
||||
BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]
|
||||
{new BulkItemResponse(0, DocumentRequest.OpType.CREATE, failure)}, randomLong());
|
||||
action.onBulkResponse(timeValueNanos(System.nanoTime()), bulkResponse);
|
||||
BulkIndexByScrollResponse response = listener.get();
|
||||
assertThat(response.getBulkFailures(), contains(failure));
|
||||
@ -765,33 +769,29 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
}
|
||||
BulkItemResponse[] responses = new BulkItemResponse[bulk.requests().size()];
|
||||
for (int i = 0; i < bulk.requests().size(); i++) {
|
||||
ActionRequest<?> item = bulk.requests().get(i);
|
||||
String opType;
|
||||
DocumentRequest<?> item = bulk.requests().get(i);
|
||||
DocWriteResponse response;
|
||||
ShardId shardId = new ShardId(new Index(((ReplicationRequest<?>) item).index(), "uuid"), 0);
|
||||
ShardId shardId = new ShardId(new Index(item.index(), "uuid"), 0);
|
||||
if (item instanceof IndexRequest) {
|
||||
IndexRequest index = (IndexRequest) item;
|
||||
opType = index.opType().lowercase();
|
||||
response = new IndexResponse(shardId, index.type(), index.id(), randomIntBetween(0, Integer.MAX_VALUE),
|
||||
true);
|
||||
} else if (item instanceof UpdateRequest) {
|
||||
UpdateRequest update = (UpdateRequest) item;
|
||||
opType = "update";
|
||||
response = new UpdateResponse(shardId, update.type(), update.id(),
|
||||
randomIntBetween(0, Integer.MAX_VALUE), DocWriteResponse.Result.CREATED);
|
||||
randomIntBetween(0, Integer.MAX_VALUE), Result.CREATED);
|
||||
} else if (item instanceof DeleteRequest) {
|
||||
DeleteRequest delete = (DeleteRequest) item;
|
||||
opType = "delete";
|
||||
response = new DeleteResponse(shardId, delete.type(), delete.id(), randomIntBetween(0, Integer.MAX_VALUE),
|
||||
true);
|
||||
} else {
|
||||
throw new RuntimeException("Unknown request: " + item);
|
||||
}
|
||||
if (i == toReject) {
|
||||
responses[i] = new BulkItemResponse(i, opType,
|
||||
responses[i] = new BulkItemResponse(i, item.opType(),
|
||||
new Failure(response.getIndex(), response.getType(), response.getId(), new EsRejectedExecutionException()));
|
||||
} else {
|
||||
responses[i] = new BulkItemResponse(i, opType, response);
|
||||
responses[i] = new BulkItemResponse(i, item.opType(), response);
|
||||
}
|
||||
}
|
||||
listener.onResponse((Response) new BulkResponse(responses, 1));
|
||||
|
@ -28,7 +28,7 @@ import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static org.elasticsearch.action.index.IndexRequest.OpType.CREATE;
|
||||
import static org.elasticsearch.action.DocumentRequest.OpType.CREATE;
|
||||
import static org.hamcrest.Matchers.both;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.either;
|
||||
|
@ -21,7 +21,7 @@ package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
|
||||
import static org.elasticsearch.action.index.IndexRequest.OpType.CREATE;
|
||||
import static org.elasticsearch.action.DocumentRequest.OpType.CREATE;
|
||||
import static org.elasticsearch.index.VersionType.EXTERNAL;
|
||||
import static org.elasticsearch.index.VersionType.INTERNAL;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user