NIFI-1895 PutHBaseJSON processor treats all values as Strings

The operator will now inspect the node value to determine type and convert as such.
Numeric integral - Long (assumes widest type)
Numeric not integral - Double (assumes widest type)
Logical - Boolean
everything else (including current Complex Type logic) - String

Values that represent the row key continue to be implictly treated as Strings by the processor

Removed depenency on HBase utility Bytes class from the PutHBaseJSON processor.
Convenience methods to encode to byte array are now wrapped by the appropriate HBaseClientService instance.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
rtempleton 2016-06-09 10:16:49 -05:00 committed by Bryan Bende
parent f5060a6d63
commit 0d2a9dc7e5
7 changed files with 173 additions and 62 deletions

View File

@ -17,7 +17,14 @@
package org.apache.nifi.hbase; package org.apache.nifi.hbase;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.put.PutFlowFile;
@ -28,12 +35,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/** /**
* Base class for processors that put data to HBase. * Base class for processors that put data to HBase.
*/ */
@ -180,4 +181,11 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
*/ */
protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile); protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile);
protected HBaseClientService cliSvc;
@OnScheduled
public void onScheduled(final ProcessContext context) {
cliSvc = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
}
} }

View File

@ -17,6 +17,16 @@
package org.apache.nifi.hbase; package org.apache.nifi.hbase;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
@ -40,17 +50,6 @@ import org.apache.nifi.util.ObjectHolder;
import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@EventDriven @EventDriven
@SupportsBatching @SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@ -175,13 +174,13 @@ public class PutHBaseJSON extends AbstractPutHBase {
final Iterator<String> fieldNames = rootNode.getFieldNames(); final Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) { while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next(); final String fieldName = fieldNames.next();
final ObjectHolder<String> fieldValueHolder = new ObjectHolder<>(null); final ObjectHolder<byte[]> fieldValueHolder = new ObjectHolder<>(null);
final JsonNode fieldNode = rootNode.get(fieldName); final JsonNode fieldNode = rootNode.get(fieldName);
if (fieldNode.isNull()) { if (fieldNode.isNull()) {
getLogger().debug("Skipping {} because value was null", new Object[]{fieldName}); getLogger().debug("Skipping {} because value was null", new Object[]{fieldName});
} else if (fieldNode.isValueNode()) { } else if (fieldNode.isValueNode()) {
fieldValueHolder.set(fieldNode.asText()); fieldValueHolder.set(extractJNodeValue(fieldNode));
} else { } else {
// for non-null, non-value nodes, determine what to do based on the handling strategy // for non-null, non-value nodes, determine what to do based on the handling strategy
switch (complexFieldStrategy) { switch (complexFieldStrategy) {
@ -194,7 +193,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
case TEXT_VALUE: case TEXT_VALUE:
// use toString() here because asText() is only guaranteed to be supported on value nodes // use toString() here because asText() is only guaranteed to be supported on value nodes
// some other types of nodes, like ArrayNode, provide toString implementations // some other types of nodes, like ArrayNode, provide toString implementations
fieldValueHolder.set(fieldNode.toString()); fieldValueHolder.set(cliSvc.toBytes(fieldNode.toString()));
break; break;
case IGNORE_VALUE: case IGNORE_VALUE:
// silently skip // silently skip
@ -208,9 +207,9 @@ public class PutHBaseJSON extends AbstractPutHBase {
// otherwise add a new column where the fieldName and fieldValue are the column qualifier and value // otherwise add a new column where the fieldName and fieldValue are the column qualifier and value
if (fieldValueHolder.get() != null) { if (fieldValueHolder.get() != null) {
if (extractRowId && fieldName.equals(rowFieldName)) { if (extractRowId && fieldName.equals(rowFieldName)) {
rowIdHolder.set(fieldValueHolder.get()); rowIdHolder.set(fieldNode.asText());
} else { } else {
columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get().getBytes(StandardCharsets.UTF_8))); columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get()));
} }
} }
} }
@ -227,4 +226,25 @@ public class PutHBaseJSON extends AbstractPutHBase {
return new PutFlowFile(tableName, putRowId, columns, flowFile); return new PutFlowFile(tableName, putRowId, columns, flowFile);
} }
/*
*Handles the conversion of the JsonNode value into it correct underlying data type in the form of a byte array as expected by the columns.add function
*/
private byte[] extractJNodeValue(JsonNode n){
if (n.isBoolean()){
//boolean
return cliSvc.toBytes(n.asBoolean());
}else if(n.isNumber()){
if(n.isIntegralNumber()){
//interpret as Long
return cliSvc.toBytes(n.asLong());
}else{
//interpret as Double
return cliSvc.toBytes(n.asDouble());
}
}else{
//if all else fails, interpret as String
return cliSvc.toBytes(n.asText());
}
}
} }

