NIFI-4578 Added strategy for dealing with nullable fields in PutHBaseRecord.

NIFI-4578 Added changes from code review.

This closes #2256.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Mike Thomsen 2017-11-07 11:24:46 -05:00 committed by Koji Kawamura
parent 9e9c129c21
commit ad2d12a20c
1 changed files with 35 additions and 2 deletions

View File

@ -127,6 +127,20 @@ public class PutHBaseRecord extends AbstractPutHBase {
.defaultValue("1000") .defaultValue("1000")
.build(); .build();
protected static final AllowableValue NULL_FIELD_EMPTY = new AllowableValue("empty-bytes", "Empty Bytes",
"Use empty bytes. This can be used to overwrite existing fields or to put an empty placeholder value if you want" +
" every field to be present even if it has a null value.");
protected static final AllowableValue NULL_FIELD_SKIP = new AllowableValue("skip-field", "Skip Field", "Skip the field (don't process it at all).");
protected static final PropertyDescriptor NULL_FIELD_STRATEGY = new PropertyDescriptor.Builder()
.name("hbase-record-null-field-strategy")
.displayName("Null Field Strategy")
.required(true)
.defaultValue("skip-field")
.description("Handle null field values as either an empty string or skip them altogether.")
.allowableValues(NULL_FIELD_EMPTY, NULL_FIELD_SKIP)
.build();
@Override @Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
@ -135,6 +149,7 @@ public class PutHBaseRecord extends AbstractPutHBase {
properties.add(TABLE_NAME); properties.add(TABLE_NAME);
properties.add(ROW_FIELD_NAME); properties.add(ROW_FIELD_NAME);
properties.add(ROW_ID_ENCODING_STRATEGY); properties.add(ROW_ID_ENCODING_STRATEGY);
properties.add(NULL_FIELD_STRATEGY);
properties.add(COLUMN_FAMILY); properties.add(COLUMN_FAMILY);
properties.add(TIMESTAMP_FIELD_NAME); properties.add(TIMESTAMP_FIELD_NAME);
properties.add(BATCH_SIZE); properties.add(BATCH_SIZE);
@ -201,6 +216,9 @@ public class PutHBaseRecord extends AbstractPutHBase {
while ((record = reader.nextRecord()) != null) { while ((record = reader.nextRecord()) != null) {
PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily,
timestampFieldName, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy); timestampFieldName, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
if (putFlowFile.getColumns().size() == 0) {
continue;
}
flowFiles.add(putFlowFile); flowFiles.add(putFlowFile);
index++; index++;
@ -220,7 +238,9 @@ public class PutHBaseRecord extends AbstractPutHBase {
} }
if (!failed) { if (!failed) {
if (columns > 0) {
sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); sendProvenance(session, flowFile, columns, System.nanoTime() - start, last);
}
flowFile = session.removeAttribute(flowFile, "restart.index"); flowFile = session.removeAttribute(flowFile, "restart.index");
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} else { } else {
@ -317,12 +337,15 @@ public class PutHBaseRecord extends AbstractPutHBase {
} }
} }
static final byte[] EMPTY = "".getBytes();
protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, String rowFieldName, protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, String rowFieldName,
String columnFamily, String timestampFieldName, String fieldEncodingStrategy, String rowEncodingStrategy, String columnFamily, String timestampFieldName, String fieldEncodingStrategy, String rowEncodingStrategy,
String complexFieldStrategy) String complexFieldStrategy)
throws PutCreationFailedInvokedException { throws PutCreationFailedInvokedException {
PutFlowFile retVal = null; PutFlowFile retVal = null;
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String nullStrategy = context.getProperty(NULL_FIELD_STRATEGY).getValue();
boolean asString = STRING_ENCODING_VALUE.equals(fieldEncodingStrategy); boolean asString = STRING_ENCODING_VALUE.equals(fieldEncodingStrategy);
@ -350,7 +373,17 @@ public class PutHBaseRecord extends AbstractPutHBase {
continue; continue;
} }
final byte[] fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy); Object val = record.getValue(name);
final byte[] fieldValueBytes;
if (val == null && nullStrategy.equals(NULL_FIELD_SKIP.getValue())) {
continue;
} else if (val == null && nullStrategy.equals(NULL_FIELD_EMPTY.getValue())) {
fieldValueBytes = EMPTY;
} else {
fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy);
}
if (fieldValueBytes != null) { if (fieldValueBytes != null) {
columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp)); columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp));
} }