NIFI-1174 Refactoring the HBase client API and adding a PutHBaseJSON which can write a whole row from a single json document - Adding Complex Field Strategy to PutHBaseJSON to allow more control of complex fields - Improving error messages to indicate what the problem was with an invalid row

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Bryan Bende 2015-11-18 17:24:49 -05:00
parent 8c2323dc8d
commit 40dd8a0a84
15 changed files with 1099 additions and 192 deletions

View File

@ -50,6 +50,10 @@
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Base class for processors that put data to HBase.
*/
public abstract class AbstractPutHBase extends AbstractProcessor {
protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
.description("Specifies the Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the HBase Table to put data into")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor ROW_ID = new PropertyDescriptor.Builder()
.name("Row Identifier")
.description("Specifies the Row ID to use when inserting data into HBase")
.required(false) // not all sub-classes will require this
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
.name("Column Family")
.description("The Column Family to use when inserting data into HBase")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor COLUMN_QUALIFIER = new PropertyDescriptor.Builder()
.name("Column Qualifier")
.description("The Column Qualifier to use when inserting data into HBase")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " +
"grouped by table, and a single Put per table will be performed.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("25")
.build();
protected static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it has been successfully stored in HBase")
.build();
protected static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
.build();
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles == null || flowFiles.size() == 0) {
return;
}
final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>();
// Group FlowFiles by HBase Table
for (final FlowFile flowFile : flowFiles) {
final PutFlowFile putFlowFile = createPut(session, context, flowFile);
if (putFlowFile == null) {
// sub-classes should log appropriate error messages before returning null
session.transfer(flowFile, REL_FAILURE);
} else if (!putFlowFile.isValid()) {
if (StringUtils.isBlank(putFlowFile.getTableName())) {
getLogger().error("Missing table name for FlowFile {}; routing to failure", new Object[]{flowFile});
} else if (StringUtils.isBlank(putFlowFile.getRow())) {
getLogger().error("Missing row id for FlowFile {}; routing to failure", new Object[]{flowFile});
} else if (putFlowFile.getColumns() == null || putFlowFile.getColumns().isEmpty()) {
getLogger().error("No columns provided for FlowFile {}; routing to failure", new Object[]{flowFile});
} else {
// really shouldn't get here, but just in case
getLogger().error("Failed to produce a put for FlowFile {}; routing to failure", new Object[]{flowFile});
}
session.transfer(flowFile, REL_FAILURE);
} else {
List<PutFlowFile> putFlowFiles = tablePuts.get(putFlowFile.getTableName());
if (putFlowFiles == null) {
putFlowFiles = new ArrayList<>();
tablePuts.put(putFlowFile.getTableName(), putFlowFiles);
}
putFlowFiles.add(putFlowFile);
}
}
getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[]{flowFiles.size(), tablePuts.size()});
final long start = System.nanoTime();
final List<PutFlowFile> successes = new ArrayList<>();
final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) {
try {
hBaseClientService.put(entry.getKey(), entry.getValue());
successes.addAll(entry.getValue());
} catch (Exception e) {
getLogger().error(e.getMessage(), e);
for (PutFlowFile putFlowFile : entry.getValue()) {
getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e});
final FlowFile failure = session.penalize(putFlowFile.getFlowFile());
session.transfer(failure, REL_FAILURE);
}
}
}
final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[]{successes.size(), sendMillis});
for (PutFlowFile putFlowFile : successes) {
session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS);
final String details = "Put " + putFlowFile.getColumns().size() + " cells to HBase";
session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), details, sendMillis);
}
}
protected String getTransitUri(PutFlowFile putFlowFile) {
return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow();
}
/**
* Sub-classes provide the implementation to create a put from a FlowFile.
*
* @param session
* the current session
* @param context
* the current context
* @param flowFile
* the FlowFile to create a Put from
*
* @return a PutFlowFile instance for the given FlowFile
*/
protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile);
}

