NIFI-4024 Added org.apache.nifi.hbase.PutHBaseRecord

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mike Thomsen 2017-06-23 07:50:26 -04:00 committed by Bryan Bende
parent afd4f9e034
commit 496a32e12c
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
11 changed files with 620 additions and 12 deletions

View File

@ -44,6 +44,14 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-processor-utils</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
@ -82,5 +90,10 @@
</exclusions> </exclusions>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -107,11 +107,11 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
.defaultValue("25") .defaultValue("25")
.build(); .build();
protected static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("A FlowFile is routed to this relationship after it has been successfully stored in HBase") .description("A FlowFile is routed to this relationship after it has been successfully stored in HBase")
.build(); .build();
protected static final Relationship REL_FAILURE = new Relationship.Builder() public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure") .name("failure")
.description("A FlowFile is routed to this relationship if it cannot be sent to HBase") .description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
.build(); .build();

View File

@ -0,0 +1,331 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "hbase", "put", "record"})
@CapabilityDescription("Adds rows to HBase based on the contents of a flowfile using a configured record reader.")
@ReadsAttribute(attribute = "restart.index", description = "Reads restart.index when it needs to replay part of a record set that did not get into HBase.")
@WritesAttribute(attribute = "restart.index", description = "Writes restart.index when a batch fails to be insert into HBase")
public class PutHBaseRecord extends AbstractPutHBase {
protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
.name("Row Identifier Field Name")
.description("Specifies the name of a record field whose value should be used as the row id for the given record.")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final String FAIL_VALUE = "Fail";
protected static final String WARN_VALUE = "Warn";
protected static final String IGNORE_VALUE = "Ignore";
protected static final String TEXT_VALUE = "Text";
protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
.name("Complex Field Strategy")
.description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
.expressionLanguageSupported(false)
.required(true)
.allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
.defaultValue(COMPLEX_FIELD_TEXT.getValue())
.build();
protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
"Stores the value of each field as a UTF-8 String.");
protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
"Stores the value of each field as the byte representation of the type derived from the record.");
protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder()
.name("Field Encoding Strategy")
.description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " +
"record to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " +
"the record, and convert the value to the byte representation of that type, meaning an integer will be stored as the " +
"byte representation of that integer."))
.required(true)
.allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
.defaultValue(FIELD_ENCODING_STRING.getValue())
.build();
protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of records to be sent to HBase at any one time from the record set.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1000")
.build();
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(RECORD_READER_FACTORY);
properties.add(HBASE_CLIENT_SERVICE);
properties.add(TABLE_NAME);
properties.add(ROW_FIELD_NAME);
properties.add(ROW_ID_ENCODING_STRATEGY);
properties.add(COLUMN_FAMILY);
properties.add(BATCH_SIZE);
properties.add(COMPLEX_FIELD_STRATEGY);
properties.add(FIELD_ENCODING_STRATEGY);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
return rels;
}
private int addBatch(String tableName, List<PutFlowFile> flowFiles) throws IOException {
int columns = 0;
clientService.put(tableName, flowFiles);
for (PutFlowFile put : flowFiles) {
columns += put.getColumns().size();
}
return columns;
}
private RecordReaderFactory recordParserFactory;
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
.asControllerService(RecordReaderFactory.class);
List<PutFlowFile> flowFiles = new ArrayList<>();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
final long start = System.nanoTime();
int index = 0;
int columns = 0;
boolean failed = false;
String startIndexStr = flowFile.getAttribute("restart.index");
int startIndex = -1;
if (startIndexStr != null) {
startIndex = Integer.parseInt(startIndexStr);
}
PutFlowFile last = null;
try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) {
Record record;
if (startIndex >= 0) {
while ( index++ < startIndex && (reader.nextRecord()) != null) {}
}
while ((record = reader.nextRecord()) != null) {
PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
flowFiles.add(putFlowFile);
index++;
if (flowFiles.size() == batchSize) {
columns += addBatch(tableName, flowFiles);
last = flowFiles.get(flowFiles.size() - 1);
flowFiles = new ArrayList<>();
}
}
if (flowFiles.size() > 0) {
columns += addBatch(tableName, flowFiles);
last = flowFiles.get(flowFiles.size() - 1);
}
} catch (Exception ex) {
getLogger().error("Failed to put records to HBase.", ex);
failed = true;
}
if (!failed) {
sendProvenance(session, flowFile, columns, System.nanoTime() - start, last);
flowFile = session.removeAttribute(flowFile, "restart.index");
session.transfer(flowFile, REL_SUCCESS);
} else {
String restartIndex = Integer.toString(index - flowFiles.size());
flowFile = session.putAttribute(flowFile, "restart.index", restartIndex);
if (columns > 0) {
sendProvenance(session, flowFile, columns, System.nanoTime() - start, last);
}
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
session.commit();
}
private void sendProvenance(ProcessSession session, FlowFile flowFile, int columns, long time, PutFlowFile pff) {
final String details = String.format("Put %d cells to HBase.", columns);
session.getProvenanceReporter().send(flowFile, getTransitUri(pff), details, time);
}
@Override
protected String getTransitUri(PutFlowFile putFlowFile) {
return "hbase://" + putFlowFile.getTableName();
}
@Override
protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) {
return null;
}
protected byte[] asBytes(String field, RecordFieldType fieldType, Record record, boolean asString, String complexFieldStrategy) throws PutCreationFailedInvokedException {
byte[] retVal;
if (asString) {
retVal = clientService.toBytes(record.getAsString(field));
} else {
switch (fieldType) {
case BOOLEAN:
retVal = clientService.toBytes(record.getAsBoolean(field));
break;
case CHAR:
retVal = clientService.toBytes(record.getAsString(field));
break;
case DOUBLE:
retVal = clientService.toBytes(record.getAsDouble(field));
break;
case FLOAT:
retVal = clientService.toBytes(record.getAsFloat(field));
break;
case INT:
retVal = clientService.toBytes(record.getAsInt(field));
break;
case LONG:
retVal = clientService.toBytes(record.getAsLong(field));
break;
default:
retVal = null;
switch (complexFieldStrategy) {
case FAIL_VALUE:
getLogger().error("Complex value found for {}; routing to failure", new Object[]{field});
throw new PutCreationFailedInvokedException(String.format("Complex value found for %s; routing to failure", field));
case WARN_VALUE:
getLogger().warn("Complex value found for {}; skipping", new Object[]{field});
break;
case TEXT_VALUE:
retVal = clientService.toBytes(record.getAsString(field));
break;
case IGNORE_VALUE:
// silently skip
break;
default:
break;
}
}
}
return retVal;
}
protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile,
String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy) throws PutCreationFailedInvokedException {
PutFlowFile retVal = null;
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
boolean asString = STRING_ENCODING_VALUE.equals(fieldEncodingStrategy);
final byte[] fam = clientService.toBytes(columnFamily);
//try {
if (record != null) {
List<PutColumn> columns = new ArrayList<>();
for (String name : schema.getFieldNames()) {
if (name.equals(rowFieldName)) {
continue;
}
columns.add(new PutColumn(fam, clientService.toBytes(name), asBytes(name,
schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy)));
}
String rowIdValue = record.getAsString(rowFieldName);
if (rowIdValue == null) {
throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid")));
}
byte[] rowId = getRow(rowIdValue, rowEncodingStrategy);
retVal = new PutFlowFile(tableName, rowId, columns, flowFile);
}
/* } catch (Exception ex) {
getLogger().error("Error running createPuts", ex);
throw new RuntimeException(ex);
}*/
return retVal;
}
static class PutCreationFailedInvokedException extends Exception {
PutCreationFailedInvokedException(String msg) {
super(msg);
}
}
}

