From d3dbac50a8f354503838e5a0bdf22872d878b078 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Tue, 22 Nov 2016 20:08:17 -0500 Subject: [PATCH] NIFI-3087: Fixed issue with partial failure responses in PutElasticsearch(Http) --- .../elasticsearch/PutElasticsearch.java | 22 +++++++++++-------- .../elasticsearch/PutElasticsearchHttp.java | 2 +- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index f64180b20c..216efd4a77 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -211,17 +211,21 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces final BulkResponse response = bulk.execute().actionGet(); if (response.hasFailures()) { - for (final BulkItemResponse item : response.getItems()) { - final FlowFile flowFile = flowFilesToTransfer.get(item.getItemId()); - if (item.isFailed()) { - logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", - new Object[]{flowFile, item.getFailure().getMessage()}); - session.transfer(flowFile, REL_FAILURE); + // Responses are guaranteed to be in order, remove them in reverse order + BulkItemResponse[] responses = response.getItems(); + if (responses != null && responses.length > 0) { + for (int i = responses.length - 1; i >= 0; i--) { + final FlowFile flowFile = flowFilesToTransfer.get(i); + if (responses[i].isFailed()) { + logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", + new Object[]{flowFile, responses[i].getFailure().getMessage()}); + session.transfer(flowFile, REL_FAILURE); - } else { - session.transfer(flowFile, REL_SUCCESS); + } else { + session.transfer(flowFile, REL_SUCCESS); + } + flowFilesToTransfer.remove(flowFile); } - flowFilesToTransfer.remove(flowFile); } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 71171006c0..3ba46bb362 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -328,7 +328,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { if (itemNodeArray.size() > 0) { // All items are returned whether they succeeded or failed, so iterate through the item array // at the same time as the flow file list, moving each to success or failure accordingly - for (int i = 0; i < itemNodeArray.size(); i++) { + for (int i = itemNodeArray.size() - 1; i >= 0; i--) { JsonNode itemNode = itemNodeArray.get(i); FlowFile flowFile = flowFilesToTransfer.remove(i); int status = itemNode.findPath("status").asInt();