From 496a32e12c4da869996ccee4c1dcf797b229cec3 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Fri, 23 Jun 2017 07:50:26 -0400 Subject: [PATCH] NIFI-4024 Added org.apache.nifi.hbase.PutHBaseRecord Signed-off-by: Bryan Bende --- .../nifi-hbase-processors/pom.xml | 13 + .../apache/nifi/hbase/AbstractPutHBase.java | 4 +- .../org/apache/nifi/hbase/PutHBaseRecord.java | 331 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 3 +- .../org/apache/nifi/hbase/HBaseTestUtil.java | 10 + .../nifi/hbase/MockHBaseClientService.java | 28 +- .../apache/nifi/hbase/TestPutHBaseJSON.java | 10 +- .../apache/nifi/hbase/TestPutHBaseRecord.java | 194 ++++++++++ .../apache/nifi/hbase/HBaseClientService.java | 17 + .../apache/nifi/hbase/put/PutFlowFile.java | 12 + .../nifi/hbase/HBase_1_1_2_ClientService.java | 10 + 11 files changed, 620 insertions(+), 12 deletions(-) create mode 100755 nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java create mode 100644 nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml index 64c2aae31e..9f52a69c8e 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml @@ -44,6 +44,14 @@ org.apache.nifi nifi-processor-utils + + org.apache.nifi + nifi-record-serialization-service-api + + + org.apache.nifi + nifi-record + org.apache.commons commons-lang3 @@ -82,5 +90,10 @@ test + + org.apache.nifi + nifi-mock-record-utils + test + diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java index 2dc92aec0f..3aa905426b 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java @@ -107,11 +107,11 @@ public abstract class AbstractPutHBase extends AbstractProcessor { .defaultValue("25") .build(); - protected static final Relationship REL_SUCCESS = new Relationship.Builder() + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is routed to this relationship after it has been successfully stored in HBase") .build(); - protected static final Relationship REL_FAILURE = new Relationship.Builder() + public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("A FlowFile is routed to this relationship if it cannot be sent to HBase") .build(); diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java new file mode 100755 index 0000000000..66f95e024e --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hadoop", "hbase", "put", "record"}) +@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.") +@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.") +@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase") +public class PutHBaseRecord extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Name") + .description("Specifies the name of a record field whose value should be used as the row id for the given record.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the record."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of records to be sent to HBase at any one time from the record set.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .build(); + + @Override + public final List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_FIELD_NAME); + properties.add(ROW_ID_ENCODING_STRATEGY); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); + return properties; + } + + @Override + public Set getRelationships() { + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + private int addBatch(String tableName, List flowFiles) throws IOException { + int columns = 0; + clientService.put(tableName, flowFiles); + for (PutFlowFile put : flowFiles) { + columns += put.getColumns().size(); + } + + return columns; + } + + private RecordReaderFactory recordParserFactory; + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + List flowFiles = new ArrayList<>(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); + final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue(); + final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue(); + + final long start = System.nanoTime(); + int index = 0; + int columns = 0; + boolean failed = false; + String startIndexStr = flowFile.getAttribute("restart.index"); + int startIndex = -1; + if (startIndexStr != null) { + startIndex = Integer.parseInt(startIndexStr); + } + + PutFlowFile last = null; + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { + Record record; + if (startIndex >= 0) { + while ( index++ < startIndex && (reader.nextRecord()) != null) {} + } + + while ((record = reader.nextRecord()) != null) { + PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy); + flowFiles.add(putFlowFile); + index++; + + if (flowFiles.size() == batchSize) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + flowFiles = new ArrayList<>(); + } + } + if (flowFiles.size() > 0) { + columns += addBatch(tableName, flowFiles); + last = flowFiles.get(flowFiles.size() - 1); + } + } catch (Exception ex) { + getLogger().error("Failed to put records to HBase.", ex); + failed = true; + } + + if (!failed) { + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + flowFile = session.removeAttribute(flowFile, "restart.index"); + session.transfer(flowFile, REL_SUCCESS); + } else { + String restartIndex = Integer.toString(index - flowFiles.size()); + flowFile = session.putAttribute(flowFile, "restart.index", restartIndex); + if (columns > 0) { + sendProvenance(session, flowFile, columns, System.nanoTime() - start, last); + } + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + + + + session.commit(); + } + + private void sendProvenance(ProcessSession session, FlowFile flowFile, int columns, long time, PutFlowFile pff) { + final String details = String.format("Put %d cells to HBase.", columns); + session.getProvenanceReporter().send(flowFile, getTransitUri(pff), details, time); + } + + @Override + protected String getTransitUri(PutFlowFile putFlowFile) { + return "hbase://" + putFlowFile.getTableName(); + } + + + @Override + protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) { + return null; + } + + protected byte[] asBytes(String field, RecordFieldType fieldType, Record record, boolean asString, String complexFieldStrategy) throws PutCreationFailedInvokedException { + + byte[] retVal; + + if (asString) { + retVal = clientService.toBytes(record.getAsString(field)); + } else { + switch (fieldType) { + case BOOLEAN: + retVal = clientService.toBytes(record.getAsBoolean(field)); + break; + case CHAR: + retVal = clientService.toBytes(record.getAsString(field)); + break; + case DOUBLE: + retVal = clientService.toBytes(record.getAsDouble(field)); + break; + case FLOAT: + retVal = clientService.toBytes(record.getAsFloat(field)); + break; + case INT: + retVal = clientService.toBytes(record.getAsInt(field)); + break; + case LONG: + retVal = clientService.toBytes(record.getAsLong(field)); + break; + default: + retVal = null; + switch (complexFieldStrategy) { + case FAIL_VALUE: + getLogger().error("Complex value found for {}; routing to failure", new Object[]{field}); + throw new PutCreationFailedInvokedException(String.format("Complex value found for %s; routing to failure", field)); + case WARN_VALUE: + getLogger().warn("Complex value found for {}; skipping", new Object[]{field}); + break; + case TEXT_VALUE: + retVal = clientService.toBytes(record.getAsString(field)); + break; + case IGNORE_VALUE: + // silently skip + break; + default: + break; + } + } + } + + return retVal; + } + + protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, + String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy) throws PutCreationFailedInvokedException { + PutFlowFile retVal = null; + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + + boolean asString = STRING_ENCODING_VALUE.equals(fieldEncodingStrategy); + + final byte[] fam = clientService.toBytes(columnFamily); + + //try { + if (record != null) { + List columns = new ArrayList<>(); + for (String name : schema.getFieldNames()) { + if (name.equals(rowFieldName)) { + continue; + } + columns.add(new PutColumn(fam, clientService.toBytes(name), asBytes(name, + schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy))); + } + String rowIdValue = record.getAsString(rowFieldName); + if (rowIdValue == null) { + throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid"))); + } + byte[] rowId = getRow(rowIdValue, rowEncodingStrategy); + + retVal = new PutFlowFile(tableName, rowId, columns, flowFile); + } + +/* } catch (Exception ex) { + getLogger().error("Error running createPuts", ex); + throw new RuntimeException(ex); + }*/ + + return retVal; + } + + static class PutCreationFailedInvokedException extends Exception { + PutCreationFailedInvokedException(String msg) { + super(msg); + } + } +} diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 8af8cd551b..21c827cc7a 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -16,4 +16,5 @@ org.apache.nifi.hbase.GetHBase org.apache.nifi.hbase.PutHBaseCell org.apache.nifi.hbase.PutHBaseJSON -org.apache.nifi.hbase.FetchHBaseRow \ No newline at end of file +org.apache.nifi.hbase.PutHBaseRecord +org.apache.nifi.hbase.FetchHBaseRow diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java index 90d8838c68..f86e611549 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java @@ -28,6 +28,8 @@ import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; public class HBaseTestUtil { @@ -85,4 +87,12 @@ public class HBaseTestUtil { } assertTrue(foundEvent); } + + public static MockHBaseClientService getHBaseClientService(final TestRunner runner) throws InitializationException { + final MockHBaseClientService hBaseClient = new MockHBaseClientService(); + runner.addControllerService("hbaseClient", hBaseClient); + runner.enableControllerService(hBaseClient); + runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient"); + return hBaseClient; + } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index f23e956959..f62102a8e1 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -40,13 +40,19 @@ public class MockHBaseClientService extends AbstractControllerService implements private Map> flowFilePuts = new HashMap<>(); private boolean throwException = false; private int numScans = 0; - + private int numPuts = 0; @Override public void put(String tableName, Collection puts) throws IOException { if (throwException) { throw new IOException("exception"); } + if (testFailure) { + if (++numPuts == failureThreshold) { + throw new IOException(); + } + } + this.flowFilePuts.put(tableName, new ArrayList<>(puts)); } @@ -165,6 +171,16 @@ public class MockHBaseClientService extends AbstractControllerService implements return new byte[] { b ? (byte) -1 : (byte) 0 }; } + @Override + public byte[] toBytes(float f) { + return toBytes((double)f); + } + + @Override + public byte[] toBytes(int i) { + return toBytes((long)i); + } + @Override public byte[] toBytes(long l) { byte [] b = new byte[8]; @@ -190,4 +206,14 @@ public class MockHBaseClientService extends AbstractControllerService implements public byte[] toBytesBinary(String s) { return Bytes.toBytesBinary(s); } + + private boolean testFailure = false; + public void setTestFailure(boolean testFailure) { + this.testFailure = testFailure; + } + + private int failureThreshold = 1; + public void setFailureThreshold(int failureThreshold) { + this.failureThreshold = failureThreshold; + } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java index 28d9105d4c..d20d3549a1 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java @@ -19,6 +19,8 @@ package org.apache.nifi.hbase; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.apache.nifi.hbase.HBaseTestUtil.getHBaseClientService; + import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; @@ -447,12 +449,4 @@ public class TestPutHBaseJSON { return runner; } - private MockHBaseClientService getHBaseClientService(final TestRunner runner) throws InitializationException { - final MockHBaseClientService hBaseClient = new MockHBaseClientService(); - runner.addControllerService("hbaseClient", hBaseClient); - runner.enableControllerService(hBaseClient); - runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient"); - return hBaseClient; - } - } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java new file mode 100644 index 0000000000..7bc5f96750 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import static org.apache.nifi.hbase.HBaseTestUtil.getHBaseClientService; + +public class TestPutHBaseRecord { + + public static final String DEFAULT_TABLE_NAME = "nifi"; + public static final String DEFAULT_COLUMN_FAMILY = "family1"; + + private TestRunner getTestRunner(String table, String columnFamily, String batchSize) { + final TestRunner runner = TestRunners.newTestRunner(PutHBaseRecord.class); + runner.enforceReadStreamsClosed(false); + runner.setProperty(PutHBaseJSON.TABLE_NAME, table); + runner.setProperty(PutHBaseJSON.COLUMN_FAMILY, columnFamily); + runner.setProperty(PutHBaseJSON.BATCH_SIZE, batchSize); + return runner; + } + + private static final List KEYS = Arrays.asList(1, 2,3, 4); + private static final List NAMES = Arrays.asList("rec1", "rec2", "rec3", "rec4"); + private static final List CODES = Arrays.asList(101L, 102L, 103L, 104L); + + private void generateTestData(TestRunner runner) throws IOException { + + final MockRecordParser parser = new MockRecordParser(); + try { + runner.addControllerService("parser", parser); + } catch (InitializationException e) { + throw new IOException(e); + } + runner.enableControllerService(parser); + runner.setProperty(PutHBaseRecord.RECORD_READER_FACTORY, "parser"); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("code", RecordFieldType.LONG); + + for (int x = 0; x < KEYS.size(); x++) { + parser.addRecord(KEYS.get(x), NAMES.get(x), CODES.get(x)); + } + } + + private void basicPutSetup(String encodingStrategy, PutValidator validator) throws Exception { + basicPutSetup(encodingStrategy, validator, "1000", 4); + } + + private void basicPutSetup(String encodingStrategy, PutValidator validator, String batchSize, int expectedPuts) throws Exception { + Assert.assertEquals(1L, 1L); + TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, batchSize); + runner.setProperty(PutHBaseRecord.ROW_FIELD_NAME, "id"); + runner.setProperty(PutHBaseRecord.FIELD_ENCODING_STRATEGY, encodingStrategy); + MockHBaseClientService client = getHBaseClientService(runner); + generateTestData(runner); + runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l + runner.run(); + + List results = runner.getFlowFilesForRelationship(PutHBaseRecord.REL_SUCCESS); + Assert.assertTrue("Wrong count", results.size() == 1); + + Assert.assertEquals("Wrong number of PutFlowFiles ", client.getFlowFilePuts().get("nifi").size(), expectedPuts); + for (PutFlowFile putFlowFile : client.getFlowFilePuts().get("nifi")) { + Iterator columnIterator = putFlowFile.getColumns().iterator(); + PutColumn name = columnIterator.next(); + PutColumn code = columnIterator.next(); + Assert.assertNotNull("Name was null", name); + Assert.assertNotNull("Code was null", code); + + String nFamName = new String(name.getColumnFamily()); + String cFamName = new String(code.getColumnFamily()); + String nQual = new String(name.getColumnQualifier()); + String cQual = new String(code.getColumnQualifier()); + + Assert.assertEquals("Name column family didn't match", nFamName, DEFAULT_COLUMN_FAMILY); + Assert.assertEquals("Code column family didn't match", cFamName, DEFAULT_COLUMN_FAMILY); + Assert.assertEquals("Name qualifier didn't match", nQual, "name"); + Assert.assertEquals("Code qualifier didn't match", cQual, "code"); + + validator.handle(name, code); + } + } + + @Test + public void testByteEncodedPut() throws Exception { + basicPutSetup(PutHBaseRecord.BYTES_ENCODING_VALUE, (PutColumn[] columns) -> { + PutColumn name = columns[0]; + PutColumn code = columns[1]; + String nameVal = Bytes.toString(name.getBuffer()); + Long codeVal = Bytes.toLong(code.getBuffer()); + Assert.assertTrue("Name was not found", NAMES.contains(nameVal)); + Assert.assertTrue("Code was not found ", CODES.contains(codeVal)); + }); + } + + private void innertTest(PutColumn[] columns) { + PutColumn name = columns[0]; + PutColumn code = columns[1]; + String nameVal = Bytes.toString(name.getBuffer()); + String codeVal = Bytes.toString(code.getBuffer()); + Assert.assertTrue("Name was not found", NAMES.contains(nameVal)); + Assert.assertTrue("Code was not found ", CODES.contains(new Long(codeVal))); + } + + @Test + public void testStringEncodedPut() throws Exception { + basicPutSetup(PutHBaseRecord.STRING_ENCODING_VALUE, (PutColumn[] columns) -> { + innertTest(columns); + }); + } + + @Test + public void testBatchOfOne() throws Exception { + basicPutSetup(PutHBaseRecord.STRING_ENCODING_VALUE, (PutColumn[] columns) -> { + innertTest(columns); + }, "1", 1); + } + + @Test + public void testBatchOfTwo() throws Exception { + basicPutSetup(PutHBaseRecord.STRING_ENCODING_VALUE, (PutColumn[] columns) -> { + innertTest(columns); + }, "2", 2); + } + + @Test + public void testFailure() throws Exception { + Assert.assertEquals(1L, 1L); + TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "2"); + runner.setProperty(PutHBaseRecord.ROW_FIELD_NAME, "id"); + runner.setProperty(PutHBaseRecord.FIELD_ENCODING_STRATEGY, PutHBaseRecord.STRING_ENCODING_VALUE); + MockHBaseClientService client = getHBaseClientService(runner); + client.setTestFailure(true); + client.setFailureThreshold(2); + generateTestData(runner); + runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader. + runner.run(); + List result = runner.getFlowFilesForRelationship(PutHBaseRecord.REL_FAILURE); + Assert.assertEquals("Size was wrong", result.size(), 1); + Assert.assertEquals("Wrong # of PutFlowFiles", client.getFlowFilePuts().get("nifi").size(), 2); + Assert.assertTrue(runner.getFlowFilesForRelationship(PutHBaseRecord.REL_SUCCESS).size() == 0); + + MockFlowFile mff = result.get(0); + Assert.assertNotNull("Missing restart index attribute", mff.getAttribute("restart.index")); + List old = client.getFlowFilePuts().get("nifi"); + client.setTestFailure(false); + runner.enqueue("test"); + runner.run(); + + Assert.assertEquals("Size was wrong", result.size(), 1); + Assert.assertEquals("Wrong # of PutFlowFiles", client.getFlowFilePuts().get("nifi").size(), 2); + + List newPFF = client.getFlowFilePuts().get("nifi"); + for (PutFlowFile putFlowFile : old) { + Assert.assertFalse("Duplication", newPFF.contains(putFlowFile)); + } + } + + interface PutValidator { + void handle(PutColumn... columns); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index 80b8961434..c67b0cafc9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -148,6 +148,23 @@ public interface HBaseClientService extends ControllerService { */ byte[] toBytes(boolean b); + /** + * Converts the given float to its byte representation. + * + * @param f a float + * @return the float represented as bytes + */ + byte[] toBytes(float f); + + + /** + * Converts the given float to its byte representation. + * + * @param i an int + * @return the int represented as bytes + */ + byte[] toBytes(int i); + /** * Converts the given long to it's byte representation. * diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java index 2566ebdeec..025ed50b0f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java @@ -67,4 +67,16 @@ public class PutFlowFile { return true; } + @Override + public boolean equals(Object obj) { + if (obj instanceof PutFlowFile) { + PutFlowFile pff = (PutFlowFile)obj; + return this.tableName.equals(pff.tableName) + && this.row.equals(pff.row) + && this.columns.equals(pff.columns) + && this.flowFile.equals(pff.flowFile); + } else { + return false; + } + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index 2d1166c01f..f6ac852008 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -489,6 +489,16 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme return Bytes.toBytes(b); } + @Override + public byte[] toBytes(float f) { + return Bytes.toBytes(f); + } + + @Override + public byte[] toBytes(int i) { + return Bytes.toBytes(i); + } + @Override public byte[] toBytes(long l) { return Bytes.toBytes(l);