NIFI-4275 Adding support for specifying the timestamp on PutHBase processors

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2070.
This commit is contained in:
Bryan Bende 2017-08-09 14:01:19 -04:00 committed by Pierre Villard
parent 760fd75bee
commit f7da7e67f4
9 changed files with 248 additions and 24 deletions

View File

@ -98,6 +98,13 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor TIMESTAMP = new PropertyDescriptor.Builder()
.name("timestamp")
.displayName("Timestamp")
.description("The timestamp for the cells being created in HBase. This field can be left blank and HBase will use the current time.")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
.build();
protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " +

View File

@ -30,6 +30,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
@ -57,6 +58,7 @@ public class PutHBaseCell extends AbstractPutHBase {
properties.add(ROW_ID_ENCODING_STRATEGY);
properties.add(COLUMN_FAMILY);
properties.add(COLUMN_QUALIFIER);
properties.add(TIMESTAMP);
properties.add(BATCH_SIZE);
return properties;
}
@ -75,6 +77,20 @@ public class PutHBaseCell extends AbstractPutHBase {
final String row = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue();
final String timestampValue = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
final Long timestamp;
if (!StringUtils.isBlank(timestampValue)) {
try {
timestamp = Long.valueOf(timestampValue);
} catch (Exception e) {
getLogger().error("Invalid timestamp value: " + timestampValue, e);
return null;
}
} else {
timestamp = null;
}
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@ -86,7 +102,7 @@ public class PutHBaseCell extends AbstractPutHBase {
final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8), buffer));
columnQualifier.getBytes(StandardCharsets.UTF_8), buffer, timestamp));
byte[] rowKeyBytes = getRow(row,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue());

View File

@ -117,6 +117,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
properties.add(ROW_FIELD_NAME);
properties.add(ROW_ID_ENCODING_STRATEGY);
properties.add(COLUMN_FAMILY);
properties.add(TIMESTAMP);
properties.add(BATCH_SIZE);
properties.add(COMPLEX_FIELD_STRATEGY);
properties.add(FIELD_ENCODING_STRATEGY);
@ -161,11 +162,24 @@ public class PutHBaseJSON extends AbstractPutHBase {
final String rowId = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
final String timestampValue = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
final boolean extractRowId = !StringUtils.isBlank(rowFieldName);
final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
final String rowIdEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
final Long timestamp;
if (!StringUtils.isBlank(timestampValue)) {
try {
timestamp = Long.valueOf(timestampValue);
} catch (Exception e) {
getLogger().error("Invalid timestamp value: " + timestampValue, e);
return null;
}
} else {
timestamp = null;
}
// Parse the JSON document
final ObjectMapper mapper = new ObjectMapper();
final AtomicReference<JsonNode> rootNodeRef = new AtomicReference<>(null);
@ -238,7 +252,10 @@ public class PutHBaseJSON extends AbstractPutHBase {
if (extractRowId && fieldName.equals(rowFieldName)) {
rowIdHolder.set(fieldNode.asText());
} else {
columns.add(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), fieldName.getBytes(StandardCharsets.UTF_8), fieldValueHolder.get()));
final byte[] colFamBytes = columnFamily.getBytes(StandardCharsets.UTF_8);
final byte[] colQualBytes = fieldName.getBytes(StandardCharsets.UTF_8);
final byte[] colValBytes = fieldValueHolder.get();
columns.add(new PutColumn(colFamBytes, colQualBytes, colValBytes, timestamp));
}
}
}
@ -253,7 +270,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
final String putRowId = (extractRowId ? rowIdHolder.get() : rowId);
byte[] rowKeyBytes = getRow(putRowId,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue());
byte[] rowKeyBytes = getRow(putRowId, rowIdEncodingStrategy);
return new PutFlowFile(tableName, rowKeyBytes, columns, flowFile);
}

View File

