From f8f1cc8d0df4893f35d7cbe3413b7a3dd521bd18 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Thu, 3 Aug 2017 13:00:25 -0400 Subject: [PATCH] NIFI-4024 Ensuring InputStream gets closed and cleaning up complex field handling This closes #1961. Signed-off-by: Bryan Bende --- .../org/apache/nifi/hbase/PutHBaseRecord.java | 104 ++++++++++-------- 1 file changed, 61 insertions(+), 43 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java index 66f95e024e..8aa84ea4fd 100755 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java @@ -40,6 +40,7 @@ import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -146,7 +147,6 @@ public class PutHBaseRecord extends AbstractPutHBase { return columns; } - private RecordReaderFactory recordParserFactory; @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); @@ -176,7 +176,8 @@ public class PutHBaseRecord extends AbstractPutHBase { } PutFlowFile last = null; - try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { + try (final InputStream in = session.read(flowFile); + final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) { Record record; if (startIndex >= 0) { while ( index++ < startIndex && (reader.nextRecord()) != null) {} @@ -242,15 +243,29 @@ public class PutHBaseRecord extends AbstractPutHBase { byte[] retVal; if (asString) { - retVal = clientService.toBytes(record.getAsString(field)); + switch (fieldType) { + case RECORD: + case CHOICE: + case ARRAY: + case MAP: + retVal = handleComplexField(record, field, complexFieldStrategy); + break; + default: + final String value = record.getAsString(field); + retVal = clientService.toBytes(value); + break; + } } else { switch (fieldType) { + case RECORD: + case CHOICE: + case ARRAY: + case MAP: + retVal = handleComplexField(record, field, complexFieldStrategy); + break; case BOOLEAN: retVal = clientService.toBytes(record.getAsBoolean(field)); break; - case CHAR: - retVal = clientService.toBytes(record.getAsString(field)); - break; case DOUBLE: retVal = clientService.toBytes(record.getAsDouble(field)); break; @@ -264,31 +279,37 @@ public class PutHBaseRecord extends AbstractPutHBase { retVal = clientService.toBytes(record.getAsLong(field)); break; default: - retVal = null; - switch (complexFieldStrategy) { - case FAIL_VALUE: - getLogger().error("Complex value found for {}; routing to failure", new Object[]{field}); - throw new PutCreationFailedInvokedException(String.format("Complex value found for %s; routing to failure", field)); - case WARN_VALUE: - getLogger().warn("Complex value found for {}; skipping", new Object[]{field}); - break; - case TEXT_VALUE: - retVal = clientService.toBytes(record.getAsString(field)); - break; - case IGNORE_VALUE: - // silently skip - break; - default: - break; - } + final String value = record.getAsString(field); + retVal = clientService.toBytes(value); + break; } } return retVal; } + private byte[] handleComplexField(Record record, String field, String complexFieldStrategy) throws PutCreationFailedInvokedException { + switch (complexFieldStrategy) { + case FAIL_VALUE: + getLogger().error("Complex value found for {}; routing to failure", new Object[]{field}); + throw new PutCreationFailedInvokedException(String.format("Complex value found for %s; routing to failure", field)); + case WARN_VALUE: + getLogger().warn("Complex value found for {}; skipping", new Object[]{field}); + return null; + case TEXT_VALUE: + final String value = record.getAsString(field); + return clientService.toBytes(value); + case IGNORE_VALUE: + // silently skip + return null; + default: + return null; + } + } + protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, - String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy) throws PutCreationFailedInvokedException { + String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy) + throws PutCreationFailedInvokedException { PutFlowFile retVal = null; final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); @@ -296,29 +317,26 @@ public class PutHBaseRecord extends AbstractPutHBase { final byte[] fam = clientService.toBytes(columnFamily); - //try { - if (record != null) { - List columns = new ArrayList<>(); - for (String name : schema.getFieldNames()) { - if (name.equals(rowFieldName)) { - continue; - } - columns.add(new PutColumn(fam, clientService.toBytes(name), asBytes(name, - schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy))); + if (record != null) { + List columns = new ArrayList<>(); + for (String name : schema.getFieldNames()) { + if (name.equals(rowFieldName)) { + continue; } - String rowIdValue = record.getAsString(rowFieldName); - if (rowIdValue == null) { - throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid"))); - } - byte[] rowId = getRow(rowIdValue, rowEncodingStrategy); - retVal = new PutFlowFile(tableName, rowId, columns, flowFile); + final byte[] fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy); + if (fieldValueBytes != null) { + columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes)); + } } + String rowIdValue = record.getAsString(rowFieldName); + if (rowIdValue == null) { + throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid"))); + } + byte[] rowId = getRow(rowIdValue, rowEncodingStrategy); -/* } catch (Exception ex) { - getLogger().error("Error running createPuts", ex); - throw new RuntimeException(ex); - }*/ + retVal = new PutFlowFile(tableName, rowId, columns, flowFile); + } return retVal; }