View File

@ -16,4 +16,5 @@
org.apache.nifi.hbase.GetHBase org.apache.nifi.hbase.GetHBase
org.apache.nifi.hbase.PutHBaseCell org.apache.nifi.hbase.PutHBaseCell
org.apache.nifi.hbase.PutHBaseJSON org.apache.nifi.hbase.PutHBaseJSON
org.apache.nifi.hbase.PutHBaseRecord
org.apache.nifi.hbase.FetchHBaseRow org.apache.nifi.hbase.FetchHBaseRow

View File

@ -28,6 +28,8 @@ import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
public class HBaseTestUtil { public class HBaseTestUtil {
@ -85,4 +87,12 @@ public class HBaseTestUtil {
} }
assertTrue(foundEvent); assertTrue(foundEvent);
} }
public static MockHBaseClientService getHBaseClientService(final TestRunner runner) throws InitializationException {
final MockHBaseClientService hBaseClient = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClient);
runner.enableControllerService(hBaseClient);
runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
return hBaseClient;
}
} }

View File

@ -40,13 +40,19 @@ public class MockHBaseClientService extends AbstractControllerService implements
private Map<String, List<PutFlowFile>> flowFilePuts = new HashMap<>(); private Map<String, List<PutFlowFile>> flowFilePuts = new HashMap<>();
private boolean throwException = false; private boolean throwException = false;
private int numScans = 0; private int numScans = 0;
private int numPuts = 0;
@Override @Override
public void put(String tableName, Collection<PutFlowFile> puts) throws IOException { public void put(String tableName, Collection<PutFlowFile> puts) throws IOException {
if (throwException) { if (throwException) {
throw new IOException("exception"); throw new IOException("exception");
} }
if (testFailure) {
if (++numPuts == failureThreshold) {
throw new IOException();
}
}
this.flowFilePuts.put(tableName, new ArrayList<>(puts)); this.flowFilePuts.put(tableName, new ArrayList<>(puts));
} }
@ -165,6 +171,16 @@ public class MockHBaseClientService extends AbstractControllerService implements
return new byte[] { b ? (byte) -1 : (byte) 0 }; return new byte[] { b ? (byte) -1 : (byte) 0 };
} }
@Override
public byte[] toBytes(float f) {
return toBytes((double)f);
}
@Override
public byte[] toBytes(int i) {
return toBytes((long)i);
}
@Override @Override
public byte[] toBytes(long l) { public byte[] toBytes(long l) {
byte [] b = new byte[8]; byte [] b = new byte[8];
@ -190,4 +206,14 @@ public class MockHBaseClientService extends AbstractControllerService implements
public byte[] toBytesBinary(String s) { public byte[] toBytesBinary(String s) {
return Bytes.toBytesBinary(s); return Bytes.toBytesBinary(s);
} }
private boolean testFailure = false;
public void setTestFailure(boolean testFailure) {
this.testFailure = testFailure;
}
private int failureThreshold = 1;
public void setFailureThreshold(int failureThreshold) {
this.failureThreshold = failureThreshold;
}
} }