View File

@ -17,20 +17,20 @@
*/ */
package org.apache.nifi.hbase; package org.apache.nifi.hbase;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertTrue;
public class HBaseTestUtil { public class HBaseTestUtil {
public static void verifyPut(final String row, final String columnFamily, final Map<String,String> columns, final List<PutFlowFile> puts) { public static void verifyPut(final String row, final String columnFamily, final Map<String,byte[]> columns, final List<PutFlowFile> puts) {
boolean foundPut = false; boolean foundPut = false;
for (final PutFlowFile put : puts) { for (final PutFlowFile put : puts) {
@ -45,13 +45,12 @@ public class HBaseTestUtil {
// start off assuming we have all the columns // start off assuming we have all the columns
boolean foundAllColumns = true; boolean foundAllColumns = true;
for (Map.Entry<String, String> entry : columns.entrySet()) { for (Map.Entry<String, byte[]> entry : columns.entrySet()) {
// determine if we have the current expected column // determine if we have the current expected column
boolean foundColumn = false; boolean foundColumn = false;
for (PutColumn putColumn : put.getColumns()) { for (PutColumn putColumn : put.getColumns()) {
final String colVal = new String(putColumn.getBuffer(), StandardCharsets.UTF_8);
if (columnFamily.equals(putColumn.getColumnFamily()) && entry.getKey().equals(putColumn.getColumnQualifier()) if (columnFamily.equals(putColumn.getColumnFamily()) && entry.getKey().equals(putColumn.getColumnQualifier())
&& entry.getValue().equals(colVal)) { && Arrays.equals(entry.getValue(), putColumn.getBuffer())) {
foundColumn = true; foundColumn = true;
break; break;
} }

View File

@ -105,4 +105,30 @@ public class MockHBaseClientService extends AbstractControllerService implements
public void setThrowException(boolean throwException) { public void setThrowException(boolean throwException) {
this.throwException = throwException; this.throwException = throwException;
} }
@Override
public byte[] toBytes(final boolean b) {
return new byte[] { b ? (byte) -1 : (byte) 0 };
}
@Override
public byte[] toBytes(long l) {
byte [] b = new byte[8];
for (int i = 7; i > 0; i--) {
b[i] = (byte) l;
l >>>= 8;
}
b[0] = (byte) l;
return b;
}
@Override
public byte[] toBytes(final double d) {
return toBytes(Double.doubleToRawLongBits(d));
}
@Override
public byte[] toBytes(final String s) {
return s.getBytes(StandardCharsets.UTF_8);
}
} }

View File

@ -16,6 +16,15 @@
*/ */
package org.apache.nifi.hbase; package org.apache.nifi.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
@ -25,15 +34,6 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TestPutHBaseJSON { public class TestPutHBaseJSON {
public static final String DEFAULT_TABLE_NAME = "nifi"; public static final String DEFAULT_TABLE_NAME = "nifi";
@ -87,9 +87,42 @@ public class TestPutHBaseJSON {
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size()); assertEquals(1, puts.size());
final Map<String,String> expectedColumns = new HashMap<>(); final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", "value1"); expectedColumns.put("field1", hBaseClient.toBytes("value1"));
expectedColumns.put("field2", "value2"); expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals("hbase://" + DEFAULT_TABLE_NAME + "/" + DEFAULT_ROW, event.getTransitUri());
}
@Test
public void testSingleJsonDocAndProvidedRowIdwithNonString() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
final String content = "{ \"field1\" : 1.23456, \"field2\" : 2345235, \"field3\" : false }";
runner.enqueue(content.getBytes("UTF-8"));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", hBaseClient.toBytes(1.23456d));
expectedColumns.put("field2", hBaseClient.toBytes(2345235l));
expectedColumns.put("field3", hBaseClient.toBytes(false));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
@ -120,9 +153,9 @@ public class TestPutHBaseJSON {
assertEquals(1, puts.size()); assertEquals(1, puts.size());
// should be a put with row id of myRowId, and rowField shouldn't end up in the columns // should be a put with row id of myRowId, and rowField shouldn't end up in the columns
final Map<String,String> expectedColumns1 = new HashMap<>(); final Map<String,byte[]> expectedColumns1 = new HashMap<>();
expectedColumns1.put("field1", "value1"); expectedColumns1.put("field1", hBaseClient.toBytes("value1"));
expectedColumns1.put("field2", "value2"); expectedColumns1.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut("myRowId", DEFAULT_COLUMN_FAMILY, expectedColumns1, puts); HBaseTestUtil.verifyPut("myRowId", DEFAULT_COLUMN_FAMILY, expectedColumns1, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
@ -200,9 +233,9 @@ public class TestPutHBaseJSON {
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable"); final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable");
assertEquals(1, puts.size()); assertEquals(1, puts.size());
final Map<String,String> expectedColumns = new HashMap<>(); final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", "value1"); expectedColumns.put("field1", hBaseClient.toBytes("value1"));
expectedColumns.put("field2", "value2"); expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut("myRowId", "myColFamily", expectedColumns, puts); HBaseTestUtil.verifyPut("myRowId", "myColFamily", expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
@ -235,8 +268,8 @@ public class TestPutHBaseJSON {
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable"); final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable");
assertEquals(1, puts.size()); assertEquals(1, puts.size());
final Map<String,String> expectedColumns = new HashMap<>(); final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field2", "value2"); expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut("value1", "myColFamily", expectedColumns, puts); HBaseTestUtil.verifyPut("value1", "myColFamily", expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
@ -264,8 +297,8 @@ public class TestPutHBaseJSON {
assertEquals(1, puts.size()); assertEquals(1, puts.size());
// should have skipped field1 and field3 // should have skipped field1 and field3
final Map<String,String> expectedColumns = new HashMap<>(); final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field2", "value2"); expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
} }
@ -289,8 +322,8 @@ public class TestPutHBaseJSON {
assertEquals(1, puts.size()); assertEquals(1, puts.size());
// should have skipped field1 and field3 // should have skipped field1 and field3
final Map<String,String> expectedColumns = new HashMap<>(); final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field2", "value2"); expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
} }
@ -337,9 +370,9 @@ public class TestPutHBaseJSON {
assertEquals(1, puts.size()); assertEquals(1, puts.size());
// should have skipped field1 and field3 // should have skipped field1 and field3
final Map<String,String> expectedColumns = new HashMap<>(); final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", "[{\"child_field1\":\"child_value1\"}]"); expectedColumns.put("field1", hBaseClient.toBytes("[{\"child_field1\":\"child_value1\"}]"));
expectedColumns.put("field2", "value2"); expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
} }
@ -363,9 +396,9 @@ public class TestPutHBaseJSON {
assertEquals(1, puts.size()); assertEquals(1, puts.size());
// should have skipped field1 and field3 // should have skipped field1 and field3
final Map<String,String> expectedColumns = new HashMap<>(); final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", "{\"child_field1\":\"child_value1\"}"); expectedColumns.put("field1", hBaseClient.toBytes("{\"child_field1\":\"child_value1\"}"));
expectedColumns.put("field2", "value2"); expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
} }

View File

@ -98,4 +98,8 @@ public interface HBaseClientService extends ControllerService {
*/ */
void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, ResultHandler handler) throws IOException; void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, ResultHandler handler) throws IOException;
byte[] toBytes(boolean b);
byte[] toBytes(long l);
byte[] toBytes(double d);
byte[] toBytes(String s);
} }

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.ParseFilter; import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -405,4 +406,24 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
} }
} }
@Override
public byte[] toBytes(boolean b) {
return Bytes.toBytes(b);
}
@Override
public byte[] toBytes(long l) {
return Bytes.toBytes(l);
}
@Override
public byte[] toBytes(double d) {
return Bytes.toBytes(d);
}
@Override
public byte[] toBytes(String s) {
return Bytes.toBytes(s);
}
} }