NIFI-1784 This closes #1349. Initial commit for FetchHBaseRow processor

This commit is contained in:
Bryan Bende 2016-12-20 16:18:06 -05:00 committed by joewitt
parent a90fa9c285
commit b207397a11
16 changed files with 1609 additions and 37 deletions

View File

@ -0,0 +1,408 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hbase.io.JsonFullRowSerializer;
import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
import org.apache.nifi.hbase.io.RowSerializer;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hbase", "scan", "fetch", "get", "enrich"})
@CapabilityDescription("Fetches a row from an HBase table. The Destination property controls whether the cells are added as flow file attributes, " +
"or the row is written to the flow file content as JSON. This processor may be used to fetch a fixed row on a interval by specifying the " +
"table and row id directly in the processor, or it may be used to dynamically fetch rows by referencing the table and row id from " +
"incoming flow files.")
@WritesAttributes({
@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"),
@WritesAttribute(attribute = "hbase.row", description = "A JSON document representing the row. This property is only written when a Destination of flowfile-attributes is selected."),
@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise")
})
public class FetchHBaseRow extends AbstractProcessor {
static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
.description("Specifies the Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the HBase Table to fetch from.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor ROW_ID = new PropertyDescriptor.Builder()
.name("Row Identifier")
.description("The identifier of the row to fetch.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor COLUMNS = new PropertyDescriptor.Builder()
.name("Columns")
.description("An optional comma-separated list of \"<colFamily>:<colQualifier>\" pairs to fetch. To return all columns " +
"for a given family, leave off the qualifier such as \"<colFamily1>,<colFamily2>\".")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
.build();
static final AllowableValue DESTINATION_ATTRIBUTES = new AllowableValue("flowfile-attributes", "flowfile-attributes",
"Adds the JSON document representing the row that was fetched as an attribute named hbase.row. " +
"The format of the JSON document is determined by the JSON Format property. " +
"NOTE: Fetching many large rows into attributes may have a negative impact on performance.");
static final AllowableValue DESTINATION_CONTENT = new AllowableValue("flowfile-content", "flowfile-content",
"Overwrites the FlowFile content with a JSON document representing the row that was fetched. " +
"The format of the JSON document is determined by the JSON Format property.");
static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
.name("Destination")
.description("Indicates whether the row fetched from HBase is written to FlowFile content or FlowFile Attributes.")
.required(true)
.allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT)
.defaultValue(DESTINATION_ATTRIBUTES.getValue())
.build();
static final AllowableValue JSON_FORMAT_FULL_ROW = new AllowableValue("full-row", "full-row",
"Creates a JSON document with the format: {\"row\":<row-id>, \"cells\":[{\"fam\":<col-fam>, \"qual\":<col-val>, \"val\":<value>, \"ts\":<timestamp>}]}.");
static final AllowableValue JSON_FORMAT_QUALIFIER_AND_VALUE = new AllowableValue("col-qual-and-val", "col-qual-and-val",
"Creates a JSON document with the format: {\"<col-qual>\":\"<value>\", \"<col-qual>\":\"<value>\".");
static final PropertyDescriptor JSON_FORMAT = new PropertyDescriptor.Builder()
.name("JSON Format")
.description("Specifies how to represent the HBase row as a JSON document.")
.required(true)
.allowableValues(JSON_FORMAT_FULL_ROW, JSON_FORMAT_QUALIFIER_AND_VALUE)
.defaultValue(JSON_FORMAT_FULL_ROW.getValue())
.build();
static final AllowableValue ENCODING_NONE = new AllowableValue("none", "none", "Creates a String using the bytes of given data and the given Character Set.");
static final AllowableValue ENCODING_BASE64 = new AllowableValue("base64", "base64", "Creates a Base64 encoded String of the given data.");
static final PropertyDescriptor JSON_VALUE_ENCODING = new PropertyDescriptor.Builder()
.name("JSON Value Encoding")
.description("Specifies how to represent row ids, column families, column qualifiers, and values when stored in FlowFile attributes, or written to JSON.")
.required(true)
.allowableValues(ENCODING_NONE, ENCODING_BASE64)
.defaultValue(ENCODING_NONE.getValue())
.build();
static final PropertyDescriptor DECODE_CHARSET = new PropertyDescriptor.Builder()
.name("Decode Character Set")
.description("The character set used to decode data from HBase.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
static final PropertyDescriptor ENCODE_CHARSET = new PropertyDescriptor.Builder()
.name("Encode Character Set")
.description("The character set used to encode the JSON representation of the row.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All successful fetches are routed to this relationship.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("All failed fetches are routed to this relationship.")
.build();
static final Relationship REL_NOT_FOUND = new Relationship.Builder()
.name("not found")
.description("All fetches where the row id is not found are routed to this relationship.")
.build();
static final String HBASE_TABLE_ATTR = "hbase.table";
static final String HBASE_ROW_ATTR = "hbase.row";
static final List<PropertyDescriptor> properties;
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(HBASE_CLIENT_SERVICE);
props.add(TABLE_NAME);
props.add(ROW_ID);
props.add(COLUMNS);
props.add(DESTINATION);
props.add(JSON_FORMAT);
props.add(JSON_VALUE_ENCODING);
props.add(ENCODE_CHARSET);
props.add(DECODE_CHARSET);
properties = Collections.unmodifiableList(props);
}
static final Set<Relationship> relationships;
static {
Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
rels.add(REL_NOT_FOUND);
relationships = Collections.unmodifiableSet(rels);
}
private volatile Charset decodeCharset;
private volatile Charset encodeCharset;
private volatile RowSerializer regularRowSerializer;
private volatile RowSerializer base64RowSerializer;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@OnScheduled
public void onScheduled(ProcessContext context) {
this.decodeCharset = Charset.forName(context.getProperty(DECODE_CHARSET).getValue());
this.encodeCharset = Charset.forName(context.getProperty(ENCODE_CHARSET).getValue());
final String jsonFormat = context.getProperty(JSON_FORMAT).getValue();
if (jsonFormat.equals(JSON_FORMAT_FULL_ROW.getValue())) {
this.regularRowSerializer = new JsonFullRowSerializer(decodeCharset, encodeCharset);
this.base64RowSerializer = new JsonFullRowSerializer(decodeCharset, encodeCharset, true);
} else {
this.regularRowSerializer = new JsonQualifierAndValueRowSerializer(decodeCharset, encodeCharset);
this.base64RowSerializer = new JsonQualifierAndValueRowSerializer(decodeCharset, encodeCharset, true);
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(tableName)) {
getLogger().error("Table Name is blank or null for {}, transferring to failure", new Object[] {flowFile});
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
final String rowId = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(rowId)) {
getLogger().error("Row Identifier is blank or null for {}, transferring to failure", new Object[] {flowFile});
session.transfer(session.penalize(flowFile), REL_FAILURE);
return;
}
final List<Column> columns = getColumns(context.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue());
final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
final String destination = context.getProperty(DESTINATION).getValue();
final boolean base64Encode = context.getProperty(JSON_VALUE_ENCODING).getValue().equals(ENCODING_BASE64.getValue());
final RowSerializer rowSerializer = base64Encode ? base64RowSerializer : regularRowSerializer;
final FetchHBaseRowHandler handler = destination.equals(DESTINATION_CONTENT.getValue())
? new FlowFileContentHandler(flowFile, session, rowSerializer) : new FlowFileAttributeHandler(flowFile, session, rowSerializer);
final byte[] rowIdBytes = rowId.getBytes(StandardCharsets.UTF_8);
try {
hBaseClientService.scan(tableName, rowIdBytes, rowIdBytes, columns, handler);
} catch (Exception e) {
getLogger().error("Unable to fetch row {} from {} due to {}", new Object[] {rowId, tableName, e});
session.transfer(handler.getFlowFile(), REL_FAILURE);
return;
}
FlowFile handlerFlowFile = handler.getFlowFile();
if (!handler.handledRow()) {
getLogger().error("Row {} not found in {}, transferring to not found", new Object[] {rowId, tableName});
session.transfer(handlerFlowFile, REL_NOT_FOUND);
return;
}
if (getLogger().isDebugEnabled()) {
getLogger().debug("Fetched {} from {} with row id {}", new Object[]{handlerFlowFile, tableName, rowId});
}
final Map<String, String> attributes = new HashMap<>();
attributes.put(HBASE_TABLE_ATTR, tableName);
if (destination.equals(DESTINATION_CONTENT.getValue())) {
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
}
handlerFlowFile = session.putAllAttributes(handlerFlowFile, attributes);
final String transitUri = "hbase://" + tableName + "/" + rowId;
if (destination.equals(DESTINATION_CONTENT.getValue())) {
session.getProvenanceReporter().fetch(handlerFlowFile, transitUri);
} else {
session.getProvenanceReporter().modifyAttributes(handlerFlowFile, "Added attributes to FlowFile from " + transitUri);
}
session.transfer(handlerFlowFile, REL_SUCCESS);
}
/**
* @param columnsValue a String in the form colFam:colQual,colFam:colQual
* @return a list of Columns based on parsing the given String
*/
private List<Column> getColumns(final String columnsValue) {
final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
List<Column> columnsList = new ArrayList<>(columns.length);
for (final String column : columns) {
if (column.contains(":")) {
final String[] parts = column.split(":");
final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
columnsList.add(new Column(cf, cq));
} else {
final byte[] cf = column.getBytes(StandardCharsets.UTF_8);
columnsList.add(new Column(cf, null));
}
}
return columnsList;
}
/**
* A ResultHandler that also provides access to a resulting FlowFile reference.
*/
private interface FetchHBaseRowHandler extends ResultHandler {
/**
* @return returns the flow file reference that was used by this handler
*/
FlowFile getFlowFile();
/**
* @return returns true if this handler handled a row
*/
boolean handledRow();
}
/**
* A FetchHBaseRowHandler that writes the resulting row to the FlowFile content.
*/
private static class FlowFileContentHandler implements FetchHBaseRowHandler {
private FlowFile flowFile;
private final ProcessSession session;
private final RowSerializer serializer;
private boolean handledRow = false;
public FlowFileContentHandler(final FlowFile flowFile, final ProcessSession session, final RowSerializer serializer) {
this.flowFile = flowFile;
this.session = session;
this.serializer = serializer;
}
@Override
public void handle(byte[] row, ResultCell[] resultCells) {
flowFile = session.write(flowFile, (out) -> {
serializer.serialize(row, resultCells, out);
});
handledRow = true;
}
@Override
public FlowFile getFlowFile() {
return flowFile;
}
@Override
public boolean handledRow() {
return handledRow;
}
}
/**
* A FetchHBaseRowHandler that writes the resulting row to FlowFile attributes.
*/
private static class FlowFileAttributeHandler implements FetchHBaseRowHandler {
private FlowFile flowFile;
private final ProcessSession session;
private final RowSerializer rowSerializer;
private boolean handledRow = false;
public FlowFileAttributeHandler(final FlowFile flowFile, final ProcessSession session, final RowSerializer serializer) {
this.flowFile = flowFile;
this.session = session;
this.rowSerializer = serializer;
}
@Override
public void handle(byte[] row, ResultCell[] resultCells) {
final String serializedRow = rowSerializer.serialize(row, resultCells);
flowFile = session.putAttribute(flowFile, HBASE_ROW_ATTR, serializedRow);
handledRow = true;
}
@Override
public FlowFile getFlowFile() {
return flowFile;
}
@Override
public boolean handledRow() {
return handledRow;
}
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.io;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.util.RowSerializerUtil;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
/**
* Serializes a row from HBase to a JSON document of the form:
*
* {
* "row" : "row1",
* "cells": [
* {
* "family" : "fam1",
* "qualifier" : "qual1"
* "value" : "val1"
* "timestamp" : 123456789
* },
* {
* "family" : "fam1",
* "qualifier" : "qual2"
* "value" : "val2"
* "timestamp" : 123456789
* }
* ]
* }
*
* If base64encode is true, the row id, family, qualifier, and value will be represented as base 64 encoded strings.
*/
public class JsonFullRowSerializer implements RowSerializer {
private final Charset decodeCharset;
private final Charset encodeCharset;
private final boolean base64encode;
public JsonFullRowSerializer(final Charset decodeCharset, final Charset encodeCharset) {
this(decodeCharset, encodeCharset, false);
}
public JsonFullRowSerializer(final Charset decodeCharset, final Charset encodeCharset, final boolean base64encode) {
this.decodeCharset = decodeCharset;
this.encodeCharset = encodeCharset;
this.base64encode = base64encode;
}
@Override
public String serialize(byte[] rowKey, ResultCell[] cells) {
final String rowId = RowSerializerUtil.getRowId(rowKey, decodeCharset, base64encode);
final StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append("{");
jsonBuilder.append("\"row\":");
appendString(jsonBuilder, rowId, base64encode);
jsonBuilder.append(", \"cells\": [");
int i = 0;
for (final ResultCell cell : cells) {
final String cellFamily = RowSerializerUtil.getCellFamily(cell, decodeCharset, base64encode);
final String cellQualifier = RowSerializerUtil.getCellQualifier(cell, decodeCharset, base64encode);
final String cellValue = RowSerializerUtil.getCellValue(cell, decodeCharset, base64encode);
if (i > 0) {
jsonBuilder.append(", ");
}
// start cell
jsonBuilder.append("{");
jsonBuilder.append("\"fam\":");
appendString(jsonBuilder, cellFamily, base64encode);
jsonBuilder.append(",\"qual\":");
appendString(jsonBuilder, cellQualifier, base64encode);
jsonBuilder.append(",\"val\":");
appendString(jsonBuilder, cellValue, base64encode);
jsonBuilder.append(",\"ts\":");
jsonBuilder.append(String.valueOf(cell.getTimestamp()));
// end cell
jsonBuilder.append("}");
i++;
}
// end cell array
jsonBuilder.append("]");
// end overall document
jsonBuilder.append("}");
return jsonBuilder.toString();
}
@Override
public void serialize(final byte[] rowKey, final ResultCell[] cells, final OutputStream out) throws IOException {
final String json = serialize(rowKey, cells);
out.write(json.getBytes(encodeCharset));
}
private void appendString(final StringBuilder jsonBuilder, final String str, final boolean base64encode) {
jsonBuilder.append("\"");
// only escape the value when not doing base64
if (!base64encode) {
jsonBuilder.append(StringEscapeUtils.escapeJson(str));
} else {
jsonBuilder.append(str);
}
jsonBuilder.append("\"");
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.io;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.util.RowSerializerUtil;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
/**
* Serializes an HBase row to a JSON document of the form:
*
* {
* "qual1" : "val1",
* "qual2" : "val2"
* }
*
* If base64encode is true, the qualifiers and values will be represented as base 64 encoded strings.
*/
public class JsonQualifierAndValueRowSerializer implements RowSerializer {
private final Charset decodeCharset;
private final Charset encodeCharset;
private final boolean base64encode;
public JsonQualifierAndValueRowSerializer(final Charset decodeCharset, final Charset encodeCharset) {
this(decodeCharset, encodeCharset, false);
}
public JsonQualifierAndValueRowSerializer(final Charset decodeCharset, final Charset encodeCharset, final boolean base64encode) {
this.decodeCharset = decodeCharset;
this.encodeCharset = encodeCharset;
this.base64encode = base64encode;
}
@Override
public String serialize(byte[] rowKey, ResultCell[] cells) {
final StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append("{");
int i = 0;
for (final ResultCell cell : cells) {
final String cellQualifier = RowSerializerUtil.getCellQualifier(cell, decodeCharset, base64encode);
final String cellValue = RowSerializerUtil.getCellValue(cell, decodeCharset, base64encode);
if (i > 0) {
jsonBuilder.append(", ");
}
appendString(jsonBuilder, cellQualifier, base64encode);
jsonBuilder.append(":");
appendString(jsonBuilder, cellValue, base64encode);
i++;
}
jsonBuilder.append("}");
return jsonBuilder.toString();
}
@Override
public void serialize(final byte[] rowKey, final ResultCell[] cells, final OutputStream out) throws IOException {
final String json = serialize(rowKey, cells);
out.write(json.getBytes(encodeCharset));
}
private void appendString(final StringBuilder jsonBuilder, final String str, final boolean base64encode) {
jsonBuilder.append("\"");
// only escape the value when not doing base64
if (!base64encode) {
jsonBuilder.append(StringEscapeUtils.escapeJson(str));
} else {
jsonBuilder.append(str);
}
jsonBuilder.append("\"");
}
}

View File

@ -23,6 +23,18 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
/**
* Serializes a row from HBase to a JSON document of the form:
*
* {
* "row" : "row1",
* "cells": {
* "fam1:qual1" : "val1",
* "fam1:qual2" : "val2"
* }
* }
*
*/
public class JsonRowSerializer implements RowSerializer {
private final Charset charset;
@ -32,7 +44,7 @@ public class JsonRowSerializer implements RowSerializer {
}
@Override
public void serialize(final byte[] rowKey, final ResultCell[] cells, final OutputStream out) throws IOException {
public String serialize(byte[] rowKey, ResultCell[] cells) {
final StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append("{");
@ -62,7 +74,12 @@ public class JsonRowSerializer implements RowSerializer {
}
jsonBuilder.append("}}");
final String json = jsonBuilder.toString();
return jsonBuilder.toString();
}
@Override
public void serialize(final byte[] rowKey, final ResultCell[] cells, final OutputStream out) throws IOException {
final String json = serialize(rowKey, cells);
out.write(json.getBytes(charset));
}

View File

@ -32,4 +32,13 @@ public interface RowSerializer {
* @throws IOException if unable to serialize the row
*/
void serialize(byte[] rowKey, ResultCell[] cells, OutputStream out) throws IOException;
/**
*
* @param rowKey the row key of the row being serialized
* @param cells the cells of the row being serialized
* @return the serialized string representing the row
*/
String serialize(byte[] rowKey, ResultCell[] cells);
}

View File

@ -20,9 +20,9 @@ import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@ -37,7 +37,7 @@ public class ObjectSerDe implements Serializer<Object>, Deserializer<Object> {
}
try (final ByteArrayInputStream in = new ByteArrayInputStream(input);
final ObjectInputStream objIn = new ObjectInputStream(in)) {
final ObjectInputStream objIn = new ObjectInputStream(in)) {
return objIn.readObject();
} catch (ClassNotFoundException e) {
throw new DeserializationException("Could not deserialize object due to ClassNotFoundException", e);
@ -47,7 +47,7 @@ public class ObjectSerDe implements Serializer<Object>, Deserializer<Object> {
@Override
public void serialize(Object value, OutputStream output) throws SerializationException, IOException {
try (final ByteArrayOutputStream bOut = new ByteArrayOutputStream();
final ObjectOutputStream objOut = new ObjectOutputStream(bOut)) {
final ObjectOutputStream objOut = new ObjectOutputStream(bOut)) {
objOut.writeObject(value);
output.write(bOut.toByteArray());
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.util;
import org.apache.nifi.hbase.scan.ResultCell;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
public class RowSerializerUtil {
/**
* @param rowId the row id to get the string from
* @param charset the charset that was used to encode the cell's row
* @param base64encodeValues whether or not to base64 encode the returned string
*
* @return the String representation of the cell's row
*/
public static String getRowId(final byte[] rowId, final Charset charset, final boolean base64encodeValues) {
if (base64encodeValues) {
ByteBuffer cellRowBuffer = ByteBuffer.wrap(rowId);
ByteBuffer base64Buffer = Base64.getEncoder().encode(cellRowBuffer);
return new String(base64Buffer.array(), StandardCharsets.UTF_8);
} else {
return new String(rowId, charset);
}
}
/**
* @param cell the cell to get the family from
* @param charset the charset that was used to encode the cell's family
* @param base64encodeValues whether or not to base64 encode the returned string
*
* @return the String representation of the cell's family
*/
public static String getCellFamily(final ResultCell cell, final Charset charset, final boolean base64encodeValues) {
if (base64encodeValues) {
ByteBuffer cellFamilyBuffer = ByteBuffer.wrap(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
ByteBuffer base64Buffer = Base64.getEncoder().encode(cellFamilyBuffer);
return new String(base64Buffer.array(), StandardCharsets.UTF_8);
} else {
return new String(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), charset);
}
}
/**
* @param cell the cell to get the qualifier from
* @param charset the charset that was used to encode the cell's qualifier
* @param base64encodeValues whether or not to base64 encode the returned string
*
* @return the String representation of the cell's qualifier
*/
public static String getCellQualifier(final ResultCell cell, final Charset charset, final boolean base64encodeValues) {
if (base64encodeValues) {
ByteBuffer cellQualifierBuffer = ByteBuffer.wrap(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
ByteBuffer base64Buffer = Base64.getEncoder().encode(cellQualifierBuffer);
return new String(base64Buffer.array(), StandardCharsets.UTF_8);
} else {
return new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), charset);
}
}
/**
* @param cell the cell to get the value from
* @param charset the charset that was used to encode the cell's value
* @param base64encodeValues whether or not to base64 encode the returned string
*
* @return the String representation of the cell's value
*/
public static String getCellValue(final ResultCell cell, final Charset charset, final boolean base64encodeValues) {
if (base64encodeValues) {
ByteBuffer cellValueBuffer = ByteBuffer.wrap(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
ByteBuffer base64Buffer = Base64.getEncoder().encode(cellValueBuffer);
return new String(base64Buffer.array(), StandardCharsets.UTF_8);
} else {
return new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), charset);
}
}
}

View File

@ -16,3 +16,4 @@
org.apache.nifi.hbase.GetHBase
org.apache.nifi.hbase.PutHBaseCell
org.apache.nifi.hbase.PutHBaseJSON
org.apache.nifi.hbase.FetchHBaseRow

View File

@ -28,6 +28,7 @@ import org.apache.nifi.hbase.scan.ResultHandler;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@ -38,6 +39,7 @@ public class MockHBaseClientService extends AbstractControllerService implements
private Map<String,ResultCell[]> results = new HashMap<>();
private Map<String, List<PutFlowFile>> flowFilePuts = new HashMap<>();
private boolean throwException = false;
private int numScans = 0;
@Override
public void put(String tableName, Collection<PutFlowFile> puts) throws IOException {
@ -49,10 +51,44 @@ public class MockHBaseClientService extends AbstractControllerService implements
}
@Override
public void put(String tableName, byte[] rowId, Collection<PutColumn> columns) throws IOException {
public void put(String tableName, byte[] startRow, Collection<PutColumn> columns) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void scan(String tableName, byte[] startRow, byte[] endRow, Collection<Column> columns, ResultHandler handler) throws IOException {
if (throwException) {
throw new IOException("exception");
}
for (final Map.Entry<String,ResultCell[]> entry : results.entrySet()) {
List<ResultCell> matchedCells = new ArrayList<>();
if (columns == null || columns.isEmpty()) {
Arrays.stream(entry.getValue()).forEach(e -> matchedCells.add(e));
} else {
for (Column column : columns) {
String colFam = new String(column.getFamily(), StandardCharsets.UTF_8);
String colQual = new String(column.getQualifier(), StandardCharsets.UTF_8);
for (ResultCell cell : entry.getValue()) {
String cellFam = new String(cell.getFamilyArray(), StandardCharsets.UTF_8);
String cellQual = new String(cell.getQualifierArray(), StandardCharsets.UTF_8);
if (colFam.equals(cellFam) && colQual.equals(cellQual)) {
matchedCells.add(cell);
}
}
}
}
handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8), matchedCells.toArray(new ResultCell[matchedCells.size()]));
}
numScans++;
}
@Override
public void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, ResultHandler handler) throws IOException {
if (throwException) {
@ -63,6 +99,8 @@ public class MockHBaseClientService extends AbstractControllerService implements
for (final Map.Entry<String,ResultCell[]> entry : results.entrySet()) {
handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue());
}
numScans++;
}
public void addResult(final String rowKey, final Map<String, String> cells, final long timestamp) {
@ -108,6 +146,10 @@ public class MockHBaseClientService extends AbstractControllerService implements
this.throwException = throwException;
}
public int getNumScans() {
return numScans;
}
@Override
public byte[] toBytes(final boolean b) {
return new byte[] { b ? (byte) -1 : (byte) 0 };

View File

@ -0,0 +1,403 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.commons.codec.binary.Base64;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class TestFetchHBaseRow {
private FetchHBaseRow proc;
private MockHBaseClientService hBaseClientService;
private TestRunner runner;
@Before
public void setup() throws InitializationException {
proc = new FetchHBaseRow();
runner = TestRunners.newTestRunner(proc);
hBaseClientService = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClientService);
runner.enableControllerService(hBaseClientService);
runner.setProperty(FetchHBaseRow.HBASE_CLIENT_SERVICE, "hbaseClient");
}
@Test
public void testColumnsValidation() {
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.assertValid();
runner.setProperty(FetchHBaseRow.COLUMNS, "cf1:cq1");
runner.assertValid();
runner.setProperty(FetchHBaseRow.COLUMNS, "cf1");
runner.assertValid();
runner.setProperty(FetchHBaseRow.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3");
runner.assertValid();
runner.setProperty(FetchHBaseRow.COLUMNS, "cf1,cf2:cq1,cf3");
runner.assertValid();
runner.setProperty(FetchHBaseRow.COLUMNS, "cf1 cf2,cf3");
runner.assertNotValid();
runner.setProperty(FetchHBaseRow.COLUMNS, "cf1:,cf2,cf3");
runner.assertNotValid();
runner.setProperty(FetchHBaseRow.COLUMNS, "cf1:cq1,");
runner.assertNotValid();
}
@Test
public void testNoIncomingFlowFile() {
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0);
runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 0);
runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0);
Assert.assertEquals(0, hBaseClientService.getNumScans());
}
@Test
public void testInvalidTableName() {
runner.setProperty(FetchHBaseRow.TABLE_NAME, "${hbase.table}");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.enqueue("trigger flow file");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 1);
runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 0);
runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0);
Assert.assertEquals(0, hBaseClientService.getNumScans());
}
@Test
public void testInvalidRowId() {
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "${hbase.row}");
runner.enqueue("trigger flow file");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 1);
runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 0);
runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0);
Assert.assertEquals(0, hBaseClientService.getNumScans());
}
@Test
public void testFetchToAttributesWithStringValues() {
final Map<String, String> cells = new HashMap<>();
cells.put("cq1", "val1");
cells.put("cq2", "val2");
final long ts1 = 123456789;
hBaseClientService.addResult("row1", cells, ts1);
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_ATTRIBUTES);
runner.enqueue("trigger flow file");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0);
runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1);
runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(FetchHBaseRow.HBASE_ROW_ATTR,
"{\"row\":\"row1\", \"cells\": [" +
"{\"fam\":\"nifi\",\"qual\":\"cq1\",\"val\":\"val1\",\"ts\":" + ts1 + "}, " +
"{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}");
Assert.assertEquals(1, hBaseClientService.getNumScans());
}
@Test
public void testFetchSpecificColumnsToAttributesWithStringValues() {
final Map<String, String> cells = new HashMap<>();
cells.put("cq1", "val1");
cells.put("cq2", "val2");
final long ts1 = 123456789;
hBaseClientService.addResult("row1", cells, ts1);
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.COLUMNS, "nifi:cq2");
runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_ATTRIBUTES);
runner.enqueue("trigger flow file");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0);
runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1);
runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(FetchHBaseRow.HBASE_ROW_ATTR,
"{\"row\":\"row1\", \"cells\": [{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}");
Assert.assertEquals(1, hBaseClientService.getNumScans());
}
@Test
public void testFetchToAttributesWithBase64Values() {
final Map<String, String> cells = new HashMap<>();
cells.put("cq1", "val1");
cells.put("cq2", "val2");
final long ts1 = 123456789;
hBaseClientService.addResult("row1", cells, ts1);
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_ATTRIBUTES);
runner.setProperty(FetchHBaseRow.JSON_VALUE_ENCODING, FetchHBaseRow.ENCODING_BASE64);
runner.enqueue("trigger flow file");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0);
runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1);
runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0);
final String rowBase64 = Base64.encodeBase64String("row1".getBytes(StandardCharsets.UTF_8));
final String fam1Base64 = Base64.encodeBase64String("nifi".getBytes(StandardCharsets.UTF_8));
final String qual1Base64 = Base64.encodeBase64String("cq1".getBytes(StandardCharsets.UTF_8));
final String val1Base64 = Base64.encodeBase64String("val1".getBytes(StandardCharsets.UTF_8));
final String fam2Base64 = Base64.encodeBase64String("nifi".getBytes(StandardCharsets.UTF_8));
final String qual2Base64 = Base64.encodeBase64String("cq2".getBytes(StandardCharsets.UTF_8));
final String val2Base64 = Base64.encodeBase64String("val2".getBytes(StandardCharsets.UTF_8));
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(FetchHBaseRow.HBASE_ROW_ATTR,
"{\"row\":\"" + rowBase64 + "\", \"cells\": [" +
"{\"fam\":\"" + fam1Base64 + "\",\"qual\":\"" + qual1Base64 + "\",\"val\":\"" + val1Base64 + "\",\"ts\":" + ts1 + "}, " +
"{\"fam\":\"" + fam2Base64 + "\",\"qual\":\"" + qual2Base64 + "\",\"val\":\"" + val2Base64 + "\",\"ts\":" + ts1 + "}]}");
Assert.assertEquals(1, hBaseClientService.getNumScans());
}
@Test
public void testFetchToAttributesNoResults() {
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_ATTRIBUTES);
runner.enqueue("trigger flow file");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0);
runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 0);
runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 1);
Assert.assertEquals(1, hBaseClientService.getNumScans());
}
@Test
public void testFetchToContentWithStringValues() {
final Map<String, String> cells = new HashMap<>();
cells.put("cq1", "val1");
cells.put("cq2", "val2");
final long ts1 = 123456789;
hBaseClientService.addResult("row1", cells, ts1);
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_CONTENT);
runner.enqueue("trigger flow file");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0);
runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1);
runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0);
flowFile.assertContentEquals("{\"row\":\"row1\", \"cells\": [" +
"{\"fam\":\"nifi\",\"qual\":\"cq1\",\"val\":\"val1\",\"ts\":" + ts1 + "}, " +
"{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}");
Assert.assertEquals(1, hBaseClientService.getNumScans());
}
@Test
public void testFetchSpecificColumnsToContentWithStringValues() {
final Map<String, String> cells = new HashMap<>();
cells.put("cq1", "val1");
cells.put("cq2", "val2");
final long ts1 = 123456789;
hBaseClientService.addResult("row1", cells, ts1);
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_CONTENT);
runner.setProperty(FetchHBaseRow.COLUMNS, "nifi:cq2");
runner.enqueue("trigger flow file");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0);
runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1);
runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0);
flowFile.assertContentEquals("{\"row\":\"row1\", \"cells\": [{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}");
Assert.assertEquals(1, hBaseClientService.getNumScans());
}
@Test
public void testFetchSpecificColumnsToContentWithBase64() {
final Map<String, String> cells = new HashMap<>();
cells.put("cq1", "val1");
cells.put("cq2", "val2");
final long ts1 = 123456789;
hBaseClientService.addResult("row1", cells, ts1);
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_CONTENT);
runner.setProperty(FetchHBaseRow.JSON_VALUE_ENCODING, FetchHBaseRow.ENCODING_BASE64);
runner.enqueue("trigger flow file");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0);
runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1);
runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0);
final String rowBase64 = Base64.encodeBase64String("row1".getBytes(StandardCharsets.UTF_8));
final String fam1Base64 = Base64.encodeBase64String("nifi".getBytes(StandardCharsets.UTF_8));
final String qual1Base64 = Base64.encodeBase64String("cq1".getBytes(StandardCharsets.UTF_8));
final String val1Base64 = Base64.encodeBase64String("val1".getBytes(StandardCharsets.UTF_8));
final String fam2Base64 = Base64.encodeBase64String("nifi".getBytes(StandardCharsets.UTF_8));
final String qual2Base64 = Base64.encodeBase64String("cq2".getBytes(StandardCharsets.UTF_8));
final String val2Base64 = Base64.encodeBase64String("val2".getBytes(StandardCharsets.UTF_8));
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0);
flowFile.assertContentEquals("{\"row\":\"" + rowBase64 + "\", \"cells\": [" +
"{\"fam\":\"" + fam1Base64 + "\",\"qual\":\"" + qual1Base64 + "\",\"val\":\"" + val1Base64 + "\",\"ts\":" + ts1 + "}, " +
"{\"fam\":\"" + fam2Base64 + "\",\"qual\":\"" + qual2Base64 + "\",\"val\":\"" + val2Base64 + "\",\"ts\":" + ts1 + "}]}");
Assert.assertEquals(1, hBaseClientService.getNumScans());
}
@Test
public void testFetchToContentWithQualifierAndValueJSON() {
final Map<String, String> cells = new HashMap<>();
cells.put("cq1", "val1");
cells.put("cq2", "val2");
hBaseClientService.addResult("row1", cells, System.currentTimeMillis());
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_CONTENT);
runner.setProperty(FetchHBaseRow.JSON_FORMAT, FetchHBaseRow.JSON_FORMAT_QUALIFIER_AND_VALUE);
runner.enqueue("trigger flow file");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0);
runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1);
runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0);
flowFile.assertContentEquals("{\"cq1\":\"val1\", \"cq2\":\"val2\"}");
Assert.assertEquals(1, hBaseClientService.getNumScans());
}
@Test
public void testFetchWithExpressionLanguage() {
final Map<String, String> cells = new HashMap<>();
cells.put("cq1", "val1");
cells.put("cq2", "val2");
final long ts1 = 123456789;
hBaseClientService.addResult("row1", cells, ts1);
runner.setProperty(FetchHBaseRow.TABLE_NAME, "${hbase.table}");
runner.setProperty(FetchHBaseRow.ROW_ID, "${hbase.row}");
runner.setProperty(FetchHBaseRow.COLUMNS, "${hbase.cols}");
runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_CONTENT);
final Map<String,String> attributes = new HashMap<>();
attributes.put("hbase.table", "table1");
attributes.put("hbase.row", "row1");
attributes.put("hbase.cols", "nifi:cq2");
runner.enqueue("trigger flow file", attributes);
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0);
runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1);
runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0);
flowFile.assertContentEquals("{\"row\":\"row1\", \"cells\": [{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}");
Assert.assertEquals(1, hBaseClientService.getNumScans());
}
@Test
public void testFetchWhenScanThrowsException() {
hBaseClientService.setThrowException(true);
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_ATTRIBUTES);
runner.enqueue("trigger flow file");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 1);
runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 0);
runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0);
Assert.assertEquals(0, hBaseClientService.getNumScans());
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.io;
import org.apache.commons.codec.binary.Base64;
import org.apache.nifi.hbase.scan.ResultCell;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class TestJsonFullRowSerializer {
static final String ROW = "row1";
static final String FAM1 = "colFam1";
static final String QUAL1 = "colQual1";
static final String VAL1 = "val1";
static final long TS1 = 1111111111;
static final String FAM2 = "colFam2";
static final String QUAL2 = "colQual2";
static final String VAL2 = "val2";
static final long TS2 = 222222222;
private final byte[] rowKey = ROW.getBytes(StandardCharsets.UTF_8);
private ResultCell[] cells;
@Before
public void setup() {
final byte[] cell1Fam = FAM1.getBytes(StandardCharsets.UTF_8);
final byte[] cell1Qual = QUAL1.getBytes(StandardCharsets.UTF_8);
final byte[] cell1Val = VAL1.getBytes(StandardCharsets.UTF_8);
final byte[] cell2Fam = FAM2.getBytes(StandardCharsets.UTF_8);
final byte[] cell2Qual = QUAL2.getBytes(StandardCharsets.UTF_8);
final byte[] cell2Val = VAL2.getBytes(StandardCharsets.UTF_8);
final ResultCell cell1 = getResultCell(cell1Fam, cell1Qual, cell1Val, TS1);
final ResultCell cell2 = getResultCell(cell2Fam, cell2Qual, cell2Val, TS2);
cells = new ResultCell[] { cell1, cell2 };
}
@Test
public void testSerializeRegular() throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final RowSerializer rowSerializer = new JsonFullRowSerializer(StandardCharsets.UTF_8, StandardCharsets.UTF_8);
rowSerializer.serialize(rowKey, cells, out);
final String json = out.toString(StandardCharsets.UTF_8.name());
Assert.assertEquals("{\"row\":\"row1\", \"cells\": [" +
"{\"fam\":\"" + FAM1 + "\",\"qual\":\"" + QUAL1 + "\",\"val\":\"" + VAL1 + "\",\"ts\":" + TS1 + "}, " +
"{\"fam\":\"" + FAM2 + "\",\"qual\":\"" + QUAL2 + "\",\"val\":\"" + VAL2 + "\",\"ts\":" + TS2 + "}]}",
json);
}
@Test
public void testSerializeWithBase64() throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final RowSerializer rowSerializer = new JsonFullRowSerializer(StandardCharsets.UTF_8, StandardCharsets.UTF_8, true);
rowSerializer.serialize(rowKey, cells, out);
final String rowBase64 = Base64.encodeBase64String(ROW.getBytes(StandardCharsets.UTF_8));
final String fam1Base64 = Base64.encodeBase64String(FAM1.getBytes(StandardCharsets.UTF_8));
final String qual1Base64 = Base64.encodeBase64String(QUAL1.getBytes(StandardCharsets.UTF_8));
final String val1Base64 = Base64.encodeBase64String(VAL1.getBytes(StandardCharsets.UTF_8));
final String fam2Base64 = Base64.encodeBase64String(FAM2.getBytes(StandardCharsets.UTF_8));
final String qual2Base64 = Base64.encodeBase64String(QUAL2.getBytes(StandardCharsets.UTF_8));
final String val2Base64 = Base64.encodeBase64String(VAL2.getBytes(StandardCharsets.UTF_8));
final String json = out.toString(StandardCharsets.UTF_8.name());
Assert.assertEquals("{\"row\":\"" + rowBase64 + "\", \"cells\": [" +
"{\"fam\":\"" + fam1Base64 + "\",\"qual\":\"" + qual1Base64 + "\",\"val\":\"" + val1Base64 + "\",\"ts\":" + TS1 + "}, " +
"{\"fam\":\"" + fam2Base64 + "\",\"qual\":\"" + qual2Base64 + "\",\"val\":\"" + val2Base64 + "\",\"ts\":" + TS2 + "}]}", json);
}
private ResultCell getResultCell(byte[] fam, byte[] qual, byte[] val, long timestamp) {
final ResultCell cell = new ResultCell();
cell.setFamilyArray(fam);
cell.setFamilyOffset(0);
cell.setFamilyLength((byte)fam.length);
cell.setQualifierArray(qual);
cell.setQualifierOffset(0);
cell.setQualifierLength(qual.length);
cell.setValueArray(val);
cell.setValueOffset(0);
cell.setValueLength(val.length);
cell.setTimestamp(timestamp);
return cell;
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.io;
import org.apache.commons.codec.binary.Base64;
import org.apache.nifi.hbase.scan.ResultCell;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class TestJsonQualifierAndValueRowSerializer {
static final String ROW = "row1";
static final String FAM1 = "colFam1";
static final String QUAL1 = "colQual1";
static final String VAL1 = "val1";
static final long TS1 = 1111111111;
static final String FAM2 = "colFam2";
static final String QUAL2 = "colQual2";
static final String VAL2 = "val2";
static final long TS2 = 222222222;
private final byte[] rowKey = ROW.getBytes(StandardCharsets.UTF_8);
private ResultCell[] cells;
@Before
public void setup() {
final byte[] cell1Fam = FAM1.getBytes(StandardCharsets.UTF_8);
final byte[] cell1Qual = QUAL1.getBytes(StandardCharsets.UTF_8);
final byte[] cell1Val = VAL1.getBytes(StandardCharsets.UTF_8);
final byte[] cell2Fam = FAM2.getBytes(StandardCharsets.UTF_8);
final byte[] cell2Qual = QUAL2.getBytes(StandardCharsets.UTF_8);
final byte[] cell2Val = VAL2.getBytes(StandardCharsets.UTF_8);
final ResultCell cell1 = getResultCell(cell1Fam, cell1Qual, cell1Val, TS1);
final ResultCell cell2 = getResultCell(cell2Fam, cell2Qual, cell2Val, TS2);
cells = new ResultCell[] { cell1, cell2 };
}
@Test
public void testSerializeRegular() throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final RowSerializer rowSerializer = new JsonQualifierAndValueRowSerializer(StandardCharsets.UTF_8, StandardCharsets.UTF_8);
rowSerializer.serialize(rowKey, cells, out);
final String json = out.toString(StandardCharsets.UTF_8.name());
Assert.assertEquals("{\"" + QUAL1 + "\":\"" + VAL1 + "\", \"" + QUAL2 + "\":\"" + VAL2 + "\"}", json);
}
@Test
public void testSerializeWithBase64() throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final RowSerializer rowSerializer = new JsonQualifierAndValueRowSerializer(StandardCharsets.UTF_8, StandardCharsets.UTF_8, true);
rowSerializer.serialize(rowKey, cells, out);
final String qual1Base64 = Base64.encodeBase64String(QUAL1.getBytes(StandardCharsets.UTF_8));
final String val1Base64 = Base64.encodeBase64String(VAL1.getBytes(StandardCharsets.UTF_8));
final String qual2Base64 = Base64.encodeBase64String(QUAL2.getBytes(StandardCharsets.UTF_8));
final String val2Base64 = Base64.encodeBase64String(VAL2.getBytes(StandardCharsets.UTF_8));
final String json = out.toString(StandardCharsets.UTF_8.name());
Assert.assertEquals("{\"" + qual1Base64 + "\":\"" + val1Base64 + "\", \"" + qual2Base64 + "\":\"" + val2Base64 + "\"}", json);
}
private ResultCell getResultCell(byte[] fam, byte[] qual, byte[] val, long timestamp) {
final ResultCell cell = new ResultCell();
cell.setFamilyArray(fam);
cell.setFamilyOffset(0);
cell.setFamilyLength((byte)fam.length);
cell.setQualifierArray(qual);
cell.setQualifierOffset(0);
cell.setQualifierLength(qual.length);
cell.setValueArray(val);
cell.setValueOffset(0);
cell.setValueLength(val.length);
cell.setTimestamp(timestamp);
return cell;
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.io;
import org.apache.nifi.hbase.scan.ResultCell;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class TestJsonRowSerializer {
private final byte[] rowKey = "row1".getBytes(StandardCharsets.UTF_8);
private ResultCell[] cells;
@Before
public void setup() {
final byte[] cell1Fam = "colFam1".getBytes(StandardCharsets.UTF_8);
final byte[] cell1Qual = "colQual1".getBytes(StandardCharsets.UTF_8);
final byte[] cell1Val = "val1".getBytes(StandardCharsets.UTF_8);
final byte[] cell2Fam = "colFam2".getBytes(StandardCharsets.UTF_8);
final byte[] cell2Qual = "colQual2".getBytes(StandardCharsets.UTF_8);
final byte[] cell2Val = "val2".getBytes(StandardCharsets.UTF_8);
final ResultCell cell1 = getResultCell(cell1Fam, cell1Qual, cell1Val);
final ResultCell cell2 = getResultCell(cell2Fam, cell2Qual, cell2Val);
cells = new ResultCell[] { cell1, cell2 };
}
@Test
public void testSerializeRegular() throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final RowSerializer rowSerializer = new JsonRowSerializer(StandardCharsets.UTF_8);
rowSerializer.serialize(rowKey, cells, out);
final String json = out.toString(StandardCharsets.UTF_8.name());
Assert.assertEquals("{\"row\":\"row1\", \"cells\": {\"colFam1:colQual1\":\"val1\", \"colFam2:colQual2\":\"val2\"}}", json);
}
private ResultCell getResultCell(byte[] fam, byte[] qual, byte[] val) {
final ResultCell cell = new ResultCell();
cell.setFamilyArray(fam);
cell.setFamilyOffset(0);
cell.setFamilyLength((byte)fam.length);
cell.setQualifierArray(qual);
cell.setQualifierOffset(0);
cell.setQualifierLength(qual.length);
cell.setValueArray(val);
cell.setValueOffset(0);
cell.setValueLength(val.length);
return cell;
}
}

