diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java index 87424f967f..50813963ae 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java @@ -17,7 +17,14 @@ package org.apache.nifi.hbase; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hbase.put.PutFlowFile; @@ -28,12 +35,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - /** * Base class for processors that put data to HBase. */ @@ -180,4 +181,11 @@ public abstract class AbstractPutHBase extends AbstractProcessor { */ protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile); + protected HBaseClientService cliSvc; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + cliSvc = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + } + } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java index 0dba7eecd4..3c10e66ec2 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java @@ -17,6 +17,16 @@ package org.apache.nifi.hbase; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -40,17 +50,6 @@ import org.apache.nifi.util.ObjectHolder; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - @EventDriven @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @@ -175,13 +174,13 @@ public class PutHBaseJSON extends AbstractPutHBase { final Iterator fieldNames = rootNode.getFieldNames(); while (fieldNames.hasNext()) { final String fieldName = fieldNames.next(); - final ObjectHolder fieldValueHolder = new ObjectHolder<>(null); + final ObjectHolder fieldValueHolder = new ObjectHolder<>(null); final JsonNode fieldNode = rootNode.get(fieldName); if (fieldNode.isNull()) { getLogger().debug("Skipping {} because value was null", new Object[]{fieldName}); } else if (fieldNode.isValueNode()) { - fieldValueHolder.set(fieldNode.asText()); + fieldValueHolder.set(extractJNodeValue(fieldNode)); } else { // for non-null, non-value nodes, determine what to do based on the handling strategy switch (complexFieldStrategy) { @@ -194,7 +193,7 @@ public class PutHBaseJSON extends AbstractPutHBase { case TEXT_VALUE: // use toString() here because asText() is only guaranteed to be supported on value nodes // some other types of nodes, like ArrayNode, provide toString implementations - fieldValueHolder.set(fieldNode.toString()); + fieldValueHolder.set(cliSvc.toBytes(fieldNode.toString())); break; case IGNORE_VALUE: // silently skip @@ -208,9 +207,9 @@ public class PutHBaseJSON extends AbstractPutHBase { // otherwise add a new column where the fieldName and fieldValue are the column qualifier and value if (fieldValueHolder.get() != null) { if (extractRowId && fieldName.equals(rowFieldName)) { - rowIdHolder.set(fieldValueHolder.get()); + rowIdHolder.set(fieldNode.asText()); } else { - columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get().getBytes(StandardCharsets.UTF_8))); + columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get())); } } } @@ -227,4 +226,25 @@ public class PutHBaseJSON extends AbstractPutHBase { return new PutFlowFile(tableName, putRowId, columns, flowFile); } + /* + *Handles the conversion of the JsonNode value into it correct underlying data type in the form of a byte array as expected by the columns.add function + */ + private byte[] extractJNodeValue(JsonNode n){ + if (n.isBoolean()){ + //boolean + return cliSvc.toBytes(n.asBoolean()); + }else if(n.isNumber()){ + if(n.isIntegralNumber()){ + //interpret as Long + return cliSvc.toBytes(n.asLong()); + }else{ + //interpret as Double + return cliSvc.toBytes(n.asDouble()); + } + }else{ + //if all else fails, interpret as String + return cliSvc.toBytes(n.asText()); + } + } + } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java index fc30f73948..f1c6689f0e 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java @@ -17,20 +17,20 @@ */ package org.apache.nifi.hbase; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertTrue; - public class HBaseTestUtil { - public static void verifyPut(final String row, final String columnFamily, final Map columns, final List puts) { + public static void verifyPut(final String row, final String columnFamily, final Map columns, final List puts) { boolean foundPut = false; for (final PutFlowFile put : puts) { @@ -45,13 +45,12 @@ public class HBaseTestUtil { // start off assuming we have all the columns boolean foundAllColumns = true; - for (Map.Entry entry : columns.entrySet()) { + for (Map.Entry entry : columns.entrySet()) { // determine if we have the current expected column boolean foundColumn = false; for (PutColumn putColumn : put.getColumns()) { - final String colVal = new String(putColumn.getBuffer(), StandardCharsets.UTF_8); if (columnFamily.equals(putColumn.getColumnFamily()) && entry.getKey().equals(putColumn.getColumnQualifier()) - && entry.getValue().equals(colVal)) { + && Arrays.equals(entry.getValue(), putColumn.getBuffer())) { foundColumn = true; break; } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index bca8b4f983..35a96bb095 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -105,4 +105,30 @@ public class MockHBaseClientService extends AbstractControllerService implements public void setThrowException(boolean throwException) { this.throwException = throwException; } + + @Override + public byte[] toBytes(final boolean b) { + return new byte[] { b ? (byte) -1 : (byte) 0 }; + } + + @Override + public byte[] toBytes(long l) { + byte [] b = new byte[8]; + for (int i = 7; i > 0; i--) { + b[i] = (byte) l; + l >>>= 8; + } + b[0] = (byte) l; + return b; + } + + @Override + public byte[] toBytes(final double d) { + return toBytes(Double.doubleToRawLongBits(d)); + } + + @Override + public byte[] toBytes(final String s) { + return s.getBytes(StandardCharsets.UTF_8); + } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java index 7b5991986f..92c96cc8f0 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java @@ -16,6 +16,15 @@ */ package org.apache.nifi.hbase; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; @@ -25,15 +34,6 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - public class TestPutHBaseJSON { public static final String DEFAULT_TABLE_NAME = "nifi"; @@ -87,9 +87,42 @@ public class TestPutHBaseJSON { final List puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); assertEquals(1, puts.size()); - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "value1"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("value1")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); + HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); + + final List events = runner.getProvenanceEvents(); + assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + assertEquals("hbase://" + DEFAULT_TABLE_NAME + "/" + DEFAULT_ROW, event.getTransitUri()); + } + + @Test + public void testSingleJsonDocAndProvidedRowIdwithNonString() throws IOException, InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + + final String content = "{ \"field1\" : 1.23456, \"field2\" : 2345235, \"field3\" : false }"; + runner.enqueue(content.getBytes("UTF-8")); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); + assertEquals(1, puts.size()); + + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes(1.23456d)); + expectedColumns.put("field2", hBaseClient.toBytes(2345235l)); + expectedColumns.put("field3", hBaseClient.toBytes(false)); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -120,9 +153,9 @@ public class TestPutHBaseJSON { assertEquals(1, puts.size()); // should be a put with row id of myRowId, and rowField shouldn't end up in the columns - final Map expectedColumns1 = new HashMap<>(); - expectedColumns1.put("field1", "value1"); - expectedColumns1.put("field2", "value2"); + final Map expectedColumns1 = new HashMap<>(); + expectedColumns1.put("field1", hBaseClient.toBytes("value1")); + expectedColumns1.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut("myRowId", DEFAULT_COLUMN_FAMILY, expectedColumns1, puts); final List events = runner.getProvenanceEvents(); @@ -200,9 +233,9 @@ public class TestPutHBaseJSON { final List puts = hBaseClient.getFlowFilePuts().get("myTable"); assertEquals(1, puts.size()); - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "value1"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("value1")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut("myRowId", "myColFamily", expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -235,8 +268,8 @@ public class TestPutHBaseJSON { final List puts = hBaseClient.getFlowFilePuts().get("myTable"); assertEquals(1, puts.size()); - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut("value1", "myColFamily", expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -264,8 +297,8 @@ public class TestPutHBaseJSON { assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -289,8 +322,8 @@ public class TestPutHBaseJSON { assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -337,9 +370,9 @@ public class TestPutHBaseJSON { assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "[{\"child_field1\":\"child_value1\"}]"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("[{\"child_field1\":\"child_value1\"}]")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -363,9 +396,9 @@ public class TestPutHBaseJSON { assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "{\"child_field1\":\"child_value1\"}"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("{\"child_field1\":\"child_value1\"}")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index 2f5b6a5e57..3a65f5dee5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -98,4 +98,8 @@ public interface HBaseClientService extends ControllerService { */ void scan(String tableName, Collection columns, String filterExpression, long minTime, ResultHandler handler) throws IOException; + byte[] toBytes(boolean b); + byte[] toBytes(long l); + byte[] toBytes(double d); + byte[] toBytes(String s); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index 1791cfee00..e07b728950 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.ParseFilter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -405,4 +406,24 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } } + @Override + public byte[] toBytes(boolean b) { + return Bytes.toBytes(b); + } + + @Override + public byte[] toBytes(long l) { + return Bytes.toBytes(l); + } + + @Override + public byte[] toBytes(double d) { + return Bytes.toBytes(d); + } + + @Override + public byte[] toBytes(String s) { + return Bytes.toBytes(s); + } + }