From 57d8025010a5d0bf60a0c98279ee2719fe336005 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 6 Oct 2016 02:59:07 -0400 Subject: [PATCH] cleanup --- .../elasticsearch/action/DocumentRequest.java | 48 ++++++++++++++++++- .../action/bulk/BulkItemRequest.java | 26 +--------- .../action/bulk/BulkItemResponse.java | 6 ++- .../action/bulk/BulkRequest.java | 26 +--------- .../action/bulk/TransportShardBulkAction.java | 7 ++- 5 files changed, 60 insertions(+), 53 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/DocumentRequest.java b/core/src/main/java/org/elasticsearch/action/DocumentRequest.java index ce957bb0d61..f4c88e159c7 100644 --- a/core/src/main/java/org/elasticsearch/action/DocumentRequest.java +++ b/core/src/main/java/org/elasticsearch/action/DocumentRequest.java @@ -18,9 +18,15 @@ */ package org.elasticsearch.action; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.VersionType; +import java.io.IOException; import java.util.Locale; /** @@ -125,7 +131,7 @@ public interface DocumentRequest extends IndicesRequest { OpType(int op) { this.op = (byte) op; - this.lowercase = this.toString().toLowerCase(Locale.ENGLISH); + this.lowercase = this.toString().toLowerCase(Locale.ROOT); } public byte getId() { @@ -147,7 +153,7 @@ public interface DocumentRequest extends IndicesRequest { } public static OpType fromString(String sOpType) { - String lowerCase = sOpType.toLowerCase(Locale.ENGLISH); + String lowerCase = sOpType.toLowerCase(Locale.ROOT); for (OpType opType : OpType.values()) { if (opType.getLowercase().equals(lowerCase)) { return opType; @@ -156,4 +162,42 @@ public interface DocumentRequest extends IndicesRequest { throw new IllegalArgumentException("Unknown opType: [" + sOpType + "]"); } } + + /** read a document write (index/delete/update) request */ + static DocumentRequest readDocumentRequest(StreamInput in) throws IOException { + byte type = in.readByte(); + final DocumentRequest documentRequest; + if (type == 0) { + IndexRequest indexRequest = new IndexRequest(); + indexRequest.readFrom(in); + documentRequest = indexRequest; + } else if (type == 1) { + DeleteRequest deleteRequest = new DeleteRequest(); + deleteRequest.readFrom(in); + documentRequest = deleteRequest; + } else if (type == 2) { + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.readFrom(in); + documentRequest = updateRequest; + } else { + throw new IllegalStateException("invalid request type [" + type+ " ]"); + } + return documentRequest; + } + + /** write a document write (index/delete/update) request*/ + static void writeDocumentRequest(StreamOutput out, DocumentRequest request) throws IOException { + if (request instanceof IndexRequest) { + out.writeByte((byte) 0); + ((IndexRequest) request).writeTo(out); + } else if (request instanceof DeleteRequest) { + out.writeByte((byte) 1); + ((DeleteRequest) request).writeTo(out); + } else if (request instanceof UpdateRequest) { + out.writeByte((byte) 2); + ((UpdateRequest) request).writeTo(out); + } else { + throw new IllegalStateException("invalid request [" + request.getClass().getSimpleName() + " ]"); + } + } } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 79503fcf9ee..079d4efe9bf 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -89,20 +89,7 @@ public class BulkItemRequest implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { id = in.readVInt(); - byte type = in.readByte(); - if (type == 0) { - IndexRequest indexRequest = new IndexRequest(); - indexRequest.readFrom(in); - request = indexRequest; - } else if (type == 1) { - DeleteRequest deleteRequest = new DeleteRequest(); - deleteRequest.readFrom(in); - request = deleteRequest; - } else if (type == 2) { - UpdateRequest updateRequest = new UpdateRequest(); - updateRequest.readFrom(in); - request = updateRequest; - } + request = DocumentRequest.readDocumentRequest(in); if (in.readBoolean()) { primaryResponse = BulkItemResponse.readBulkItem(in); } @@ -112,16 +99,7 @@ public class BulkItemRequest implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); - if (request instanceof IndexRequest) { - out.writeByte((byte) 0); - ((IndexRequest) request).writeTo(out); - } else if (request instanceof DeleteRequest) { - out.writeByte((byte) 1); - ((DeleteRequest) request).writeTo(out); - } else if (request instanceof UpdateRequest) { - out.writeByte((byte) 2); - ((UpdateRequest) request).writeTo(out); - } + DocumentRequest.writeDocumentRequest(out, request); out.writeOptionalStreamable(primaryResponse); out.writeBoolean(ignoreOnReplica); } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 83e8f8b7b46..9f0714784bc 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -328,7 +328,11 @@ public class BulkItemResponse implements Streamable, StatusToXContent { @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); - out.writeByte(opType.getId()); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { + out.writeByte(opType.getId()); + } else { + out.writeString(opType.getLowercase()); + } if (response == null) { out.writeByte((byte) 2); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 57a04314593..dc72407cf42 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -550,20 +550,7 @@ public class BulkRequest extends ActionRequest implements Composite waitForActiveShards = ActiveShardCount.readFrom(in); int size = in.readVInt(); for (int i = 0; i < size; i++) { - byte type = in.readByte(); - if (type == 0) { - IndexRequest request = new IndexRequest(); - request.readFrom(in); - requests.add(request); - } else if (type == 1) { - DeleteRequest request = new DeleteRequest(); - request.readFrom(in); - requests.add(request); - } else if (type == 2) { - UpdateRequest request = new UpdateRequest(); - request.readFrom(in); - requests.add(request); - } + requests.add(DocumentRequest.readDocumentRequest(in)); } refreshPolicy = RefreshPolicy.readFrom(in); timeout = new TimeValue(in); @@ -575,16 +562,7 @@ public class BulkRequest extends ActionRequest implements Composite waitForActiveShards.writeTo(out); out.writeVInt(requests.size()); for (DocumentRequest request : requests) { - if (request instanceof IndexRequest) { - out.writeByte((byte) 0); - ((IndexRequest) request).writeTo(out); - } else if (request instanceof DeleteRequest) { - out.writeByte((byte) 1); - ((DeleteRequest) request).writeTo(out); - } else if (request instanceof UpdateRequest) { - out.writeByte((byte) 2); - ((UpdateRequest) request).writeTo(out); - } + DocumentRequest.writeDocumentRequest(out, request); } refreshPolicy.writeTo(out); timeout.writeTo(out); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 098092ef1ed..9289fc0cab8 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -137,8 +137,11 @@ public class TransportShardBulkAction extends TransportWriteAction writeResult = innerExecuteBulkItemRequest(metaData, indexShard, request, requestIndex); - if (writeResult.getResponse().getResult() != DocWriteResponse.Result.NOOP) { + if (writeResult.getLocation() != null) { location = locationToSync(location, writeResult.getLocation()); + } else { + assert writeResult.getResponse().getResult() == DocWriteResponse.Result.NOOP + : "only noop operation can have null next operation"; } // update the bulk item request because update request execution can mutate the bulk item request BulkItemRequest item = request.items()[requestIndex]; @@ -157,7 +160,7 @@ public class TransportShardBulkAction extends TransportWriteAction documentRequest = item.request(); - if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { + if (isConflictException(e)) { logger.trace((Supplier) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", request.shardId(), documentRequest.opType().getLowercase(), request), e); } else {