View File

@ -19,6 +19,8 @@ package org.apache.nifi.hbase;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.apache.nifi.hbase.HBaseTestUtil.getHBaseClientService;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
@ -447,12 +449,4 @@ public class TestPutHBaseJSON {
return runner; return runner;
} }
private MockHBaseClientService getHBaseClientService(final TestRunner runner) throws InitializationException {
final MockHBaseClientService hBaseClient = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClient);
runner.enableControllerService(hBaseClient);
runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
return hBaseClient;
}
} }

View File

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import static org.apache.nifi.hbase.HBaseTestUtil.getHBaseClientService;
public class TestPutHBaseRecord {
public static final String DEFAULT_TABLE_NAME = "nifi";
public static final String DEFAULT_COLUMN_FAMILY = "family1";
private TestRunner getTestRunner(String table, String columnFamily, String batchSize) {
final TestRunner runner = TestRunners.newTestRunner(PutHBaseRecord.class);
runner.enforceReadStreamsClosed(false);
runner.setProperty(PutHBaseJSON.TABLE_NAME, table);
runner.setProperty(PutHBaseJSON.COLUMN_FAMILY, columnFamily);
runner.setProperty(PutHBaseJSON.BATCH_SIZE, batchSize);
return runner;
}
private static final List<Integer> KEYS = Arrays.asList(1, 2,3, 4);
private static final List<String> NAMES = Arrays.asList("rec1", "rec2", "rec3", "rec4");
private static final List<Long> CODES = Arrays.asList(101L, 102L, 103L, 104L);
private void generateTestData(TestRunner runner) throws IOException {
final MockRecordParser parser = new MockRecordParser();
try {
runner.addControllerService("parser", parser);
} catch (InitializationException e) {
throw new IOException(e);
}
runner.enableControllerService(parser);
runner.setProperty(PutHBaseRecord.RECORD_READER_FACTORY, "parser");
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("code", RecordFieldType.LONG);
for (int x = 0; x < KEYS.size(); x++) {
parser.addRecord(KEYS.get(x), NAMES.get(x), CODES.get(x));
}
}
private void basicPutSetup(String encodingStrategy, PutValidator validator) throws Exception {
basicPutSetup(encodingStrategy, validator, "1000", 4);
}
private void basicPutSetup(String encodingStrategy, PutValidator validator, String batchSize, int expectedPuts) throws Exception {
Assert.assertEquals(1L, 1L);
TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, batchSize);
runner.setProperty(PutHBaseRecord.ROW_FIELD_NAME, "id");
runner.setProperty(PutHBaseRecord.FIELD_ENCODING_STRATEGY, encodingStrategy);
MockHBaseClientService client = getHBaseClientService(runner);
generateTestData(runner);
runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l
runner.run();
List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutHBaseRecord.REL_SUCCESS);
Assert.assertTrue("Wrong count", results.size() == 1);
Assert.assertEquals("Wrong number of PutFlowFiles ", client.getFlowFilePuts().get("nifi").size(), expectedPuts);
for (PutFlowFile putFlowFile : client.getFlowFilePuts().get("nifi")) {
Iterator<PutColumn> columnIterator = putFlowFile.getColumns().iterator();
PutColumn name = columnIterator.next();
PutColumn code = columnIterator.next();
Assert.assertNotNull("Name was null", name);
Assert.assertNotNull("Code was null", code);
String nFamName = new String(name.getColumnFamily());
String cFamName = new String(code.getColumnFamily());
String nQual = new String(name.getColumnQualifier());
String cQual = new String(code.getColumnQualifier());
Assert.assertEquals("Name column family didn't match", nFamName, DEFAULT_COLUMN_FAMILY);
Assert.assertEquals("Code column family didn't match", cFamName, DEFAULT_COLUMN_FAMILY);
Assert.assertEquals("Name qualifier didn't match", nQual, "name");
Assert.assertEquals("Code qualifier didn't match", cQual, "code");
validator.handle(name, code);
}
}
@Test
public void testByteEncodedPut() throws Exception {
basicPutSetup(PutHBaseRecord.BYTES_ENCODING_VALUE, (PutColumn[] columns) -> {
PutColumn name = columns[0];
PutColumn code = columns[1];
String nameVal = Bytes.toString(name.getBuffer());
Long codeVal = Bytes.toLong(code.getBuffer());
Assert.assertTrue("Name was not found", NAMES.contains(nameVal));
Assert.assertTrue("Code was not found ", CODES.contains(codeVal));
});
}
private void innertTest(PutColumn[] columns) {
PutColumn name = columns[0];
PutColumn code = columns[1];
String nameVal = Bytes.toString(name.getBuffer());
String codeVal = Bytes.toString(code.getBuffer());
Assert.assertTrue("Name was not found", NAMES.contains(nameVal));
Assert.assertTrue("Code was not found ", CODES.contains(new Long(codeVal)));
}
@Test
public void testStringEncodedPut() throws Exception {
basicPutSetup(PutHBaseRecord.STRING_ENCODING_VALUE, (PutColumn[] columns) -> {
innertTest(columns);
});
}
@Test
public void testBatchOfOne() throws Exception {
basicPutSetup(PutHBaseRecord.STRING_ENCODING_VALUE, (PutColumn[] columns) -> {
innertTest(columns);
}, "1", 1);
}
@Test
public void testBatchOfTwo() throws Exception {
basicPutSetup(PutHBaseRecord.STRING_ENCODING_VALUE, (PutColumn[] columns) -> {
innertTest(columns);
}, "2", 2);
}
@Test
public void testFailure() throws Exception {
Assert.assertEquals(1L, 1L);
TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "2");
runner.setProperty(PutHBaseRecord.ROW_FIELD_NAME, "id");
runner.setProperty(PutHBaseRecord.FIELD_ENCODING_STRATEGY, PutHBaseRecord.STRING_ENCODING_VALUE);
MockHBaseClientService client = getHBaseClientService(runner);
client.setTestFailure(true);
client.setFailureThreshold(2);
generateTestData(runner);
runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.
runner.run();
List<MockFlowFile> result = runner.getFlowFilesForRelationship(PutHBaseRecord.REL_FAILURE);
Assert.assertEquals("Size was wrong", result.size(), 1);
Assert.assertEquals("Wrong # of PutFlowFiles", client.getFlowFilePuts().get("nifi").size(), 2);
Assert.assertTrue(runner.getFlowFilesForRelationship(PutHBaseRecord.REL_SUCCESS).size() == 0);
MockFlowFile mff = result.get(0);
Assert.assertNotNull("Missing restart index attribute", mff.getAttribute("restart.index"));
List<PutFlowFile> old = client.getFlowFilePuts().get("nifi");
client.setTestFailure(false);
runner.enqueue("test");
runner.run();
Assert.assertEquals("Size was wrong", result.size(), 1);
Assert.assertEquals("Wrong # of PutFlowFiles", client.getFlowFilePuts().get("nifi").size(), 2);
List<PutFlowFile> newPFF = client.getFlowFilePuts().get("nifi");
for (PutFlowFile putFlowFile : old) {
Assert.assertFalse("Duplication", newPFF.contains(putFlowFile));
}
}
interface PutValidator {
void handle(PutColumn... columns);
}
}