View File

@ -41,6 +41,7 @@ import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.util.ObjectSerDe;
import org.apache.nifi.hbase.util.StringSerDe;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@ -83,7 +84,7 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the data was pulled from"),
@WritesAttribute(attribute = "mime.type", description = "Set to application/json to indicate that output is JSON")
})
public class GetHBase extends AbstractHBaseProcessor {
public class GetHBase extends AbstractProcessor {
static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.hbase;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -24,91 +23,36 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "hbase"})
@CapabilityDescription("Adds the Contents of a FlowFile to HBase as the value of a single cell")
public class PutHBaseCell extends AbstractProcessor {
protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
.description("Specifies the Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the HBase Table to put data into")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor ROW = new PropertyDescriptor.Builder()
.name("Row Identifier")
.description("Specifies the Row ID to use when inserting data into HBase")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
.name("Column Family")
.description("The Column Family to use when inserting data into HBase")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor COLUMN_QUALIFIER = new PropertyDescriptor.Builder()
.name("Column Qualifier")
.description("The Column Qualifier to use when inserting data into HBase")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " +
"grouped by table, and a single Put per table will be performed.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("25")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it has been successfully stored in HBase")
.build();
static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
.build();
public class PutHBaseCell extends AbstractPutHBase {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HBASE_CLIENT_SERVICE);
properties.add(TABLE_NAME);
properties.add(ROW);
properties.add(ROW_ID);
properties.add(COLUMN_FAMILY);
properties.add(COLUMN_QUALIFIER);
properties.add(BATCH_SIZE);
@ -119,31 +63,17 @@ public class PutHBaseCell extends AbstractProcessor {
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(FAILURE);
rels.add(REL_FAILURE);
return rels;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles == null || flowFiles.size() == 0) {
return;
}
final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>();
// Group FlowFiles by HBase Table
for (final FlowFile flowFile : flowFiles) {
protected PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile) {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String row = context.getProperty(ROW).evaluateAttributeExpressions(flowFile).getValue();
final String row = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || StringUtils.isBlank(columnFamily) || StringUtils.isBlank(columnQualifier)) {
getLogger().error("Invalid FlowFile {} missing table, row, column familiy, or column qualifier; routing to failure", new Object[]{flowFile});
session.transfer(flowFile, FAILURE);
} else {
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
@ -152,51 +82,8 @@ public class PutHBaseCell extends AbstractProcessor {
}
});
final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columnFamily, columnQualifier, buffer, flowFile);
List<PutFlowFile> putFlowFiles = tablePuts.get(tableName);
if (putFlowFiles == null) {
putFlowFiles = new ArrayList<>();
tablePuts.put(tableName, putFlowFiles);
}
putFlowFiles.add(putFlowFile);
}
}
getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[] {flowFiles.size(), tablePuts.size()});
final long start = System.nanoTime();
final List<PutFlowFile> successes = new ArrayList<>();
final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) {
try {
hBaseClientService.put(entry.getKey(), entry.getValue());
successes.addAll(entry.getValue());
} catch (Exception e) {
getLogger().error(e.getMessage(), e);
for (PutFlowFile putFlowFile : entry.getValue()) {
getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e});
final FlowFile failure = session.penalize(putFlowFile.getFlowFile());
session.transfer(failure, FAILURE);
}
}
}
final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[] {successes.size(), sendMillis});
for (PutFlowFile putFlowFile : successes) {
session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS);
session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), sendMillis);
}
}
protected String getTransitUri(PutFlowFile putFlowFile) {
return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow() + "/" + putFlowFile.getColumnFamily()
+ ":" + putFlowFile.getColumnQualifier();
final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily, columnQualifier, buffer));
return new PutFlowFile(tableName, row, columns, flowFile);
}
}

View File

