diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
index 64c2aae31e..9f52a69c8e 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
@@ -44,6 +44,14 @@
org.apache.nifi
nifi-processor-utils
+
+ org.apache.nifi
+ nifi-record-serialization-service-api
+
+
+ org.apache.nifi
+ nifi-record
+
org.apache.commons
commons-lang3
@@ -82,5 +90,10 @@
test
+
+ org.apache.nifi
+ nifi-mock-record-utils
+ test
+
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
index 2dc92aec0f..3aa905426b 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
@@ -107,11 +107,11 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
.defaultValue("25")
.build();
- protected static final Relationship REL_SUCCESS = new Relationship.Builder()
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it has been successfully stored in HBase")
.build();
- protected static final Relationship REL_FAILURE = new Relationship.Builder()
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
.build();
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
new file mode 100755
index 0000000000..66f95e024e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "hbase", "put", "record"})
+@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
+@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.")
+@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase")
+public class PutHBaseRecord extends AbstractPutHBase {
+
+ protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
+ .name("Row Identifier Field Name")
+ .description("Specifies the name of a record field whose value should be used as the row id for the given record.")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final String FAIL_VALUE = "Fail";
+ protected static final String WARN_VALUE = "Warn";
+ protected static final String IGNORE_VALUE = "Ignore";
+ protected static final String TEXT_VALUE = "Text";
+
+ protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
+ protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
+ protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
+ protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
+
+ static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
+ .name("Complex Field Strategy")
+ .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
+ .expressionLanguageSupported(false)
+ .required(true)
+ .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
+ .defaultValue(COMPLEX_FIELD_TEXT.getValue())
+ .build();
+
+
+ protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
+ "Stores the value of each field as a UTF-8 String.");
+ protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
+ "Stores the value of each field as the byte representation of the type derived from the record.");
+
+ protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
+ .name("Field Encoding Strategy")
+ .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
+ "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
+ "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
+ "byte representation of that integer."))
+ .required(true)
+ .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
+ .defaultValue(FIELD_ENCODING_STRING.getValue())
+ .build();
+
+ protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("The maximum number of records to be sent to HBase at any one time from the record set.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1000")
+ .build();
+
+ @Override
+ public final List getSupportedPropertyDescriptors() {
+ final List properties = new ArrayList<>();
+ properties.add(RECORD_READER_FACTORY);
+ properties.add(HBASE_CLIENT_SERVICE);
+ properties.add(TABLE_NAME);
+ properties.add(ROW_FIELD_NAME);
+ properties.add(ROW_ID_ENCODING_STRATEGY);
+ properties.add(COLUMN_FAMILY);
+ properties.add(BATCH_SIZE);
+ properties.add(COMPLEX_FIELD_STRATEGY);
+ properties.add(FIELD_ENCODING_STRATEGY);
+ return properties;
+ }
+
+ @Override
+ public Set getRelationships() {
+ final Set rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_FAILURE);
+ return rels;
+ }
+
+ private int addBatch(String tableName, List flowFiles) throws IOException {
+ int columns = 0;
+ clientService.put(tableName, flowFiles);
+ for (PutFlowFile put : flowFiles) {
+ columns += put.getColumns().size();
+ }
+
+ return columns;
+ }
+
+ private RecordReaderFactory recordParserFactory;
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
+ .asControllerService(RecordReaderFactory.class);
+ List flowFiles = new ArrayList<>();
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
+ final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
+ final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
+ final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
+
+ final long start = System.nanoTime();
+ int index = 0;
+ int columns = 0;
+ boolean failed = false;
+ String startIndexStr = flowFile.getAttribute("restart.index");
+ int startIndex = -1;
+ if (startIndexStr != null) {
+ startIndex = Integer.parseInt(startIndexStr);
+ }
+
+ PutFlowFile last = null;
+ try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) {
+ Record record;
+ if (startIndex >= 0) {
+ while ( index++ < startIndex && (reader.nextRecord()) != null) {}
+ }
+
+ while ((record = reader.nextRecord()) != null) {
+ PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
+ flowFiles.add(putFlowFile);
+ index++;
+
+ if (flowFiles.size() == batchSize) {
+ columns += addBatch(tableName, flowFiles);
+ last = flowFiles.get(flowFiles.size() - 1);
+ flowFiles = new ArrayList<>();
+ }
+ }
+ if (flowFiles.size() > 0) {
+ columns += addBatch(tableName, flowFiles);
+ last = flowFiles.get(flowFiles.size() - 1);
+ }
+ } catch (Exception ex) {
+ getLogger().error("Failed to put records to HBase.", ex);
+ failed = true;
+ }
+
+ if (!failed) {
+ sendProvenance(session, flowFile, columns, System.nanoTime() - start, last);
+ flowFile = session.removeAttribute(flowFile, "restart.index");
+ session.transfer(flowFile, REL_SUCCESS);
+ } else {
+ String restartIndex = Integer.toString(index - flowFiles.size());
+ flowFile = session.putAttribute(flowFile, "restart.index", restartIndex);
+ if (columns > 0) {
+ sendProvenance(session, flowFile, columns, System.nanoTime() - start, last);
+ }
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+
+
+ session.commit();
+ }
+
+ private void sendProvenance(ProcessSession session, FlowFile flowFile, int columns, long time, PutFlowFile pff) {
+ final String details = String.format("Put %d cells to HBase.", columns);
+ session.getProvenanceReporter().send(flowFile, getTransitUri(pff), details, time);
+ }
+
+ @Override
+ protected String getTransitUri(PutFlowFile putFlowFile) {
+ return "hbase://" + putFlowFile.getTableName();
+ }
+
+
+ @Override
+ protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) {
+ return null;
+ }
+
+ protected byte[] asBytes(String field, RecordFieldType fieldType, Record record, boolean asString, String complexFieldStrategy) throws PutCreationFailedInvokedException {
+
+ byte[] retVal;
+
+ if (asString) {
+ retVal = clientService.toBytes(record.getAsString(field));
+ } else {
+ switch (fieldType) {
+ case BOOLEAN:
+ retVal = clientService.toBytes(record.getAsBoolean(field));
+ break;
+ case CHAR:
+ retVal = clientService.toBytes(record.getAsString(field));
+ break;
+ case DOUBLE:
+ retVal = clientService.toBytes(record.getAsDouble(field));
+ break;
+ case FLOAT:
+ retVal = clientService.toBytes(record.getAsFloat(field));
+ break;
+ case INT:
+ retVal = clientService.toBytes(record.getAsInt(field));
+ break;
+ case LONG:
+ retVal = clientService.toBytes(record.getAsLong(field));
+ break;
+ default:
+ retVal = null;
+ switch (complexFieldStrategy) {
+ case FAIL_VALUE:
+ getLogger().error("Complex value found for {}; routing to failure", new Object[]{field});
+ throw new PutCreationFailedInvokedException(String.format("Complex value found for %s; routing to failure", field));
+ case WARN_VALUE:
+ getLogger().warn("Complex value found for {}; skipping", new Object[]{field});
+ break;
+ case TEXT_VALUE:
+ retVal = clientService.toBytes(record.getAsString(field));
+ break;
+ case IGNORE_VALUE:
+ // silently skip
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ return retVal;
+ }
+
+ protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile,
+ String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy) throws PutCreationFailedInvokedException {
+ PutFlowFile retVal = null;
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ boolean asString = STRING_ENCODING_VALUE.equals(fieldEncodingStrategy);
+
+ final byte[] fam = clientService.toBytes(columnFamily);
+
+ //try {
+ if (record != null) {
+ List columns = new ArrayList<>();
+ for (String name : schema.getFieldNames()) {
+ if (name.equals(rowFieldName)) {
+ continue;
+ }
+ columns.add(new PutColumn(fam, clientService.toBytes(name), asBytes(name,
+ schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy)));
+ }
+ String rowIdValue = record.getAsString(rowFieldName);
+ if (rowIdValue == null) {
+ throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid")));
+ }
+ byte[] rowId = getRow(rowIdValue, rowEncodingStrategy);
+
+ retVal = new PutFlowFile(tableName, rowId, columns, flowFile);
+ }
+
+/* } catch (Exception ex) {
+ getLogger().error("Error running createPuts", ex);
+ throw new RuntimeException(ex);
+ }*/
+
+ return retVal;
+ }
+
+ static class PutCreationFailedInvokedException extends Exception {
+ PutCreationFailedInvokedException(String msg) {
+ super(msg);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 8af8cd551b..21c827cc7a 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -16,4 +16,5 @@
org.apache.nifi.hbase.GetHBase
org.apache.nifi.hbase.PutHBaseCell
org.apache.nifi.hbase.PutHBaseJSON
-org.apache.nifi.hbase.FetchHBaseRow
\ No newline at end of file
+org.apache.nifi.hbase.PutHBaseRecord
+org.apache.nifi.hbase.FetchHBaseRow
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
index 90d8838c68..f86e611549 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
@@ -28,6 +28,8 @@ import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
public class HBaseTestUtil {
@@ -85,4 +87,12 @@ public class HBaseTestUtil {
}
assertTrue(foundEvent);
}
+
+ public static MockHBaseClientService getHBaseClientService(final TestRunner runner) throws InitializationException {
+ final MockHBaseClientService hBaseClient = new MockHBaseClientService();
+ runner.addControllerService("hbaseClient", hBaseClient);
+ runner.enableControllerService(hBaseClient);
+ runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
+ return hBaseClient;
+ }
}
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index f23e956959..f62102a8e1 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -40,13 +40,19 @@ public class MockHBaseClientService extends AbstractControllerService implements
private Map> flowFilePuts = new HashMap<>();
private boolean throwException = false;
private int numScans = 0;
-
+ private int numPuts = 0;
@Override
public void put(String tableName, Collection puts) throws IOException {
if (throwException) {
throw new IOException("exception");
}
+ if (testFailure) {
+ if (++numPuts == failureThreshold) {
+ throw new IOException();
+ }
+ }
+
this.flowFilePuts.put(tableName, new ArrayList<>(puts));
}
@@ -165,6 +171,16 @@ public class MockHBaseClientService extends AbstractControllerService implements
return new byte[] { b ? (byte) -1 : (byte) 0 };
}
+ @Override
+ public byte[] toBytes(float f) {
+ return toBytes((double)f);
+ }
+
+ @Override
+ public byte[] toBytes(int i) {
+ return toBytes((long)i);
+ }
+
@Override
public byte[] toBytes(long l) {
byte [] b = new byte[8];
@@ -190,4 +206,14 @@ public class MockHBaseClientService extends AbstractControllerService implements
public byte[] toBytesBinary(String s) {
return Bytes.toBytesBinary(s);
}
+
+ private boolean testFailure = false;
+ public void setTestFailure(boolean testFailure) {
+ this.testFailure = testFailure;
+ }
+
+ private int failureThreshold = 1;
+ public void setFailureThreshold(int failureThreshold) {
+ this.failureThreshold = failureThreshold;
+ }
}
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
index 28d9105d4c..d20d3549a1 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
@@ -19,6 +19,8 @@ package org.apache.nifi.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.apache.nifi.hbase.HBaseTestUtil.getHBaseClientService;
+
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
@@ -447,12 +449,4 @@ public class TestPutHBaseJSON {
return runner;
}
- private MockHBaseClientService getHBaseClientService(final TestRunner runner) throws InitializationException {
- final MockHBaseClientService hBaseClient = new MockHBaseClientService();
- runner.addControllerService("hbaseClient", hBaseClient);
- runner.enableControllerService(hBaseClient);
- runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
- return hBaseClient;
- }
-
}
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java
new file mode 100644
index 0000000000..7bc5f96750
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.nifi.hbase.HBaseTestUtil.getHBaseClientService;
+
+public class TestPutHBaseRecord {
+
+ public static final String DEFAULT_TABLE_NAME = "nifi";
+ public static final String DEFAULT_COLUMN_FAMILY = "family1";
+
+ private TestRunner getTestRunner(String table, String columnFamily, String batchSize) {
+ final TestRunner runner = TestRunners.newTestRunner(PutHBaseRecord.class);
+ runner.enforceReadStreamsClosed(false);
+ runner.setProperty(PutHBaseJSON.TABLE_NAME, table);
+ runner.setProperty(PutHBaseJSON.COLUMN_FAMILY, columnFamily);
+ runner.setProperty(PutHBaseJSON.BATCH_SIZE, batchSize);
+ return runner;
+ }
+
+ private static final List KEYS = Arrays.asList(1, 2,3, 4);
+ private static final List NAMES = Arrays.asList("rec1", "rec2", "rec3", "rec4");
+ private static final List CODES = Arrays.asList(101L, 102L, 103L, 104L);
+
+ private void generateTestData(TestRunner runner) throws IOException {
+
+ final MockRecordParser parser = new MockRecordParser();
+ try {
+ runner.addControllerService("parser", parser);
+ } catch (InitializationException e) {
+ throw new IOException(e);
+ }
+ runner.enableControllerService(parser);
+ runner.setProperty(PutHBaseRecord.RECORD_READER_FACTORY, "parser");
+
+ parser.addSchemaField("id", RecordFieldType.INT);
+ parser.addSchemaField("name", RecordFieldType.STRING);
+ parser.addSchemaField("code", RecordFieldType.LONG);
+
+ for (int x = 0; x < KEYS.size(); x++) {
+ parser.addRecord(KEYS.get(x), NAMES.get(x), CODES.get(x));
+ }
+ }
+
+ private void basicPutSetup(String encodingStrategy, PutValidator validator) throws Exception {
+ basicPutSetup(encodingStrategy, validator, "1000", 4);
+ }
+
+ private void basicPutSetup(String encodingStrategy, PutValidator validator, String batchSize, int expectedPuts) throws Exception {
+ Assert.assertEquals(1L, 1L);
+ TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, batchSize);
+ runner.setProperty(PutHBaseRecord.ROW_FIELD_NAME, "id");
+ runner.setProperty(PutHBaseRecord.FIELD_ENCODING_STRATEGY, encodingStrategy);
+ MockHBaseClientService client = getHBaseClientService(runner);
+ generateTestData(runner);
+ runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l
+ runner.run();
+
+ List results = runner.getFlowFilesForRelationship(PutHBaseRecord.REL_SUCCESS);
+ Assert.assertTrue("Wrong count", results.size() == 1);
+
+ Assert.assertEquals("Wrong number of PutFlowFiles ", client.getFlowFilePuts().get("nifi").size(), expectedPuts);
+ for (PutFlowFile putFlowFile : client.getFlowFilePuts().get("nifi")) {
+ Iterator columnIterator = putFlowFile.getColumns().iterator();
+ PutColumn name = columnIterator.next();
+ PutColumn code = columnIterator.next();
+ Assert.assertNotNull("Name was null", name);
+ Assert.assertNotNull("Code was null", code);
+
+ String nFamName = new String(name.getColumnFamily());
+ String cFamName = new String(code.getColumnFamily());
+ String nQual = new String(name.getColumnQualifier());
+ String cQual = new String(code.getColumnQualifier());
+
+ Assert.assertEquals("Name column family didn't match", nFamName, DEFAULT_COLUMN_FAMILY);
+ Assert.assertEquals("Code column family didn't match", cFamName, DEFAULT_COLUMN_FAMILY);
+ Assert.assertEquals("Name qualifier didn't match", nQual, "name");
+ Assert.assertEquals("Code qualifier didn't match", cQual, "code");
+
+ validator.handle(name, code);
+ }
+ }
+
+ @Test
+ public void testByteEncodedPut() throws Exception {
+ basicPutSetup(PutHBaseRecord.BYTES_ENCODING_VALUE, (PutColumn[] columns) -> {
+ PutColumn name = columns[0];
+ PutColumn code = columns[1];
+ String nameVal = Bytes.toString(name.getBuffer());
+ Long codeVal = Bytes.toLong(code.getBuffer());
+ Assert.assertTrue("Name was not found", NAMES.contains(nameVal));
+ Assert.assertTrue("Code was not found ", CODES.contains(codeVal));
+ });
+ }
+
+ private void innertTest(PutColumn[] columns) {
+ PutColumn name = columns[0];
+ PutColumn code = columns[1];
+ String nameVal = Bytes.toString(name.getBuffer());
+ String codeVal = Bytes.toString(code.getBuffer());
+ Assert.assertTrue("Name was not found", NAMES.contains(nameVal));
+ Assert.assertTrue("Code was not found ", CODES.contains(new Long(codeVal)));
+ }
+
+ @Test
+ public void testStringEncodedPut() throws Exception {
+ basicPutSetup(PutHBaseRecord.STRING_ENCODING_VALUE, (PutColumn[] columns) -> {
+ innertTest(columns);
+ });
+ }
+
+ @Test
+ public void testBatchOfOne() throws Exception {
+ basicPutSetup(PutHBaseRecord.STRING_ENCODING_VALUE, (PutColumn[] columns) -> {
+ innertTest(columns);
+ }, "1", 1);
+ }
+
+ @Test
+ public void testBatchOfTwo() throws Exception {
+ basicPutSetup(PutHBaseRecord.STRING_ENCODING_VALUE, (PutColumn[] columns) -> {
+ innertTest(columns);
+ }, "2", 2);
+ }
+
+ @Test
+ public void testFailure() throws Exception {
+ Assert.assertEquals(1L, 1L);
+ TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "2");
+ runner.setProperty(PutHBaseRecord.ROW_FIELD_NAME, "id");
+ runner.setProperty(PutHBaseRecord.FIELD_ENCODING_STRATEGY, PutHBaseRecord.STRING_ENCODING_VALUE);
+ MockHBaseClientService client = getHBaseClientService(runner);
+ client.setTestFailure(true);
+ client.setFailureThreshold(2);
+ generateTestData(runner);
+ runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.
+ runner.run();
+ List result = runner.getFlowFilesForRelationship(PutHBaseRecord.REL_FAILURE);
+ Assert.assertEquals("Size was wrong", result.size(), 1);
+ Assert.assertEquals("Wrong # of PutFlowFiles", client.getFlowFilePuts().get("nifi").size(), 2);
+ Assert.assertTrue(runner.getFlowFilesForRelationship(PutHBaseRecord.REL_SUCCESS).size() == 0);
+
+ MockFlowFile mff = result.get(0);
+ Assert.assertNotNull("Missing restart index attribute", mff.getAttribute("restart.index"));
+ List old = client.getFlowFilePuts().get("nifi");
+ client.setTestFailure(false);
+ runner.enqueue("test");
+ runner.run();
+
+ Assert.assertEquals("Size was wrong", result.size(), 1);
+ Assert.assertEquals("Wrong # of PutFlowFiles", client.getFlowFilePuts().get("nifi").size(), 2);
+
+ List newPFF = client.getFlowFilePuts().get("nifi");
+ for (PutFlowFile putFlowFile : old) {
+ Assert.assertFalse("Duplication", newPFF.contains(putFlowFile));
+ }
+ }
+
+ interface PutValidator {
+ void handle(PutColumn... columns);
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
index 80b8961434..c67b0cafc9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
@@ -148,6 +148,23 @@ public interface HBaseClientService extends ControllerService {
*/
byte[] toBytes(boolean b);
+ /**
+ * Converts the given float to its byte representation.
+ *
+ * @param f a float
+ * @return the float represented as bytes
+ */
+ byte[] toBytes(float f);
+
+
+ /**
+ * Converts the given float to its byte representation.
+ *
+ * @param i an int
+ * @return the int represented as bytes
+ */
+ byte[] toBytes(int i);
+
/**
* Converts the given long to it's byte representation.
*
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
index 2566ebdeec..025ed50b0f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java
@@ -67,4 +67,16 @@ public class PutFlowFile {
return true;
}
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof PutFlowFile) {
+ PutFlowFile pff = (PutFlowFile)obj;
+ return this.tableName.equals(pff.tableName)
+ && this.row.equals(pff.row)
+ && this.columns.equals(pff.columns)
+ && this.flowFile.equals(pff.flowFile);
+ } else {
+ return false;
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index 2d1166c01f..f6ac852008 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -489,6 +489,16 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
return Bytes.toBytes(b);
}
+ @Override
+ public byte[] toBytes(float f) {
+ return Bytes.toBytes(f);
+ }
+
+ @Override
+ public byte[] toBytes(int i) {
+ return Bytes.toBytes(i);
+ }
+
@Override
public byte[] toBytes(long l) {
return Bytes.toBytes(l);