diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml index 7423777951..dd321b50f4 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml @@ -71,5 +71,16 @@ test + + org.apache.hbase + hbase-client + + + org.slf4j + slf4j-log4j12 + + + test + diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java index 05f4b7ebbc..f5d11f1e4f 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java @@ -17,6 +17,7 @@ package org.apache.nifi.hbase; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hbase.put.PutFlowFile; @@ -60,6 +62,28 @@ public abstract class AbstractPutHBase extends AbstractProcessor { .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + + static final String STRING_ENCODING_VALUE = "String"; + static final String BYTES_ENCODING_VALUE = "Bytes"; + static final String BINARY_ENCODING_VALUE = "Binary"; + + + protected static final AllowableValue ROW_ID_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of row id as a UTF-8 String."); + protected static final AllowableValue ROW_ID_ENCODING_BINARY = new AllowableValue(BINARY_ENCODING_VALUE, BINARY_ENCODING_VALUE, + "Stores the value of the rows id as a binary byte array. It expects that the row id is a binary formated string."); + + static final PropertyDescriptor ROW_ID_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Row Identifier Encoding Strategy") + .description("Specifies the data type of Row ID used when inserting data into HBase. The default behaviror is" + + " to convert the row id to a UTF-8 byte array. Choosing Binary will convert a binary formatted string" + + " to the correct byte[] representation. The Binary option should be used if you are using Binary row" + + " keys in HBase") + .required(false) // not all sub-classes will require this + .expressionLanguageSupported(false) + .defaultValue(ROW_ID_ENCODING_STRING.getValue()) + .allowableValues(ROW_ID_ENCODING_STRING,ROW_ID_ENCODING_BINARY) + .build(); protected static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder() .name("Column Family") .description("The Column Family to use when inserting data into HBase") @@ -119,7 +143,7 @@ public abstract class AbstractPutHBase extends AbstractProcessor { } else if (!putFlowFile.isValid()) { if (StringUtils.isBlank(putFlowFile.getTableName())) { getLogger().error("Missing table name for FlowFile {}; routing to failure", new Object[]{flowFile}); - } else if (StringUtils.isBlank(putFlowFile.getRow())) { + } else if (null == putFlowFile.getRow()) { getLogger().error("Missing row id for FlowFile {}; routing to failure", new Object[]{flowFile}); } else if (putFlowFile.getColumns() == null || putFlowFile.getColumns().isEmpty()) { getLogger().error("No columns provided for FlowFile {}; routing to failure", new Object[]{flowFile}); @@ -170,9 +194,19 @@ public abstract class AbstractPutHBase extends AbstractProcessor { } protected String getTransitUri(PutFlowFile putFlowFile) { - return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow(); + return "hbase://" + putFlowFile.getTableName() + "/" + new String(putFlowFile.getRow(), StandardCharsets.UTF_8); } + protected byte[] getRow(final String row, final String encoding) { + //check to see if we need to modify the rowKey before we pass it down to the PutFlowFile + byte[] rowKeyBytes = null; + if(BINARY_ENCODING_VALUE.contentEquals(encoding)){ + rowKeyBytes = clientService.toBytesBinary(row); + }else{ + rowKeyBytes = row.getBytes(StandardCharsets.UTF_8); + } + return rowKeyBytes; + } /** * Sub-classes provide the implementation to create a put from a FlowFile. * diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java index 759d91ef60..eb1f636c41 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java @@ -33,6 +33,7 @@ import org.apache.nifi.stream.io.StreamUtils; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -53,6 +54,7 @@ public class PutHBaseCell extends AbstractPutHBase { properties.add(HBASE_CLIENT_SERVICE); properties.add(TABLE_NAME); properties.add(ROW_ID); + properties.add(ROW_ID_ENCODING_STRATEGY); properties.add(COLUMN_FAMILY); properties.add(COLUMN_QUALIFIER); properties.add(BATCH_SIZE); @@ -82,8 +84,15 @@ public class PutHBaseCell extends AbstractPutHBase { } }); - final Collection columns = Collections.singletonList(new PutColumn(columnFamily, columnQualifier, buffer)); - return new PutFlowFile(tableName, row, columns, flowFile); + + final Collection columns = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), + columnQualifier.getBytes(StandardCharsets.UTF_8), buffer)); + byte[] rowKeyBytes = getRow(row,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue()); + + + return new PutFlowFile(tableName,rowKeyBytes , columns, flowFile); } + + } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java index 4c4c2073c5..1294d9b164 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java @@ -20,6 +20,7 @@ package org.apache.nifi.hbase; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -89,8 +90,7 @@ public class PutHBaseJSON extends AbstractPutHBase { .defaultValue(COMPLEX_FIELD_TEXT.getValue()) .build(); - protected static final String STRING_ENCODING_VALUE = "String"; - protected static final String BYTES_ENCODING_VALUE = "Bytes"; + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, "Stores the value of each field as a UTF-8 String."); @@ -115,6 +115,7 @@ public class PutHBaseJSON extends AbstractPutHBase { properties.add(TABLE_NAME); properties.add(ROW_ID); properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); properties.add(COLUMN_FAMILY); properties.add(BATCH_SIZE); properties.add(COMPLEX_FIELD_STRATEGY); @@ -163,6 +164,7 @@ public class PutHBaseJSON extends AbstractPutHBase { final boolean extractRowId = !StringUtils.isBlank(rowFieldName); final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue(); final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); + final String rowIdEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue(); // Parse the JSON document final ObjectMapper mapper = new ObjectMapper(); @@ -236,7 +238,7 @@ public class PutHBaseJSON extends AbstractPutHBase { if (extractRowId && fieldName.equals(rowFieldName)) { rowIdHolder.set(fieldNode.asText()); } else { - columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get())); + columns.add(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), fieldName.getBytes(StandardCharsets.UTF_8), fieldValueHolder.get())); } } } @@ -250,7 +252,9 @@ public class PutHBaseJSON extends AbstractPutHBase { } final String putRowId = (extractRowId ? rowIdHolder.get() : rowId); - return new PutFlowFile(tableName, putRowId, columns, flowFile); + + byte[] rowKeyBytes = getRow(putRowId,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue()); + return new PutFlowFile(tableName, rowKeyBytes, columns, flowFile); } /* diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java index f1c6689f0e..90d8838c68 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java @@ -19,6 +19,7 @@ package org.apache.nifi.hbase; import static org.junit.Assert.assertTrue; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -34,7 +35,7 @@ public class HBaseTestUtil { boolean foundPut = false; for (final PutFlowFile put : puts) { - if (!row.equals(put.getRow())) { + if (!row.equals(new String(put.getRow(), StandardCharsets.UTF_8))) { continue; } @@ -49,7 +50,8 @@ public class HBaseTestUtil { // determine if we have the current expected column boolean foundColumn = false; for (PutColumn putColumn : put.getColumns()) { - if (columnFamily.equals(putColumn.getColumnFamily()) && entry.getKey().equals(putColumn.getColumnQualifier()) + if (columnFamily.equals(new String(putColumn.getColumnFamily(), StandardCharsets.UTF_8)) + && entry.getKey().equals(new String(putColumn.getColumnQualifier(), StandardCharsets.UTF_8)) && Arrays.equals(entry.getValue(), putColumn.getBuffer())) { foundColumn = true; break; diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index 35a96bb095..71304e5d5a 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.hbase; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; @@ -23,6 +24,7 @@ import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; + import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -47,7 +49,7 @@ public class MockHBaseClientService extends AbstractControllerService implements } @Override - public void put(String tableName, String rowId, Collection columns) throws IOException { + public void put(String tableName, byte[] rowId, Collection columns) throws IOException { throw new UnsupportedOperationException(); } @@ -131,4 +133,9 @@ public class MockHBaseClientService extends AbstractControllerService implements public byte[] toBytes(final String s) { return s.getBytes(StandardCharsets.UTF_8); } + + @Override + public byte[] toBytesBinary(String s) { + return Bytes.toBytesBinary(s); + } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java index 0cd8ff7a55..2d9068f9e1 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java @@ -253,6 +253,52 @@ public class TestPutHBaseCell { assertEquals(2, runner.getProvenanceEvents().size()); } + @Test + public void testSingleFlowFileWithBinaryRowKey() throws IOException, InitializationException { + final String tableName = "nifi"; + final String row = "\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00" + + "\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00" + + "\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00" + + "\\x00\\x00\\x00\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00" + + "\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x01\\x00\\x00\\x00\\x00\\x00" + + "\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00" + + "\\x00\\x00\\x00\\x01\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00" + + "\\x01\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x01\\x01\\x01\\x00\\x01\\x00\\x01\\x01\\x01\\x00\\x00\\x00" + + "\\x00\\x00\\x00\\x01\\x01\\x01\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x01\\x00\\x01\\x00\\x01\\x00" + + "\\x00\\x01\\x01\\x01\\x01\\x00\\x00\\x01\\x01\\x01\\x00\\x01\\x00\\x00"; + + final String columnFamily = "family1"; + final String columnQualifier = "qualifier1"; + + final TestRunner runner = TestRunners.newTestRunner(PutHBaseCell.class); + runner.setProperty(PutHBaseCell.TABLE_NAME, tableName); + runner.setProperty(PutHBaseCell.ROW_ID, row); + runner.setProperty(PutHBaseCell.ROW_ID_ENCODING_STRATEGY,PutHBaseCell.ROW_ID_ENCODING_BINARY.getValue()); + runner.setProperty(PutHBaseCell.COLUMN_FAMILY, columnFamily); + runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, columnQualifier); + runner.setProperty(PutHBaseCell.BATCH_SIZE, "1"); + + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + + final byte[] expectedRowKey = hBaseClient.toBytesBinary(row); + + final String content = "some content"; + runner.enqueue(content.getBytes("UTF-8")); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + List puts = hBaseClient.getFlowFilePuts().get(tableName); + assertEquals(1, puts.size()); + verifyPut(expectedRowKey, columnFamily.getBytes(StandardCharsets.UTF_8), columnQualifier.getBytes(StandardCharsets.UTF_8), content, puts.get(0)); + + assertEquals(1, runner.getProvenanceEvents().size()); + } private Map getAtrributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) { final Map attributes1 = new HashMap<>(); attributes1.put("hbase.tableName", tableName); @@ -280,14 +326,18 @@ public class TestPutHBaseCell { } private void verifyPut(String row, String columnFamily, String columnQualifier, String content, PutFlowFile put) { - assertEquals(row, put.getRow()); + verifyPut(row.getBytes(StandardCharsets.UTF_8),columnFamily.getBytes(StandardCharsets.UTF_8), + columnQualifier.getBytes(StandardCharsets.UTF_8),content,put); + } + private void verifyPut(byte[] row, byte[] columnFamily, byte[] columnQualifier, String content, PutFlowFile put) { + assertEquals(new String(row, StandardCharsets.UTF_8), new String(put.getRow(), StandardCharsets.UTF_8)); assertNotNull(put.getColumns()); assertEquals(1, put.getColumns().size()); final PutColumn column = put.getColumns().iterator().next(); - assertEquals(columnFamily, column.getColumnFamily()); - assertEquals(columnQualifier, column.getColumnQualifier()); + assertEquals(new String(columnFamily, StandardCharsets.UTF_8), new String(column.getColumnFamily(), StandardCharsets.UTF_8)); + assertEquals(new String(columnQualifier, StandardCharsets.UTF_8), new String(column.getColumnQualifier(), StandardCharsets.UTF_8)); assertEquals(content, new String(column.getBuffer(), StandardCharsets.UTF_8)); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index 47f6e2ecb5..f9f5bfb45a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -84,7 +84,7 @@ public interface HBaseClientService extends ControllerService { * @param columns the columns of the row to put * @throws IOException thrown when there are communication errors with HBase */ - void put(String tableName, String rowId, Collection columns) throws IOException; + void put(String tableName, byte[] rowId, Collection columns) throws IOException; /** * Scans the given table using the optional filter criteria and passing each result to the provided handler. @@ -130,4 +130,11 @@ public interface HBaseClientService extends ControllerService { */ byte[] toBytes(String s); + /** + * Converts the given binary formatted string to a byte representation + * @param s a binary encoded string + * @return the string represented as bytes + */ + byte[] toBytesBinary(String s); + } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java index 0971f94da5..7921bc2554 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java @@ -21,22 +21,22 @@ package org.apache.nifi.hbase.put; */ public class PutColumn { - private final String columnFamily; - private final String columnQualifier; + private final byte[] columnFamily; + private final byte[] columnQualifier; private final byte[] buffer; - public PutColumn(final String columnFamily, final String columnQualifier, final byte[] buffer) { + public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer) { this.columnFamily = columnFamily; this.columnQualifier = columnQualifier; this.buffer = buffer; } - public String getColumnFamily() { + public byte[] getColumnFamily() { return columnFamily; } - public String getColumnQualifier() { + public byte[] getColumnQualifier() { return columnQualifier; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java index a97e3a4a53..428edd02fd 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java @@ -27,11 +27,11 @@ import java.util.Collection; public class PutFlowFile { private final String tableName; - private final String row; + private final byte[] row; private final Collection columns; private final FlowFile flowFile; - public PutFlowFile(String tableName, String row, Collection columns, FlowFile flowFile) { + public PutFlowFile(String tableName, byte[] row, Collection columns, FlowFile flowFile) { this.tableName = tableName; this.row = row; this.columns = columns; @@ -42,7 +42,7 @@ public class PutFlowFile { return tableName; } - public String getRow() { + public byte[] getRow() { return row; } @@ -55,12 +55,12 @@ public class PutFlowFile { } public boolean isValid() { - if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || flowFile == null || columns == null || columns.isEmpty()) { + if (StringUtils.isBlank(tableName) || null == row || flowFile == null || columns == null || columns.isEmpty()) { return false; } for (PutColumn column : columns) { - if (StringUtils.isBlank(column.getColumnQualifier()) || StringUtils.isBlank(column.getColumnFamily()) || column.getBuffer() == null) { + if (null == column.getColumnQualifier() || null == column.getColumnFamily() || column.getBuffer() == null) { return false; } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index 5517474237..97a0d66179 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -271,16 +271,18 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme // Create one Put per row.... final Map rowPuts = new HashMap<>(); for (final PutFlowFile putFlowFile : puts) { - Put put = rowPuts.get(putFlowFile.getRow()); + //this is used for the map key as a byte[] does not work as a key. + final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8); + Put put = rowPuts.get(rowKeyString); if (put == null) { - put = new Put(putFlowFile.getRow().getBytes(StandardCharsets.UTF_8)); - rowPuts.put(putFlowFile.getRow(), put); + put = new Put(putFlowFile.getRow()); + rowPuts.put(rowKeyString, put); } for (final PutColumn column : putFlowFile.getColumns()) { put.addColumn( - column.getColumnFamily().getBytes(StandardCharsets.UTF_8), - column.getColumnQualifier().getBytes(StandardCharsets.UTF_8), + column.getColumnFamily(), + column.getColumnQualifier(), column.getBuffer()); } } @@ -290,13 +292,13 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } @Override - public void put(final String tableName, final String rowId, final Collection columns) throws IOException { + public void put(final String tableName, final byte[] rowId, final Collection columns) throws IOException { try (final Table table = connection.getTable(TableName.valueOf(tableName))) { - Put put = new Put(rowId.getBytes(StandardCharsets.UTF_8)); + Put put = new Put(rowId); for (final PutColumn column : columns) { put.addColumn( - column.getColumnFamily().getBytes(StandardCharsets.UTF_8), - column.getColumnQualifier().getBytes(StandardCharsets.UTF_8), + column.getColumnFamily(), + column.getColumnQualifier(), column.getBuffer()); } table.put(put); @@ -428,4 +430,8 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme return Bytes.toBytes(s); } + @Override + public byte[] toBytesBinary(String s) { + return Bytes.toBytesBinary(s); + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java index 469033d2b5..6e7230710a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java @@ -195,9 +195,9 @@ public class TestHBase_1_1_2_ClientService { final String columnQualifier = "qualifier1"; final String content = "content1"; - final Collection columns = Collections.singletonList(new PutColumn(columnFamily, columnQualifier, + final Collection columns = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), columnQualifier.getBytes(StandardCharsets.UTF_8), content.getBytes(StandardCharsets.UTF_8))); - final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columns, null); + final PutFlowFile putFlowFile = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns, null); final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); @@ -234,13 +234,15 @@ public class TestHBase_1_1_2_ClientService { final String content1 = "content1"; final String content2 = "content2"; - final Collection columns1 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier, + final Collection columns1 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), + columnQualifier.getBytes(StandardCharsets.UTF_8), content1.getBytes(StandardCharsets.UTF_8))); - final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row, columns1, null); + final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns1, null); - final Collection columns2 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier, + final Collection columns2 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), + columnQualifier.getBytes(StandardCharsets.UTF_8), content2.getBytes(StandardCharsets.UTF_8))); - final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row, columns2, null); + final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns2, null); final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); @@ -282,13 +284,15 @@ public class TestHBase_1_1_2_ClientService { final String content1 = "content1"; final String content2 = "content2"; - final Collection columns1 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier, + final Collection columns1 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), + columnQualifier.getBytes(StandardCharsets.UTF_8), content1.getBytes(StandardCharsets.UTF_8))); - final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1, columns1, null); + final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1.getBytes(StandardCharsets.UTF_8), columns1, null); - final Collection columns2 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier, + final Collection columns2 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), + columnQualifier.getBytes(StandardCharsets.UTF_8), content2.getBytes(StandardCharsets.UTF_8))); - final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2, columns2, null); + final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2.getBytes(StandardCharsets.UTF_8), columns2, null); final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);