From 0d2a9dc7e51dc9c271276ce42fa7fe140857aeae Mon Sep 17 00:00:00 2001 From: rtempleton Date: Thu, 9 Jun 2016 10:16:49 -0500 Subject: [PATCH] NIFI-1895 PutHBaseJSON processor treats all values as Strings The operator will now inspect the node value to determine type and convert as such. Numeric integral - Long (assumes widest type) Numeric not integral - Double (assumes widest type) Logical - Boolean everything else (including current Complex Type logic) - String Values that represent the row key continue to be implictly treated as Strings by the processor Removed depenency on HBase utility Bytes class from the PutHBaseJSON processor. Convenience methods to encode to byte array are now wrapped by the appropriate HBaseClientService instance. Signed-off-by: Bryan Bende --- .../apache/nifi/hbase/AbstractPutHBase.java | 20 ++-- .../org/apache/nifi/hbase/PutHBaseJSON.java | 52 +++++++---- .../org/apache/nifi/hbase/HBaseTestUtil.java | 19 ++-- .../nifi/hbase/MockHBaseClientService.java | 26 ++++++ .../apache/nifi/hbase/TestPutHBaseJSON.java | 93 +++++++++++++------ .../apache/nifi/hbase/HBaseClientService.java | 4 + .../nifi/hbase/HBase_1_1_2_ClientService.java | 21 +++++ 7 files changed, 173 insertions(+), 62 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java index 87424f967f..50813963ae 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java @@ -17,7 +17,14 @@ package org.apache.nifi.hbase; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hbase.put.PutFlowFile; @@ -28,12 +35,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - /** * Base class for processors that put data to HBase. */ @@ -180,4 +181,11 @@ public abstract class AbstractPutHBase extends AbstractProcessor { */ protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile); + protected HBaseClientService cliSvc; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + cliSvc = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + } + } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java index 0dba7eecd4..3c10e66ec2 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java @@ -17,6 +17,16 @@ package org.apache.nifi.hbase; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -40,17 +50,6 @@ import org.apache.nifi.util.ObjectHolder; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - @EventDriven @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @@ -175,13 +174,13 @@ public class PutHBaseJSON extends AbstractPutHBase { final Iterator fieldNames = rootNode.getFieldNames(); while (fieldNames.hasNext()) { final String fieldName = fieldNames.next(); - final ObjectHolder fieldValueHolder = new ObjectHolder<>(null); + final ObjectHolder fieldValueHolder = new ObjectHolder<>(null); final JsonNode fieldNode = rootNode.get(fieldName); if (fieldNode.isNull()) { getLogger().debug("Skipping {} because value was null", new Object[]{fieldName}); } else if (fieldNode.isValueNode()) { - fieldValueHolder.set(fieldNode.asText()); + fieldValueHolder.set(extractJNodeValue(fieldNode)); } else { // for non-null, non-value nodes, determine what to do based on the handling strategy switch (complexFieldStrategy) { @@ -194,7 +193,7 @@ public class PutHBaseJSON extends AbstractPutHBase { case TEXT_VALUE: // use toString() here because asText() is only guaranteed to be supported on value nodes // some other types of nodes, like ArrayNode, provide toString implementations - fieldValueHolder.set(fieldNode.toString()); + fieldValueHolder.set(cliSvc.toBytes(fieldNode.toString())); break; case IGNORE_VALUE: // silently skip @@ -208,9 +207,9 @@ public class PutHBaseJSON extends AbstractPutHBase { // otherwise add a new column where the fieldName and fieldValue are the column qualifier and value if (fieldValueHolder.get() != null) { if (extractRowId && fieldName.equals(rowFieldName)) { - rowIdHolder.set(fieldValueHolder.get()); + rowIdHolder.set(fieldNode.asText()); } else { - columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get().getBytes(StandardCharsets.UTF_8))); + columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get())); } } } @@ -227,4 +226,25 @@ public class PutHBaseJSON extends AbstractPutHBase { return new PutFlowFile(tableName, putRowId, columns, flowFile); } + /* + *Handles the conversion of the JsonNode value into it correct underlying data type in the form of a byte array as expected by the columns.add function + */ + private byte[] extractJNodeValue(JsonNode n){ + if (n.isBoolean()){ + //boolean + return cliSvc.toBytes(n.asBoolean()); + }else if(n.isNumber()){ + if(n.isIntegralNumber()){ + //interpret as Long + return cliSvc.toBytes(n.asLong()); + }else{ + //interpret as Double + return cliSvc.toBytes(n.asDouble()); + } + }else{ + //if all else fails, interpret as String + return cliSvc.toBytes(n.asText()); + } + } + } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java index fc30f73948..f1c6689f0e 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java @@ -17,20 +17,20 @@ */ package org.apache.nifi.hbase; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertTrue; - public class HBaseTestUtil { - public static void verifyPut(final String row, final String columnFamily, final Map columns, final List puts) { + public static void verifyPut(final String row, final String columnFamily, final Map columns, final List puts) { boolean foundPut = false; for (final PutFlowFile put : puts) { @@ -45,13 +45,12 @@ public class HBaseTestUtil { // start off assuming we have all the columns boolean foundAllColumns = true; - for (Map.Entry entry : columns.entrySet()) { + for (Map.Entry entry : columns.entrySet()) { // determine if we have the current expected column boolean foundColumn = false; for (PutColumn putColumn : put.getColumns()) { - final String colVal = new String(putColumn.getBuffer(), StandardCharsets.UTF_8); if (columnFamily.equals(putColumn.getColumnFamily()) && entry.getKey().equals(putColumn.getColumnQualifier()) - && entry.getValue().equals(colVal)) { + && Arrays.equals(entry.getValue(), putColumn.getBuffer())) { foundColumn = true; break; } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index bca8b4f983..35a96bb095 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -105,4 +105,30 @@ public class MockHBaseClientService extends AbstractControllerService implements public void setThrowException(boolean throwException) { this.throwException = throwException; } + + @Override + public byte[] toBytes(final boolean b) { + return new byte[] { b ? (byte) -1 : (byte) 0 }; + } + + @Override + public byte[] toBytes(long l) { + byte [] b = new byte[8]; + for (int i = 7; i > 0; i--) { + b[i] = (byte) l; + l >>>= 8; + } + b[0] = (byte) l; + return b; + } + + @Override + public byte[] toBytes(final double d) { + return toBytes(Double.doubleToRawLongBits(d)); + } + + @Override + public byte[] toBytes(final String s) { + return s.getBytes(StandardCharsets.UTF_8); + } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java index 7b5991986f..92c96cc8f0 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java @@ -16,6 +16,15 @@ */ package org.apache.nifi.hbase; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; @@ -25,15 +34,6 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - public class TestPutHBaseJSON { public static final String DEFAULT_TABLE_NAME = "nifi"; @@ -87,9 +87,42 @@ public class TestPutHBaseJSON { final List puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); assertEquals(1, puts.size()); - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "value1"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("value1")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); + HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); + + final List events = runner.getProvenanceEvents(); + assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + assertEquals("hbase://" + DEFAULT_TABLE_NAME + "/" + DEFAULT_ROW, event.getTransitUri()); + } + + @Test + public void testSingleJsonDocAndProvidedRowIdwithNonString() throws IOException, InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + + final String content = "{ \"field1\" : 1.23456, \"field2\" : 2345235, \"field3\" : false }"; + runner.enqueue(content.getBytes("UTF-8")); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); + assertEquals(1, puts.size()); + + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes(1.23456d)); + expectedColumns.put("field2", hBaseClient.toBytes(2345235l)); + expectedColumns.put("field3", hBaseClient.toBytes(false)); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -120,9 +153,9 @@ public class TestPutHBaseJSON { assertEquals(1, puts.size()); // should be a put with row id of myRowId, and rowField shouldn't end up in the columns - final Map expectedColumns1 = new HashMap<>(); - expectedColumns1.put("field1", "value1"); - expectedColumns1.put("field2", "value2"); + final Map expectedColumns1 = new HashMap<>(); + expectedColumns1.put("field1", hBaseClient.toBytes("value1")); + expectedColumns1.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut("myRowId", DEFAULT_COLUMN_FAMILY, expectedColumns1, puts); final List events = runner.getProvenanceEvents(); @@ -200,9 +233,9 @@ public class TestPutHBaseJSON { final List puts = hBaseClient.getFlowFilePuts().get("myTable"); assertEquals(1, puts.size()); - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "value1"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("value1")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut("myRowId", "myColFamily", expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -235,8 +268,8 @@ public class TestPutHBaseJSON { final List puts = hBaseClient.getFlowFilePuts().get("myTable"); assertEquals(1, puts.size()); - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut("value1", "myColFamily", expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -264,8 +297,8 @@ public class TestPutHBaseJSON { assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -289,8 +322,8 @@ public class TestPutHBaseJSON { assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -337,9 +370,9 @@ public class TestPutHBaseJSON { assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "[{\"child_field1\":\"child_value1\"}]"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("[{\"child_field1\":\"child_value1\"}]")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -363,9 +396,9 @@ public class TestPutHBaseJSON { assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "{\"child_field1\":\"child_value1\"}"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("{\"child_field1\":\"child_value1\"}")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index 2f5b6a5e57..3a65f5dee5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -98,4 +98,8 @@ public interface HBaseClientService extends ControllerService { */ void scan(String tableName, Collection columns, String filterExpression, long minTime, ResultHandler handler) throws IOException; + byte[] toBytes(boolean b); + byte[] toBytes(long l); + byte[] toBytes(double d); + byte[] toBytes(String s); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index 1791cfee00..e07b728950 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.ParseFilter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -405,4 +406,24 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } } + @Override + public byte[] toBytes(boolean b) { + return Bytes.toBytes(b); + } + + @Override + public byte[] toBytes(long l) { + return Bytes.toBytes(l); + } + + @Override + public byte[] toBytes(double d) { + return Bytes.toBytes(d); + } + + @Override + public byte[] toBytes(String s) { + return Bytes.toBytes(s); + } + }