From 8593bd771f12834ece278eb3f511592df045a874 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Fri, 17 Jun 2016 17:10:40 -0400 Subject: [PATCH] NIFI-1895 Adding a property to PutHBaseJSON to allow specifying how to store the values This closes #542. Signed-off-by: Bryan Bende --- .../apache/nifi/hbase/AbstractPutHBase.java | 17 ++++---- .../org/apache/nifi/hbase/PutHBaseJSON.java | 41 +++++++++++++++---- .../apache/nifi/hbase/TestPutHBaseJSON.java | 2 + .../apache/nifi/hbase/HBaseClientService.java | 28 +++++++++++++ 4 files changed, 72 insertions(+), 16 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java index 50813963ae..05f4b7ebbc 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java @@ -92,6 +92,13 @@ public abstract class AbstractPutHBase extends AbstractProcessor { .description("A FlowFile is routed to this relationship if it cannot be sent to HBase") .build(); + protected HBaseClientService clientService; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + clientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); @@ -135,11 +142,10 @@ public abstract class AbstractPutHBase extends AbstractProcessor { final long start = System.nanoTime(); final List successes = new ArrayList<>(); - final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); for (Map.Entry> entry : tablePuts.entrySet()) { try { - hBaseClientService.put(entry.getKey(), entry.getValue()); + clientService.put(entry.getKey(), entry.getValue()); successes.addAll(entry.getValue()); } catch (Exception e) { getLogger().error(e.getMessage(), e); @@ -181,11 +187,4 @@ public abstract class AbstractPutHBase extends AbstractProcessor { */ protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile); - protected HBaseClientService cliSvc; - - @OnScheduled - public void onScheduled(final ProcessContext context) { - cliSvc = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); - } - } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java index 3c10e66ec2..9a57d6e446 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java @@ -89,6 +89,25 @@ public class PutHBaseJSON extends AbstractPutHBase { .defaultValue(COMPLEX_FIELD_TEXT.getValue()) .build(); + protected static final String STRING_ENCODING_VALUE = "String"; + protected static final String BYTES_ENCODING_VALUE = "Bytes"; + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the JSON."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "JSON to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the JSON, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + @Override public final List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); @@ -99,6 +118,7 @@ public class PutHBaseJSON extends AbstractPutHBase { properties.add(COLUMN_FAMILY); properties.add(BATCH_SIZE); properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); return properties; } @@ -142,6 +162,7 @@ public class PutHBaseJSON extends AbstractPutHBase { final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); final boolean extractRowId = !StringUtils.isBlank(rowFieldName); final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue(); + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); // Parse the JSON document final ObjectMapper mapper = new ObjectMapper(); @@ -180,7 +201,13 @@ public class PutHBaseJSON extends AbstractPutHBase { if (fieldNode.isNull()) { getLogger().debug("Skipping {} because value was null", new Object[]{fieldName}); } else if (fieldNode.isValueNode()) { - fieldValueHolder.set(extractJNodeValue(fieldNode)); + // for a value node we need to determine if we are storing the bytes of a string, or the bytes of actual types + if (STRING_ENCODING_VALUE.equals(fieldEncodingStrategy)) { + final byte[] valueBytes = clientService.toBytes(fieldNode.asText()); + fieldValueHolder.set(valueBytes); + } else { + fieldValueHolder.set(extractJNodeValue(fieldNode)); + } } else { // for non-null, non-value nodes, determine what to do based on the handling strategy switch (complexFieldStrategy) { @@ -193,7 +220,7 @@ public class PutHBaseJSON extends AbstractPutHBase { case TEXT_VALUE: // use toString() here because asText() is only guaranteed to be supported on value nodes // some other types of nodes, like ArrayNode, provide toString implementations - fieldValueHolder.set(cliSvc.toBytes(fieldNode.toString())); + fieldValueHolder.set(clientService.toBytes(fieldNode.toString())); break; case IGNORE_VALUE: // silently skip @@ -229,21 +256,21 @@ public class PutHBaseJSON extends AbstractPutHBase { /* *Handles the conversion of the JsonNode value into it correct underlying data type in the form of a byte array as expected by the columns.add function */ - private byte[] extractJNodeValue(JsonNode n){ + private byte[] extractJNodeValue(final JsonNode n){ if (n.isBoolean()){ //boolean - return cliSvc.toBytes(n.asBoolean()); + return clientService.toBytes(n.asBoolean()); }else if(n.isNumber()){ if(n.isIntegralNumber()){ //interpret as Long - return cliSvc.toBytes(n.asLong()); + return clientService.toBytes(n.asLong()); }else{ //interpret as Double - return cliSvc.toBytes(n.asDouble()); + return clientService.toBytes(n.asDouble()); } }else{ //if all else fails, interpret as String - return cliSvc.toBytes(n.asText()); + return clientService.toBytes(n.asText()); } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java index 92c96cc8f0..28d9105d4c 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java @@ -102,6 +102,8 @@ public class TestPutHBaseJSON { @Test public void testSingleJsonDocAndProvidedRowIdwithNonString() throws IOException, InitializationException { final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + runner.setProperty(PutHBaseJSON.FIELD_ENCODING_STRATEGY, PutHBaseJSON.BYTES_ENCODING_VALUE); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index 3a65f5dee5..47f6e2ecb5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -98,8 +98,36 @@ public interface HBaseClientService extends ControllerService { */ void scan(String tableName, Collection columns, String filterExpression, long minTime, ResultHandler handler) throws IOException; + /** + * Converts the given boolean to it's byte representation. + * + * @param b a boolean + * @return the boolean represented as bytes + */ byte[] toBytes(boolean b); + + /** + * Converts the given long to it's byte representation. + * + * @param l a long + * @return the long represented as bytes + */ byte[] toBytes(long l); + + /** + * Converts the given double to it's byte representation. + * + * @param d a double + * @return the double represented as bytes + */ byte[] toBytes(double d); + + /** + * Converts the given string to it's byte representation. + * + * @param s a string + * @return the string represented as bytes + */ byte[] toBytes(String s); + }