diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 39836fdd27..1f9cb73ab2 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -355,21 +355,34 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items"); if (itemNodeArray.size() > 0) { // All items are returned whether they succeeded or failed, so iterate through the item array - // at the same time as the flow file list, moving each to success or failure accordingly + // at the same time as the flow file list, moving each to success or failure accordingly, + // but only keep the first error for logging + String errorReason = null; for (int i = itemNodeArray.size() - 1; i >= 0; i--) { JsonNode itemNode = itemNodeArray.get(i); - FlowFile flowFile = flowFilesToTransfer.remove(i); - int status = itemNode.findPath("status").asInt(); - if (!isSuccess(status)) { - String reason = itemNode.findPath("//error/reason").asText(); - logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", - new Object[]{flowFile, reason}); - session.transfer(flowFile, REL_FAILURE); + if (flowFilesToTransfer.size() > i) { + FlowFile flowFile = flowFilesToTransfer.remove(i); + int status = itemNode.findPath("status").asInt(); + if (!isSuccess(status)) { + if (errorReason == null) { + // Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason + String reason = itemNode.findPath("//result").asText(); + if (StringUtils.isEmpty(reason)) { + // If there was no result, we expect an error with a string description in the "reason" field + reason = itemNode.findPath("//error/reason").asText(); + } + errorReason = reason; + logger.error("Failed to process {} due to {}, transferring to failure", + new Object[]{flowFile, errorReason}); + } + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); - } else { - session.transfer(flowFile, REL_SUCCESS); - // Record provenance event - session.getProvenanceReporter().send(flowFile, url.toString()); + } else { + session.transfer(flowFile, REL_SUCCESS); + // Record provenance event + session.getProvenanceReporter().send(flowFile, url.toString()); + } } } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java index c1767f1862..f65abb0205 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java @@ -394,14 +394,24 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items"); if (itemNodeArray.size() > 0) { // All items are returned whether they succeeded or failed, so iterate through the item array - // at the same time as the flow file list, logging failures accordingly + // at the same time as the flow file list, moving each to success or failure accordingly, + // but only keep the first error for logging + String errorReason = null; for (int i = itemNodeArray.size() - 1; i >= 0; i--) { JsonNode itemNode = itemNodeArray.get(i); int status = itemNode.findPath("status").asInt(); if (!isSuccess(status)) { - String reason = itemNode.findPath("//error/reason").asText(); - logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", - new Object[]{flowFile, reason}); + if (errorReason == null) { + // Use "result" if it is present; this happens for status codes like 404 Not Found, which may not have an error/reason + String reason = itemNode.findPath("//result").asText(); + if (StringUtils.isEmpty(reason)) { + // If there was no result, we expect an error with a string description in the "reason" field + reason = itemNode.findPath("//error/reason").asText(); + } + errorReason = reason; + logger.error("Failed to process {} due to {}, transferring to failure", + new Object[]{flowFile, errorReason}); + } } } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java index 36aa94e3e8..5ca38149c0 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java @@ -350,6 +350,27 @@ public class TestPutElasticsearchHttp { assertNotNull(out); } + @Test + public void testPutElasticSearchOnTriggerWithDocumentNotFound() throws IOException { + PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(true); + processor.setResultField("not_found"); + runner = TestRunners.newTestRunner(processor); // simulate failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttp.INDEX_OP, "delete"); + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); + runner.clearTransferState(); + } + /** * A Test class that extends the processor in order to inject/mock behavior */ @@ -359,6 +380,7 @@ public class TestPutElasticsearchHttp { int statusCode = 200; String statusMessage = "OK"; String expectedUrl = null; + String resultField = null; PutElasticsearchTestProcessor(boolean responseHasFailures) { this.responseHasFailures = responseHasFailures; @@ -373,6 +395,10 @@ public class TestPutElasticsearchHttp { expectedUrl = url; } + public void setResultField(String resultField) { + this.resultField = resultField; + } + @Override protected void createElasticsearchClient(ProcessContext context) throws ProcessException { client = mock(OkHttpClient.class); @@ -391,7 +417,11 @@ public class TestPutElasticsearchHttp { if (responseHasFailures) { // This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\","); - sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\","); + if(resultField != null) { + sb.append("\"result\":{\"not_found\","); + } else { + sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\","); + } sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at "); sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}},"); }