@ -0,0 +1,230 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "hbase", "put", "json"})
@CapabilityDescription("Adds rows to HBase based on the contents of incoming JSON documents. Each FlowFile must contain a single " +
"UTF-8 encoded JSON document, and any FlowFiles where the root element is not a single document will be routed to failure. " +
"Each JSON field name and value will become a column qualifier and value of the HBase row. Any fields with a null value " +
"will be skipped, and fields with a complex value will be handled according to the Complex Field Strategy. " +
"The row id can be specified either directly on the processor through the Row Identifier property, or can be extracted from the JSON " +
"document by specifying the Row Identifier Field Name property. This processor will hold the contents of all FlowFiles for the given batch " +
"in memory at one time.")
public class PutHBaseJSON extends AbstractPutHBase {
protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
.name("Row Identifier Field Name")
.description("Specifies the name of a JSON element whose value should be used as the row id for the given JSON document.")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final String FAIL_VALUE = "Fail";
protected static final String WARN_VALUE = "Warn";
protected static final String IGNORE_VALUE = "Ignore";
protected static final String TEXT_VALUE = "Text";
protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values.");
protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase.");
protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase.");
protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column.");
protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder()
.name("Complex Field Strategy")
.description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.")
.expressionLanguageSupported(false)
.required(true)
.allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
.defaultValue(COMPLEX_FIELD_TEXT.getValue())
.build();
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HBASE_CLIENT_SERVICE);
properties.add(TABLE_NAME);
properties.add(ROW_ID);
properties.add(ROW_FIELD_NAME);
properties.add(COLUMN_FAMILY);
properties.add(BATCH_SIZE);
properties.add(COMPLEX_FIELD_STRATEGY);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
return rels;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Collection<ValidationResult> results = new ArrayList<>();
final String rowId = validationContext.getProperty(ROW_ID).getValue();
final String rowFieldName = validationContext.getProperty(ROW_FIELD_NAME).getValue();
if (StringUtils.isBlank(rowId) && StringUtils.isBlank(rowFieldName)) {
results.add(new ValidationResult.Builder()
.subject(this.getClass().getSimpleName())
.explanation("Row Identifier or Row Identifier Field Name is required")
.valid(false)
.build());
} else if (!StringUtils.isBlank(rowId) && !StringUtils.isBlank(rowFieldName)) {
results.add(new ValidationResult.Builder()
.subject(this.getClass().getSimpleName())
.explanation("Row Identifier and Row Identifier Field Name can not be used together")
.valid(false)
.build());
}
return results;
}
@Override
protected PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile) {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String rowId = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
final boolean extractRowId = !StringUtils.isBlank(rowFieldName);
final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
// Parse the JSON document
final ObjectMapper mapper = new ObjectMapper();
final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null);
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final InputStream bufferedIn = new BufferedInputStream(in)) {
rootNodeRef.set(mapper.readTree(bufferedIn));
}
}
});
} catch (final ProcessException pe) {
getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[]{flowFile, pe.toString()}, pe);
return null;
}
final JsonNode rootNode = rootNodeRef.get();
if (rootNode.isArray()) {
getLogger().error("Root node of JSON must be a single document, found array for {}; routing to failure", new Object[]{flowFile});
return null;
}
final Collection<PutColumn> columns = new ArrayList<>();
final ObjectHolder<String> rowIdHolder = new ObjectHolder<>(null);
// convert each field/value to a column for the put, skip over nulls and arrays
final Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final ObjectHolder<String> fieldValueHolder = new ObjectHolder<>(null);
final JsonNode fieldNode = rootNode.get(fieldName);
if (fieldNode.isNull()) {
getLogger().debug("Skipping {} because value was null", new Object[]{fieldName});
} else if (fieldNode.isValueNode()) {
fieldValueHolder.set(fieldNode.asText());
} else {
// for non-null, non-value nodes, determine what to do based on the handling strategy
switch (complexFieldStrategy) {
case FAIL_VALUE:
getLogger().error("Complex value found for {}; routing to failure", new Object[]{fieldName});
return null;
case WARN_VALUE:
getLogger().warn("Complex value found for {}; skipping", new Object[]{fieldName});
break;
case TEXT_VALUE:
// use toString() here because asText() is only guaranteed to be supported on value nodes
// some other types of nodes, like ArrayNode, provide toString implementations
fieldValueHolder.set(fieldNode.toString());
break;
case IGNORE_VALUE:
// silently skip
break;
default:
break;
}
}
// if we have a field value, then see if this is the row id field, if so store the value for later
// otherwise add a new column where the fieldName and fieldValue are the column qualifier and value
if (fieldValueHolder.get() != null) {
if (extractRowId && fieldName.equals(rowFieldName)) {
rowIdHolder.set(fieldValueHolder.get());
} else {
columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get().getBytes(StandardCharsets.UTF_8)));
}
}
}
// if we are expecting a field name to use for the row id and the incoming document doesn't have it
// log an error message so the user can see what the field names were and return null so it gets routed to failure
if (extractRowId && rowIdHolder.get() == null) {
final String fieldNameStr = StringUtils.join(rootNode.getFieldNames(), ",");
getLogger().error("Row ID field named '{}' not found in field names '{}'; routing to failure", new Object[] {rowFieldName, fieldNameStr});
return null;
}
final String putRowId = (extractRowId ? rowIdHolder.get() : rowId);
return new PutFlowFile(tableName, putRowId, columns, flowFile);
}
}

