diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java new file mode 100644 index 0000000000..54dcec68c7 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get", "enrich"}) +@CapabilityDescription("Fetches a row from an HBase table. The Destination property controls whether the cells are added as flow file attributes, " + + "or the row is written to the flow file content as JSON. This processor may be used to fetch a fixed row on a interval by specifying the " + + "table and row id directly in the processor, or it may be used to dynamically fetch rows by referencing the table and row id from " + + "incoming flow files.") +@WritesAttributes({ + @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), + @WritesAttribute(attribute = "hbase.row", description = "A JSON document representing the row. This property is only written when a Destination of flowfile-attributes is selected."), + @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise") +}) +public class FetchHBaseRow extends AbstractProcessor { + + static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*"); + + static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("HBase Client Service") + .description("Specifies the Controller Service to use for accessing HBase.") + .required(true) + .identifiesControllerService(HBaseClientService.class) + .build(); + + static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .description("The name of the HBase Table to fetch from.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor ROW_ID = new PropertyDescriptor.Builder() + .name("Row Identifier") + .description("The identifier of the row to fetch.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor COLUMNS = new PropertyDescriptor.Builder() + .name("Columns") + .description("An optional comma-separated list of \":\" pairs to fetch. To return all columns " + + "for a given family, leave off the qualifier such as \",\".") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN)) + .build(); + + static final AllowableValue DESTINATION_ATTRIBUTES = new AllowableValue("flowfile-attributes", "flowfile-attributes", + "Adds the JSON document representing the row that was fetched as an attribute named hbase.row. " + + "The format of the JSON document is determined by the JSON Format property. " + + "NOTE: Fetching many large rows into attributes may have a negative impact on performance."); + + static final AllowableValue DESTINATION_CONTENT = new AllowableValue("flowfile-content", "flowfile-content", + "Overwrites the FlowFile content with a JSON document representing the row that was fetched. " + + "The format of the JSON document is determined by the JSON Format property."); + + static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .name("Destination") + .description("Indicates whether the row fetched from HBase is written to FlowFile content or FlowFile Attributes.") + .required(true) + .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT) + .defaultValue(DESTINATION_ATTRIBUTES.getValue()) + .build(); + + static final AllowableValue JSON_FORMAT_FULL_ROW = new AllowableValue("full-row", "full-row", + "Creates a JSON document with the format: {\"row\":, \"cells\":[{\"fam\":, \"qual\":, \"val\":, \"ts\":}]}."); + static final AllowableValue JSON_FORMAT_QUALIFIER_AND_VALUE = new AllowableValue("col-qual-and-val", "col-qual-and-val", + "Creates a JSON document with the format: {\"\":\"\", \"\":\"\"."); + + static final PropertyDescriptor JSON_FORMAT = new PropertyDescriptor.Builder() + .name("JSON Format") + .description("Specifies how to represent the HBase row as a JSON document.") + .required(true) + .allowableValues(JSON_FORMAT_FULL_ROW, JSON_FORMAT_QUALIFIER_AND_VALUE) + .defaultValue(JSON_FORMAT_FULL_ROW.getValue()) + .build(); + + static final AllowableValue ENCODING_NONE = new AllowableValue("none", "none", "Creates a String using the bytes of given data and the given Character Set."); + static final AllowableValue ENCODING_BASE64 = new AllowableValue("base64", "base64", "Creates a Base64 encoded String of the given data."); + + static final PropertyDescriptor JSON_VALUE_ENCODING = new PropertyDescriptor.Builder() + .name("JSON Value Encoding") + .description("Specifies how to represent row ids, column families, column qualifiers, and values when stored in FlowFile attributes, or written to JSON.") + .required(true) + .allowableValues(ENCODING_NONE, ENCODING_BASE64) + .defaultValue(ENCODING_NONE.getValue()) + .build(); + + static final PropertyDescriptor DECODE_CHARSET = new PropertyDescriptor.Builder() + .name("Decode Character Set") + .description("The character set used to decode data from HBase.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + static final PropertyDescriptor ENCODE_CHARSET = new PropertyDescriptor.Builder() + .name("Encode Character Set") + .description("The character set used to encode the JSON representation of the row.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All successful fetches are routed to this relationship.") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All failed fetches are routed to this relationship.") + .build(); + static final Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("All fetches where the row id is not found are routed to this relationship.") + .build(); + + static final String HBASE_TABLE_ATTR = "hbase.table"; + static final String HBASE_ROW_ATTR = "hbase.row"; + + static final List properties; + static { + List props = new ArrayList<>(); + props.add(HBASE_CLIENT_SERVICE); + props.add(TABLE_NAME); + props.add(ROW_ID); + props.add(COLUMNS); + props.add(DESTINATION); + props.add(JSON_FORMAT); + props.add(JSON_VALUE_ENCODING); + props.add(ENCODE_CHARSET); + props.add(DECODE_CHARSET); + properties = Collections.unmodifiableList(props); + } + + static final Set relationships; + static { + Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + rels.add(REL_NOT_FOUND); + relationships = Collections.unmodifiableSet(rels); + } + + private volatile Charset decodeCharset; + private volatile Charset encodeCharset; + private volatile RowSerializer regularRowSerializer; + private volatile RowSerializer base64RowSerializer; + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return relationships; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + this.decodeCharset = Charset.forName(context.getProperty(DECODE_CHARSET).getValue()); + this.encodeCharset = Charset.forName(context.getProperty(ENCODE_CHARSET).getValue()); + + final String jsonFormat = context.getProperty(JSON_FORMAT).getValue(); + if (jsonFormat.equals(JSON_FORMAT_FULL_ROW.getValue())) { + this.regularRowSerializer = new JsonFullRowSerializer(decodeCharset, encodeCharset); + this.base64RowSerializer = new JsonFullRowSerializer(decodeCharset, encodeCharset, true); + } else { + this.regularRowSerializer = new JsonQualifierAndValueRowSerializer(decodeCharset, encodeCharset); + this.base64RowSerializer = new JsonQualifierAndValueRowSerializer(decodeCharset, encodeCharset, true); + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isBlank(tableName)) { + getLogger().error("Table Name is blank or null for {}, transferring to failure", new Object[] {flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final String rowId = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isBlank(rowId)) { + getLogger().error("Row Identifier is blank or null for {}, transferring to failure", new Object[] {flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final List columns = getColumns(context.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue()); + final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + final String destination = context.getProperty(DESTINATION).getValue(); + final boolean base64Encode = context.getProperty(JSON_VALUE_ENCODING).getValue().equals(ENCODING_BASE64.getValue()); + + final RowSerializer rowSerializer = base64Encode ? base64RowSerializer : regularRowSerializer; + + final FetchHBaseRowHandler handler = destination.equals(DESTINATION_CONTENT.getValue()) + ? new FlowFileContentHandler(flowFile, session, rowSerializer) : new FlowFileAttributeHandler(flowFile, session, rowSerializer); + + final byte[] rowIdBytes = rowId.getBytes(StandardCharsets.UTF_8); + + try { + hBaseClientService.scan(tableName, rowIdBytes, rowIdBytes, columns, handler); + } catch (Exception e) { + getLogger().error("Unable to fetch row {} from {} due to {}", new Object[] {rowId, tableName, e}); + session.transfer(handler.getFlowFile(), REL_FAILURE); + return; + } + + FlowFile handlerFlowFile = handler.getFlowFile(); + if (!handler.handledRow()) { + getLogger().error("Row {} not found in {}, transferring to not found", new Object[] {rowId, tableName}); + session.transfer(handlerFlowFile, REL_NOT_FOUND); + return; + } + + if (getLogger().isDebugEnabled()) { + getLogger().debug("Fetched {} from {} with row id {}", new Object[]{handlerFlowFile, tableName, rowId}); + } + + final Map attributes = new HashMap<>(); + attributes.put(HBASE_TABLE_ATTR, tableName); + if (destination.equals(DESTINATION_CONTENT.getValue())) { + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + } + + handlerFlowFile = session.putAllAttributes(handlerFlowFile, attributes); + + final String transitUri = "hbase://" + tableName + "/" + rowId; + if (destination.equals(DESTINATION_CONTENT.getValue())) { + session.getProvenanceReporter().fetch(handlerFlowFile, transitUri); + } else { + session.getProvenanceReporter().modifyAttributes(handlerFlowFile, "Added attributes to FlowFile from " + transitUri); + } + + session.transfer(handlerFlowFile, REL_SUCCESS); + } + + /** + * @param columnsValue a String in the form colFam:colQual,colFam:colQual + * @return a list of Columns based on parsing the given String + */ + private List getColumns(final String columnsValue) { + final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(",")); + + List columnsList = new ArrayList<>(columns.length); + + for (final String column : columns) { + if (column.contains(":")) { + final String[] parts = column.split(":"); + final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8); + final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8); + columnsList.add(new Column(cf, cq)); + } else { + final byte[] cf = column.getBytes(StandardCharsets.UTF_8); + columnsList.add(new Column(cf, null)); + } + } + + return columnsList; + } + + /** + * A ResultHandler that also provides access to a resulting FlowFile reference. + */ + private interface FetchHBaseRowHandler extends ResultHandler { + + /** + * @return returns the flow file reference that was used by this handler + */ + FlowFile getFlowFile(); + + /** + * @return returns true if this handler handled a row + */ + boolean handledRow(); + + } + + /** + * A FetchHBaseRowHandler that writes the resulting row to the FlowFile content. + */ + private static class FlowFileContentHandler implements FetchHBaseRowHandler { + + private FlowFile flowFile; + private final ProcessSession session; + private final RowSerializer serializer; + private boolean handledRow = false; + + public FlowFileContentHandler(final FlowFile flowFile, final ProcessSession session, final RowSerializer serializer) { + this.flowFile = flowFile; + this.session = session; + this.serializer = serializer; + } + + @Override + public void handle(byte[] row, ResultCell[] resultCells) { + flowFile = session.write(flowFile, (out) -> { + serializer.serialize(row, resultCells, out); + }); + handledRow = true; + } + + @Override + public FlowFile getFlowFile() { + return flowFile; + } + + @Override + public boolean handledRow() { + return handledRow; + } + } + + /** + * A FetchHBaseRowHandler that writes the resulting row to FlowFile attributes. + */ + private static class FlowFileAttributeHandler implements FetchHBaseRowHandler { + + private FlowFile flowFile; + private final ProcessSession session; + private final RowSerializer rowSerializer; + private boolean handledRow = false; + + public FlowFileAttributeHandler(final FlowFile flowFile, final ProcessSession session, final RowSerializer serializer) { + this.flowFile = flowFile; + this.session = session; + this.rowSerializer = serializer; + } + + @Override + public void handle(byte[] row, ResultCell[] resultCells) { + final String serializedRow = rowSerializer.serialize(row, resultCells); + flowFile = session.putAttribute(flowFile, HBASE_ROW_ATTR, serializedRow); + handledRow = true; + } + + @Override + public FlowFile getFlowFile() { + return flowFile; + } + + @Override + public boolean handledRow() { + return handledRow; + } + + } +} diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonFullRowSerializer.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonFullRowSerializer.java new file mode 100644 index 0000000000..837f14d220 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonFullRowSerializer.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase.io; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.util.RowSerializerUtil; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; + +/** + * Serializes a row from HBase to a JSON document of the form: + * + * { + * "row" : "row1", + * "cells": [ + * { + * "family" : "fam1", + * "qualifier" : "qual1" + * "value" : "val1" + * "timestamp" : 123456789 + * }, + * { + * "family" : "fam1", + * "qualifier" : "qual2" + * "value" : "val2" + * "timestamp" : 123456789 + * } + * ] + * } + * + * If base64encode is true, the row id, family, qualifier, and value will be represented as base 64 encoded strings. + */ +public class JsonFullRowSerializer implements RowSerializer { + + private final Charset decodeCharset; + private final Charset encodeCharset; + private final boolean base64encode; + + public JsonFullRowSerializer(final Charset decodeCharset, final Charset encodeCharset) { + this(decodeCharset, encodeCharset, false); + } + + public JsonFullRowSerializer(final Charset decodeCharset, final Charset encodeCharset, final boolean base64encode) { + this.decodeCharset = decodeCharset; + this.encodeCharset = encodeCharset; + this.base64encode = base64encode; + } + + @Override + public String serialize(byte[] rowKey, ResultCell[] cells) { + final String rowId = RowSerializerUtil.getRowId(rowKey, decodeCharset, base64encode); + + final StringBuilder jsonBuilder = new StringBuilder(); + jsonBuilder.append("{"); + + jsonBuilder.append("\"row\":"); + appendString(jsonBuilder, rowId, base64encode); + + jsonBuilder.append(", \"cells\": ["); + int i = 0; + for (final ResultCell cell : cells) { + final String cellFamily = RowSerializerUtil.getCellFamily(cell, decodeCharset, base64encode); + final String cellQualifier = RowSerializerUtil.getCellQualifier(cell, decodeCharset, base64encode); + final String cellValue = RowSerializerUtil.getCellValue(cell, decodeCharset, base64encode); + + if (i > 0) { + jsonBuilder.append(", "); + } + + // start cell + jsonBuilder.append("{"); + + jsonBuilder.append("\"fam\":"); + appendString(jsonBuilder, cellFamily, base64encode); + + jsonBuilder.append(",\"qual\":"); + appendString(jsonBuilder, cellQualifier, base64encode); + + jsonBuilder.append(",\"val\":"); + appendString(jsonBuilder, cellValue, base64encode); + + jsonBuilder.append(",\"ts\":"); + jsonBuilder.append(String.valueOf(cell.getTimestamp())); + + // end cell + jsonBuilder.append("}"); + i++; + } + + // end cell array + jsonBuilder.append("]"); + + // end overall document + jsonBuilder.append("}"); + return jsonBuilder.toString(); + } + + @Override + public void serialize(final byte[] rowKey, final ResultCell[] cells, final OutputStream out) throws IOException { + final String json = serialize(rowKey, cells); + out.write(json.getBytes(encodeCharset)); + } + + private void appendString(final StringBuilder jsonBuilder, final String str, final boolean base64encode) { + jsonBuilder.append("\""); + + // only escape the value when not doing base64 + if (!base64encode) { + jsonBuilder.append(StringEscapeUtils.escapeJson(str)); + } else { + jsonBuilder.append(str); + } + + jsonBuilder.append("\""); + } + +} diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonQualifierAndValueRowSerializer.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonQualifierAndValueRowSerializer.java new file mode 100644 index 0000000000..0eb18ffbf8 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonQualifierAndValueRowSerializer.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase.io; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.util.RowSerializerUtil; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; + +/** + * Serializes an HBase row to a JSON document of the form: + * + * { + * "qual1" : "val1", + * "qual2" : "val2" + * } + * + * If base64encode is true, the qualifiers and values will be represented as base 64 encoded strings. + */ +public class JsonQualifierAndValueRowSerializer implements RowSerializer { + + private final Charset decodeCharset; + private final Charset encodeCharset; + private final boolean base64encode; + + public JsonQualifierAndValueRowSerializer(final Charset decodeCharset, final Charset encodeCharset) { + this(decodeCharset, encodeCharset, false); + } + + public JsonQualifierAndValueRowSerializer(final Charset decodeCharset, final Charset encodeCharset, final boolean base64encode) { + this.decodeCharset = decodeCharset; + this.encodeCharset = encodeCharset; + this.base64encode = base64encode; + } + + @Override + public String serialize(byte[] rowKey, ResultCell[] cells) { + final StringBuilder jsonBuilder = new StringBuilder(); + jsonBuilder.append("{"); + + int i = 0; + for (final ResultCell cell : cells) { + final String cellQualifier = RowSerializerUtil.getCellQualifier(cell, decodeCharset, base64encode); + final String cellValue = RowSerializerUtil.getCellValue(cell, decodeCharset, base64encode); + + if (i > 0) { + jsonBuilder.append(", "); + } + + appendString(jsonBuilder, cellQualifier, base64encode); + jsonBuilder.append(":"); + appendString(jsonBuilder, cellValue, base64encode); + + i++; + } + + jsonBuilder.append("}"); + return jsonBuilder.toString(); + } + + @Override + public void serialize(final byte[] rowKey, final ResultCell[] cells, final OutputStream out) throws IOException { + final String json = serialize(rowKey, cells); + out.write(json.getBytes(encodeCharset)); + } + + private void appendString(final StringBuilder jsonBuilder, final String str, final boolean base64encode) { + jsonBuilder.append("\""); + + // only escape the value when not doing base64 + if (!base64encode) { + jsonBuilder.append(StringEscapeUtils.escapeJson(str)); + } else { + jsonBuilder.append(str); + } + + jsonBuilder.append("\""); + } + + +} diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonRowSerializer.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonRowSerializer.java index b624853f33..0ea0804b80 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonRowSerializer.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/JsonRowSerializer.java @@ -23,6 +23,18 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; +/** + * Serializes a row from HBase to a JSON document of the form: + * + * { + * "row" : "row1", + * "cells": { + * "fam1:qual1" : "val1", + * "fam1:qual2" : "val2" + * } + * } + * + */ public class JsonRowSerializer implements RowSerializer { private final Charset charset; @@ -32,7 +44,7 @@ public class JsonRowSerializer implements RowSerializer { } @Override - public void serialize(final byte[] rowKey, final ResultCell[] cells, final OutputStream out) throws IOException { + public String serialize(byte[] rowKey, ResultCell[] cells) { final StringBuilder jsonBuilder = new StringBuilder(); jsonBuilder.append("{"); @@ -62,7 +74,12 @@ public class JsonRowSerializer implements RowSerializer { } jsonBuilder.append("}}"); - final String json = jsonBuilder.toString(); + return jsonBuilder.toString(); + } + + @Override + public void serialize(final byte[] rowKey, final ResultCell[] cells, final OutputStream out) throws IOException { + final String json = serialize(rowKey, cells); out.write(json.getBytes(charset)); } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/RowSerializer.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/RowSerializer.java index 292b9b6a85..e4e2c1d654 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/RowSerializer.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/io/RowSerializer.java @@ -32,4 +32,13 @@ public interface RowSerializer { * @throws IOException if unable to serialize the row */ void serialize(byte[] rowKey, ResultCell[] cells, OutputStream out) throws IOException; + + /** + * + * @param rowKey the row key of the row being serialized + * @param cells the cells of the row being serialized + * @return the serialized string representing the row + */ + String serialize(byte[] rowKey, ResultCell[] cells); + } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/ObjectSerDe.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/ObjectSerDe.java index 9c6e329bd4..feace95664 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/ObjectSerDe.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/ObjectSerDe.java @@ -20,9 +20,9 @@ import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.exception.DeserializationException; import org.apache.nifi.distributed.cache.client.exception.SerializationException; -import org.apache.nifi.stream.io.ByteArrayInputStream; -import org.apache.nifi.stream.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -37,7 +37,7 @@ public class ObjectSerDe implements Serializer, Deserializer { } try (final ByteArrayInputStream in = new ByteArrayInputStream(input); - final ObjectInputStream objIn = new ObjectInputStream(in)) { + final ObjectInputStream objIn = new ObjectInputStream(in)) { return objIn.readObject(); } catch (ClassNotFoundException e) { throw new DeserializationException("Could not deserialize object due to ClassNotFoundException", e); @@ -47,7 +47,7 @@ public class ObjectSerDe implements Serializer, Deserializer { @Override public void serialize(Object value, OutputStream output) throws SerializationException, IOException { try (final ByteArrayOutputStream bOut = new ByteArrayOutputStream(); - final ObjectOutputStream objOut = new ObjectOutputStream(bOut)) { + final ObjectOutputStream objOut = new ObjectOutputStream(bOut)) { objOut.writeObject(value); output.write(bOut.toByteArray()); } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/RowSerializerUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/RowSerializerUtil.java new file mode 100644 index 0000000000..0e93c99c15 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/RowSerializerUtil.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase.util; + +import org.apache.nifi.hbase.scan.ResultCell; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +public class RowSerializerUtil { + + /** + * @param rowId the row id to get the string from + * @param charset the charset that was used to encode the cell's row + * @param base64encodeValues whether or not to base64 encode the returned string + * + * @return the String representation of the cell's row + */ + public static String getRowId(final byte[] rowId, final Charset charset, final boolean base64encodeValues) { + if (base64encodeValues) { + ByteBuffer cellRowBuffer = ByteBuffer.wrap(rowId); + ByteBuffer base64Buffer = Base64.getEncoder().encode(cellRowBuffer); + return new String(base64Buffer.array(), StandardCharsets.UTF_8); + } else { + return new String(rowId, charset); + } + } + + /** + * @param cell the cell to get the family from + * @param charset the charset that was used to encode the cell's family + * @param base64encodeValues whether or not to base64 encode the returned string + * + * @return the String representation of the cell's family + */ + public static String getCellFamily(final ResultCell cell, final Charset charset, final boolean base64encodeValues) { + if (base64encodeValues) { + ByteBuffer cellFamilyBuffer = ByteBuffer.wrap(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); + ByteBuffer base64Buffer = Base64.getEncoder().encode(cellFamilyBuffer); + return new String(base64Buffer.array(), StandardCharsets.UTF_8); + } else { + return new String(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), charset); + } + } + + /** + * @param cell the cell to get the qualifier from + * @param charset the charset that was used to encode the cell's qualifier + * @param base64encodeValues whether or not to base64 encode the returned string + * + * @return the String representation of the cell's qualifier + */ + public static String getCellQualifier(final ResultCell cell, final Charset charset, final boolean base64encodeValues) { + if (base64encodeValues) { + ByteBuffer cellQualifierBuffer = ByteBuffer.wrap(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + ByteBuffer base64Buffer = Base64.getEncoder().encode(cellQualifierBuffer); + return new String(base64Buffer.array(), StandardCharsets.UTF_8); + } else { + return new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), charset); + } + } + + /** + * @param cell the cell to get the value from + * @param charset the charset that was used to encode the cell's value + * @param base64encodeValues whether or not to base64 encode the returned string + * + * @return the String representation of the cell's value + */ + public static String getCellValue(final ResultCell cell, final Charset charset, final boolean base64encodeValues) { + if (base64encodeValues) { + ByteBuffer cellValueBuffer = ByteBuffer.wrap(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + ByteBuffer base64Buffer = Base64.getEncoder().encode(cellValueBuffer); + return new String(base64Buffer.array(), StandardCharsets.UTF_8); + } else { + return new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), charset); + } + } + +} diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 6e2af81381..8af8cd551b 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -15,4 +15,5 @@ org.apache.nifi.hbase.GetHBase org.apache.nifi.hbase.PutHBaseCell -org.apache.nifi.hbase.PutHBaseJSON \ No newline at end of file +org.apache.nifi.hbase.PutHBaseJSON +org.apache.nifi.hbase.FetchHBaseRow \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index 71304e5d5a..1056f580b1 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -28,6 +28,7 @@ import org.apache.nifi.hbase.scan.ResultHandler; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -38,6 +39,7 @@ public class MockHBaseClientService extends AbstractControllerService implements private Map results = new HashMap<>(); private Map> flowFilePuts = new HashMap<>(); private boolean throwException = false; + private int numScans = 0; @Override public void put(String tableName, Collection puts) throws IOException { @@ -49,10 +51,44 @@ public class MockHBaseClientService extends AbstractControllerService implements } @Override - public void put(String tableName, byte[] rowId, Collection columns) throws IOException { + public void put(String tableName, byte[] startRow, Collection columns) throws IOException { throw new UnsupportedOperationException(); } + @Override + public void scan(String tableName, byte[] startRow, byte[] endRow, Collection columns, ResultHandler handler) throws IOException { + if (throwException) { + throw new IOException("exception"); + } + + for (final Map.Entry entry : results.entrySet()) { + + List matchedCells = new ArrayList<>(); + + if (columns == null || columns.isEmpty()) { + Arrays.stream(entry.getValue()).forEach(e -> matchedCells.add(e)); + } else { + for (Column column : columns) { + String colFam = new String(column.getFamily(), StandardCharsets.UTF_8); + String colQual = new String(column.getQualifier(), StandardCharsets.UTF_8); + + for (ResultCell cell : entry.getValue()) { + String cellFam = new String(cell.getFamilyArray(), StandardCharsets.UTF_8); + String cellQual = new String(cell.getQualifierArray(), StandardCharsets.UTF_8); + + if (colFam.equals(cellFam) && colQual.equals(cellQual)) { + matchedCells.add(cell); + } + } + } + } + + handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8), matchedCells.toArray(new ResultCell[matchedCells.size()])); + } + + numScans++; + } + @Override public void scan(String tableName, Collection columns, String filterExpression, long minTime, ResultHandler handler) throws IOException { if (throwException) { @@ -63,6 +99,8 @@ public class MockHBaseClientService extends AbstractControllerService implements for (final Map.Entry entry : results.entrySet()) { handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue()); } + + numScans++; } public void addResult(final String rowKey, final Map cells, final long timestamp) { @@ -108,6 +146,10 @@ public class MockHBaseClientService extends AbstractControllerService implements this.throwException = throwException; } + public int getNumScans() { + return numScans; + } + @Override public byte[] toBytes(final boolean b) { return new byte[] { b ? (byte) -1 : (byte) 0 }; diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java new file mode 100644 index 0000000000..b44c6b094f --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.codec.binary.Base64; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class TestFetchHBaseRow { + + private FetchHBaseRow proc; + private MockHBaseClientService hBaseClientService; + private TestRunner runner; + + @Before + public void setup() throws InitializationException { + proc = new FetchHBaseRow(); + runner = TestRunners.newTestRunner(proc); + + hBaseClientService = new MockHBaseClientService(); + runner.addControllerService("hbaseClient", hBaseClientService); + runner.enableControllerService(hBaseClientService); + runner.setProperty(FetchHBaseRow.HBASE_CLIENT_SERVICE, "hbaseClient"); + } + + @Test + public void testColumnsValidation() { + runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); + runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + runner.assertValid(); + + runner.setProperty(FetchHBaseRow.COLUMNS, "cf1:cq1"); + runner.assertValid(); + + runner.setProperty(FetchHBaseRow.COLUMNS, "cf1"); + runner.assertValid(); + + runner.setProperty(FetchHBaseRow.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3"); + runner.assertValid(); + + runner.setProperty(FetchHBaseRow.COLUMNS, "cf1,cf2:cq1,cf3"); + runner.assertValid(); + + runner.setProperty(FetchHBaseRow.COLUMNS, "cf1 cf2,cf3"); + runner.assertNotValid(); + + runner.setProperty(FetchHBaseRow.COLUMNS, "cf1:,cf2,cf3"); + runner.assertNotValid(); + + runner.setProperty(FetchHBaseRow.COLUMNS, "cf1:cq1,"); + runner.assertNotValid(); + } + + @Test + public void testNoIncomingFlowFile() { + runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); + runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + + runner.run(); + runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0); + runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 0); + runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0); + + Assert.assertEquals(0, hBaseClientService.getNumScans()); + } + + @Test + public void testInvalidTableName() { + runner.setProperty(FetchHBaseRow.TABLE_NAME, "${hbase.table}"); + runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 1); + runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 0); + runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0); + + Assert.assertEquals(0, hBaseClientService.getNumScans()); + } + + @Test + public void testInvalidRowId() { + runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); + runner.setProperty(FetchHBaseRow.ROW_ID, "${hbase.row}"); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 1); + runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 0); + runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0); + + Assert.assertEquals(0, hBaseClientService.getNumScans()); + } + + @Test + public void testFetchToAttributesWithStringValues() { + final Map cells = new HashMap<>(); + cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + final long ts1 = 123456789; + hBaseClientService.addResult("row1", cells, ts1); + + runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); + runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_ATTRIBUTES); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0); + runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1); + runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(FetchHBaseRow.HBASE_ROW_ATTR, + "{\"row\":\"row1\", \"cells\": [" + + "{\"fam\":\"nifi\",\"qual\":\"cq1\",\"val\":\"val1\",\"ts\":" + ts1 + "}, " + + "{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}"); + + Assert.assertEquals(1, hBaseClientService.getNumScans()); + } + + @Test + public void testFetchSpecificColumnsToAttributesWithStringValues() { + final Map cells = new HashMap<>(); + cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + final long ts1 = 123456789; + hBaseClientService.addResult("row1", cells, ts1); + + runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); + runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + runner.setProperty(FetchHBaseRow.COLUMNS, "nifi:cq2"); + runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_ATTRIBUTES); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0); + runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1); + runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(FetchHBaseRow.HBASE_ROW_ATTR, + "{\"row\":\"row1\", \"cells\": [{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}"); + + Assert.assertEquals(1, hBaseClientService.getNumScans()); + } + + @Test + public void testFetchToAttributesWithBase64Values() { + final Map cells = new HashMap<>(); + cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + final long ts1 = 123456789; + hBaseClientService.addResult("row1", cells, ts1); + + runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); + runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_ATTRIBUTES); + runner.setProperty(FetchHBaseRow.JSON_VALUE_ENCODING, FetchHBaseRow.ENCODING_BASE64); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0); + runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1); + runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0); + + final String rowBase64 = Base64.encodeBase64String("row1".getBytes(StandardCharsets.UTF_8)); + + final String fam1Base64 = Base64.encodeBase64String("nifi".getBytes(StandardCharsets.UTF_8)); + final String qual1Base64 = Base64.encodeBase64String("cq1".getBytes(StandardCharsets.UTF_8)); + final String val1Base64 = Base64.encodeBase64String("val1".getBytes(StandardCharsets.UTF_8)); + + final String fam2Base64 = Base64.encodeBase64String("nifi".getBytes(StandardCharsets.UTF_8)); + final String qual2Base64 = Base64.encodeBase64String("cq2".getBytes(StandardCharsets.UTF_8)); + final String val2Base64 = Base64.encodeBase64String("val2".getBytes(StandardCharsets.UTF_8)); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(FetchHBaseRow.HBASE_ROW_ATTR, + "{\"row\":\"" + rowBase64 + "\", \"cells\": [" + + "{\"fam\":\"" + fam1Base64 + "\",\"qual\":\"" + qual1Base64 + "\",\"val\":\"" + val1Base64 + "\",\"ts\":" + ts1 + "}, " + + "{\"fam\":\"" + fam2Base64 + "\",\"qual\":\"" + qual2Base64 + "\",\"val\":\"" + val2Base64 + "\",\"ts\":" + ts1 + "}]}"); + + Assert.assertEquals(1, hBaseClientService.getNumScans()); + } + + @Test + public void testFetchToAttributesNoResults() { + runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); + runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_ATTRIBUTES); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0); + runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 0); + runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 1); + + Assert.assertEquals(1, hBaseClientService.getNumScans()); + } + + @Test + public void testFetchToContentWithStringValues() { + final Map cells = new HashMap<>(); + cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + final long ts1 = 123456789; + hBaseClientService.addResult("row1", cells, ts1); + + runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); + runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_CONTENT); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0); + runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1); + runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0); + flowFile.assertContentEquals("{\"row\":\"row1\", \"cells\": [" + + "{\"fam\":\"nifi\",\"qual\":\"cq1\",\"val\":\"val1\",\"ts\":" + ts1 + "}, " + + "{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}"); + + Assert.assertEquals(1, hBaseClientService.getNumScans()); + } + + @Test + public void testFetchSpecificColumnsToContentWithStringValues() { + final Map cells = new HashMap<>(); + cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + final long ts1 = 123456789; + hBaseClientService.addResult("row1", cells, ts1); + + runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); + runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_CONTENT); + runner.setProperty(FetchHBaseRow.COLUMNS, "nifi:cq2"); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0); + runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1); + runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0); + flowFile.assertContentEquals("{\"row\":\"row1\", \"cells\": [{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}"); + + Assert.assertEquals(1, hBaseClientService.getNumScans()); + } + + @Test + public void testFetchSpecificColumnsToContentWithBase64() { + final Map cells = new HashMap<>(); + cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + final long ts1 = 123456789; + hBaseClientService.addResult("row1", cells, ts1); + + runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); + runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_CONTENT); + runner.setProperty(FetchHBaseRow.JSON_VALUE_ENCODING, FetchHBaseRow.ENCODING_BASE64); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0); + runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1); + runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0); + + final String rowBase64 = Base64.encodeBase64String("row1".getBytes(StandardCharsets.UTF_8)); + + final String fam1Base64 = Base64.encodeBase64String("nifi".getBytes(StandardCharsets.UTF_8)); + final String qual1Base64 = Base64.encodeBase64String("cq1".getBytes(StandardCharsets.UTF_8)); + final String val1Base64 = Base64.encodeBase64String("val1".getBytes(StandardCharsets.UTF_8)); + + final String fam2Base64 = Base64.encodeBase64String("nifi".getBytes(StandardCharsets.UTF_8)); + final String qual2Base64 = Base64.encodeBase64String("cq2".getBytes(StandardCharsets.UTF_8)); + final String val2Base64 = Base64.encodeBase64String("val2".getBytes(StandardCharsets.UTF_8)); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0); + flowFile.assertContentEquals("{\"row\":\"" + rowBase64 + "\", \"cells\": [" + + "{\"fam\":\"" + fam1Base64 + "\",\"qual\":\"" + qual1Base64 + "\",\"val\":\"" + val1Base64 + "\",\"ts\":" + ts1 + "}, " + + "{\"fam\":\"" + fam2Base64 + "\",\"qual\":\"" + qual2Base64 + "\",\"val\":\"" + val2Base64 + "\",\"ts\":" + ts1 + "}]}"); + + Assert.assertEquals(1, hBaseClientService.getNumScans()); + } + + @Test + public void testFetchToContentWithQualifierAndValueJSON() { + final Map cells = new HashMap<>(); + cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + hBaseClientService.addResult("row1", cells, System.currentTimeMillis()); + + runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); + runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_CONTENT); + runner.setProperty(FetchHBaseRow.JSON_FORMAT, FetchHBaseRow.JSON_FORMAT_QUALIFIER_AND_VALUE); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0); + runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1); + runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0); + flowFile.assertContentEquals("{\"cq1\":\"val1\", \"cq2\":\"val2\"}"); + + Assert.assertEquals(1, hBaseClientService.getNumScans()); + } + + @Test + public void testFetchWithExpressionLanguage() { + final Map cells = new HashMap<>(); + cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + final long ts1 = 123456789; + hBaseClientService.addResult("row1", cells, ts1); + + runner.setProperty(FetchHBaseRow.TABLE_NAME, "${hbase.table}"); + runner.setProperty(FetchHBaseRow.ROW_ID, "${hbase.row}"); + runner.setProperty(FetchHBaseRow.COLUMNS, "${hbase.cols}"); + runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_CONTENT); + + final Map attributes = new HashMap<>(); + attributes.put("hbase.table", "table1"); + attributes.put("hbase.row", "row1"); + attributes.put("hbase.cols", "nifi:cq2"); + + runner.enqueue("trigger flow file", attributes); + runner.run(); + + runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0); + runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 1); + runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchHBaseRow.REL_SUCCESS).get(0); + flowFile.assertContentEquals("{\"row\":\"row1\", \"cells\": [{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}"); + + Assert.assertEquals(1, hBaseClientService.getNumScans()); + } + + @Test + public void testFetchWhenScanThrowsException() { + hBaseClientService.setThrowException(true); + + runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1"); + runner.setProperty(FetchHBaseRow.ROW_ID, "row1"); + runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_ATTRIBUTES); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 1); + runner.assertTransferCount(FetchHBaseRow.REL_SUCCESS, 0); + runner.assertTransferCount(FetchHBaseRow.REL_NOT_FOUND, 0); + + Assert.assertEquals(0, hBaseClientService.getNumScans()); + } + +} diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/io/TestJsonFullRowSerializer.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/io/TestJsonFullRowSerializer.java new file mode 100644 index 0000000000..f613532768 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/io/TestJsonFullRowSerializer.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase.io; + +import org.apache.commons.codec.binary.Base64; +import org.apache.nifi.hbase.scan.ResultCell; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class TestJsonFullRowSerializer { + + static final String ROW = "row1"; + + static final String FAM1 = "colFam1"; + static final String QUAL1 = "colQual1"; + static final String VAL1 = "val1"; + static final long TS1 = 1111111111; + + static final String FAM2 = "colFam2"; + static final String QUAL2 = "colQual2"; + static final String VAL2 = "val2"; + static final long TS2 = 222222222; + + private final byte[] rowKey = ROW.getBytes(StandardCharsets.UTF_8); + private ResultCell[] cells; + + @Before + public void setup() { + final byte[] cell1Fam = FAM1.getBytes(StandardCharsets.UTF_8); + final byte[] cell1Qual = QUAL1.getBytes(StandardCharsets.UTF_8); + final byte[] cell1Val = VAL1.getBytes(StandardCharsets.UTF_8); + + final byte[] cell2Fam = FAM2.getBytes(StandardCharsets.UTF_8); + final byte[] cell2Qual = QUAL2.getBytes(StandardCharsets.UTF_8); + final byte[] cell2Val = VAL2.getBytes(StandardCharsets.UTF_8); + + final ResultCell cell1 = getResultCell(cell1Fam, cell1Qual, cell1Val, TS1); + final ResultCell cell2 = getResultCell(cell2Fam, cell2Qual, cell2Val, TS2); + + cells = new ResultCell[] { cell1, cell2 }; + } + + @Test + public void testSerializeRegular() throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final RowSerializer rowSerializer = new JsonFullRowSerializer(StandardCharsets.UTF_8, StandardCharsets.UTF_8); + rowSerializer.serialize(rowKey, cells, out); + + final String json = out.toString(StandardCharsets.UTF_8.name()); + Assert.assertEquals("{\"row\":\"row1\", \"cells\": [" + + "{\"fam\":\"" + FAM1 + "\",\"qual\":\"" + QUAL1 + "\",\"val\":\"" + VAL1 + "\",\"ts\":" + TS1 + "}, " + + "{\"fam\":\"" + FAM2 + "\",\"qual\":\"" + QUAL2 + "\",\"val\":\"" + VAL2 + "\",\"ts\":" + TS2 + "}]}", + json); + } + + @Test + public void testSerializeWithBase64() throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final RowSerializer rowSerializer = new JsonFullRowSerializer(StandardCharsets.UTF_8, StandardCharsets.UTF_8, true); + rowSerializer.serialize(rowKey, cells, out); + + final String rowBase64 = Base64.encodeBase64String(ROW.getBytes(StandardCharsets.UTF_8)); + + final String fam1Base64 = Base64.encodeBase64String(FAM1.getBytes(StandardCharsets.UTF_8)); + final String qual1Base64 = Base64.encodeBase64String(QUAL1.getBytes(StandardCharsets.UTF_8)); + final String val1Base64 = Base64.encodeBase64String(VAL1.getBytes(StandardCharsets.UTF_8)); + + final String fam2Base64 = Base64.encodeBase64String(FAM2.getBytes(StandardCharsets.UTF_8)); + final String qual2Base64 = Base64.encodeBase64String(QUAL2.getBytes(StandardCharsets.UTF_8)); + final String val2Base64 = Base64.encodeBase64String(VAL2.getBytes(StandardCharsets.UTF_8)); + + final String json = out.toString(StandardCharsets.UTF_8.name()); + Assert.assertEquals("{\"row\":\"" + rowBase64 + "\", \"cells\": [" + + "{\"fam\":\"" + fam1Base64 + "\",\"qual\":\"" + qual1Base64 + "\",\"val\":\"" + val1Base64 + "\",\"ts\":" + TS1 + "}, " + + "{\"fam\":\"" + fam2Base64 + "\",\"qual\":\"" + qual2Base64 + "\",\"val\":\"" + val2Base64 + "\",\"ts\":" + TS2 + "}]}", json); + } + + private ResultCell getResultCell(byte[] fam, byte[] qual, byte[] val, long timestamp) { + final ResultCell cell = new ResultCell(); + + cell.setFamilyArray(fam); + cell.setFamilyOffset(0); + cell.setFamilyLength((byte)fam.length); + + cell.setQualifierArray(qual); + cell.setQualifierOffset(0); + cell.setQualifierLength(qual.length); + + cell.setValueArray(val); + cell.setValueOffset(0); + cell.setValueLength(val.length); + + cell.setTimestamp(timestamp); + + return cell; + } + +} diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/io/TestJsonQualifierAndValueRowSerializer.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/io/TestJsonQualifierAndValueRowSerializer.java new file mode 100644 index 0000000000..c13ed536dc --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/io/TestJsonQualifierAndValueRowSerializer.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase.io; + +import org.apache.commons.codec.binary.Base64; +import org.apache.nifi.hbase.scan.ResultCell; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class TestJsonQualifierAndValueRowSerializer { + + static final String ROW = "row1"; + + static final String FAM1 = "colFam1"; + static final String QUAL1 = "colQual1"; + static final String VAL1 = "val1"; + static final long TS1 = 1111111111; + + static final String FAM2 = "colFam2"; + static final String QUAL2 = "colQual2"; + static final String VAL2 = "val2"; + static final long TS2 = 222222222; + + private final byte[] rowKey = ROW.getBytes(StandardCharsets.UTF_8); + private ResultCell[] cells; + + @Before + public void setup() { + final byte[] cell1Fam = FAM1.getBytes(StandardCharsets.UTF_8); + final byte[] cell1Qual = QUAL1.getBytes(StandardCharsets.UTF_8); + final byte[] cell1Val = VAL1.getBytes(StandardCharsets.UTF_8); + + final byte[] cell2Fam = FAM2.getBytes(StandardCharsets.UTF_8); + final byte[] cell2Qual = QUAL2.getBytes(StandardCharsets.UTF_8); + final byte[] cell2Val = VAL2.getBytes(StandardCharsets.UTF_8); + + final ResultCell cell1 = getResultCell(cell1Fam, cell1Qual, cell1Val, TS1); + final ResultCell cell2 = getResultCell(cell2Fam, cell2Qual, cell2Val, TS2); + + cells = new ResultCell[] { cell1, cell2 }; + } + + @Test + public void testSerializeRegular() throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final RowSerializer rowSerializer = new JsonQualifierAndValueRowSerializer(StandardCharsets.UTF_8, StandardCharsets.UTF_8); + rowSerializer.serialize(rowKey, cells, out); + + final String json = out.toString(StandardCharsets.UTF_8.name()); + Assert.assertEquals("{\"" + QUAL1 + "\":\"" + VAL1 + "\", \"" + QUAL2 + "\":\"" + VAL2 + "\"}", json); + } + + @Test + public void testSerializeWithBase64() throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final RowSerializer rowSerializer = new JsonQualifierAndValueRowSerializer(StandardCharsets.UTF_8, StandardCharsets.UTF_8, true); + rowSerializer.serialize(rowKey, cells, out); + + final String qual1Base64 = Base64.encodeBase64String(QUAL1.getBytes(StandardCharsets.UTF_8)); + final String val1Base64 = Base64.encodeBase64String(VAL1.getBytes(StandardCharsets.UTF_8)); + + final String qual2Base64 = Base64.encodeBase64String(QUAL2.getBytes(StandardCharsets.UTF_8)); + final String val2Base64 = Base64.encodeBase64String(VAL2.getBytes(StandardCharsets.UTF_8)); + + final String json = out.toString(StandardCharsets.UTF_8.name()); + Assert.assertEquals("{\"" + qual1Base64 + "\":\"" + val1Base64 + "\", \"" + qual2Base64 + "\":\"" + val2Base64 + "\"}", json); + } + + private ResultCell getResultCell(byte[] fam, byte[] qual, byte[] val, long timestamp) { + final ResultCell cell = new ResultCell(); + + cell.setFamilyArray(fam); + cell.setFamilyOffset(0); + cell.setFamilyLength((byte)fam.length); + + cell.setQualifierArray(qual); + cell.setQualifierOffset(0); + cell.setQualifierLength(qual.length); + + cell.setValueArray(val); + cell.setValueOffset(0); + cell.setValueLength(val.length); + + cell.setTimestamp(timestamp); + + return cell; + } + +} diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/io/TestJsonRowSerializer.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/io/TestJsonRowSerializer.java new file mode 100644 index 0000000000..289304e3fe --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/io/TestJsonRowSerializer.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase.io; + +import org.apache.nifi.hbase.scan.ResultCell; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class TestJsonRowSerializer { + + private final byte[] rowKey = "row1".getBytes(StandardCharsets.UTF_8); + private ResultCell[] cells; + + @Before + public void setup() { + final byte[] cell1Fam = "colFam1".getBytes(StandardCharsets.UTF_8); + final byte[] cell1Qual = "colQual1".getBytes(StandardCharsets.UTF_8); + final byte[] cell1Val = "val1".getBytes(StandardCharsets.UTF_8); + + final byte[] cell2Fam = "colFam2".getBytes(StandardCharsets.UTF_8); + final byte[] cell2Qual = "colQual2".getBytes(StandardCharsets.UTF_8); + final byte[] cell2Val = "val2".getBytes(StandardCharsets.UTF_8); + + final ResultCell cell1 = getResultCell(cell1Fam, cell1Qual, cell1Val); + final ResultCell cell2 = getResultCell(cell2Fam, cell2Qual, cell2Val); + + cells = new ResultCell[] { cell1, cell2 }; + } + + @Test + public void testSerializeRegular() throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final RowSerializer rowSerializer = new JsonRowSerializer(StandardCharsets.UTF_8); + rowSerializer.serialize(rowKey, cells, out); + + final String json = out.toString(StandardCharsets.UTF_8.name()); + Assert.assertEquals("{\"row\":\"row1\", \"cells\": {\"colFam1:colQual1\":\"val1\", \"colFam2:colQual2\":\"val2\"}}", json); + } + + private ResultCell getResultCell(byte[] fam, byte[] qual, byte[] val) { + final ResultCell cell = new ResultCell(); + + cell.setFamilyArray(fam); + cell.setFamilyOffset(0); + cell.setFamilyLength((byte)fam.length); + + cell.setQualifierArray(qual); + cell.setQualifierOffset(0); + cell.setQualifierLength(qual.length); + + cell.setValueArray(val); + cell.setValueOffset(0); + cell.setValueLength(val.length); + + return cell; + } + +} diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/util/TestObjectSerDe.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/util/TestObjectSerDe.java index c2badb872e..456452a986 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/util/TestObjectSerDe.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/util/TestObjectSerDe.java @@ -16,11 +16,11 @@ */ package org.apache.nifi.hbase.util; -import org.apache.nifi.stream.io.ByteArrayInputStream; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index 9408b174e4..f7718f6ef0 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -106,6 +106,18 @@ public interface HBaseClientService extends ControllerService { */ void scan(String tableName, Collection columns, String filterExpression, long minTime, ResultHandler handler) throws IOException; + /** + * Scans the given table for the given rowId and passes the result to the handler. + * + * @param tableName the name of an HBase table to scan + * @param startRow the row identifier to start scanning at + * @param endRow the row identifier to end scanning at + * @param columns optional columns to return, if not specified all columns are returned + * @param handler a handler to process rows of the result + * @throws IOException thrown when there are communication errors with HBase + */ + void scan(String tableName, byte[] startRow, byte[] endRow, Collection columns, ResultHandler handler) throws IOException; + /** * Converts the given boolean to it's byte representation. * diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index 4a9fc0efd6..af3776fa50 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -331,35 +331,9 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme // convert HBase cells to NiFi cells final ResultCell[] resultCells = new ResultCell[cells.length]; - for (int i=0; i < cells.length; i++) { final Cell cell = cells[i]; - - final ResultCell resultCell = new ResultCell(); - resultCell.setRowArray(cell.getRowArray()); - resultCell.setRowOffset(cell.getRowOffset()); - resultCell.setRowLength(cell.getRowLength()); - - resultCell.setFamilyArray(cell.getFamilyArray()); - resultCell.setFamilyOffset(cell.getFamilyOffset()); - resultCell.setFamilyLength(cell.getFamilyLength()); - - resultCell.setQualifierArray(cell.getQualifierArray()); - resultCell.setQualifierOffset(cell.getQualifierOffset()); - resultCell.setQualifierLength(cell.getQualifierLength()); - - resultCell.setTimestamp(cell.getTimestamp()); - resultCell.setTypeByte(cell.getTypeByte()); - resultCell.setSequenceId(cell.getSequenceId()); - - resultCell.setValueArray(cell.getValueArray()); - resultCell.setValueOffset(cell.getValueOffset()); - resultCell.setValueLength(cell.getValueLength()); - - resultCell.setTagsArray(cell.getTagsArray()); - resultCell.setTagsOffset(cell.getTagsOffset()); - resultCell.setTagsLength(cell.getTagsLength()); - + final ResultCell resultCell = getResultCell(cell); resultCells[i] = resultCell; } @@ -369,6 +343,54 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } } + @Override + public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection columns, final ResultHandler handler) + throws IOException { + + try (final Table table = connection.getTable(TableName.valueOf(tableName)); + final ResultScanner scanner = getResults(table, startRow, endRow, columns)) { + + for (final Result result : scanner) { + final byte[] rowKey = result.getRow(); + final Cell[] cells = result.rawCells(); + + if (cells == null) { + continue; + } + + // convert HBase cells to NiFi cells + final ResultCell[] resultCells = new ResultCell[cells.length]; + for (int i=0; i < cells.length; i++) { + final Cell cell = cells[i]; + final ResultCell resultCell = getResultCell(cell); + resultCells[i] = resultCell; + } + + // delegate to the handler + handler.handle(rowKey, resultCells); + } + } + } + + // protected and extracted into separate method for testing + protected ResultScanner getResults(final Table table, final byte[] startRow, final byte[] endRow, final Collection columns) throws IOException { + final Scan scan = new Scan(); + scan.setStartRow(startRow); + scan.setStopRow(endRow); + + if (columns != null) { + for (Column col : columns) { + if (col.getQualifier() == null) { + scan.addFamily(col.getFamily()); + } else { + scan.addColumn(col.getFamily(), col.getQualifier()); + } + } + } + + return table.getScanner(scan); + } + // protected and extracted into separate method for testing protected ResultScanner getResults(final Table table, final Collection columns, final Filter filter, final long minTime) throws IOException { // Create a new scan. We will set the min timerange as the latest timestamp that @@ -395,6 +417,34 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme return table.getScanner(scan); } + private ResultCell getResultCell(Cell cell) { + final ResultCell resultCell = new ResultCell(); + resultCell.setRowArray(cell.getRowArray()); + resultCell.setRowOffset(cell.getRowOffset()); + resultCell.setRowLength(cell.getRowLength()); + + resultCell.setFamilyArray(cell.getFamilyArray()); + resultCell.setFamilyOffset(cell.getFamilyOffset()); + resultCell.setFamilyLength(cell.getFamilyLength()); + + resultCell.setQualifierArray(cell.getQualifierArray()); + resultCell.setQualifierOffset(cell.getQualifierOffset()); + resultCell.setQualifierLength(cell.getQualifierLength()); + + resultCell.setTimestamp(cell.getTimestamp()); + resultCell.setTypeByte(cell.getTypeByte()); + resultCell.setSequenceId(cell.getSequenceId()); + + resultCell.setValueArray(cell.getValueArray()); + resultCell.setValueOffset(cell.getValueOffset()); + resultCell.setValueLength(cell.getValueLength()); + + resultCell.setTagsArray(cell.getTagsArray()); + resultCell.setTagsOffset(cell.getTagsOffset()); + resultCell.setTagsLength(cell.getTagsLength()); + return resultCell; + } + static protected class ValidationResources { private final String configResources; private final Configuration configuration;