From f7da7e67f49fe9a22eb5357410f0bffd9f589276 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Wed, 9 Aug 2017 14:01:19 -0400 Subject: [PATCH] NIFI-4275 Adding support for specifying the timestamp on PutHBase processors Signed-off-by: Pierre Villard This closes #2070. --- .../apache/nifi/hbase/AbstractPutHBase.java | 7 ++ .../org/apache/nifi/hbase/PutHBaseCell.java | 18 +++- .../org/apache/nifi/hbase/PutHBaseJSON.java | 21 ++++- .../org/apache/nifi/hbase/PutHBaseRecord.java | 46 ++++++++-- .../org/apache/nifi/hbase/HBaseTestUtil.java | 8 +- .../apache/nifi/hbase/TestPutHBaseCell.java | 87 ++++++++++++++++--- .../apache/nifi/hbase/TestPutHBaseJSON.java | 59 +++++++++++++ .../org/apache/nifi/hbase/put/PutColumn.java | 10 +++ .../nifi/hbase/HBase_1_1_2_ClientService.java | 16 +++- 9 files changed, 248 insertions(+), 24 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java index 3aa905426b..237bc03d35 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java @@ -98,6 +98,13 @@ public abstract class AbstractPutHBase extends AbstractProcessor { .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + protected static final PropertyDescriptor TIMESTAMP = new PropertyDescriptor.Builder() + .name("timestamp") + .displayName("Timestamp") + .description("The timestamp for the cells being created in HBase. This field can be left blank and HBase will use the current time.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR) + .build(); protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() .name("Batch Size") .description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " + diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java index eb1f636c41..122e38d21e 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java @@ -30,6 +30,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; import java.io.IOException; import java.io.InputStream; @@ -57,6 +58,7 @@ public class PutHBaseCell extends AbstractPutHBase { properties.add(ROW_ID_ENCODING_STRATEGY); properties.add(COLUMN_FAMILY); properties.add(COLUMN_QUALIFIER); + properties.add(TIMESTAMP); properties.add(BATCH_SIZE); return properties; } @@ -75,6 +77,20 @@ public class PutHBaseCell extends AbstractPutHBase { final String row = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue(); final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue(); + final String timestampValue = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue(); + + final Long timestamp; + if (!StringUtils.isBlank(timestampValue)) { + try { + timestamp = Long.valueOf(timestampValue); + } catch (Exception e) { + getLogger().error("Invalid timestamp value: " + timestampValue, e); + return null; + } + } else { + timestamp = null; + } + final byte[] buffer = new byte[(int) flowFile.getSize()]; session.read(flowFile, new InputStreamCallback() { @@ -86,7 +102,7 @@ public class PutHBaseCell extends AbstractPutHBase { final Collection columns = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), - columnQualifier.getBytes(StandardCharsets.UTF_8), buffer)); + columnQualifier.getBytes(StandardCharsets.UTF_8), buffer, timestamp)); byte[] rowKeyBytes = getRow(row,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue()); diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java index 1294d9b164..2cb5a136cf 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java @@ -117,6 +117,7 @@ public class PutHBaseJSON extends AbstractPutHBase { properties.add(ROW_FIELD_NAME); properties.add(ROW_ID_ENCODING_STRATEGY); properties.add(COLUMN_FAMILY); + properties.add(TIMESTAMP); properties.add(BATCH_SIZE); properties.add(COMPLEX_FIELD_STRATEGY); properties.add(FIELD_ENCODING_STRATEGY); @@ -161,11 +162,24 @@ public class PutHBaseJSON extends AbstractPutHBase { final String rowId = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue(); final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final String timestampValue = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue(); final boolean extractRowId = !StringUtils.isBlank(rowFieldName); final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue(); final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); final String rowIdEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue(); + final Long timestamp; + if (!StringUtils.isBlank(timestampValue)) { + try { + timestamp = Long.valueOf(timestampValue); + } catch (Exception e) { + getLogger().error("Invalid timestamp value: " + timestampValue, e); + return null; + } + } else { + timestamp = null; + } + // Parse the JSON document final ObjectMapper mapper = new ObjectMapper(); final AtomicReference rootNodeRef = new AtomicReference<>(null); @@ -238,7 +252,10 @@ public class PutHBaseJSON extends AbstractPutHBase { if (extractRowId && fieldName.equals(rowFieldName)) { rowIdHolder.set(fieldNode.asText()); } else { - columns.add(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), fieldName.getBytes(StandardCharsets.UTF_8), fieldValueHolder.get())); + final byte[] colFamBytes = columnFamily.getBytes(StandardCharsets.UTF_8); + final byte[] colQualBytes = fieldName.getBytes(StandardCharsets.UTF_8); + final byte[] colValBytes = fieldValueHolder.get(); + columns.add(new PutColumn(colFamBytes, colQualBytes, colValBytes, timestamp)); } } } @@ -253,7 +270,7 @@ public class PutHBaseJSON extends AbstractPutHBase { final String putRowId = (extractRowId ? rowIdHolder.get() : rowId); - byte[] rowKeyBytes = getRow(putRowId,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue()); + byte[] rowKeyBytes = getRow(putRowId, rowIdEncodingStrategy); return new PutFlowFile(tableName, rowKeyBytes, columns, flowFile); } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java index 8aa84ea4fd..90b2fdbeea 100755 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java @@ -38,6 +38,8 @@ import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; +import org.apache.nifi.util.StringUtils; import java.io.IOException; import java.io.InputStream; @@ -58,6 +60,17 @@ public class PutHBaseRecord extends AbstractPutHBase { protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() .name("Row Identifier Field Name") .description("Specifies the name of a record field whose value should be used as the row id for the given record.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor TIMESTAMP_FIELD_NAME = new PropertyDescriptor.Builder() + .name("timestamp-field-name") + .displayName("Timestamp Field Name") + .description("Specifies the name of a record field whose value should be used as the timestamp for the cells in HBase. " + + "The value of this field must be a number, string, or date that can be converted to a long. " + + "If this field is left blank, HBase will use the current time.") .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -123,6 +136,7 @@ public class PutHBaseRecord extends AbstractPutHBase { properties.add(ROW_FIELD_NAME); properties.add(ROW_ID_ENCODING_STRATEGY); properties.add(COLUMN_FAMILY); + properties.add(TIMESTAMP_FIELD_NAME); properties.add(BATCH_SIZE); properties.add(COMPLEX_FIELD_STRATEGY); properties.add(FIELD_ENCODING_STRATEGY); @@ -161,6 +175,7 @@ public class PutHBaseRecord extends AbstractPutHBase { final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final String timestampFieldName = context.getProperty(TIMESTAMP_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue(); final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue(); @@ -184,7 +199,8 @@ public class PutHBaseRecord extends AbstractPutHBase { } while ((record = reader.nextRecord()) != null) { - PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy); + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, + timestampFieldName, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy); flowFiles.add(putFlowFile); index++; @@ -307,8 +323,9 @@ public class PutHBaseRecord extends AbstractPutHBase { } } - protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, - String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy) + protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, String rowFieldName, + String columnFamily, String timestampFieldName, String fieldEncodingStrategy, String rowEncodingStrategy, + String complexFieldStrategy) throws PutCreationFailedInvokedException { PutFlowFile retVal = null; final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); @@ -318,17 +335,33 @@ public class PutHBaseRecord extends AbstractPutHBase { final byte[] fam = clientService.toBytes(columnFamily); if (record != null) { + final Long timestamp; + if (!StringUtils.isBlank(timestampFieldName)) { + try { + timestamp = record.getAsLong(timestampFieldName); + } catch (IllegalTypeConversionException e) { + throw new PutCreationFailedInvokedException("Could not convert " + timestampFieldName + " to a long", e); + } + + if (timestamp == null) { + getLogger().warn("The value of timestamp field " + timestampFieldName + " was null, record will be inserted with latest timestamp"); + } + } else { + timestamp = null; + } + List columns = new ArrayList<>(); for (String name : schema.getFieldNames()) { - if (name.equals(rowFieldName)) { + if (name.equals(rowFieldName) || name.equals(timestampFieldName)) { continue; } final byte[] fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy); if (fieldValueBytes != null) { - columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes)); + columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp)); } } + String rowIdValue = record.getAsString(rowFieldName); if (rowIdValue == null) { throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid"))); @@ -345,5 +378,8 @@ public class PutHBaseRecord extends AbstractPutHBase { PutCreationFailedInvokedException(String msg) { super(msg); } + PutCreationFailedInvokedException(String msg, Exception e) { + super(msg, e); + } } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java index f86e611549..fa598d0220 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java @@ -34,6 +34,10 @@ import org.apache.nifi.util.TestRunner; public class HBaseTestUtil { public static void verifyPut(final String row, final String columnFamily, final Map columns, final List puts) { + verifyPut(row, columnFamily, null, columns, puts); + } + + public static void verifyPut(final String row, final String columnFamily, final Long timestamp, final Map columns, final List puts) { boolean foundPut = false; for (final PutFlowFile put : puts) { @@ -54,7 +58,9 @@ public class HBaseTestUtil { for (PutColumn putColumn : put.getColumns()) { if (columnFamily.equals(new String(putColumn.getColumnFamily(), StandardCharsets.UTF_8)) && entry.getKey().equals(new String(putColumn.getColumnQualifier(), StandardCharsets.UTF_8)) - && Arrays.equals(entry.getValue(), putColumn.getBuffer())) { + && Arrays.equals(entry.getValue(), putColumn.getBuffer()) + && ((timestamp == null && putColumn.getTimestamp() == null) + || (timestamp != null && timestamp.equals(putColumn.getTimestamp())) )) { foundColumn = true; break; } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java index ee6a53f5be..f3fb4343bf 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java @@ -36,7 +36,7 @@ import static org.junit.Assert.assertNotNull; public class TestPutHBaseCell { @Test - public void testSingleFlowFile() throws IOException, InitializationException { + public void testSingleFlowFileNoTimestamp() throws IOException, InitializationException { final String tableName = "nifi"; final String row = "row1"; final String columnFamily = "family1"; @@ -64,26 +64,89 @@ public class TestPutHBaseCell { List puts = hBaseClient.getFlowFilePuts().get(tableName); assertEquals(1, puts.size()); - verifyPut(row, columnFamily, columnQualifier, content, puts.get(0)); + verifyPut(row, columnFamily, columnQualifier, null, content, puts.get(0)); assertEquals(1, runner.getProvenanceEvents().size()); } + @Test + public void testSingleFlowFileWithTimestamp() throws IOException, InitializationException { + final String tableName = "nifi"; + final String row = "row1"; + final String columnFamily = "family1"; + final String columnQualifier = "qualifier1"; + final Long timestamp = 1L; + + final TestRunner runner = TestRunners.newTestRunner(PutHBaseCell.class); + runner.setProperty(PutHBaseCell.TABLE_NAME, tableName); + runner.setProperty(PutHBaseCell.ROW_ID, row); + runner.setProperty(PutHBaseCell.COLUMN_FAMILY, columnFamily); + runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, columnQualifier); + runner.setProperty(PutHBaseCell.TIMESTAMP, timestamp.toString()); + runner.setProperty(PutHBaseCell.BATCH_SIZE, "1"); + + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + + final String content = "some content"; + runner.enqueue(content.getBytes("UTF-8")); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + List puts = hBaseClient.getFlowFilePuts().get(tableName); + assertEquals(1, puts.size()); + verifyPut(row, columnFamily, columnQualifier, timestamp, content, puts.get(0)); + + assertEquals(1, runner.getProvenanceEvents().size()); + } + + @Test + public void testSingleFlowFileWithInvalidTimestamp() throws IOException, InitializationException { + final String tableName = "nifi"; + final String row = "row1"; + final String columnFamily = "family1"; + final String columnQualifier = "qualifier1"; + final String timestamp = "not-a-timestamp"; + + final PutHBaseCell proc = new PutHBaseCell(); + final TestRunner runner = getTestRunnerWithEL(proc); + runner.setProperty(PutHBaseCell.TIMESTAMP, "${hbase.timestamp}"); + runner.setProperty(PutHBaseCell.BATCH_SIZE, "1"); + + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + + final String content = "some content"; + final Map attributes = getAttributeMapWithEL(tableName, row, columnFamily, columnQualifier); + attributes.put("hbase.timestamp", timestamp); + runner.enqueue(content.getBytes("UTF-8"), attributes); + + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1); + } + @Test public void testSingleFlowFileWithEL() throws IOException, InitializationException { final String tableName = "nifi"; final String row = "row1"; final String columnFamily = "family1"; final String columnQualifier = "qualifier1"; + final Long timestamp = 1L; final PutHBaseCell proc = new PutHBaseCell(); final TestRunner runner = getTestRunnerWithEL(proc); + runner.setProperty(PutHBaseCell.TIMESTAMP, "${hbase.timestamp}"); runner.setProperty(PutHBaseCell.BATCH_SIZE, "1"); final MockHBaseClientService hBaseClient = getHBaseClientService(runner); final String content = "some content"; final Map attributes = getAttributeMapWithEL(tableName, row, columnFamily, columnQualifier); + attributes.put("hbase.timestamp", timestamp.toString()); runner.enqueue(content.getBytes("UTF-8"), attributes); runner.run(); @@ -97,7 +160,7 @@ public class TestPutHBaseCell { List puts = hBaseClient.getFlowFilePuts().get(tableName); assertEquals(1, puts.size()); - verifyPut(row, columnFamily, columnQualifier, content, puts.get(0)); + verifyPut(row, columnFamily, columnQualifier, timestamp, content, puts.get(0)); assertEquals(1, runner.getProvenanceEvents().size()); } @@ -185,8 +248,8 @@ public class TestPutHBaseCell { List puts = hBaseClient.getFlowFilePuts().get(tableName); assertEquals(2, puts.size()); - verifyPut(row1, columnFamily, columnQualifier, content1, puts.get(0)); - verifyPut(row2, columnFamily, columnQualifier, content2, puts.get(1)); + verifyPut(row1, columnFamily, columnQualifier, null, content1, puts.get(0)); + verifyPut(row2, columnFamily, columnQualifier, null, content2, puts.get(1)); assertEquals(2, runner.getProvenanceEvents().size()); } @@ -247,8 +310,8 @@ public class TestPutHBaseCell { List puts = hBaseClient.getFlowFilePuts().get(tableName); assertEquals(2, puts.size()); - verifyPut(row, columnFamily, columnQualifier, content1, puts.get(0)); - verifyPut(row, columnFamily, columnQualifier, content2, puts.get(1)); + verifyPut(row, columnFamily, columnQualifier, null, content1, puts.get(0)); + verifyPut(row, columnFamily, columnQualifier, null, content2, puts.get(1)); assertEquals(2, runner.getProvenanceEvents().size()); } @@ -295,10 +358,11 @@ public class TestPutHBaseCell { List puts = hBaseClient.getFlowFilePuts().get(tableName); assertEquals(1, puts.size()); - verifyPut(expectedRowKey, columnFamily.getBytes(StandardCharsets.UTF_8), columnQualifier.getBytes(StandardCharsets.UTF_8), content, puts.get(0)); + verifyPut(expectedRowKey, columnFamily.getBytes(StandardCharsets.UTF_8), columnQualifier.getBytes(StandardCharsets.UTF_8), null, content, puts.get(0)); assertEquals(1, runner.getProvenanceEvents().size()); } + private Map getAttributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) { final Map attributes1 = new HashMap<>(); attributes1.put("hbase.tableName", tableName); @@ -325,11 +389,11 @@ public class TestPutHBaseCell { return hBaseClient; } - private void verifyPut(String row, String columnFamily, String columnQualifier, String content, PutFlowFile put) { + private void verifyPut(String row, String columnFamily, String columnQualifier, Long timestamp, String content, PutFlowFile put) { verifyPut(row.getBytes(StandardCharsets.UTF_8),columnFamily.getBytes(StandardCharsets.UTF_8), - columnQualifier.getBytes(StandardCharsets.UTF_8),content,put); + columnQualifier.getBytes(StandardCharsets.UTF_8), timestamp, content, put); } - private void verifyPut(byte[] row, byte[] columnFamily, byte[] columnQualifier, String content, PutFlowFile put) { + private void verifyPut(byte[] row, byte[] columnFamily, byte[] columnQualifier, Long timestamp, String content, PutFlowFile put) { assertEquals(new String(row, StandardCharsets.UTF_8), new String(put.getRow(), StandardCharsets.UTF_8)); assertNotNull(put.getColumns()); @@ -339,6 +403,7 @@ public class TestPutHBaseCell { assertEquals(new String(columnFamily, StandardCharsets.UTF_8), new String(column.getColumnFamily(), StandardCharsets.UTF_8)); assertEquals(new String(columnQualifier, StandardCharsets.UTF_8), new String(column.getColumnQualifier(), StandardCharsets.UTF_8)); assertEquals(content, new String(column.getBuffer(), StandardCharsets.UTF_8)); + assertEquals(timestamp, column.getTimestamp()); } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java index d20d3549a1..ee799e4dd0 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull; import static org.apache.nifi.hbase.HBaseTestUtil.getHBaseClientService; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; @@ -41,6 +42,7 @@ public class TestPutHBaseJSON { public static final String DEFAULT_TABLE_NAME = "nifi"; public static final String DEFAULT_ROW = "row1"; public static final String DEFAULT_COLUMN_FAMILY = "family1"; + public static final Long DEFAULT_TIMESTAMP = 1L; @Test public void testCustomValidate() throws InitializationException { @@ -441,6 +443,63 @@ public class TestPutHBaseJSON { runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1); } + @Test + public void testTimestamp() throws UnsupportedEncodingException, InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + runner.setProperty(PutHBaseJSON.TIMESTAMP, DEFAULT_TIMESTAMP.toString()); + + final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }"; + runner.enqueue(content.getBytes("UTF-8")); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); + assertEquals(1, puts.size()); + + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("value1")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); + HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, DEFAULT_TIMESTAMP, expectedColumns, puts); + } + + @Test + public void testTimestampWithEL() throws UnsupportedEncodingException, InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + runner.setProperty(PutHBaseJSON.TIMESTAMP, "${hbase.timestamp}"); + + final Map attributes = new HashMap<>(); + attributes.put("hbase.timestamp", DEFAULT_TIMESTAMP.toString()); + + final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }"; + runner.enqueue(content.getBytes("UTF-8"), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); + assertEquals(1, puts.size()); + + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("value1")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); + HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, DEFAULT_TIMESTAMP, expectedColumns, puts); + } + private TestRunner getTestRunner(String table, String columnFamily, String batchSize) { final TestRunner runner = TestRunners.newTestRunner(PutHBaseJSON.class); runner.setProperty(PutHBaseJSON.TABLE_NAME, table); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java index 7921bc2554..b29e032712 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java @@ -24,12 +24,18 @@ public class PutColumn { private final byte[] columnFamily; private final byte[] columnQualifier; private final byte[] buffer; + private final Long timestamp; public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer) { + this(columnFamily, columnQualifier, buffer, null); + } + + public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer, final Long timestamp) { this.columnFamily = columnFamily; this.columnQualifier = columnQualifier; this.buffer = buffer; + this.timestamp = timestamp; } public byte[] getColumnFamily() { @@ -44,4 +50,8 @@ public class PutColumn { return buffer; } + public Long getTimestamp() { + return timestamp; + } + } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index f6ac852008..53f5834d6f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -284,10 +284,18 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } for (final PutColumn column : putFlowFile.getColumns()) { - put.addColumn( - column.getColumnFamily(), - column.getColumnQualifier(), - column.getBuffer()); + if (column.getTimestamp() != null) { + put.addColumn( + column.getColumnFamily(), + column.getColumnQualifier(), + column.getTimestamp(), + column.getBuffer()); + } else { + put.addColumn( + column.getColumnFamily(), + column.getColumnQualifier(), + column.getBuffer()); + } } }