View File

@ -15,3 +15,4 @@
org.apache.nifi.hbase.GetHBase
org.apache.nifi.hbase.PutHBaseCell
org.apache.nifi.hbase.PutHBaseJSON

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertTrue;
public class HBaseTestUtil {
public static void verifyPut(final String row, final String columnFamily, final Map<String,String> columns, final List<PutFlowFile> puts) {
boolean foundPut = false;
for (final PutFlowFile put : puts) {
if (!row.equals(put.getRow())) {
continue;
}
if (put.getColumns() == null || put.getColumns().size() != columns.size()) {
continue;
}
// start off assuming we have all the columns
boolean foundAllColumns = true;
for (Map.Entry<String, String> entry : columns.entrySet()) {
// determine if we have the current expected column
boolean foundColumn = false;
for (PutColumn putColumn : put.getColumns()) {
final String colVal = new String(putColumn.getBuffer(), StandardCharsets.UTF_8);
if (columnFamily.equals(putColumn.getColumnFamily()) && entry.getKey().equals(putColumn.getColumnQualifier())
&& entry.getValue().equals(colVal)) {
foundColumn = true;
break;
}
}
// if we didn't have the current expected column we know we don't have all expected columns
if (!foundColumn) {
foundAllColumns = false;
break;
}
}
// if we found all the expected columns this was a match so we can break
if (foundAllColumns) {
foundPut = true;
break;
}
}
assertTrue(foundPut);
}
public static void verifyEvent(final List<ProvenanceEventRecord> events, final String uri, final ProvenanceEventType eventType) {
boolean foundEvent = false;
for (final ProvenanceEventRecord event : events) {
if (event.getTransitUri().equals(uri) && event.getEventType().equals(eventType)) {
foundEvent = true;
break;
}
}
assertTrue(foundEvent);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.hbase;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
@ -33,7 +34,7 @@ import java.util.Map;
public class MockHBaseClientService extends AbstractControllerService implements HBaseClientService {
private Map<String,ResultCell[]> results = new HashMap<>();
private Map<String, List<PutFlowFile>> puts = new HashMap<>();
private Map<String, List<PutFlowFile>> flowFilePuts = new HashMap<>();
private boolean throwException = false;
@Override
@ -42,7 +43,12 @@ public class MockHBaseClientService extends AbstractControllerService implements
throw new IOException("exception");
}
this.puts.put(tableName, new ArrayList<>(puts));
this.flowFilePuts.put(tableName, new ArrayList<>(puts));
}
@Override
public void put(String tableName, String rowId, Collection<PutColumn> columns) throws IOException {
throw new UnsupportedOperationException();
}
@Override
@ -92,8 +98,8 @@ public class MockHBaseClientService extends AbstractControllerService implements
results.put(rowKey, cellArray);
}
public Map<String, List<PutFlowFile>> getPuts() {
return puts;
public Map<String, List<PutFlowFile>> getFlowFilePuts() {
return flowFilePuts;
}
public void setThrowException(boolean throwException) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.hbase;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
@ -43,7 +44,7 @@ public class TestPutHBaseCell {
final TestRunner runner = TestRunners.newTestRunner(PutHBaseCell.class);
runner.setProperty(PutHBaseCell.TABLE_NAME, tableName);
runner.setProperty(PutHBaseCell.ROW, row);
runner.setProperty(PutHBaseCell.ROW_ID, row);
runner.setProperty(PutHBaseCell.COLUMN_FAMILY, columnFamily);
runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, columnQualifier);
runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
@ -58,12 +59,14 @@ public class TestPutHBaseCell {
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getPuts());
assertEquals(1, hBaseClient.getPuts().size());
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(1, puts.size());
verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
assertEquals(1, runner.getProvenanceEvents().size());
}
@Test
@ -89,12 +92,14 @@ public class TestPutHBaseCell {
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getPuts());
assertEquals(1, hBaseClient.getPuts().size());
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(1, puts.size());
verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
assertEquals(1, runner.getProvenanceEvents().size());
}
@Test
@ -115,7 +120,9 @@ public class TestPutHBaseCell {
runner.run();
runner.assertTransferCount(PutHBaseCell.REL_SUCCESS, 0);
runner.assertTransferCount(PutHBaseCell.FAILURE, 1);
runner.assertTransferCount(PutHBaseCell.REL_FAILURE, 1);
assertEquals(0, runner.getProvenanceEvents().size());
}
@Test
@ -142,7 +149,9 @@ public class TestPutHBaseCell {
runner.run();
runner.assertTransferCount(PutHBaseCell.REL_SUCCESS, 1);
runner.assertTransferCount(PutHBaseCell.FAILURE, 1);
runner.assertTransferCount(PutHBaseCell.REL_FAILURE, 1);
assertEquals(1, runner.getProvenanceEvents().size());
}
@Test
@ -171,13 +180,15 @@ public class TestPutHBaseCell {
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content1);
assertNotNull(hBaseClient.getPuts());
assertEquals(1, hBaseClient.getPuts().size());
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(2, puts.size());
verifyPut(row1, columnFamily, columnQualifier, content1, puts.get(0));
verifyPut(row2, columnFamily, columnQualifier, content2, puts.get(1));
assertEquals(2, runner.getProvenanceEvents().size());
}
@Test
@ -202,7 +213,9 @@ public class TestPutHBaseCell {
runner.enqueue(content2.getBytes("UTF-8"), attributes2);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.FAILURE, 2);
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 2);
assertEquals(0, runner.getProvenanceEvents().size());
}
@Test
@ -229,13 +242,15 @@ public class TestPutHBaseCell {
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content1);
assertNotNull(hBaseClient.getPuts());
assertEquals(1, hBaseClient.getPuts().size());
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(2, puts.size());
verifyPut(row, columnFamily, columnQualifier, content1, puts.get(0));
verifyPut(row, columnFamily, columnQualifier, content2, puts.get(1));
assertEquals(2, runner.getProvenanceEvents().size());
}
private Map<String, String> getAtrributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) {
@ -250,7 +265,7 @@ public class TestPutHBaseCell {
private TestRunner getTestRunnerWithEL(PutHBaseCell proc) {
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHBaseCell.TABLE_NAME, "${hbase.tableName}");
runner.setProperty(PutHBaseCell.ROW, "${hbase.row}");
runner.setProperty(PutHBaseCell.ROW_ID, "${hbase.row}");
runner.setProperty(PutHBaseCell.COLUMN_FAMILY, "${hbase.columnFamily}");
runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, "${hbase.columnQualifier}");
return runner;
@ -266,9 +281,14 @@ public class TestPutHBaseCell {
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, PutFlowFile put) {
assertEquals(row, put.getRow());
assertEquals(columnFamily, put.getColumnFamily());
assertEquals(columnQualifier, put.getColumnQualifier());
assertEquals(content, new String(put.getBuffer(), StandardCharsets.UTF_8));
assertNotNull(put.getColumns());
assertEquals(1, put.getColumns().size());
final PutColumn column = put.getColumns().iterator().next();
assertEquals(columnFamily, column.getColumnFamily());
assertEquals(columnQualifier, column.getColumnQualifier());
assertEquals(content, new String(column.getBuffer(), StandardCharsets.UTF_8));
}
}

