NIFI-7044: Fixed 'InputStream not closed' issue in PutElasticsearchRecord and DeleteHBaseCells

This closes #3997

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Matthew Burgess 2020-01-17 23:08:25 -05:00 committed by Mike Thomsen
parent 4ec9155cbc
commit 71226ce077
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
4 changed files with 34 additions and 3 deletions

View File

@ -224,8 +224,6 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
badRecords.add(bad);
}
}
session.transfer(input, REL_SUCCESS);
} catch (ElasticsearchError ese) {
String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
ese.isElastic() ? "Moving to retry." : "Moving to failure");
@ -234,11 +232,14 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
session.penalize(input);
session.transfer(input, rel);
removeBadRecordFlowFiles(badRecords, session);
return;
} catch (Exception ex) {
getLogger().error("Could not index documents.", ex);
session.transfer(input, REL_FAILURE);
removeBadRecordFlowFiles(badRecords, session);
return;
}
session.transfer(input, REL_SUCCESS);
}
private void removeBadRecordFlowFiles(List<FlowFile> bad, ProcessSession session) {

View File

@ -25,6 +25,8 @@ import org.apache.nifi.json.JsonRecordSetWriter
import org.apache.nifi.json.JsonTreeReader
import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService
import org.apache.nifi.schema.access.SchemaAccessUtils
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.record.MockRecordParser
import org.apache.nifi.serialization.record.MockSchemaRegistry
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
@ -38,7 +40,7 @@ import static groovy.json.JsonOutput.toJson
class PutElasticsearchRecordTest {
MockBulkLoadClientService clientService
MockSchemaRegistry registry
JsonTreeReader reader
RecordReaderFactory reader
TestRunner runner
static final String SCHEMA = prettyPrint(toJson([
@ -95,6 +97,15 @@ class PutElasticsearchRecordTest {
basicTest(0, 0, 1)
}
@Test
void simpleTestWithMockReader() {
reader = new MockRecordParser()
runner.addControllerService("mockReader", reader)
runner.setProperty(PutElasticsearchRecord.RECORD_READER, "mockReader")
runner.enableControllerService(reader)
basicTest(0, 0, 1)
}
@Test
void testFatalError() {
clientService.throwFatalError = true

View File

@ -109,6 +109,7 @@ public class DeleteHBaseCells extends AbstractDeleteHBase {
String[] parts = line.split(separator);
if (parts.length < 3 || parts.length > 4) {
final String msg = String.format("Invalid line length. It must have 3 or 4 components. It had %d.", parts.length);
is.close();
input = writeErrorAttributes(lineNum, msg, input, session);
session.transfer(input, REL_FAILURE);
getLogger().error(msg);

View File

@ -17,6 +17,7 @@
package org.apache.nifi.hbase;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Test;
@ -43,4 +44,21 @@ public class TestDeleteHBaseCells extends DeleteTestBase {
runner.run();
runner.assertAllFlowFilesTransferred(DeleteHBaseCells.REL_SUCCESS);
}
@Test
public void testWrongNumberOfInputs() {
final String SEP = "::::";
List<String> ids = populateTable(10000);
runner.setProperty(DeleteHBaseCells.SEPARATOR, SEP);
runner.assertValid();
StringBuilder sb = new StringBuilder();
for (String id : ids) {
sb.append(String.format("%s%sX\n", id, SEP));
}
runner.enqueue(sb.toString().trim());
runner.run();
runner.assertAllFlowFilesTransferred(DeleteHBaseCells.REL_FAILURE);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHBaseCells.REL_FAILURE).get(0);
flowFile.assertAttributeEquals(DeleteHBaseCells.ERROR_MSG, "Invalid line length. It must have 3 or 4 components. It had 2.");
}
}