diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java index bbc86e8810..d139ecccdf 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java @@ -224,8 +224,6 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic badRecords.add(bad); } } - - session.transfer(input, REL_SUCCESS); } catch (ElasticsearchError ese) { String msg = String.format("Encountered a server-side problem with Elasticsearch. %s", ese.isElastic() ? "Moving to retry." : "Moving to failure"); @@ -234,11 +232,14 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic session.penalize(input); session.transfer(input, rel); removeBadRecordFlowFiles(badRecords, session); + return; } catch (Exception ex) { getLogger().error("Could not index documents.", ex); session.transfer(input, REL_FAILURE); removeBadRecordFlowFiles(badRecords, session); + return; } + session.transfer(input, REL_SUCCESS); } private void removeBadRecordFlowFiles(List bad, ProcessSession session) { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy index 62f7c1b08f..1d303c90aa 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy @@ -25,6 +25,8 @@ import org.apache.nifi.json.JsonRecordSetWriter import org.apache.nifi.json.JsonTreeReader import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService import org.apache.nifi.schema.access.SchemaAccessUtils +import org.apache.nifi.serialization.RecordReaderFactory +import org.apache.nifi.serialization.record.MockRecordParser import org.apache.nifi.serialization.record.MockSchemaRegistry import org.apache.nifi.util.TestRunner import org.apache.nifi.util.TestRunners @@ -38,7 +40,7 @@ import static groovy.json.JsonOutput.toJson class PutElasticsearchRecordTest { MockBulkLoadClientService clientService MockSchemaRegistry registry - JsonTreeReader reader + RecordReaderFactory reader TestRunner runner static final String SCHEMA = prettyPrint(toJson([ @@ -95,6 +97,15 @@ class PutElasticsearchRecordTest { basicTest(0, 0, 1) } + @Test + void simpleTestWithMockReader() { + reader = new MockRecordParser() + runner.addControllerService("mockReader", reader) + runner.setProperty(PutElasticsearchRecord.RECORD_READER, "mockReader") + runner.enableControllerService(reader) + basicTest(0, 0, 1) + } + @Test void testFatalError() { clientService.throwFatalError = true diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java index 4ca47f8b9d..5d0bacd21c 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java @@ -109,6 +109,7 @@ public class DeleteHBaseCells extends AbstractDeleteHBase { String[] parts = line.split(separator); if (parts.length < 3 || parts.length > 4) { final String msg = String.format("Invalid line length. It must have 3 or 4 components. It had %d.", parts.length); + is.close(); input = writeErrorAttributes(lineNum, msg, input, session); session.transfer(input, REL_FAILURE); getLogger().error(msg); diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java index ce8e044b02..9ffa72e7e7 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java @@ -17,6 +17,7 @@ package org.apache.nifi.hbase; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; import org.junit.Before; import org.junit.Test; @@ -43,4 +44,21 @@ public class TestDeleteHBaseCells extends DeleteTestBase { runner.run(); runner.assertAllFlowFilesTransferred(DeleteHBaseCells.REL_SUCCESS); } + + @Test + public void testWrongNumberOfInputs() { + final String SEP = "::::"; + List ids = populateTable(10000); + runner.setProperty(DeleteHBaseCells.SEPARATOR, SEP); + runner.assertValid(); + StringBuilder sb = new StringBuilder(); + for (String id : ids) { + sb.append(String.format("%s%sX\n", id, SEP)); + } + runner.enqueue(sb.toString().trim()); + runner.run(); + runner.assertAllFlowFilesTransferred(DeleteHBaseCells.REL_FAILURE); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHBaseCells.REL_FAILURE).get(0); + flowFile.assertAttributeEquals(DeleteHBaseCells.ERROR_MSG, "Invalid line length. It must have 3 or 4 components. It had 2."); + } }