mirror of https://github.com/apache/nifi.git
NIFI-2620 Adding support for Binary Row Keys for both PutHBaseCell and PutHBaseJSON. This also involved making changes to PutFlowFile and PutColumn to carry around byte[] and not all strings. This closes #914.
Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
4a49587533
commit
0303805c01
|
@ -71,5 +71,16 @@
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hbase</groupId>
|
||||||
|
<artifactId>hbase-client</artifactId>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-log4j12</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.nifi.hbase;
|
package org.apache.nifi.hbase;
|
||||||
|
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.hbase.put.PutFlowFile;
|
import org.apache.nifi.hbase.put.PutFlowFile;
|
||||||
|
@ -60,6 +62,28 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
|
||||||
.expressionLanguageSupported(true)
|
.expressionLanguageSupported(true)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
static final String STRING_ENCODING_VALUE = "String";
|
||||||
|
static final String BYTES_ENCODING_VALUE = "Bytes";
|
||||||
|
static final String BINARY_ENCODING_VALUE = "Binary";
|
||||||
|
|
||||||
|
|
||||||
|
protected static final AllowableValue ROW_ID_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
|
||||||
|
"Stores the value of row id as a UTF-8 String.");
|
||||||
|
protected static final AllowableValue ROW_ID_ENCODING_BINARY = new AllowableValue(BINARY_ENCODING_VALUE, BINARY_ENCODING_VALUE,
|
||||||
|
"Stores the value of the rows id as a binary byte array. It expects that the row id is a binary formated string.");
|
||||||
|
|
||||||
|
static final PropertyDescriptor ROW_ID_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
|
||||||
|
.name("Row Identifier Encoding Strategy")
|
||||||
|
.description("Specifies the data type of Row ID used when inserting data into HBase. The default behaviror is" +
|
||||||
|
" to convert the row id to a UTF-8 byte array. Choosing Binary will convert a binary formatted string" +
|
||||||
|
" to the correct byte[] representation. The Binary option should be used if you are using Binary row" +
|
||||||
|
" keys in HBase")
|
||||||
|
.required(false) // not all sub-classes will require this
|
||||||
|
.expressionLanguageSupported(false)
|
||||||
|
.defaultValue(ROW_ID_ENCODING_STRING.getValue())
|
||||||
|
.allowableValues(ROW_ID_ENCODING_STRING,ROW_ID_ENCODING_BINARY)
|
||||||
|
.build();
|
||||||
protected static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
|
protected static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
|
||||||
.name("Column Family")
|
.name("Column Family")
|
||||||
.description("The Column Family to use when inserting data into HBase")
|
.description("The Column Family to use when inserting data into HBase")
|
||||||
|
@ -119,7 +143,7 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
|
||||||
} else if (!putFlowFile.isValid()) {
|
} else if (!putFlowFile.isValid()) {
|
||||||
if (StringUtils.isBlank(putFlowFile.getTableName())) {
|
if (StringUtils.isBlank(putFlowFile.getTableName())) {
|
||||||
getLogger().error("Missing table name for FlowFile {}; routing to failure", new Object[]{flowFile});
|
getLogger().error("Missing table name for FlowFile {}; routing to failure", new Object[]{flowFile});
|
||||||
} else if (StringUtils.isBlank(putFlowFile.getRow())) {
|
} else if (null == putFlowFile.getRow()) {
|
||||||
getLogger().error("Missing row id for FlowFile {}; routing to failure", new Object[]{flowFile});
|
getLogger().error("Missing row id for FlowFile {}; routing to failure", new Object[]{flowFile});
|
||||||
} else if (putFlowFile.getColumns() == null || putFlowFile.getColumns().isEmpty()) {
|
} else if (putFlowFile.getColumns() == null || putFlowFile.getColumns().isEmpty()) {
|
||||||
getLogger().error("No columns provided for FlowFile {}; routing to failure", new Object[]{flowFile});
|
getLogger().error("No columns provided for FlowFile {}; routing to failure", new Object[]{flowFile});
|
||||||
|
@ -170,9 +194,19 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getTransitUri(PutFlowFile putFlowFile) {
|
protected String getTransitUri(PutFlowFile putFlowFile) {
|
||||||
return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow();
|
return "hbase://" + putFlowFile.getTableName() + "/" + new String(putFlowFile.getRow(), StandardCharsets.UTF_8);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected byte[] getRow(final String row, final String encoding) {
|
||||||
|
//check to see if we need to modify the rowKey before we pass it down to the PutFlowFile
|
||||||
|
byte[] rowKeyBytes = null;
|
||||||
|
if(BINARY_ENCODING_VALUE.contentEquals(encoding)){
|
||||||
|
rowKeyBytes = clientService.toBytesBinary(row);
|
||||||
|
}else{
|
||||||
|
rowKeyBytes = row.getBytes(StandardCharsets.UTF_8);
|
||||||
|
}
|
||||||
|
return rowKeyBytes;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Sub-classes provide the implementation to create a put from a FlowFile.
|
* Sub-classes provide the implementation to create a put from a FlowFile.
|
||||||
*
|
*
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.nifi.stream.io.StreamUtils;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -53,6 +54,7 @@ public class PutHBaseCell extends AbstractPutHBase {
|
||||||
properties.add(HBASE_CLIENT_SERVICE);
|
properties.add(HBASE_CLIENT_SERVICE);
|
||||||
properties.add(TABLE_NAME);
|
properties.add(TABLE_NAME);
|
||||||
properties.add(ROW_ID);
|
properties.add(ROW_ID);
|
||||||
|
properties.add(ROW_ID_ENCODING_STRATEGY);
|
||||||
properties.add(COLUMN_FAMILY);
|
properties.add(COLUMN_FAMILY);
|
||||||
properties.add(COLUMN_QUALIFIER);
|
properties.add(COLUMN_QUALIFIER);
|
||||||
properties.add(BATCH_SIZE);
|
properties.add(BATCH_SIZE);
|
||||||
|
@ -82,8 +84,15 @@ public class PutHBaseCell extends AbstractPutHBase {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily, columnQualifier, buffer));
|
|
||||||
return new PutFlowFile(tableName, row, columns, flowFile);
|
final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
|
||||||
|
columnQualifier.getBytes(StandardCharsets.UTF_8), buffer));
|
||||||
|
byte[] rowKeyBytes = getRow(row,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue());
|
||||||
|
|
||||||
|
|
||||||
|
return new PutFlowFile(tableName,rowKeyBytes , columns, flowFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.nifi.hbase;
|
||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -89,8 +90,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
|
||||||
.defaultValue(COMPLEX_FIELD_TEXT.getValue())
|
.defaultValue(COMPLEX_FIELD_TEXT.getValue())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
protected static final String STRING_ENCODING_VALUE = "String";
|
|
||||||
protected static final String BYTES_ENCODING_VALUE = "Bytes";
|
|
||||||
|
|
||||||
protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
|
protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
|
||||||
"Stores the value of each field as a UTF-8 String.");
|
"Stores the value of each field as a UTF-8 String.");
|
||||||
|
@ -115,6 +115,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
|
||||||
properties.add(TABLE_NAME);
|
properties.add(TABLE_NAME);
|
||||||
properties.add(ROW_ID);
|
properties.add(ROW_ID);
|
||||||
properties.add(ROW_FIELD_NAME);
|
properties.add(ROW_FIELD_NAME);
|
||||||
|
properties.add(ROW_ID_ENCODING_STRATEGY);
|
||||||
properties.add(COLUMN_FAMILY);
|
properties.add(COLUMN_FAMILY);
|
||||||
properties.add(BATCH_SIZE);
|
properties.add(BATCH_SIZE);
|
||||||
properties.add(COMPLEX_FIELD_STRATEGY);
|
properties.add(COMPLEX_FIELD_STRATEGY);
|
||||||
|
@ -163,6 +164,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
|
||||||
final boolean extractRowId = !StringUtils.isBlank(rowFieldName);
|
final boolean extractRowId = !StringUtils.isBlank(rowFieldName);
|
||||||
final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
|
final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
|
||||||
final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
|
final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
|
||||||
|
final String rowIdEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
|
||||||
|
|
||||||
// Parse the JSON document
|
// Parse the JSON document
|
||||||
final ObjectMapper mapper = new ObjectMapper();
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
@ -236,7 +238,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
|
||||||
if (extractRowId && fieldName.equals(rowFieldName)) {
|
if (extractRowId && fieldName.equals(rowFieldName)) {
|
||||||
rowIdHolder.set(fieldNode.asText());
|
rowIdHolder.set(fieldNode.asText());
|
||||||
} else {
|
} else {
|
||||||
columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get()));
|
columns.add(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), fieldName.getBytes(StandardCharsets.UTF_8), fieldValueHolder.get()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -250,7 +252,9 @@ public class PutHBaseJSON extends AbstractPutHBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
final String putRowId = (extractRowId ? rowIdHolder.get() : rowId);
|
final String putRowId = (extractRowId ? rowIdHolder.get() : rowId);
|
||||||
return new PutFlowFile(tableName, putRowId, columns, flowFile);
|
|
||||||
|
byte[] rowKeyBytes = getRow(putRowId,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue());
|
||||||
|
return new PutFlowFile(tableName, rowKeyBytes, columns, flowFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.hbase;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -34,7 +35,7 @@ public class HBaseTestUtil {
|
||||||
boolean foundPut = false;
|
boolean foundPut = false;
|
||||||
|
|
||||||
for (final PutFlowFile put : puts) {
|
for (final PutFlowFile put : puts) {
|
||||||
if (!row.equals(put.getRow())) {
|
if (!row.equals(new String(put.getRow(), StandardCharsets.UTF_8))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,7 +50,8 @@ public class HBaseTestUtil {
|
||||||
// determine if we have the current expected column
|
// determine if we have the current expected column
|
||||||
boolean foundColumn = false;
|
boolean foundColumn = false;
|
||||||
for (PutColumn putColumn : put.getColumns()) {
|
for (PutColumn putColumn : put.getColumns()) {
|
||||||
if (columnFamily.equals(putColumn.getColumnFamily()) && entry.getKey().equals(putColumn.getColumnQualifier())
|
if (columnFamily.equals(new String(putColumn.getColumnFamily(), StandardCharsets.UTF_8))
|
||||||
|
&& entry.getKey().equals(new String(putColumn.getColumnQualifier(), StandardCharsets.UTF_8))
|
||||||
&& Arrays.equals(entry.getValue(), putColumn.getBuffer())) {
|
&& Arrays.equals(entry.getValue(), putColumn.getBuffer())) {
|
||||||
foundColumn = true;
|
foundColumn = true;
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.hbase;
|
package org.apache.nifi.hbase;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.hbase.put.PutColumn;
|
import org.apache.nifi.hbase.put.PutColumn;
|
||||||
import org.apache.nifi.hbase.put.PutFlowFile;
|
import org.apache.nifi.hbase.put.PutFlowFile;
|
||||||
|
@ -23,6 +24,7 @@ import org.apache.nifi.hbase.scan.Column;
|
||||||
import org.apache.nifi.hbase.scan.ResultCell;
|
import org.apache.nifi.hbase.scan.ResultCell;
|
||||||
import org.apache.nifi.hbase.scan.ResultHandler;
|
import org.apache.nifi.hbase.scan.ResultHandler;
|
||||||
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -47,7 +49,7 @@ public class MockHBaseClientService extends AbstractControllerService implements
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void put(String tableName, String rowId, Collection<PutColumn> columns) throws IOException {
|
public void put(String tableName, byte[] rowId, Collection<PutColumn> columns) throws IOException {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,4 +133,9 @@ public class MockHBaseClientService extends AbstractControllerService implements
|
||||||
public byte[] toBytes(final String s) {
|
public byte[] toBytes(final String s) {
|
||||||
return s.getBytes(StandardCharsets.UTF_8);
|
return s.getBytes(StandardCharsets.UTF_8);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] toBytesBinary(String s) {
|
||||||
|
return Bytes.toBytesBinary(s);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -253,6 +253,52 @@ public class TestPutHBaseCell {
|
||||||
assertEquals(2, runner.getProvenanceEvents().size());
|
assertEquals(2, runner.getProvenanceEvents().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingleFlowFileWithBinaryRowKey() throws IOException, InitializationException {
|
||||||
|
final String tableName = "nifi";
|
||||||
|
final String row = "\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00" +
|
||||||
|
"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00" +
|
||||||
|
"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00" +
|
||||||
|
"\\x00\\x00\\x00\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00" +
|
||||||
|
"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x01\\x00\\x00\\x00\\x00\\x00" +
|
||||||
|
"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00" +
|
||||||
|
"\\x00\\x00\\x00\\x01\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00" +
|
||||||
|
"\\x01\\x00\\x00\\x01\\x00\\x00\\x00\\x00\\x01\\x01\\x01\\x00\\x01\\x00\\x01\\x01\\x01\\x00\\x00\\x00" +
|
||||||
|
"\\x00\\x00\\x00\\x01\\x01\\x01\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x01\\x01\\x00\\x01\\x00\\x01\\x00" +
|
||||||
|
"\\x00\\x01\\x01\\x01\\x01\\x00\\x00\\x01\\x01\\x01\\x00\\x01\\x00\\x00";
|
||||||
|
|
||||||
|
final String columnFamily = "family1";
|
||||||
|
final String columnQualifier = "qualifier1";
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(PutHBaseCell.class);
|
||||||
|
runner.setProperty(PutHBaseCell.TABLE_NAME, tableName);
|
||||||
|
runner.setProperty(PutHBaseCell.ROW_ID, row);
|
||||||
|
runner.setProperty(PutHBaseCell.ROW_ID_ENCODING_STRATEGY,PutHBaseCell.ROW_ID_ENCODING_BINARY.getValue());
|
||||||
|
runner.setProperty(PutHBaseCell.COLUMN_FAMILY, columnFamily);
|
||||||
|
runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, columnQualifier);
|
||||||
|
runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
|
||||||
|
|
||||||
|
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
|
||||||
|
|
||||||
|
final byte[] expectedRowKey = hBaseClient.toBytesBinary(row);
|
||||||
|
|
||||||
|
final String content = "some content";
|
||||||
|
runner.enqueue(content.getBytes("UTF-8"));
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
|
||||||
|
|
||||||
|
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
|
||||||
|
outFile.assertContentEquals(content);
|
||||||
|
|
||||||
|
assertNotNull(hBaseClient.getFlowFilePuts());
|
||||||
|
assertEquals(1, hBaseClient.getFlowFilePuts().size());
|
||||||
|
|
||||||
|
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
|
||||||
|
assertEquals(1, puts.size());
|
||||||
|
verifyPut(expectedRowKey, columnFamily.getBytes(StandardCharsets.UTF_8), columnQualifier.getBytes(StandardCharsets.UTF_8), content, puts.get(0));
|
||||||
|
|
||||||
|
assertEquals(1, runner.getProvenanceEvents().size());
|
||||||
|
}
|
||||||
private Map<String, String> getAtrributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) {
|
private Map<String, String> getAtrributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) {
|
||||||
final Map<String,String> attributes1 = new HashMap<>();
|
final Map<String,String> attributes1 = new HashMap<>();
|
||||||
attributes1.put("hbase.tableName", tableName);
|
attributes1.put("hbase.tableName", tableName);
|
||||||
|
@ -280,14 +326,18 @@ public class TestPutHBaseCell {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, PutFlowFile put) {
|
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, PutFlowFile put) {
|
||||||
assertEquals(row, put.getRow());
|
verifyPut(row.getBytes(StandardCharsets.UTF_8),columnFamily.getBytes(StandardCharsets.UTF_8),
|
||||||
|
columnQualifier.getBytes(StandardCharsets.UTF_8),content,put);
|
||||||
|
}
|
||||||
|
private void verifyPut(byte[] row, byte[] columnFamily, byte[] columnQualifier, String content, PutFlowFile put) {
|
||||||
|
assertEquals(new String(row, StandardCharsets.UTF_8), new String(put.getRow(), StandardCharsets.UTF_8));
|
||||||
|
|
||||||
assertNotNull(put.getColumns());
|
assertNotNull(put.getColumns());
|
||||||
assertEquals(1, put.getColumns().size());
|
assertEquals(1, put.getColumns().size());
|
||||||
|
|
||||||
final PutColumn column = put.getColumns().iterator().next();
|
final PutColumn column = put.getColumns().iterator().next();
|
||||||
assertEquals(columnFamily, column.getColumnFamily());
|
assertEquals(new String(columnFamily, StandardCharsets.UTF_8), new String(column.getColumnFamily(), StandardCharsets.UTF_8));
|
||||||
assertEquals(columnQualifier, column.getColumnQualifier());
|
assertEquals(new String(columnQualifier, StandardCharsets.UTF_8), new String(column.getColumnQualifier(), StandardCharsets.UTF_8));
|
||||||
assertEquals(content, new String(column.getBuffer(), StandardCharsets.UTF_8));
|
assertEquals(content, new String(column.getBuffer(), StandardCharsets.UTF_8));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -84,7 +84,7 @@ public interface HBaseClientService extends ControllerService {
|
||||||
* @param columns the columns of the row to put
|
* @param columns the columns of the row to put
|
||||||
* @throws IOException thrown when there are communication errors with HBase
|
* @throws IOException thrown when there are communication errors with HBase
|
||||||
*/
|
*/
|
||||||
void put(String tableName, String rowId, Collection<PutColumn> columns) throws IOException;
|
void put(String tableName, byte[] rowId, Collection<PutColumn> columns) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scans the given table using the optional filter criteria and passing each result to the provided handler.
|
* Scans the given table using the optional filter criteria and passing each result to the provided handler.
|
||||||
|
@ -130,4 +130,11 @@ public interface HBaseClientService extends ControllerService {
|
||||||
*/
|
*/
|
||||||
byte[] toBytes(String s);
|
byte[] toBytes(String s);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts the given binary formatted string to a byte representation
|
||||||
|
* @param s a binary encoded string
|
||||||
|
* @return the string represented as bytes
|
||||||
|
*/
|
||||||
|
byte[] toBytesBinary(String s);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,22 +21,22 @@ package org.apache.nifi.hbase.put;
|
||||||
*/
|
*/
|
||||||
public class PutColumn {
|
public class PutColumn {
|
||||||
|
|
||||||
private final String columnFamily;
|
private final byte[] columnFamily;
|
||||||
private final String columnQualifier;
|
private final byte[] columnQualifier;
|
||||||
private final byte[] buffer;
|
private final byte[] buffer;
|
||||||
|
|
||||||
|
|
||||||
public PutColumn(final String columnFamily, final String columnQualifier, final byte[] buffer) {
|
public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer) {
|
||||||
this.columnFamily = columnFamily;
|
this.columnFamily = columnFamily;
|
||||||
this.columnQualifier = columnQualifier;
|
this.columnQualifier = columnQualifier;
|
||||||
this.buffer = buffer;
|
this.buffer = buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getColumnFamily() {
|
public byte[] getColumnFamily() {
|
||||||
return columnFamily;
|
return columnFamily;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getColumnQualifier() {
|
public byte[] getColumnQualifier() {
|
||||||
return columnQualifier;
|
return columnQualifier;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,11 +27,11 @@ import java.util.Collection;
|
||||||
public class PutFlowFile {
|
public class PutFlowFile {
|
||||||
|
|
||||||
private final String tableName;
|
private final String tableName;
|
||||||
private final String row;
|
private final byte[] row;
|
||||||
private final Collection<PutColumn> columns;
|
private final Collection<PutColumn> columns;
|
||||||
private final FlowFile flowFile;
|
private final FlowFile flowFile;
|
||||||
|
|
||||||
public PutFlowFile(String tableName, String row, Collection<PutColumn> columns, FlowFile flowFile) {
|
public PutFlowFile(String tableName, byte[] row, Collection<PutColumn> columns, FlowFile flowFile) {
|
||||||
this.tableName = tableName;
|
this.tableName = tableName;
|
||||||
this.row = row;
|
this.row = row;
|
||||||
this.columns = columns;
|
this.columns = columns;
|
||||||
|
@ -42,7 +42,7 @@ public class PutFlowFile {
|
||||||
return tableName;
|
return tableName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getRow() {
|
public byte[] getRow() {
|
||||||
return row;
|
return row;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,12 +55,12 @@ public class PutFlowFile {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isValid() {
|
public boolean isValid() {
|
||||||
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || flowFile == null || columns == null || columns.isEmpty()) {
|
if (StringUtils.isBlank(tableName) || null == row || flowFile == null || columns == null || columns.isEmpty()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (PutColumn column : columns) {
|
for (PutColumn column : columns) {
|
||||||
if (StringUtils.isBlank(column.getColumnQualifier()) || StringUtils.isBlank(column.getColumnFamily()) || column.getBuffer() == null) {
|
if (null == column.getColumnQualifier() || null == column.getColumnFamily() || column.getBuffer() == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -271,16 +271,18 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
// Create one Put per row....
|
// Create one Put per row....
|
||||||
final Map<String, Put> rowPuts = new HashMap<>();
|
final Map<String, Put> rowPuts = new HashMap<>();
|
||||||
for (final PutFlowFile putFlowFile : puts) {
|
for (final PutFlowFile putFlowFile : puts) {
|
||||||
Put put = rowPuts.get(putFlowFile.getRow());
|
//this is used for the map key as a byte[] does not work as a key.
|
||||||
|
final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8);
|
||||||
|
Put put = rowPuts.get(rowKeyString);
|
||||||
if (put == null) {
|
if (put == null) {
|
||||||
put = new Put(putFlowFile.getRow().getBytes(StandardCharsets.UTF_8));
|
put = new Put(putFlowFile.getRow());
|
||||||
rowPuts.put(putFlowFile.getRow(), put);
|
rowPuts.put(rowKeyString, put);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final PutColumn column : putFlowFile.getColumns()) {
|
for (final PutColumn column : putFlowFile.getColumns()) {
|
||||||
put.addColumn(
|
put.addColumn(
|
||||||
column.getColumnFamily().getBytes(StandardCharsets.UTF_8),
|
column.getColumnFamily(),
|
||||||
column.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
|
column.getColumnQualifier(),
|
||||||
column.getBuffer());
|
column.getBuffer());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -290,13 +292,13 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void put(final String tableName, final String rowId, final Collection<PutColumn> columns) throws IOException {
|
public void put(final String tableName, final byte[] rowId, final Collection<PutColumn> columns) throws IOException {
|
||||||
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
|
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
|
||||||
Put put = new Put(rowId.getBytes(StandardCharsets.UTF_8));
|
Put put = new Put(rowId);
|
||||||
for (final PutColumn column : columns) {
|
for (final PutColumn column : columns) {
|
||||||
put.addColumn(
|
put.addColumn(
|
||||||
column.getColumnFamily().getBytes(StandardCharsets.UTF_8),
|
column.getColumnFamily(),
|
||||||
column.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
|
column.getColumnQualifier(),
|
||||||
column.getBuffer());
|
column.getBuffer());
|
||||||
}
|
}
|
||||||
table.put(put);
|
table.put(put);
|
||||||
|
@ -428,4 +430,8 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
return Bytes.toBytes(s);
|
return Bytes.toBytes(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] toBytesBinary(String s) {
|
||||||
|
return Bytes.toBytesBinary(s);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -195,9 +195,9 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
final String columnQualifier = "qualifier1";
|
final String columnQualifier = "qualifier1";
|
||||||
final String content = "content1";
|
final String content = "content1";
|
||||||
|
|
||||||
final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
|
final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), columnQualifier.getBytes(StandardCharsets.UTF_8),
|
||||||
content.getBytes(StandardCharsets.UTF_8)));
|
content.getBytes(StandardCharsets.UTF_8)));
|
||||||
final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columns, null);
|
final PutFlowFile putFlowFile = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns, null);
|
||||||
|
|
||||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||||
|
|
||||||
|
@ -234,13 +234,15 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
final String content1 = "content1";
|
final String content1 = "content1";
|
||||||
final String content2 = "content2";
|
final String content2 = "content2";
|
||||||
|
|
||||||
final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
|
final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
|
||||||
|
columnQualifier.getBytes(StandardCharsets.UTF_8),
|
||||||
content1.getBytes(StandardCharsets.UTF_8)));
|
content1.getBytes(StandardCharsets.UTF_8)));
|
||||||
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row, columns1, null);
|
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns1, null);
|
||||||
|
|
||||||
final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
|
final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
|
||||||
|
columnQualifier.getBytes(StandardCharsets.UTF_8),
|
||||||
content2.getBytes(StandardCharsets.UTF_8)));
|
content2.getBytes(StandardCharsets.UTF_8)));
|
||||||
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row, columns2, null);
|
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns2, null);
|
||||||
|
|
||||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||||
|
|
||||||
|
@ -282,13 +284,15 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
final String content1 = "content1";
|
final String content1 = "content1";
|
||||||
final String content2 = "content2";
|
final String content2 = "content2";
|
||||||
|
|
||||||
final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
|
final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
|
||||||
|
columnQualifier.getBytes(StandardCharsets.UTF_8),
|
||||||
content1.getBytes(StandardCharsets.UTF_8)));
|
content1.getBytes(StandardCharsets.UTF_8)));
|
||||||
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1, columns1, null);
|
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1.getBytes(StandardCharsets.UTF_8), columns1, null);
|
||||||
|
|
||||||
final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
|
final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
|
||||||
|
columnQualifier.getBytes(StandardCharsets.UTF_8),
|
||||||
content2.getBytes(StandardCharsets.UTF_8)));
|
content2.getBytes(StandardCharsets.UTF_8)));
|
||||||
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2, columns2, null);
|
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2.getBytes(StandardCharsets.UTF_8), columns2, null);
|
||||||
|
|
||||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue