From e1cf37fb8953cc04a8f92382b558cf89c1563a5d Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Fri, 11 Mar 2016 11:32:51 -0500 Subject: [PATCH] NIFI-1619: Fix Elasticsearch processor bug when flow file missing ID attribute This closes #269 Signed-off-by: Aldrin Piri --- .../elasticsearch/PutElasticsearch.java | 42 +++++++++++-------- .../elasticsearch/TestPutElasticsearch.java | 20 +++++++++ 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index 244c4322c8..6319791e9a 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -49,6 +49,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -156,46 +157,51 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor { } final ProcessorLog logger = getLogger(); - + // Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list. + List flowFilesToTransfer = new LinkedList<>(flowFiles); try { final BulkRequestBuilder bulk = esClient.get().prepareBulk(); if (authToken != null) { bulk.putHeader("Authorization", authToken); } + for (FlowFile file : flowFiles) { final String id = file.getAttribute(id_attribute); if (id == null) { - logger.error("No value in identifier attribute {} for {}", new Object[]{id_attribute, file}); + logger.error("No value in identifier attribute {} for {}, transferring to failure", new Object[]{id_attribute, file}); + flowFilesToTransfer.remove(file); session.transfer(file, REL_FAILURE); + } else { + session.read(file, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + String json = IOUtils.toString(in, charset) + .replace("\r\n", " ").replace('\n', ' ').replace('\r', ' '); + bulk.add(esClient.get().prepareIndex(index, docType, id) + .setSource(json.getBytes(charset))); + } + }); } - session.read(file, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - String json = IOUtils.toString(in, charset) - .replace("\r\n", " ").replace('\n', ' ').replace('\r', ' '); - bulk.add(esClient.get().prepareIndex(index, docType, id) - .setSource(json.getBytes(charset))); - } - }); } final BulkResponse response = bulk.execute().actionGet(); if (response.hasFailures()) { for (final BulkItemResponse item : response.getItems()) { - final FlowFile flowFile = flowFiles.get(item.getItemId()); + final FlowFile flowFile = flowFilesToTransfer.get(item.getItemId()); if (item.isFailed()) { - logger.error("Failed to insert {} into Elasticsearch due to {}", + logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", new Object[]{flowFile, item.getFailure().getMessage()}); session.transfer(flowFile, REL_FAILURE); } else { session.transfer(flowFile, REL_SUCCESS); } + flowFilesToTransfer.remove(flowFile); } - } else { - session.transfer(flowFiles, REL_SUCCESS); } + // Transfer any remaining flowfiles to success + session.transfer(flowFilesToTransfer, REL_SUCCESS); } catch (NoNodeAvailableException | ElasticsearchTimeoutException @@ -209,14 +215,14 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor { logger.error("Failed to insert into Elasticsearch due to {}. More detailed information may be available in " + "the NiFi logs.", new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry); - session.transfer(flowFiles, REL_RETRY); + session.transfer(flowFilesToTransfer, REL_RETRY); context.yield(); } catch (Exception exceptionToFail) { - logger.error("Failed to insert into Elasticsearch due to {}", + logger.error("Failed to insert into Elasticsearch due to {}, transferring to failure", new Object[]{exceptionToFail.getLocalizedMessage()}, exceptionToFail); - session.transfer(flowFiles, REL_FAILURE); + session.transfer(flowFilesToTransfer, REL_FAILURE); context.yield(); } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java index dc1c445d27..957332a4f3 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java @@ -188,6 +188,26 @@ public class TestPutElasticsearch { runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1); } + @Test + public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(PutElasticsearch.INDEX, "doc"); + runner.setProperty(PutElasticsearch.TYPE, "status"); + runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id"); + + runner.enqueue(docExample); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0); + assertNotNull(out); + } /** * A Test class that extends the processor in order to inject/mock behavior