View File

@ -16,11 +16,11 @@
*/
package org.apache.nifi.hbase.util;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

View File

@ -106,6 +106,18 @@ public interface HBaseClientService extends ControllerService {
*/
void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, ResultHandler handler) throws IOException;
/**
* Scans the given table for the given rowId and passes the result to the handler.
*
* @param tableName the name of an HBase table to scan
* @param startRow the row identifier to start scanning at
* @param endRow the row identifier to end scanning at
* @param columns optional columns to return, if not specified all columns are returned
* @param handler a handler to process rows of the result
* @throws IOException thrown when there are communication errors with HBase
*/
void scan(String tableName, byte[] startRow, byte[] endRow, Collection<Column> columns, ResultHandler handler) throws IOException;
/**
* Converts the given boolean to it's byte representation.
*

View File

@ -331,35 +331,9 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
// convert HBase cells to NiFi cells
final ResultCell[] resultCells = new ResultCell[cells.length];
for (int i=0; i < cells.length; i++) {
final Cell cell = cells[i];
final ResultCell resultCell = new ResultCell();
resultCell.setRowArray(cell.getRowArray());
resultCell.setRowOffset(cell.getRowOffset());
resultCell.setRowLength(cell.getRowLength());
resultCell.setFamilyArray(cell.getFamilyArray());
resultCell.setFamilyOffset(cell.getFamilyOffset());
resultCell.setFamilyLength(cell.getFamilyLength());
resultCell.setQualifierArray(cell.getQualifierArray());
resultCell.setQualifierOffset(cell.getQualifierOffset());
resultCell.setQualifierLength(cell.getQualifierLength());
resultCell.setTimestamp(cell.getTimestamp());
resultCell.setTypeByte(cell.getTypeByte());
resultCell.setSequenceId(cell.getSequenceId());
resultCell.setValueArray(cell.getValueArray());
resultCell.setValueOffset(cell.getValueOffset());
resultCell.setValueLength(cell.getValueLength());
resultCell.setTagsArray(cell.getTagsArray());
resultCell.setTagsOffset(cell.getTagsOffset());
resultCell.setTagsLength(cell.getTagsLength());
final ResultCell resultCell = getResultCell(cell);
resultCells[i] = resultCell;
}
@ -369,6 +343,54 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
}
@Override
public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, final ResultHandler handler)
throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName));
final ResultScanner scanner = getResults(table, startRow, endRow, columns)) {
for (final Result result : scanner) {
final byte[] rowKey = result.getRow();
final Cell[] cells = result.rawCells();
if (cells == null) {
continue;
}
// convert HBase cells to NiFi cells
final ResultCell[] resultCells = new ResultCell[cells.length];
for (int i=0; i < cells.length; i++) {
final Cell cell = cells[i];
final ResultCell resultCell = getResultCell(cell);
resultCells[i] = resultCell;
}
// delegate to the handler
handler.handle(rowKey, resultCells);
}
}
}
// protected and extracted into separate method for testing
protected ResultScanner getResults(final Table table, final byte[] startRow, final byte[] endRow, final Collection<Column> columns) throws IOException {
final Scan scan = new Scan();
scan.setStartRow(startRow);
scan.setStopRow(endRow);
if (columns != null) {
for (Column col : columns) {
if (col.getQualifier() == null) {
scan.addFamily(col.getFamily());
} else {
scan.addColumn(col.getFamily(), col.getQualifier());
}
}
}
return table.getScanner(scan);
}
// protected and extracted into separate method for testing
protected ResultScanner getResults(final Table table, final Collection<Column> columns, final Filter filter, final long minTime) throws IOException {
// Create a new scan. We will set the min timerange as the latest timestamp that
@ -395,6 +417,34 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
return table.getScanner(scan);
}
private ResultCell getResultCell(Cell cell) {
final ResultCell resultCell = new ResultCell();
resultCell.setRowArray(cell.getRowArray());
resultCell.setRowOffset(cell.getRowOffset());
resultCell.setRowLength(cell.getRowLength());
resultCell.setFamilyArray(cell.getFamilyArray());
resultCell.setFamilyOffset(cell.getFamilyOffset());
resultCell.setFamilyLength(cell.getFamilyLength());
resultCell.setQualifierArray(cell.getQualifierArray());
resultCell.setQualifierOffset(cell.getQualifierOffset());
resultCell.setQualifierLength(cell.getQualifierLength());
resultCell.setTimestamp(cell.getTimestamp());
resultCell.setTypeByte(cell.getTypeByte());
resultCell.setSequenceId(cell.getSequenceId());
resultCell.setValueArray(cell.getValueArray());
resultCell.setValueOffset(cell.getValueOffset());
resultCell.setValueLength(cell.getValueLength());
resultCell.setTagsArray(cell.getTagsArray());
resultCell.setTagsOffset(cell.getTagsOffset());
resultCell.setTagsLength(cell.getTagsLength());
return resultCell;
}
static protected class ValidationResources {
private final String configResources;
private final Configuration configuration;