View File

@ -0,0 +1,423 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TestPutHBaseJSON {
public static final String DEFAULT_TABLE_NAME = "nifi";
public static final String DEFAULT_ROW = "row1";
public static final String DEFAULT_COLUMN_FAMILY = "family1";
@Test
public void testCustomValidate() throws InitializationException {
// missing row id and row id field name should be invalid
TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.assertNotValid();
// setting both properties should still be invalid
runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, "rowId");
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowFieldName");
runner.assertNotValid();
// only a row id field name should make it valid
runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowFieldName");
runner.assertValid();
// only a row id should make it valid
runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, "rowId");
runner.assertValid();
}
@Test
public void testSingleJsonDocAndProvidedRowId() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes("UTF-8"));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field1", "value1");
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals("hbase://" + DEFAULT_TABLE_NAME + "/" + DEFAULT_ROW, event.getTransitUri());
}
@Test
public void testSingJsonDocAndExtractedRowId() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowField");
final String content = "{ \"rowField\" : \"myRowId\", \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should be a put with row id of myRowId, and rowField shouldn't end up in the columns
final Map<String,String> expectedColumns1 = new HashMap<>();
expectedColumns1.put("field1", "value1");
expectedColumns1.put("field2", "value2");
HBaseTestUtil.verifyPut("myRowId", DEFAULT_COLUMN_FAMILY, expectedColumns1, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://" + DEFAULT_TABLE_NAME + "/myRowId", ProvenanceEventType.SEND);
}
@Test
public void testSingJsonDocAndExtractedRowIdMissingField() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowField");
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
outFile.assertContentEquals(content);
// should be no provenance events
assertEquals(0, runner.getProvenanceEvents().size());
// no puts should have made it to the client
assertEquals(0, hBaseClient.getFlowFilePuts().size());
}
@Test
public void testMultipleJsonDocsRouteToFailure() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
final String content1 = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
final String content2 = "{ \"field3\" : \"value3\", \"field4\" : \"value4\" }";
final String content = "[ " + content1 + " , " + content2 + " ]";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
outFile.assertContentEquals(content);
// should be no provenance events
assertEquals(0, runner.getProvenanceEvents().size());
// no puts should have made it to the client
assertEquals(0, hBaseClient.getFlowFilePuts().size());
}
@Test
public void testELWithProvidedRowId() throws IOException, InitializationException {
final TestRunner runner = getTestRunner("${hbase.table}", "${hbase.colFamily}", "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, "${hbase.rowId}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("hbase.table", "myTable");
attributes.put("hbase.colFamily", "myColFamily");
attributes.put("hbase.rowId", "myRowId");
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes("UTF-8"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable");
assertEquals(1, puts.size());
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field1", "value1");
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut("myRowId", "myColFamily", expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://myTable/myRowId", ProvenanceEventType.SEND);
}
@Test
public void testELWithExtractedRowId() throws IOException, InitializationException {
final TestRunner runner = getTestRunner("${hbase.table}", "${hbase.colFamily}", "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "${hbase.rowField}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("hbase.table", "myTable");
attributes.put("hbase.colFamily", "myColFamily");
attributes.put("hbase.rowField", "field1");
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes("UTF-8"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable");
assertEquals(1, puts.size());
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut("value1", "myColFamily", expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://myTable/value1", ProvenanceEventType.SEND);
}
@Test
public void testNullAndArrayElementsWithWarnStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_WARN.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should have skipped field1 and field3
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
}
@Test
public void testNullAndArrayElementsWithIgnoreStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_IGNORE.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should have skipped field1 and field3
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
}
@Test
public void testNullAndArrayElementsWithFailureStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_FAIL.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
outFile.assertContentEquals(content);
// should be no provenance events
assertEquals(0, runner.getProvenanceEvents().size());
// no puts should have made it to the client
assertEquals(0, hBaseClient.getFlowFilePuts().size());
}
@Test
public void testNullAndArrayElementsWithTextStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_TEXT.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should have skipped field1 and field3
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field1", "[{\"child_field1\":\"child_value1\"}]");
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
}
@Test
public void testNestedDocWithTextStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_TEXT.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : { \"child_field1\" : \"child_value1\" }, \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should have skipped field1 and field3
final Map<String,String> expectedColumns = new HashMap<>();
expectedColumns.put("field1", "{\"child_field1\":\"child_value1\"}");
expectedColumns.put("field2", "value2");
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
}
@Test
public void testAllElementsAreNullOrArrays() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_WARN.getValue());
// should route to failure since it would produce a put with no columns
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
outFile.assertContentEquals(content);
// should be no provenance events
assertEquals(0, runner.getProvenanceEvents().size());
// no puts should have made it to the client
assertEquals(0, hBaseClient.getFlowFilePuts().size());
}
@Test
public void testInvalidJson() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
final String content = "NOT JSON";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
}
private TestRunner getTestRunner(String table, String columnFamily, String batchSize) {
final TestRunner runner = TestRunners.newTestRunner(PutHBaseJSON.class);
runner.setProperty(PutHBaseJSON.TABLE_NAME, table);
runner.setProperty(PutHBaseJSON.COLUMN_FAMILY, columnFamily);
runner.setProperty(PutHBaseJSON.BATCH_SIZE, batchSize);
return runner;
}
private MockHBaseClientService getHBaseClientService(final TestRunner runner) throws InitializationException {
final MockHBaseClientService hBaseClient = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClient);
runner.enableControllerService(hBaseClient);
runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
return hBaseClient;
}
}