View File

@ -148,6 +148,23 @@ public interface HBaseClientService extends ControllerService {
*/ */
byte[] toBytes(boolean b); byte[] toBytes(boolean b);
/**
* Converts the given float to its byte representation.
*
* @param f a float
* @return the float represented as bytes
*/
byte[] toBytes(float f);
/**
* Converts the given float to its byte representation.
*
* @param i an int
* @return the int represented as bytes
*/
byte[] toBytes(int i);
/** /**
* Converts the given long to it's byte representation. * Converts the given long to it's byte representation.
* *

View File

@ -67,4 +67,16 @@ public class PutFlowFile {
return true; return true;
} }
@Override
public boolean equals(Object obj) {
if (obj instanceof PutFlowFile) {
PutFlowFile pff = (PutFlowFile)obj;
return this.tableName.equals(pff.tableName)
&& this.row.equals(pff.row)
&& this.columns.equals(pff.columns)
&& this.flowFile.equals(pff.flowFile);
} else {
return false;
}
}
} }

View File

@ -489,6 +489,16 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
return Bytes.toBytes(b); return Bytes.toBytes(b);
} }
@Override
public byte[] toBytes(float f) {
return Bytes.toBytes(f);
}
@Override
public byte[] toBytes(int i) {
return Bytes.toBytes(i);
}
@Override @Override
public byte[] toBytes(long l) { public byte[] toBytes(long l) {
return Bytes.toBytes(l); return Bytes.toBytes(l);