@ -38,6 +38,8 @@ import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
@ -58,6 +60,17 @@ public class PutHBaseRecord extends AbstractPutHBase {
protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
.name("Row Identifier Field Name")
.description("Specifies the name of a record field whose value should be used as the row id for the given record.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor TIMESTAMP_FIELD_NAME = new PropertyDescriptor.Builder()
.name("timestamp-field-name")
.displayName("Timestamp Field Name")
.description("Specifies the name of a record field whose value should be used as the timestamp for the cells in HBase. " +
"The value of this field must be a number, string, or date that can be converted to a long. " +
"If this field is left blank, HBase will use the current time.")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@ -123,6 +136,7 @@ public class PutHBaseRecord extends AbstractPutHBase {
properties.add(ROW_FIELD_NAME);
properties.add(ROW_ID_ENCODING_STRATEGY);
properties.add(COLUMN_FAMILY);
properties.add(TIMESTAMP_FIELD_NAME);
properties.add(BATCH_SIZE);
properties.add(COMPLEX_FIELD_STRATEGY);
properties.add(FIELD_ENCODING_STRATEGY);
@ -161,6 +175,7 @@ public class PutHBaseRecord extends AbstractPutHBase {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
final String timestampFieldName = context.getProperty(TIMESTAMP_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
@ -184,7 +199,8 @@ public class PutHBaseRecord extends AbstractPutHBase {
}
while ((record = reader.nextRecord()) != null) {
PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily,
timestampFieldName, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
flowFiles.add(putFlowFile);
index++;
@ -307,8 +323,9 @@ public class PutHBaseRecord extends AbstractPutHBase {
}
}
protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile,
String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy)
protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, String rowFieldName,
String columnFamily, String timestampFieldName, String fieldEncodingStrategy, String rowEncodingStrategy,
String complexFieldStrategy)
throws PutCreationFailedInvokedException {
PutFlowFile retVal = null;
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
@ -318,17 +335,33 @@ public class PutHBaseRecord extends AbstractPutHBase {
final byte[] fam = clientService.toBytes(columnFamily);
if (record != null) {
final Long timestamp;
if (!StringUtils.isBlank(timestampFieldName)) {
try {
timestamp = record.getAsLong(timestampFieldName);
} catch (IllegalTypeConversionException e) {
throw new PutCreationFailedInvokedException("Could not convert " + timestampFieldName + " to a long", e);
}
if (timestamp == null) {
getLogger().warn("The value of timestamp field " + timestampFieldName + " was null, record will be inserted with latest timestamp");
}
} else {
timestamp = null;
}
List<PutColumn> columns = new ArrayList<>();
for (String name : schema.getFieldNames()) {
if (name.equals(rowFieldName)) {
if (name.equals(rowFieldName) || name.equals(timestampFieldName)) {
continue;
}
final byte[] fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy);
if (fieldValueBytes != null) {
columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes));
columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp));
}
}
String rowIdValue = record.getAsString(rowFieldName);
if (rowIdValue == null) {
throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid")));
@ -345,5 +378,8 @@ public class PutHBaseRecord extends AbstractPutHBase {
PutCreationFailedInvokedException(String msg) {
super(msg);
}
PutCreationFailedInvokedException(String msg, Exception e) {
super(msg, e);
}
}
}

View File

@ -34,6 +34,10 @@ import org.apache.nifi.util.TestRunner;
public class HBaseTestUtil {
public static void verifyPut(final String row, final String columnFamily, final Map<String,byte[]> columns, final List<PutFlowFile> puts) {
verifyPut(row, columnFamily, null, columns, puts);
}
public static void verifyPut(final String row, final String columnFamily, final Long timestamp, final Map<String,byte[]> columns, final List<PutFlowFile> puts) {
boolean foundPut = false;
for (final PutFlowFile put : puts) {
@ -54,7 +58,9 @@ public class HBaseTestUtil {
for (PutColumn putColumn : put.getColumns()) {
if (columnFamily.equals(new String(putColumn.getColumnFamily(), StandardCharsets.UTF_8))
&& entry.getKey().equals(new String(putColumn.getColumnQualifier(), StandardCharsets.UTF_8))
&& Arrays.equals(entry.getValue(), putColumn.getBuffer())) {
&& Arrays.equals(entry.getValue(), putColumn.getBuffer())
&& ((timestamp == null && putColumn.getTimestamp() == null)
|| (timestamp != null && timestamp.equals(putColumn.getTimestamp())) )) {
foundColumn = true;
break;
}

View File

@ -36,7 +36,7 @@ import static org.junit.Assert.assertNotNull;
public class TestPutHBaseCell {
@Test
public void testSingleFlowFile() throws IOException, InitializationException {
public void testSingleFlowFileNoTimestamp() throws IOException, InitializationException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
@ -64,26 +64,89 @@ public class TestPutHBaseCell {
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(1, puts.size());
verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
verifyPut(row, columnFamily, columnQualifier, null, content, puts.get(0));
assertEquals(1, runner.getProvenanceEvents().size());
}
@Test
public void testSingleFlowFileWithTimestamp() throws IOException, InitializationException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final Long timestamp = 1L;
final TestRunner runner = TestRunners.newTestRunner(PutHBaseCell.class);
runner.setProperty(PutHBaseCell.TABLE_NAME, tableName);
runner.setProperty(PutHBaseCell.ROW_ID, row);
runner.setProperty(PutHBaseCell.COLUMN_FAMILY, columnFamily);
runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, columnQualifier);
runner.setProperty(PutHBaseCell.TIMESTAMP, timestamp.toString());
runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
final String content = "some content";
runner.enqueue(content.getBytes("UTF-8"));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(1, puts.size());
verifyPut(row, columnFamily, columnQualifier, timestamp, content, puts.get(0));
assertEquals(1, runner.getProvenanceEvents().size());
}
@Test
public void testSingleFlowFileWithInvalidTimestamp() throws IOException, InitializationException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final String timestamp = "not-a-timestamp";
final PutHBaseCell proc = new PutHBaseCell();
final TestRunner runner = getTestRunnerWithEL(proc);
runner.setProperty(PutHBaseCell.TIMESTAMP, "${hbase.timestamp}");
runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
final String content = "some content";
final Map<String, String> attributes = getAttributeMapWithEL(tableName, row, columnFamily, columnQualifier);
attributes.put("hbase.timestamp", timestamp);
runner.enqueue(content.getBytes("UTF-8"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
}
@Test
public void testSingleFlowFileWithEL() throws IOException, InitializationException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final Long timestamp = 1L;
final PutHBaseCell proc = new PutHBaseCell();
final TestRunner runner = getTestRunnerWithEL(proc);
runner.setProperty(PutHBaseCell.TIMESTAMP, "${hbase.timestamp}");
runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
final String content = "some content";
final Map<String, String> attributes = getAttributeMapWithEL(tableName, row, columnFamily, columnQualifier);
attributes.put("hbase.timestamp", timestamp.toString());
runner.enqueue(content.getBytes("UTF-8"), attributes);
runner.run();
@ -97,7 +160,7 @@ public class TestPutHBaseCell {
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(1, puts.size());
verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
verifyPut(row, columnFamily, columnQualifier, timestamp, content, puts.get(0));
assertEquals(1, runner.getProvenanceEvents().size());
}
@ -185,8 +248,8 @@ public class TestPutHBaseCell {
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(2, puts.size());
verifyPut(row1, columnFamily, columnQualifier, content1, puts.get(0));
verifyPut(row2, columnFamily, columnQualifier, content2, puts.get(1));
verifyPut(row1, columnFamily, columnQualifier, null, content1, puts.get(0));
verifyPut(row2, columnFamily, columnQualifier, null, content2, puts.get(1));
assertEquals(2, runner.getProvenanceEvents().size());
}
@ -247,8 +310,8 @@ public class TestPutHBaseCell {
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(2, puts.size());
verifyPut(row, columnFamily, columnQualifier, content1, puts.get(0));
verifyPut(row, columnFamily, columnQualifier, content2, puts.get(1));
verifyPut(row, columnFamily, columnQualifier, null, content1, puts.get(0));
verifyPut(row, columnFamily, columnQualifier, null, content2, puts.get(1));
assertEquals(2, runner.getProvenanceEvents().size());
}
@ -295,10 +358,11 @@ public class TestPutHBaseCell {
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(1, puts.size());
verifyPut(expectedRowKey, columnFamily.getBytes(StandardCharsets.UTF_8), columnQualifier.getBytes(StandardCharsets.UTF_8), content, puts.get(0));
verifyPut(expectedRowKey, columnFamily.getBytes(StandardCharsets.UTF_8), columnQualifier.getBytes(StandardCharsets.UTF_8), null, content, puts.get(0));
assertEquals(1, runner.getProvenanceEvents().size());
}
private Map<String, String> getAttributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) {
final Map<String,String> attributes1 = new HashMap<>();
attributes1.put("hbase.tableName", tableName);
@ -325,11 +389,11 @@ public class TestPutHBaseCell {
return hBaseClient;
}
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, PutFlowFile put) {
private void verifyPut(String row, String columnFamily, String columnQualifier, Long timestamp, String content, PutFlowFile put) {
verifyPut(row.getBytes(StandardCharsets.UTF_8),columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8),content,put);
columnQualifier.getBytes(StandardCharsets.UTF_8), timestamp, content, put);
}
private void verifyPut(byte[] row, byte[] columnFamily, byte[] columnQualifier, String content, PutFlowFile put) {
private void verifyPut(byte[] row, byte[] columnFamily, byte[] columnQualifier, Long timestamp, String content, PutFlowFile put) {
assertEquals(new String(row, StandardCharsets.UTF_8), new String(put.getRow(), StandardCharsets.UTF_8));
assertNotNull(put.getColumns());
@ -339,6 +403,7 @@ public class TestPutHBaseCell {
assertEquals(new String(columnFamily, StandardCharsets.UTF_8), new String(column.getColumnFamily(), StandardCharsets.UTF_8));
assertEquals(new String(columnQualifier, StandardCharsets.UTF_8), new String(column.getColumnQualifier(), StandardCharsets.UTF_8));
assertEquals(content, new String(column.getBuffer(), StandardCharsets.UTF_8));
assertEquals(timestamp, column.getTimestamp());
}
}

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull;
import static org.apache.nifi.hbase.HBaseTestUtil.getHBaseClientService;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
@ -41,6 +42,7 @@ public class TestPutHBaseJSON {
public static final String DEFAULT_TABLE_NAME = "nifi";
public static final String DEFAULT_ROW = "row1";
public static final String DEFAULT_COLUMN_FAMILY = "family1";
public static final Long DEFAULT_TIMESTAMP = 1L;
@Test
public void testCustomValidate() throws InitializationException {
@ -441,6 +443,63 @@ public class TestPutHBaseJSON {
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
}
@Test
public void testTimestamp() throws UnsupportedEncodingException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.TIMESTAMP, DEFAULT_TIMESTAMP.toString());
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes("UTF-8"));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", hBaseClient.toBytes("value1"));
expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, DEFAULT_TIMESTAMP, expectedColumns, puts);
}
@Test
public void testTimestampWithEL() throws UnsupportedEncodingException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.TIMESTAMP, "${hbase.timestamp}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("hbase.timestamp", DEFAULT_TIMESTAMP.toString());
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes("UTF-8"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", hBaseClient.toBytes("value1"));
expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, DEFAULT_TIMESTAMP, expectedColumns, puts);
}
private TestRunner getTestRunner(String table, String columnFamily, String batchSize) {
final TestRunner runner = TestRunners.newTestRunner(PutHBaseJSON.class);
runner.setProperty(PutHBaseJSON.TABLE_NAME, table);

View File

@ -24,12 +24,18 @@ public class PutColumn {
private final byte[] columnFamily;
private final byte[] columnQualifier;
private final byte[] buffer;
private final Long timestamp;
public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer) {
this(columnFamily, columnQualifier, buffer, null);
}
public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer, final Long timestamp) {
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
this.buffer = buffer;
this.timestamp = timestamp;
}
public byte[] getColumnFamily() {
@ -44,4 +50,8 @@ public class PutColumn {
return buffer;
}
public Long getTimestamp() {
return timestamp;
}
}

View File

@ -284,12 +284,20 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
for (final PutColumn column : putFlowFile.getColumns()) {
if (column.getTimestamp() != null) {
put.addColumn(
column.getColumnFamily(),
column.getColumnQualifier(),
column.getTimestamp(),
column.getBuffer());
} else {
put.addColumn(
column.getColumnFamily(),
column.getColumnQualifier(),
column.getBuffer());
}
}
}
table.put(new ArrayList<>(rowPuts.values()));
}