diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 079d4efe9bf..8c7786cb2ad 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -35,7 +35,7 @@ import java.io.IOException; public class BulkItemRequest implements Streamable { private int id; - private DocumentRequest request; + private DocumentRequest request; private volatile BulkItemResponse primaryResponse; private volatile boolean ignoreOnReplica; @@ -43,7 +43,7 @@ public class BulkItemRequest implements Streamable { } - public BulkItemRequest(int id, DocumentRequest request) { + public BulkItemRequest(int id, DocumentRequest request) { this.id = id; this.request = request; } @@ -52,7 +52,7 @@ public class BulkItemRequest implements Streamable { return id; } - public DocumentRequest request() { + public DocumentRequest request() { return request; } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index f32bfaa775c..55347b2da13 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -250,24 +250,24 @@ public class BulkProcessor implements Closeable { * (for example, if no id is provided, one will be generated, or usage of the create flag). */ public BulkProcessor add(IndexRequest request) { - return add((DocumentRequest) request); + return add((DocumentRequest) request); } /** * Adds an {@link DeleteRequest} to the list of actions to execute. */ public BulkProcessor add(DeleteRequest request) { - return add((DocumentRequest) request); + return add((DocumentRequest) request); } /** * Adds either a delete or an index request. */ - public BulkProcessor add(DocumentRequest request) { + public BulkProcessor add(DocumentRequest request) { return add(request, null); } - public BulkProcessor add(DocumentRequest request, @Nullable Object payload) { + public BulkProcessor add(DocumentRequest request, @Nullable Object payload) { internalAdd(request, payload); return this; } @@ -282,7 +282,7 @@ public class BulkProcessor implements Closeable { } } - private synchronized void internalAdd(DocumentRequest request, @Nullable Object payload) { + private synchronized void internalAdd(DocumentRequest request, @Nullable Object payload) { ensureOpen(); bulkRequest.add(request, payload); executeIfNeeded(); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index dc72407cf42..292ecdd33e7 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -72,7 +72,7 @@ public class BulkRequest extends ActionRequest implements Composite * {@link WriteRequest}s to this but java doesn't support syntax to declare that everything in the array has both types so we declare * the one with the least casts. */ - final List> requests = new ArrayList<>(); + final List requests = new ArrayList<>(); List payloads = null; protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT; @@ -87,14 +87,14 @@ public class BulkRequest extends ActionRequest implements Composite /** * Adds a list of requests to be executed. Either index or delete requests. */ - public BulkRequest add(DocumentRequest... requests) { - for (DocumentRequest request : requests) { + public BulkRequest add(DocumentRequest... requests) { + for (DocumentRequest request : requests) { add(request, null); } return this; } - public BulkRequest add(DocumentRequest request) { + public BulkRequest add(DocumentRequest request) { return add(request, null); } @@ -104,7 +104,7 @@ public class BulkRequest extends ActionRequest implements Composite * @param payload Optional payload * @return the current bulk request */ - public BulkRequest add(DocumentRequest request, @Nullable Object payload) { + public BulkRequest add(DocumentRequest request, @Nullable Object payload) { if (request instanceof IndexRequest) { add((IndexRequest) request, payload); } else if (request instanceof DeleteRequest) { @@ -120,8 +120,8 @@ public class BulkRequest extends ActionRequest implements Composite /** * Adds a list of requests to be executed. Either index or delete requests. */ - public BulkRequest add(Iterable> requests) { - for (DocumentRequest request : requests) { + public BulkRequest add(Iterable requests) { + for (DocumentRequest request : requests) { add(request); } return this; @@ -207,7 +207,7 @@ public class BulkRequest extends ActionRequest implements Composite /** * The list of requests in this bulk request. */ - public List> requests() { + public List requests() { return this.requests; } @@ -508,7 +508,7 @@ public class BulkRequest extends ActionRequest implements Composite * @return Whether this bulk request contains index request with an ingest pipeline enabled. */ public boolean hasIndexRequestsWithPipelines() { - for (DocumentRequest actionRequest : requests) { + for (DocumentRequest actionRequest : requests) { if (actionRequest instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) actionRequest; if (Strings.hasText(indexRequest.getPipeline())) { @@ -526,7 +526,7 @@ public class BulkRequest extends ActionRequest implements Composite if (requests.isEmpty()) { validationException = addValidationError("no requests added", validationException); } - for (DocumentRequest request : requests) { + for (DocumentRequest request : requests) { // We first check if refresh has been set if (((WriteRequest) request).getRefreshPolicy() != RefreshPolicy.NONE) { validationException = addValidationError( @@ -561,7 +561,7 @@ public class BulkRequest extends ActionRequest implements Composite super.writeTo(out); waitForActiveShards.writeTo(out); out.writeVInt(requests.size()); - for (DocumentRequest request : requests) { + for (DocumentRequest request : requests) { DocumentRequest.writeDocumentRequest(out, request); } refreshPolicy.writeTo(out); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index f7861d1e093..48edb528fe1 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -145,7 +145,7 @@ public class TransportBulkAction extends HandledTransportAction request = bulkRequest.requests.get(i); + DocumentRequest request = bulkRequest.requests.get(i); if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { bulkRequest.requests.set(i, null); } @@ -180,7 +180,7 @@ public class TransportBulkAction extends HandledTransportAction responses, int idx, DocumentRequest request, String index, Exception e) { + private boolean setResponseFailureIfIndexMatches(AtomicArray responses, int idx, DocumentRequest request, String index, Exception e) { if (index.equals(request.index())) { responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(), request.id(), e))); return true; @@ -211,7 +211,7 @@ public class TransportBulkAction extends HandledTransportAction documentRequest = bulkRequest.requests.get(i); + DocumentRequest documentRequest = bulkRequest.requests.get(i); //the request can only be null because we set it to null in the previous step, so it gets ignored if (documentRequest == null) { continue; @@ -253,7 +253,7 @@ public class TransportBulkAction extends HandledTransportAction Operations mapping Map> requestsByShard = new HashMap<>(); for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocumentRequest request = bulkRequest.requests.get(i); + DocumentRequest request = bulkRequest.requests.get(i); if (request == null) { continue; } @@ -300,7 +300,7 @@ public class TransportBulkAction extends HandledTransportAction documentRequest = request.request(); + DocumentRequest documentRequest = request.request(); responses.set(request.id(), new BulkItemResponse(request.id(), documentRequest.opType(), new BulkItemResponse.Failure(indexName, documentRequest.type(), documentRequest.id(), e))); } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 6b4ffc56b40..338e7db4cb7 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -147,14 +147,14 @@ public class TransportShardBulkAction extends TransportWriteAction documentRequest = request.items()[j].request(); + DocumentRequest documentRequest = request.items()[j].request(); documentRequest.version(preVersions[j]); documentRequest.versionType(preVersionTypes[j]); } throw (ElasticsearchException) e; } BulkItemRequest item = request.items()[requestIndex]; - DocumentRequest documentRequest = item.request(); + DocumentRequest documentRequest = item.request(); if (isConflictException(e)) { logger.trace((Supplier) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", request.shardId(), documentRequest.opType().getLowercase(), request), e); @@ -179,7 +179,7 @@ public class TransportShardBulkAction extends TransportWriteAction innerExecuteBulkItemRequest(IndexMetaData metaData, IndexShard indexShard, BulkShardRequest request, int requestIndex) throws Exception { - DocumentRequest itemRequest = request.items()[requestIndex].request(); + DocumentRequest itemRequest = request.items()[requestIndex].request(); switch (itemRequest.opType()) { case CREATE: case INDEX: @@ -268,7 +268,7 @@ public class TransportShardBulkAction extends TransportWriteAction documentRequest = item.request(); + DocumentRequest documentRequest = item.request(); final Engine.Operation operation; try { switch (documentRequest.opType()) { diff --git a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java index 70a117bf1f7..31801bac85a 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java @@ -135,7 +135,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio return Integer.MAX_VALUE; } - static final class BulkRequestModifier implements Iterator> { + static final class BulkRequestModifier implements Iterator { final BulkRequest bulkRequest; final Set failedSlots; @@ -151,7 +151,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio } @Override - public DocumentRequest next() { + public DocumentRequest next() { return bulkRequest.requests().get(++currentSlot); } @@ -172,7 +172,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio int slot = 0; originalSlots = new int[bulkRequest.requests().size() - failedSlots.size()]; for (int i = 0; i < bulkRequest.requests().size(); i++) { - DocumentRequest request = bulkRequest.requests().get(i); + DocumentRequest request = bulkRequest.requests().get(i); if (failedSlots.contains(i) == false) { modifiedBulkRequest.add(request); originalSlots[slot++] = i; diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index 57eb7afcb5a..1e11dbf78f2 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -68,7 +68,7 @@ public class PipelineExecutionService implements ClusterStateListener { }); } - public void executeBulkRequest(Iterable> actionRequests, + public void executeBulkRequest(Iterable actionRequests, BiConsumer itemFailureHandler, Consumer completionHandler) { threadPool.executor(ThreadPool.Names.BULK).execute(new AbstractRunnable() { @@ -80,7 +80,7 @@ public class PipelineExecutionService implements ClusterStateListener { @Override protected void doRun() throws Exception { - for (DocumentRequest actionRequest : actionRequests) { + for (DocumentRequest actionRequest : actionRequests) { if ((actionRequest instanceof IndexRequest)) { IndexRequest indexRequest = (IndexRequest) actionRequest; if (Strings.hasText(indexRequest.getPipeline())) { diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java index 230373f7415..5d2145ddc3c 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java @@ -113,7 +113,7 @@ public class BulkRequestTests extends ESTestCase { public void testBulkAddIterable() { BulkRequest bulkRequest = Requests.bulkRequest(); - List> requests = new ArrayList<>(); + List requests = new ArrayList<>(); requests.add(new IndexRequest("test", "test", "id").source("field", "value")); requests.add(new UpdateRequest("test", "test", "id").doc("field", "value")); requests.add(new DeleteRequest("test", "test", "id"));