This commit is contained in:
Areek Zillur 2016-10-06 02:59:07 -04:00
parent 9b691f0d93
commit 57d8025010
5 changed files with 60 additions and 53 deletions

View File

@ -18,9 +18,15 @@
*/
package org.elasticsearch.action;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.VersionType;
import java.io.IOException;
import java.util.Locale;
/**
@ -125,7 +131,7 @@ public interface DocumentRequest<T> extends IndicesRequest {
OpType(int op) {
this.op = (byte) op;
this.lowercase = this.toString().toLowerCase(Locale.ENGLISH);
this.lowercase = this.toString().toLowerCase(Locale.ROOT);
}
public byte getId() {
@ -147,7 +153,7 @@ public interface DocumentRequest<T> extends IndicesRequest {
}
public static OpType fromString(String sOpType) {
String lowerCase = sOpType.toLowerCase(Locale.ENGLISH);
String lowerCase = sOpType.toLowerCase(Locale.ROOT);
for (OpType opType : OpType.values()) {
if (opType.getLowercase().equals(lowerCase)) {
return opType;
@ -156,4 +162,42 @@ public interface DocumentRequest<T> extends IndicesRequest {
throw new IllegalArgumentException("Unknown opType: [" + sOpType + "]");
}
}
/** read a document write (index/delete/update) request */
static DocumentRequest readDocumentRequest(StreamInput in) throws IOException {
byte type = in.readByte();
final DocumentRequest documentRequest;
if (type == 0) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.readFrom(in);
documentRequest = indexRequest;
} else if (type == 1) {
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.readFrom(in);
documentRequest = deleteRequest;
} else if (type == 2) {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.readFrom(in);
documentRequest = updateRequest;
} else {
throw new IllegalStateException("invalid request type [" + type+ " ]");
}
return documentRequest;
}
/** write a document write (index/delete/update) request*/
static void writeDocumentRequest(StreamOutput out, DocumentRequest request) throws IOException {
if (request instanceof IndexRequest) {
out.writeByte((byte) 0);
((IndexRequest) request).writeTo(out);
} else if (request instanceof DeleteRequest) {
out.writeByte((byte) 1);
((DeleteRequest) request).writeTo(out);
} else if (request instanceof UpdateRequest) {
out.writeByte((byte) 2);
((UpdateRequest) request).writeTo(out);
} else {
throw new IllegalStateException("invalid request [" + request.getClass().getSimpleName() + " ]");
}
}
}

View File

@ -89,20 +89,7 @@ public class BulkItemRequest implements Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
id = in.readVInt();
byte type = in.readByte();
if (type == 0) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.readFrom(in);
request = indexRequest;
} else if (type == 1) {
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.readFrom(in);
request = deleteRequest;
} else if (type == 2) {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.readFrom(in);
request = updateRequest;
}
request = DocumentRequest.readDocumentRequest(in);
if (in.readBoolean()) {
primaryResponse = BulkItemResponse.readBulkItem(in);
}
@ -112,16 +99,7 @@ public class BulkItemRequest implements Streamable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
if (request instanceof IndexRequest) {
out.writeByte((byte) 0);
((IndexRequest) request).writeTo(out);
} else if (request instanceof DeleteRequest) {
out.writeByte((byte) 1);
((DeleteRequest) request).writeTo(out);
} else if (request instanceof UpdateRequest) {
out.writeByte((byte) 2);
((UpdateRequest) request).writeTo(out);
}
DocumentRequest.writeDocumentRequest(out, request);
out.writeOptionalStreamable(primaryResponse);
out.writeBoolean(ignoreOnReplica);
}

View File

@ -328,7 +328,11 @@ public class BulkItemResponse implements Streamable, StatusToXContent {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
out.writeByte(opType.getId());
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeByte(opType.getId());
} else {
out.writeString(opType.getLowercase());
}
if (response == null) {
out.writeByte((byte) 2);

View File

@ -550,20 +550,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
waitForActiveShards = ActiveShardCount.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
byte type = in.readByte();
if (type == 0) {
IndexRequest request = new IndexRequest();
request.readFrom(in);
requests.add(request);
} else if (type == 1) {
DeleteRequest request = new DeleteRequest();
request.readFrom(in);
requests.add(request);
} else if (type == 2) {
UpdateRequest request = new UpdateRequest();
request.readFrom(in);
requests.add(request);
}
requests.add(DocumentRequest.readDocumentRequest(in));
}
refreshPolicy = RefreshPolicy.readFrom(in);
timeout = new TimeValue(in);
@ -575,16 +562,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
waitForActiveShards.writeTo(out);
out.writeVInt(requests.size());
for (DocumentRequest<?> request : requests) {
if (request instanceof IndexRequest) {
out.writeByte((byte) 0);
((IndexRequest) request).writeTo(out);
} else if (request instanceof DeleteRequest) {
out.writeByte((byte) 1);
((DeleteRequest) request).writeTo(out);
} else if (request instanceof UpdateRequest) {
out.writeByte((byte) 2);
((UpdateRequest) request).writeTo(out);
}
DocumentRequest.writeDocumentRequest(out, request);
}
refreshPolicy.writeTo(out);
timeout.writeTo(out);

View File

@ -137,8 +137,11 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
try {
WriteResult<? extends DocWriteResponse> writeResult = innerExecuteBulkItemRequest(metaData, indexShard,
request, requestIndex);
if (writeResult.getResponse().getResult() != DocWriteResponse.Result.NOOP) {
if (writeResult.getLocation() != null) {
location = locationToSync(location, writeResult.getLocation());
} else {
assert writeResult.getResponse().getResult() == DocWriteResponse.Result.NOOP
: "only noop operation can have null next operation";
}
// update the bulk item request because update request execution can mutate the bulk item request
BulkItemRequest item = request.items()[requestIndex];
@ -157,7 +160,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
BulkItemRequest item = request.items()[requestIndex];
DocumentRequest<?> documentRequest = item.request();
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
if (isConflictException(e)) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
request.shardId(), documentRequest.opType().getLowercase(), request), e);
} else {