NIFI-1895 Adding a property to PutHBaseJSON to allow specifying how to store the values

This closes #542.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Bryan Bende 2016-06-17 17:10:40 -04:00
parent 0d2a9dc7e5
commit 8593bd771f
4 changed files with 72 additions and 16 deletions

View File

@ -92,6 +92,13 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
.description("A FlowFile is routed to this relationship if it cannot be sent to HBase") .description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
.build(); .build();
protected HBaseClientService clientService;
@OnScheduled
public void onScheduled(final ProcessContext context) {
clientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
}
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
@ -135,11 +142,10 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
final long start = System.nanoTime(); final long start = System.nanoTime();
final List<PutFlowFile> successes = new ArrayList<>(); final List<PutFlowFile> successes = new ArrayList<>();
final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) { for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) {
try { try {
hBaseClientService.put(entry.getKey(), entry.getValue()); clientService.put(entry.getKey(), entry.getValue());
successes.addAll(entry.getValue()); successes.addAll(entry.getValue());
} catch (Exception e) { } catch (Exception e) {
getLogger().error(e.getMessage(), e); getLogger().error(e.getMessage(), e);
@ -181,11 +187,4 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
*/ */
protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile); protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile);
protected HBaseClientService cliSvc;
@OnScheduled
public void onScheduled(final ProcessContext context) {
cliSvc = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
}
} }

View File

@ -89,6 +89,25 @@ public class PutHBaseJSON extends AbstractPutHBase {
.defaultValue(COMPLEX_FIELD_TEXT.getValue()) .defaultValue(COMPLEX_FIELD_TEXT.getValue())
.build(); .build();
protected static final String STRING_ENCODING_VALUE = "String";
protected static final String BYTES_ENCODING_VALUE = "Bytes";
protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
"Stores the value of each field as a UTF-8 String.");
protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
"Stores the value of each field as the byte representation of the type derived from the JSON.");
protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
.name("Field Encoding Strategy")
.description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
"JSON to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
"the JSON, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
"byte representation of that integer."))
.required(true)
.allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
.defaultValue(FIELD_ENCODING_STRING.getValue())
.build();
@Override @Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
@ -99,6 +118,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
properties.add(COLUMN_FAMILY); properties.add(COLUMN_FAMILY);
properties.add(BATCH_SIZE); properties.add(BATCH_SIZE);
properties.add(COMPLEX_FIELD_STRATEGY); properties.add(COMPLEX_FIELD_STRATEGY);
properties.add(FIELD_ENCODING_STRATEGY);
return properties; return properties;
} }
@ -142,6 +162,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
final boolean extractRowId = !StringUtils.isBlank(rowFieldName); final boolean extractRowId = !StringUtils.isBlank(rowFieldName);
final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue(); final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
// Parse the JSON document // Parse the JSON document
final ObjectMapper mapper = new ObjectMapper(); final ObjectMapper mapper = new ObjectMapper();
@ -180,7 +201,13 @@ public class PutHBaseJSON extends AbstractPutHBase {
if (fieldNode.isNull()) { if (fieldNode.isNull()) {
getLogger().debug("Skipping {} because value was null", new Object[]{fieldName}); getLogger().debug("Skipping {} because value was null", new Object[]{fieldName});
} else if (fieldNode.isValueNode()) { } else if (fieldNode.isValueNode()) {
fieldValueHolder.set(extractJNodeValue(fieldNode)); // for a value node we need to determine if we are storing the bytes of a string, or the bytes of actual types
if (STRING_ENCODING_VALUE.equals(fieldEncodingStrategy)) {
final byte[] valueBytes = clientService.toBytes(fieldNode.asText());
fieldValueHolder.set(valueBytes);
} else {
fieldValueHolder.set(extractJNodeValue(fieldNode));
}
} else { } else {
// for non-null, non-value nodes, determine what to do based on the handling strategy // for non-null, non-value nodes, determine what to do based on the handling strategy
switch (complexFieldStrategy) { switch (complexFieldStrategy) {
@ -193,7 +220,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
case TEXT_VALUE: case TEXT_VALUE:
// use toString() here because asText() is only guaranteed to be supported on value nodes // use toString() here because asText() is only guaranteed to be supported on value nodes
// some other types of nodes, like ArrayNode, provide toString implementations // some other types of nodes, like ArrayNode, provide toString implementations
fieldValueHolder.set(cliSvc.toBytes(fieldNode.toString())); fieldValueHolder.set(clientService.toBytes(fieldNode.toString()));
break; break;
case IGNORE_VALUE: case IGNORE_VALUE:
// silently skip // silently skip
@ -229,21 +256,21 @@ public class PutHBaseJSON extends AbstractPutHBase {
/* /*
*Handles the conversion of the JsonNode value into it correct underlying data type in the form of a byte array as expected by the columns.add function *Handles the conversion of the JsonNode value into it correct underlying data type in the form of a byte array as expected by the columns.add function
*/ */
private byte[] extractJNodeValue(JsonNode n){ private byte[] extractJNodeValue(final JsonNode n){
if (n.isBoolean()){ if (n.isBoolean()){
//boolean //boolean
return cliSvc.toBytes(n.asBoolean()); return clientService.toBytes(n.asBoolean());
}else if(n.isNumber()){ }else if(n.isNumber()){
if(n.isIntegralNumber()){ if(n.isIntegralNumber()){
//interpret as Long //interpret as Long
return cliSvc.toBytes(n.asLong()); return clientService.toBytes(n.asLong());
}else{ }else{
//interpret as Double //interpret as Double
return cliSvc.toBytes(n.asDouble()); return clientService.toBytes(n.asDouble());
} }
}else{ }else{
//if all else fails, interpret as String //if all else fails, interpret as String
return cliSvc.toBytes(n.asText()); return clientService.toBytes(n.asText());
} }
} }

View File

@ -102,6 +102,8 @@ public class TestPutHBaseJSON {
@Test @Test
public void testSingleJsonDocAndProvidedRowIdwithNonString() throws IOException, InitializationException { public void testSingleJsonDocAndProvidedRowIdwithNonString() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
runner.setProperty(PutHBaseJSON.FIELD_ENCODING_STRATEGY, PutHBaseJSON.BYTES_ENCODING_VALUE);
final MockHBaseClientService hBaseClient = getHBaseClientService(runner); final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);

View File

@ -98,8 +98,36 @@ public interface HBaseClientService extends ControllerService {
*/ */
void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, ResultHandler handler) throws IOException; void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, ResultHandler handler) throws IOException;
/**
* Converts the given boolean to it's byte representation.
*
* @param b a boolean
* @return the boolean represented as bytes
*/
byte[] toBytes(boolean b); byte[] toBytes(boolean b);
/**
* Converts the given long to it's byte representation.
*
* @param l a long
* @return the long represented as bytes
*/
byte[] toBytes(long l); byte[] toBytes(long l);
/**
* Converts the given double to it's byte representation.
*
* @param d a double
* @return the double represented as bytes
*/
byte[] toBytes(double d); byte[] toBytes(double d);
/**
* Converts the given string to it's byte representation.
*
* @param s a string
* @return the string represented as bytes
*/
byte[] toBytes(String s); byte[] toBytes(String s);
} }