View File

@ -20,6 +20,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultHandler;
@ -73,6 +74,16 @@ public interface HBaseClientService extends ControllerService {
*/
void put(String tableName, Collection<PutFlowFile> puts) throws IOException;
/**
* Puts the given row to HBase with the provided columns.
*
* @param tableName the name of an HBase table
* @param rowId the id of the row to put
* @param columns the columns of the row to put
* @throws IOException thrown when there are communication errors with HBase
*/
void put(String tableName, String rowId, Collection<PutColumn> columns) throws IOException;
/**
* Scans the given table using the optional filter criteria and passing each result to the provided handler.
*

View File

@ -14,10 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
package org.apache.nifi.hbase.put;
import org.apache.nifi.processor.AbstractProcessor;
/**
* Encapsulates the information for one column of a put operation.
*/
public class PutColumn {
public abstract class AbstractHBaseProcessor extends AbstractProcessor {
private final String columnFamily;
private final String columnQualifier;
private final byte[] buffer;
public PutColumn(final String columnFamily, final String columnQualifier, final byte[] buffer) {
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
this.buffer = buffer;
}
public String getColumnFamily() {
return columnFamily;
}
public String getColumnQualifier() {
return columnQualifier;
}
public byte[] getBuffer() {
return buffer;
}
}

View File

@ -16,8 +16,11 @@
*/
package org.apache.nifi.hbase.put;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.flowfile.FlowFile;
import java.util.Collection;
/**
* Wrapper to encapsulate all of the information for the Put along with the FlowFile.
*/
@ -25,18 +28,13 @@ public class PutFlowFile {
private final String tableName;
private final String row;
private final String columnFamily;
private final String columnQualifier;
private final byte[] buffer;
private final Collection<PutColumn> columns;
private final FlowFile flowFile;
public PutFlowFile(String tableName, String row, String columnFamily, String columnQualifier,
byte[] buffer, FlowFile flowFile) {
public PutFlowFile(String tableName, String row, Collection<PutColumn> columns, FlowFile flowFile) {
this.tableName = tableName;
this.row = row;
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
this.buffer = buffer;
this.columns = columns;
this.flowFile = flowFile;
}
@ -48,20 +46,26 @@ public class PutFlowFile {
return row;
}
public String getColumnFamily() {
return columnFamily;
}
public String getColumnQualifier() {
return columnQualifier;
}
public byte[] getBuffer() {
return buffer;
public Collection<PutColumn> getColumns() {
return columns;
}
public FlowFile getFlowFile() {
return flowFile;
}
public boolean isValid() {
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || flowFile == null || columns == null || columns.isEmpty()) {
return false;
}
for (PutColumn column : columns) {
if (StringUtils.isBlank(column.getColumnQualifier()) || StringUtils.isBlank(column.getColumnFamily()) || column.getBuffer() == null) {
return false;
}
}
return true;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
@ -195,15 +196,33 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
put = new Put(putFlowFile.getRow().getBytes(StandardCharsets.UTF_8));
rowPuts.put(putFlowFile.getRow(), put);
}
put.addColumn(putFlowFile.getColumnFamily().getBytes(StandardCharsets.UTF_8),
putFlowFile.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
putFlowFile.getBuffer());
for (final PutColumn column : putFlowFile.getColumns()) {
put.addColumn(
column.getColumnFamily().getBytes(StandardCharsets.UTF_8),
column.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
column.getBuffer());
}
}
table.put(new ArrayList<>(rowPuts.values()));
}
}
@Override
public void put(final String tableName, final String rowId, final Collection<PutColumn> columns) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
Put put = new Put(rowId.getBytes(StandardCharsets.UTF_8));
for (final PutColumn column : columns) {
put.addColumn(
column.getColumnFamily().getBytes(StandardCharsets.UTF_8),
column.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
column.getBuffer());
}
table.put(put);
}
}
@Override
public void scan(final String tableName, final Collection<Column> columns, final String filterExpression, final long minTime, final ResultHandler handler)
throws IOException {

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
@ -41,6 +42,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@ -130,8 +132,9 @@ public class TestHBase_1_1_2_ClientService {
final String columnQualifier = "qualifier1";
final String content = "content1";
final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
content.getBytes(StandardCharsets.UTF_8), null);
final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
content.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columns, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
@ -168,11 +171,13 @@ public class TestHBase_1_1_2_ClientService {
final String content1 = "content1";
final String content2 = "content2";
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
content1.getBytes(StandardCharsets.UTF_8), null);
final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
content1.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row, columns1, null);
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
content2.getBytes(StandardCharsets.UTF_8), null);
final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
content2.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row, columns2, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
@ -214,11 +219,13 @@ public class TestHBase_1_1_2_ClientService {
final String content1 = "content1";
final String content2 = "content2";
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1, columnFamily, columnQualifier,
content1.getBytes(StandardCharsets.UTF_8), null);
final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
content1.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1, columns1, null);
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2, columnFamily, columnQualifier,
content2.getBytes(StandardCharsets.UTF_8), null);
final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily, columnQualifier,
content2.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2, columns2, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);