From ad2d12a20cdfdd93e99be84a52918eaf39269063 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Tue, 7 Nov 2017 11:24:46 -0500 Subject: [PATCH] NIFI-4578 Added strategy for dealing with nullable fields in PutHBaseRecord. NIFI-4578 Added changes from code review. This closes #2256. Signed-off-by: Koji Kawamura --- .../org/apache/nifi/hbase/PutHBaseRecord.java | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java index 3e6d62448a..b4de3c6469 100755 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java @@ -127,6 +127,20 @@ public class PutHBaseRecord extends AbstractPutHBase { .defaultValue("1000") .build(); + protected static final AllowableValue NULL_FIELD_EMPTY = new AllowableValue("empty-bytes", "Empty Bytes", + "Use empty bytes. This can be used to overwrite existing fields or to put an empty placeholder value if you want" + + " every field to be present even if it has a null value."); + protected static final AllowableValue NULL_FIELD_SKIP = new AllowableValue("skip-field", "Skip Field", "Skip the field (don't process it at all)."); + + protected static final PropertyDescriptor NULL_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("hbase-record-null-field-strategy") + .displayName("Null Field Strategy") + .required(true) + .defaultValue("skip-field") + .description("Handle null field values as either an empty string or skip them altogether.") + .allowableValues(NULL_FIELD_EMPTY, NULL_FIELD_SKIP) + .build(); + @Override public final List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); @@ -135,6 +149,7 @@ public class PutHBaseRecord extends AbstractPutHBase { properties.add(TABLE_NAME); properties.add(ROW_FIELD_NAME); properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(NULL_FIELD_STRATEGY); properties.add(COLUMN_FAMILY); properties.add(TIMESTAMP_FIELD_NAME); properties.add(BATCH_SIZE); @@ -201,6 +216,9 @@ public class PutHBaseRecord extends AbstractPutHBase { while ((record = reader.nextRecord()) != null) { PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, timestampFieldName, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy); + if (putFlowFile.getColumns().size() == 0) { + continue; + } flowFiles.add(putFlowFile); index++; @@ -220,7 +238,9 @@ public class PutHBaseRecord extends AbstractPutHBase { } if (!failed) { - sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + if (columns > 0) { + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + } flowFile = session.removeAttribute(flowFile, "restart.index"); session.transfer(flowFile, REL_SUCCESS); } else { @@ -317,12 +337,15 @@ public class PutHBaseRecord extends AbstractPutHBase { } } + static final byte[] EMPTY = "".getBytes(); + protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, String rowFieldName, String columnFamily, String timestampFieldName, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy) throws PutCreationFailedInvokedException { PutFlowFile retVal = null; final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String nullStrategy = context.getProperty(NULL_FIELD_STRATEGY).getValue(); boolean asString = STRING_ENCODING_VALUE.equals(fieldEncodingStrategy); @@ -350,7 +373,17 @@ public class PutHBaseRecord extends AbstractPutHBase { continue; } - final byte[] fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy); + Object val = record.getValue(name); + final byte[] fieldValueBytes; + if (val == null && nullStrategy.equals(NULL_FIELD_SKIP.getValue())) { + continue; + } else if (val == null && nullStrategy.equals(NULL_FIELD_EMPTY.getValue())) { + fieldValueBytes = EMPTY; + } else { + fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy); + } + + if (fieldValueBytes != null) { columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp)); }