NIFI-4637 Added support for visibility labels to the HBase processors.

NIFI-4637 Removed integration test and updated the hbase client from 1.1.2 to 1.1.13 which is the final version for 1.1.X

NIFI-4637 Fixed EL support issue w/ tests.

NIFI-4637 Added more documentation to DeleteHBaseCells.

NIFI-4637 changed PutHBaseCell/JSON to use dynamic properties instead of a 'default visibility string.'

NIFI-4637 Added changes requested in a code review.

NIFI-4637 Moved pickVisibilityString to a utility class to make testing easier.

NIFI-4637 Added additionalDetails.html for PutHBaseRecord.

NIFI-4637 Added additional documentation and testing.

NIFI-4637 Added documentation for DeleteHBaseCells.

NIFI-4637 Added pickVisibilityLabel support to PutHBaseRecord and updated documentation to reflect that.

NIFI-4637 Reverted version bump to hbase client.

This closes #2518.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Mike Thomsen 2018-02-22 16:15:00 -05:00 committed by Koji Kawamura
parent 6e0be8e641
commit 0b851910f3
35 changed files with 1392 additions and 215 deletions

View File

@ -51,6 +51,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@ -78,11 +79,17 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>1.7.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-path</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -17,14 +17,9 @@
package org.apache.nifi.hbase;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
@ -38,9 +33,26 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Base class for processors that put data to HBase.
*/
@DynamicProperties({
@DynamicProperty(name = "visibility.<COLUMN FAMILY>", description = "Visibility label for everything under that column family " +
"when a specific label for a particular column qualifier is not available.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
value = "visibility label for <COLUMN FAMILY>"
),
@DynamicProperty(name = "visibility.<COLUMN FAMILY>.<COLUMN QUALIFIER>", description = "Visibility label for the specified column qualifier " +
"qualified by a configured column family.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
value = "visibility label for <COLUMN FAMILY>:<COLUMN QUALIFIER>."
)
})
public abstract class AbstractPutHBase extends AbstractProcessor {
protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
@ -131,6 +143,36 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
clientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
if (propertyDescriptorName.startsWith("visibility.")) {
String[] parts = propertyDescriptorName.split("\\.");
String displayName;
String description;
if (parts.length == 2) {
displayName = String.format("Column Family %s Default Visibility", parts[1]);
description = String.format("Default visibility setting for %s", parts[1]);
} else if (parts.length == 3) {
displayName = String.format("Column Qualifier %s.%s Default Visibility", parts[1], parts[2]);
description = String.format("Default visibility setting for %s.%s", parts[1], parts[2]);
} else {
return null;
}
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.displayName(displayName)
.description(description)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dynamic(true)
.build();
}
return null;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttributes({
@WritesAttribute(attribute = "error.line", description = "The line number of the error."),
@WritesAttribute(attribute = "error.msg", description = "The message explaining the error.")
})
@Tags({"hbase", "delete", "cell", "cells", "visibility"})
@CapabilityDescription("This processor allows the user to delete individual HBase cells by specifying one or more lines " +
"in the flowfile content that are a sequence composed of row ID, column family, column qualifier and associated visibility labels " +
"if visibility labels are enabled and in use. A user-defined separator is used to separate each of these pieces of data on each " +
"line, with :::: being the default separator.")
public class DeleteHBaseCells extends AbstractDeleteHBase {
static final PropertyDescriptor SEPARATOR = new PropertyDescriptor.Builder()
.name("delete-hbase-cell-separator")
.displayName("Separator")
.description("Each line of the flowfile content is separated into components for building a delete using this" +
"separator. It should be something other than a single colon or a comma because these are values that " +
"are associated with columns and visibility labels respectively. To delete a row with ID xyz, column family abc, " +
"column qualifier def and visibility label PII&PHI, one would specify xyz::::abc::::def::::PII&PHI given the default " +
"value")
.required(true)
.defaultValue("::::")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final String ERROR_LINE = "error.line";
static final String ERROR_MSG = "error.msg";
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HBASE_CLIENT_SERVICE);
properties.add(TABLE_NAME);
properties.add(SEPARATOR);
return properties;
}
private FlowFile writeErrorAttributes(int line, String msg, FlowFile file, ProcessSession session) {
file = session.putAttribute(file, ERROR_LINE, String.valueOf(line));
file = session.putAttribute(file, ERROR_MSG, msg != null ? msg : "");
return file;
}
private void logCell(String rowId, String family, String column, String visibility) {
StringBuilder sb = new StringBuilder()
.append("Assembling cell delete for...\t")
.append(String.format("Row ID: %s\t", rowId))
.append(String.format("Column Family: %s\t", family))
.append(String.format("Column Qualifier: %s\t", column))
.append(String.format("Visibility Label: %s", visibility));
getLogger().debug(sb.toString());
}
@Override
protected void doDelete(ProcessContext context, ProcessSession session) throws Exception {
FlowFile input = session.get();
if (input == null) {
return;
}
final String separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(input).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(input).getValue();
List<String> rowKeys = new ArrayList<>();
int lineNum = 1;
try (InputStream is = session.read(input)) {
Scanner scanner = new Scanner(is);
List<DeleteRequest> deletes = new ArrayList<>();
while (scanner.hasNextLine()) {
String line = scanner.nextLine().trim();
if (line.equals("")) {
continue;
}
String[] parts = line.split(separator);
if (parts.length < 3 || parts.length > 4) {
final String msg = String.format("Invalid line length. It must have 3 or 4 components. It had %d.", parts.length);
input = writeErrorAttributes(lineNum, msg, input, session);
session.transfer(input, REL_FAILURE);
getLogger().error(msg);
return;
}
String rowId = parts[0];
String family = parts[1];
String column = parts[2];
String visibility = parts.length == 4 ? parts[3] : null;
DeleteRequest request = new DeleteRequest(rowId.getBytes(), family.getBytes(), column.getBytes(), visibility);
deletes.add(request);
if (!rowKeys.contains(rowId)) {
rowKeys.add(rowId);
}
if (getLogger().isDebugEnabled()) {
logCell(rowId, family, column, visibility);
}
lineNum++;
}
is.close();
clientService.deleteCells(tableName, deletes);
for (int index = 0; index < rowKeys.size(); index++) { //Could be many row keys in one flowfile.
session.getProvenanceReporter().invokeRemoteProcess(input, clientService.toTransitUri(tableName, rowKeys.get(index)));
}
session.transfer(input, REL_SUCCESS);
} catch (Exception ex) {
input = writeErrorAttributes(lineNum, ex.getMessage(), input, session);
session.transfer(input, REL_FAILURE);
}
}
}

View File

@ -104,6 +104,17 @@ public class DeleteHBaseRow extends AbstractDeleteHBase {
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
static final PropertyDescriptor VISIBLITY_LABEL = new PropertyDescriptor.Builder()
.name("delete-visibility-label")
.displayName("Visibility Label")
.description("If visibility labels are enabled, a row cannot be deleted without supplying its visibility label(s) in the delete " +
"request. Note: this visibility label will be applied to all cells within the row that is specified. If some cells have " +
"different visibility labels, they will not be deleted. When that happens, the failure to delete will be considered a success " +
"because HBase does not report it as a failure.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -112,6 +123,7 @@ public class DeleteHBaseRow extends AbstractDeleteHBase {
properties.add(FLOWFILE_FETCH_COUNT);
properties.add(BATCH_SIZE);
properties.add(KEY_SEPARATOR);
properties.add(VISIBLITY_LABEL);
properties.add(CHARSET);
return properties;
@ -128,10 +140,12 @@ public class DeleteHBaseRow extends AbstractDeleteHBase {
if (flowFiles != null && flowFiles.size() > 0) {
for (int index = 0; index < flowFiles.size(); index++) {
FlowFile flowFile = flowFiles.get(index);
final String visibility = context.getProperty(VISIBLITY_LABEL).isSet()
? context.getProperty(VISIBLITY_LABEL).evaluateAttributeExpressions(flowFile).getValue() : null;
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
try {
if (location.equals(ROW_ID_CONTENT.getValue())) {
flowFile = doDeleteFromContent(flowFile, context, session, tableName, batchSize, charset);
flowFile = doDeleteFromContent(flowFile, context, session, tableName, batchSize, charset, visibility);
if (flowFile.getAttribute(RESTART_INDEX) != null) {
session.transfer(flowFile, REL_FAILURE);
} else {
@ -140,7 +154,7 @@ public class DeleteHBaseRow extends AbstractDeleteHBase {
session.getProvenanceReporter().invokeRemoteProcess(flowFile, transitUrl);
}
} else {
String transitUrl = doDeleteFromAttribute(flowFile, context, tableName, charset);
String transitUrl = doDeleteFromAttribute(flowFile, context, tableName, charset, visibility);
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, transitUrl);
}
@ -152,14 +166,14 @@ public class DeleteHBaseRow extends AbstractDeleteHBase {
}
}
private String doDeleteFromAttribute(FlowFile flowFile, ProcessContext context, String tableName, String charset) throws Exception {
private String doDeleteFromAttribute(FlowFile flowFile, ProcessContext context, String tableName, String charset, String visibility) throws Exception {
String rowKey = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
clientService.delete(tableName, rowKey.getBytes(charset));
clientService.delete(tableName, rowKey.getBytes(charset), visibility);
return clientService.toTransitUri(tableName, rowKey);
}
private FlowFile doDeleteFromContent(FlowFile flowFile, ProcessContext context, ProcessSession session, String tableName, int batchSize, String charset) throws Exception {
private FlowFile doDeleteFromContent(FlowFile flowFile, ProcessContext context, ProcessSession session, String tableName, int batchSize, String charset, String visibility) throws Exception {
String keySeparator = context.getProperty(KEY_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
final String restartIndex = flowFile.getAttribute(RESTART_INDEX);
@ -192,13 +206,13 @@ public class DeleteHBaseRow extends AbstractDeleteHBase {
batch.add(parts[index].getBytes(charset));
if (batch.size() == batchSize) {
clientService.delete(tableName, batch);
clientService.delete(tableName, batch, visibility);
batch = new ArrayList<>();
}
last = parts[index];
}
if (batch.size() > 0) {
clientService.delete(tableName, batch);
clientService.delete(tableName, batch, visibility);
}
flowFile = session.removeAttribute(flowFile, RESTART_INDEX);

View File

@ -63,7 +63,7 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "hbase.row", description = "A JSON document representing the row. This property is only written when a Destination of flowfile-attributes is selected."),
@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise")
})
public class FetchHBaseRow extends AbstractProcessor {
public class FetchHBaseRow extends AbstractProcessor implements VisibilityFetchSupport {
static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
@ -179,6 +179,7 @@ public class FetchHBaseRow extends AbstractProcessor {
props.add(TABLE_NAME);
props.add(ROW_ID);
props.add(COLUMNS);
props.add(AUTHORIZATIONS);
props.add(DESTINATION);
props.add(JSON_FORMAT);
props.add(JSON_VALUE_ENCODING);
@ -252,6 +253,8 @@ public class FetchHBaseRow extends AbstractProcessor {
final String destination = context.getProperty(DESTINATION).getValue();
final boolean base64Encode = context.getProperty(JSON_VALUE_ENCODING).getValue().equals(ENCODING_BASE64.getValue());
List<String> authorizations = getAuthorizations(context, flowFile);
final RowSerializer rowSerializer = base64Encode ? base64RowSerializer : regularRowSerializer;
final FetchHBaseRowHandler handler = destination.equals(DESTINATION_CONTENT.getValue())
@ -260,7 +263,7 @@ public class FetchHBaseRow extends AbstractProcessor {
final byte[] rowIdBytes = rowId.getBytes(StandardCharsets.UTF_8);
try {
hBaseClientService.scan(tableName, rowIdBytes, rowIdBytes, columns, handler);
hBaseClientService.scan(tableName, rowIdBytes, rowIdBytes, columns, authorizations, handler);
} catch (Exception e) {
getLogger().error("Unable to fetch row {} from {} due to {}", new Object[] {rowId, tableName, e});
session.transfer(handler.getFlowFile(), REL_FAILURE);

View File

@ -21,7 +21,6 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@ -55,6 +54,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
@ -65,7 +65,6 @@ import org.apache.nifi.hbase.io.JsonRowSerializer;
import org.apache.nifi.hbase.io.RowSerializer;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.util.ObjectSerDe;
import org.apache.nifi.hbase.util.StringSerDe;
import org.apache.nifi.processor.AbstractProcessor;
@ -73,7 +72,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@TriggerWhenEmpty
@ -91,7 +89,7 @@ import org.apache.nifi.processor.util.StandardValidators;
@Stateful(scopes = Scope.CLUSTER, description = "After performing a fetching from HBase, stores a timestamp of the last-modified cell that was found. In addition, it stores the ID of the row(s) "
+ "and the value of each cell that has that timestamp as its modification date. This is stored across the cluster and allows the next fetch to avoid duplicating data, even if this Processor is "
+ "run on Primary Node only and the Primary Node changes.")
public class GetHBase extends AbstractProcessor {
public class GetHBase extends AbstractProcessor implements VisibilityFetchSupport {
static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
@ -149,6 +147,14 @@ public class GetHBase extends AbstractProcessor {
.allowableValues(NONE, CURRENT_TIME)
.defaultValue(NONE.getValue())
.build();
static final PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder()
.name("hbase-fetch-row-authorizations")
.displayName("Authorizations")
.description("The list of authorizations to pass to the scanner. This will be ignored if cell visibility labels are not in use.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(Validator.VALID)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -172,6 +178,7 @@ public class GetHBase extends AbstractProcessor {
properties.add(DISTRIBUTED_CACHE_SERVICE);
properties.add(TABLE_NAME);
properties.add(COLUMNS);
properties.add(AUTHORIZATIONS);
properties.add(FILTER_EXPRESSION);
properties.add(INITIAL_TIMERANGE);
properties.add(CHARSET);
@ -253,6 +260,9 @@ public class GetHBase extends AbstractProcessor {
final String tableName = context.getProperty(TABLE_NAME).getValue();
final String initialTimeRange = context.getProperty(INITIAL_TIMERANGE).getValue();
final String filterExpression = context.getProperty(FILTER_EXPRESSION).getValue();
List<String> authorizations = getAuthorizations(context, null);
final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
// if the table was changed then remove any previous state
@ -279,105 +289,97 @@ public class GetHBase extends AbstractProcessor {
final AtomicReference<Long> latestTimestampHolder = new AtomicReference<>(minTime);
hBaseClientService.scan(tableName, columns, filterExpression, minTime, new ResultHandler() {
@Override
public void handle(final byte[] rowKey, final ResultCell[] resultCells) {
hBaseClientService.scan(tableName, columns, filterExpression, minTime, authorizations, (rowKey, resultCells) -> {
final String rowKeyString = new String(rowKey, StandardCharsets.UTF_8);
final String rowKeyString = new String(rowKey, StandardCharsets.UTF_8);
// check if latest cell timestamp is equal to our cutoff.
// if any of the cells have a timestamp later than our cutoff, then we
// want the row. But if the cell with the latest timestamp is equal to
// our cutoff, then we want to check if that's one of the cells that
// we have already seen.
long latestCellTimestamp = 0L;
// check if latest cell timestamp is equal to our cutoff.
// if any of the cells have a timestamp later than our cutoff, then we
// want the row. But if the cell with the latest timestamp is equal to
// our cutoff, then we want to check if that's one of the cells that
// we have already seen.
long latestCellTimestamp = 0L;
for (final ResultCell cell : resultCells) {
if (cell.getTimestamp() > latestCellTimestamp) {
latestCellTimestamp = cell.getTimestamp();
}
}
// we've already seen this.
if (latestCellTimestamp < minTime) {
getLogger().debug("latest cell timestamp for row {} is {}, which is earlier than the minimum time of {}",
new Object[] {rowKeyString, latestCellTimestamp, minTime});
return;
}
if (latestCellTimestamp == minTime) {
// latest cell timestamp is equal to our minimum time. Check if all cells that have
// that timestamp are in our list of previously seen cells.
boolean allSeen = true;
for (final ResultCell cell : resultCells) {
if (cell.getTimestamp() > latestCellTimestamp) {
latestCellTimestamp = cell.getTimestamp();
if (cell.getTimestamp() == latestCellTimestamp) {
if (lastResult == null || !lastResult.contains(cell)) {
allSeen = false;
break;
}
}
}
// we've already seen this.
if (latestCellTimestamp < minTime) {
getLogger().debug("latest cell timestamp for row {} is {}, which is earlier than the minimum time of {}",
new Object[] {rowKeyString, latestCellTimestamp, minTime});
if (allSeen) {
// we have already seen all of the cells for this row. We do not want to
// include this cell in our output.
getLogger().debug("all cells for row {} have already been seen", new Object[] { rowKeyString });
return;
}
}
if (latestCellTimestamp == minTime) {
// latest cell timestamp is equal to our minimum time. Check if all cells that have
// that timestamp are in our list of previously seen cells.
boolean allSeen = true;
for (final ResultCell cell : resultCells) {
if (cell.getTimestamp() == latestCellTimestamp) {
if (lastResult == null || !lastResult.contains(cell)) {
allSeen = false;
break;
}
// If the latest timestamp of the cell is later than the latest timestamp we have already seen,
// we want to keep track of the cells that match this timestamp so that the next time we scan,
// we can ignore these cells.
if (latestCellTimestamp >= latestTimestampHolder.get()) {
// new timestamp, so clear all of the 'matching cells'
if (latestCellTimestamp > latestTimestampHolder.get()) {
latestTimestampHolder.set(latestCellTimestamp);
cellsMatchingTimestamp.clear();
}
for (final ResultCell cell : resultCells) {
final long ts = cell.getTimestamp();
if (ts == latestCellTimestamp) {
final byte[] rowValue = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength() + cell.getRowOffset());
final byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset());
final String rowHash = new String(rowValue, StandardCharsets.UTF_8);
Set<String> cellHashes = cellsMatchingTimestamp.get(rowHash);
if (cellHashes == null) {
cellHashes = new HashSet<>();
cellsMatchingTimestamp.put(rowHash, cellHashes);
}
}
if (allSeen) {
// we have already seen all of the cells for this row. We do not want to
// include this cell in our output.
getLogger().debug("all cells for row {} have already been seen", new Object[] { rowKeyString });
return;
cellHashes.add(new String(cellValue, StandardCharsets.UTF_8));
}
}
}
// If the latest timestamp of the cell is later than the latest timestamp we have already seen,
// we want to keep track of the cells that match this timestamp so that the next time we scan,
// we can ignore these cells.
if (latestCellTimestamp >= latestTimestampHolder.get()) {
// new timestamp, so clear all of the 'matching cells'
if (latestCellTimestamp > latestTimestampHolder.get()) {
latestTimestampHolder.set(latestCellTimestamp);
cellsMatchingTimestamp.clear();
}
// write the row to a new FlowFile.
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, out -> serializer.serialize(rowKey, resultCells, out));
for (final ResultCell cell : resultCells) {
final long ts = cell.getTimestamp();
if (ts == latestCellTimestamp) {
final byte[] rowValue = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength() + cell.getRowOffset());
final byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset());
final Map<String, String> attributes = new HashMap<>();
attributes.put("hbase.table", tableName);
attributes.put("mime.type", "application/json");
flowFile = session.putAllAttributes(flowFile, attributes);
final String rowHash = new String(rowValue, StandardCharsets.UTF_8);
Set<String> cellHashes = cellsMatchingTimestamp.get(rowHash);
if (cellHashes == null) {
cellHashes = new HashSet<>();
cellsMatchingTimestamp.put(rowHash, cellHashes);
}
cellHashes.add(new String(cellValue, StandardCharsets.UTF_8));
}
}
}
session.getProvenanceReporter().receive(flowFile, hBaseClientService.toTransitUri(tableName, rowKeyString));
session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Received {} from HBase with row key {}", new Object[]{flowFile, rowKeyString});
// write the row to a new FlowFile.
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
serializer.serialize(rowKey, resultCells, out);
}
});
// we could potentially have a huge number of rows. If we get to 500, go ahead and commit the
// session so that we can avoid buffering tons of FlowFiles without ever sending any out.
long rowsPulled = rowsPulledHolder.get();
rowsPulledHolder.set(++rowsPulled);
final Map<String, String> attributes = new HashMap<>();
attributes.put("hbase.table", tableName);
attributes.put("mime.type", "application/json");
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, hBaseClientService.toTransitUri(tableName, rowKeyString));
session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Received {} from HBase with row key {}", new Object[]{flowFile, rowKeyString});
// we could potentially have a huge number of rows. If we get to 500, go ahead and commit the
// session so that we can avoid buffering tons of FlowFiles without ever sending any out.
long rowsPulled = rowsPulledHolder.get();
rowsPulledHolder.set(++rowsPulled);
if (++rowsPulled % getBatchSize() == 0) {
session.commit();
}
if (++rowsPulled % getBatchSize() == 0) {
session.commit();
}
});
@ -636,5 +638,4 @@ public class GetHBase extends AbstractProcessor {
return new ScanResult(timestamp, matchingCellHashes);
}
}
}

View File

@ -17,10 +17,10 @@
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
@ -30,12 +30,9 @@ import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
@ -44,6 +41,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.nifi.hbase.util.VisibilityUtil.pickVisibilityString;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@ -82,6 +81,8 @@ public class PutHBaseCell extends AbstractPutHBase {
final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue();
final String timestampValue = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
final String visibilityStringToUse = pickVisibilityString(columnFamily, columnQualifier, flowFile, context);
final Long timestamp;
if (!StringUtils.isBlank(timestampValue)) {
try {
@ -96,16 +97,15 @@ public class PutHBaseCell extends AbstractPutHBase {
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer);
}
});
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
PutColumn column = StringUtils.isEmpty(visibilityStringToUse)
? new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8), buffer, timestamp)
: new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8), buffer, timestamp, visibilityStringToUse);
final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8), buffer, timestamp));
final Collection<PutColumn> columns = Collections.singletonList(column);
byte[] rowKeyBytes = getRow(row,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue());

View File

@ -37,11 +37,9 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -52,6 +50,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.hbase.util.VisibilityUtil.pickVisibilityString;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@ -185,12 +185,9 @@ public class PutHBaseJSON extends AbstractPutHBase {
final ObjectMapper mapper = new ObjectMapper();
final AtomicReference<JsonNode> rootNodeRef = new AtomicReference<>(null);
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final InputStream bufferedIn = new BufferedInputStream(in)) {
rootNodeRef.set(mapper.readTree(bufferedIn));
}
session.read(flowFile, in -> {
try (final InputStream bufferedIn = new BufferedInputStream(in)) {
rootNodeRef.set(mapper.readTree(bufferedIn));
}
});
} catch (final ProcessException pe) {
@ -256,7 +253,13 @@ public class PutHBaseJSON extends AbstractPutHBase {
final byte[] colFamBytes = columnFamily.getBytes(StandardCharsets.UTF_8);
final byte[] colQualBytes = fieldName.getBytes(StandardCharsets.UTF_8);
final byte[] colValBytes = fieldValueHolder.get();
columns.add(new PutColumn(colFamBytes, colQualBytes, colValBytes, timestamp));
final String visibilityStringToUse = pickVisibilityString(columnFamily, fieldName, flowFile, context);
PutColumn column = StringUtils.isEmpty(visibilityStringToUse)
? new PutColumn(colFamBytes, colQualBytes, colValBytes, timestamp)
: new PutColumn(colFamBytes, colQualBytes, colValBytes, timestamp, visibilityStringToUse);
columns.add(column);
}
}
}

View File

@ -23,20 +23,28 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.util.VisibilityUtil;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
@ -47,6 +55,7 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@EventDriven
@ -75,6 +84,17 @@ public class PutHBaseRecord extends AbstractPutHBase {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor DEFAULT_VISIBILITY_STRING = new PropertyDescriptor.Builder()
.name("hbase-default-vis-string")
.displayName("Default Visibility String")
.description("When using visibility labels, any value set in this field will be applied to all cells that are written unless " +
"an attribute with the convention \"visibility.COLUMN_FAMILY.COLUMN_QUALIFIER\" is present on the flowfile. If this field " +
"is left blank, it will be assumed that no visibility is to be set unless visibility-related attributes are set. NOTE: " +
"this configuration will have no effect on your data if you have not enabled visibility labels in the HBase cluster.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(Validator.VALID)
.build();
protected static final String FAIL_VALUE = "Fail";
protected static final String WARN_VALUE = "Warn";
@ -142,6 +162,16 @@ public class PutHBaseRecord extends AbstractPutHBase {
.allowableValues(NULL_FIELD_EMPTY, NULL_FIELD_SKIP)
.build();
protected static final PropertyDescriptor VISIBILITY_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-hb-rec-visibility-record-path")
.displayName("Visibility String Record Path Root")
.description("A record path that points to part of the record which contains a path to a mapping of visibility strings to record paths")
.required(false)
.addValidator(Validator.VALID)
.build();
protected RecordPathCache recordPathCache;
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
@ -152,6 +182,8 @@ public class PutHBaseRecord extends AbstractPutHBase {
properties.add(ROW_ID_ENCODING_STRATEGY);
properties.add(NULL_FIELD_STRATEGY);
properties.add(COLUMN_FAMILY);
properties.add(DEFAULT_VISIBILITY_STRING);
properties.add(VISIBILITY_RECORD_PATH);
properties.add(TIMESTAMP_FIELD_NAME);
properties.add(BATCH_SIZE);
properties.add(COMPLEX_FIELD_STRATEGY);
@ -177,6 +209,12 @@ public class PutHBaseRecord extends AbstractPutHBase {
return columns;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
recordPathCache = new RecordPathCache(4);
super.onScheduled(context);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
@ -195,6 +233,12 @@ public class PutHBaseRecord extends AbstractPutHBase {
final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
final String recordPathText = context.getProperty(VISIBILITY_RECORD_PATH).getValue();
RecordPath recordPath = null;
if (recordPathCache != null && !StringUtils.isEmpty(recordPathText)) {
recordPath = recordPathCache.getCompiled(recordPathText);
}
final long start = System.nanoTime();
int index = 0;
@ -215,7 +259,7 @@ public class PutHBaseRecord extends AbstractPutHBase {
}
while ((record = reader.nextRecord()) != null) {
PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily,
PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), recordPath, flowFile, rowFieldName, columnFamily,
timestampFieldName, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
if (putFlowFile.getColumns().size() == 0) {
continue;
@ -340,13 +384,14 @@ public class PutHBaseRecord extends AbstractPutHBase {
static final byte[] EMPTY = "".getBytes();
protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, String rowFieldName,
protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, RecordPath recordPath, FlowFile flowFile, String rowFieldName,
String columnFamily, String timestampFieldName, String fieldEncodingStrategy, String rowEncodingStrategy,
String complexFieldStrategy)
throws PutCreationFailedInvokedException {
PutFlowFile retVal = null;
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String nullStrategy = context.getProperty(NULL_FIELD_STRATEGY).getValue();
final String defaultVisibility = context.getProperty(DEFAULT_VISIBILITY_STRING).evaluateAttributeExpressions(flowFile).getValue();
boolean asString = STRING_ENCODING_VALUE.equals(fieldEncodingStrategy);
@ -368,9 +413,18 @@ public class PutHBaseRecord extends AbstractPutHBase {
timestamp = null;
}
RecordField visField = null;
Map visSettings = null;
if (recordPath != null) {
final RecordPathResult result = recordPath.evaluate(record);
FieldValue fv = result.getSelectedFields().findFirst().get();
visField = fv.getField();
visSettings = (Map)fv.getValue();
}
List<PutColumn> columns = new ArrayList<>();
for (String name : schema.getFieldNames()) {
if (name.equals(rowFieldName) || name.equals(timestampFieldName)) {
if (name.equals(rowFieldName) || name.equals(timestampFieldName) || (visField != null && name.equals(visField.getFieldName()))) {
continue;
}
@ -386,7 +440,20 @@ public class PutHBaseRecord extends AbstractPutHBase {
if (fieldValueBytes != null) {
columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp));
String visString = (visField != null && visSettings != null && visSettings.containsKey(name))
? (String)visSettings.get(name) : defaultVisibility;
//TODO: factor this into future enhancements to how complex records are handled.
if (StringUtils.isBlank(visString)) {
visString = VisibilityUtil.pickVisibilityString(columnFamily, name, flowFile, context);
}
PutColumn column = !StringUtils.isEmpty(visString)
? new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp, visString)
: new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp);
columns.add(column);
}
}

View File

@ -69,7 +69,7 @@ import org.apache.nifi.processor.util.StandardValidators;
+ "Could be null (not present) if transfered to FAILURE")
})
public class ScanHBase extends AbstractProcessor {
public class ScanHBase extends AbstractProcessor implements VisibilityFetchSupport {
//enhanced regex for columns to allow "-" in column qualifier names
static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
static final String nl = System.lineSeparator();
@ -231,6 +231,7 @@ public class ScanHBase extends AbstractProcessor {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(HBASE_CLIENT_SERVICE);
props.add(TABLE_NAME);
props.add(AUTHORIZATIONS);
props.add(START_ROW);
props.add(END_ROW);
props.add(TIME_RANGE_MIN);
@ -326,6 +327,8 @@ public class ScanHBase extends AbstractProcessor {
return;
}
final List<String> authorizations = getAuthorizations(context, flowFile);
final String startRow = context.getProperty(START_ROW).evaluateAttributeExpressions(flowFile).getValue();
final String endRow = context.getProperty(END_ROW).evaluateAttributeExpressions(flowFile).getValue();
@ -381,6 +384,7 @@ public class ScanHBase extends AbstractProcessor {
limitRows,
isReversed,
columns,
authorizations,
handler);
} catch (Exception e) {
if (handler.getFlowFile() != null){

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
public interface VisibilityFetchSupport {
PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder()
.name("hbase-fetch-row-authorizations")
.displayName("Authorizations")
.description("The list of authorizations to pass to the scanner. This will be ignored if cell visibility labels are not in use.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(Validator.VALID)
.build();
default List<String> getAuthorizations(ProcessContext context, FlowFile flowFile) {
final String authorizationString = context.getProperty(AUTHORIZATIONS).isSet()
? context.getProperty(AUTHORIZATIONS).evaluateAttributeExpressions(flowFile).getValue().trim()
: "";
List<String> authorizations = new ArrayList<>();
if (!StringUtils.isBlank(authorizationString)) {
String[] parts = authorizationString.split(",");
for (String part : parts) {
authorizations.add(part.trim());
}
}
return authorizations;
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
public class VisibilityUtil {
public static String pickVisibilityString(String columnFamily, String columnQualifier, FlowFile flowFile, ProcessContext context) {
if (StringUtils.isBlank(columnFamily)) {
return null;
}
String lookupKey = String.format("visibility.%s%s%s", columnFamily, !StringUtils.isBlank(columnQualifier) ? "." : "", columnQualifier);
String fromAttribute = flowFile.getAttribute(lookupKey);
if (fromAttribute == null && !StringUtils.isBlank(columnQualifier)) {
String lookupKeyFam = String.format("visibility.%s", columnFamily);
fromAttribute = flowFile.getAttribute(lookupKeyFam);
}
if (fromAttribute != null) {
return fromAttribute;
} else {
PropertyValue descriptor = context.getProperty(lookupKey);
if (descriptor == null || !descriptor.isSet()) {
descriptor = context.getProperty(String.format("visibility.%s", columnFamily));
}
String retVal = descriptor != null ? descriptor.evaluateAttributeExpressions(flowFile).getValue() : null;
return retVal;
}
}
}

View File

@ -0,0 +1,39 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>DeleteHBaseCells</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Overview</h2>
<p>
This processor provides the ability to do deletes against one or more HBase cells, without having to delete the entire row. It should
be used as the primary delete method when visibility labels are in use and the cells have different visibility labels. Each line in
the flowfile body is a fully qualified cell (row id, column family, column qualifier and visibility labels if applicable). The separator
that separates each piece of the fully qualified cell is configurable, but <strong>::::</strong> is the default value.
</p>
<h2>Example FlowFile</h2>
<pre>
row1::::user::::name
row1::::user::::address::::PII
row1::::user::::billing_code_1::::PII&&amp;BILLING
</pre>
</body>
</html>

View File

@ -0,0 +1,41 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutHBaseCell</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Visibility Labels</h2>
<p>
This processor provides the ability to attach visibility labels to HBase Puts that it generates, if visibility labels
are enabled on the HBase cluster. There are two ways to enable this:
</p>
<ul>
<li>Attributes on the flowfile.</li>
<li>Dynamic properties added to the processor.</li>
</ul>
<p>When the dynamic properties are defined on the processor, they will be the default value, but can be overridden by
attributes set on the flowfile. The naming convention for both (property name and attribute name) is:</p>
<ul>
<li>visibility.COLUMN_FAMILY - every column qualifier under the column family will get this.</li>
<li>visibility.COLUMN_FAMILY.COLUMN_VISIBILITY - the qualified column qualifier will be assigned this value.</li>
</ul>
</body>
</html>

View File

@ -0,0 +1,41 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutHBaseJSON</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Visibility Labels</h2>
<p>
This processor provides the ability to attach visibility labels to HBase Puts that it generates, if visibility labels
are enabled on the HBase cluster. There are two ways to enable this:
</p>
<ul>
<li>Attributes on the flowfile.</li>
<li>Dynamic properties added to the processor.</li>
</ul>
<p>When the dynamic properties are defined on the processor, they will be the default value, but can be overridden by
attributes set on the flowfile. The naming convention for both (property name and attribute name) is:</p>
<ul>
<li>visibility.COLUMN_FAMILY - every column qualifier under the column family will get this.</li>
<li>visibility.COLUMN_FAMILY.COLUMN_VISIBILITY - the qualified column qualifier will be assigned this value.</li>
</ul>
</body>
</html>

View File

@ -0,0 +1,113 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutHBaseRecord</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Visibility Labels:</h2>
<p>
PutHBaseRecord provides the ability to define a branch of the record as a map which contains an association between
column qualifiers and the visibility label that they should have assigned to them.
</p>
<h3>Example Schema</h3>
<pre>
{
"type": "record",
"name": "SampleRecord",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "address", "type": "string" },
{ "name": "dob", "type": "string" },
{ "name": "attendingPhysician", "type": "string" },
{ "name": "accountNumber", "type": "string" },
{ "name": "visibility_labels", "type": "map", "values": "string" }
]
}
</pre>
<h3>Example Record</h3>
<pre>
{
"name": "John Smith",
"address": "12345 Main Street",
"dob": "1970-01-01",
"attendingPhysician": "Dr. Jane Doe",
"accountNumber": "1234-567-890-ABC",
"visibility_labels": {
"name": "OPEN",
"address": "PII",
"dob": "PII",
"attendingPhysician": "PII&amp;PHI",
"accountNumber": "PII&amp;BILLING"
}
}
</pre>
<h3>Results in HBase</h3>
<p>Example is for row with ID <em>patient-1</em> and column family <em>patient</em></p>
<table>
<thead>
<tr>
<th>Row</th>
<th>Value</th>
<th>Visibility</th>
</tr>
</thead>
<tbody>
<tr>
<td>patient-1:patient:name</td>
<td>John Smith</td>
<td>OPEN</td>
</tr>
<tr>
<td>patient-1:patient:address</td>
<td>12345 Main Street</td>
<td>PII</td>
</tr>
<tr>
<td>patient-1:patient:</td>
<td>1970-01-01</td>
<td>PII</td>
</tr>
<tr>
<td>patient-1:patient:attendingPhysician</td>
<td>Dr. Jane Doe</td>
<td>PII&amp;PHI</td>
</tr>
<tr>
<td>patient-1:patient:accountNumber</td>
<td>1234-567-890-ABC</td>
<td>PII&amp;BILLING</td>
</tr>
</tbody>
</table>
<p>In addition to the branch for visibility labels, the same methods used for PutHBaseCell and PutHBaseJSON can be used.
They are:</p>
<ul>
<li>Attributes on the flowfile.</li>
<li>Dynamic properties added to the processor.</li>
</ul>
<p>When the dynamic properties are defined on the processor, they will be the default value, but can be overridden by
attributes set on the flowfile. The naming convention for both (property name and attribute name) is:</p>
<ul>
<li>visibility.COLUMN_FAMILY - every column qualifier under the column family will get this.</li>
<li>visibility.COLUMN_FAMILY.COLUMN_VISIBILITY - the qualified column qualifier will be assigned this value.</li>
</ul>
</body>
</html>

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.hbase.DeleteHBaseCells
org.apache.nifi.hbase.DeleteHBaseRow
org.apache.nifi.hbase.GetHBase
org.apache.nifi.hbase.PutHBaseCell

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public class DeleteTestBase {
protected TestRunner runner;
protected MockHBaseClientService hBaseClient;
public void setup(Class clz) throws InitializationException {
runner = TestRunners.newTestRunner(clz);
hBaseClient = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClient);
runner.enableControllerService(hBaseClient);
runner.setProperty(DeleteHBaseRow.TABLE_NAME, "nifi");
runner.setProperty(DeleteHBaseRow.HBASE_CLIENT_SERVICE, "hbaseClient");
}
List<String> populateTable(int max) {
List<String> ids = new ArrayList<>();
for (int index = 0; index < max; index++) {
String uuid = UUID.randomUUID().toString();
ids.add(uuid);
Map<String, String> cells = new HashMap<>();
cells.put("test", UUID.randomUUID().toString());
hBaseClient.addResult(uuid, cells, System.currentTimeMillis());
}
return ids;
}
}

View File

@ -72,6 +72,9 @@ public class MockHBaseClientService extends AbstractControllerService implements
throw new UnsupportedOperationException();
}
@Override
public void delete(String tableName, byte[] rowId, String visibilityLabel) throws IOException { }
private int deletePoint = 0;
public void setDeletePoint(int deletePoint) {
this.deletePoint = deletePoint;
@ -98,6 +101,18 @@ public class MockHBaseClientService extends AbstractControllerService implements
}
}
@Override
public void deleteCells(String tableName, List<DeleteRequest> deletes) throws IOException {
for (DeleteRequest req : deletes) {
results.remove(new String(req.getRowId()));
}
}
@Override
public void delete(String tableName, List<byte[]> rowIds, String visibilityLabel) throws IOException {
delete(tableName, rowIds);
}
public int size() {
return results.size();
}
@ -107,7 +122,7 @@ public class MockHBaseClientService extends AbstractControllerService implements
}
@Override
public void scan(String tableName, byte[] startRow, byte[] endRow, Collection<Column> columns, ResultHandler handler) throws IOException {
public void scan(String tableName, byte[] startRow, byte[] endRow, Collection<Column> columns, List<String> labels, ResultHandler handler) throws IOException {
if (throwException) {
throw new IOException("exception");
}
@ -154,9 +169,14 @@ public class MockHBaseClientService extends AbstractControllerService implements
numScans++;
}
@Override
public void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, List<String> visibilityLabels, ResultHandler handler) throws IOException {
scan(tableName, columns, filterExpression, minTime, handler);
}
@Override
public void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin,
Long timerangeMax, Integer limitRows, Boolean isReversed, Collection<Column> columns, ResultHandler handler)
Long timerangeMax, Integer limitRows, Boolean isReversed, Collection<Column> columns, List<String> visibilityLabels, ResultHandler handler)
throws IOException {
if (throwException) {
throw new IOException("exception");
@ -164,8 +184,8 @@ public class MockHBaseClientService extends AbstractControllerService implements
int i = 0;
// pass all the staged data to the handler
for (final Map.Entry<String,ResultCell[]> entry : results.entrySet()) {
if (linesBeforeException>=0 && i++>=linesBeforeException) {
for (final Map.Entry<String, ResultCell[]> entry : results.entrySet()) {
if (linesBeforeException >= 0 && i++ >= linesBeforeException) {
throw new IOException("iterating exception");
}
handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue());

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.reporting.InitializationException;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class TestDeleteHBaseCells extends DeleteTestBase {
@Before
public void setup() throws InitializationException {
super.setup(DeleteHBaseCells.class);
}
@Test
public void testSimpleDelete() {
final String SEP = "::::";
List<String> ids = populateTable(10000);
runner.setProperty(DeleteHBaseCells.SEPARATOR, SEP);
runner.assertValid();
StringBuilder sb = new StringBuilder();
for (String id : ids) {
sb.append(String.format("%s%sX%sY\n", id, SEP, SEP));
}
runner.enqueue(sb.toString().trim());
runner.run();
runner.assertAllFlowFilesTransferred(DeleteHBaseCells.REL_SUCCESS);
}
}

View File

@ -19,45 +19,19 @@ package org.apache.nifi.hbase;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public class TestDeleteHBaseRow {
private TestRunner runner;
private MockHBaseClientService hBaseClient;
public class TestDeleteHBaseRow extends DeleteTestBase {
@Before
public void setup() throws InitializationException {
runner = TestRunners.newTestRunner(new DeleteHBaseRow());
hBaseClient = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClient);
runner.enableControllerService(hBaseClient);
runner.setProperty(DeleteHBaseRow.TABLE_NAME, "nifi");
runner.setProperty(DeleteHBaseRow.HBASE_CLIENT_SERVICE, "hbaseClient");
}
List<String> populateTable(int max) {
List<String> ids = new ArrayList<>();
for (int index = 0; index < max; index++) {
String uuid = UUID.randomUUID().toString();
ids.add(uuid);
Map<String, String> cells = new HashMap<>();
cells.put("test", UUID.randomUUID().toString());
hBaseClient.addResult(uuid, cells, System.currentTimeMillis());
}
return ids;
super.setup(DeleteHBaseRow.class);
}
@Test

View File

@ -39,6 +39,7 @@ public class TestFetchHBaseRow {
public void setup() throws InitializationException {
proc = new FetchHBaseRow();
runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
hBaseClientService = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClientService);
@ -48,6 +49,7 @@ public class TestFetchHBaseRow {
@Test
public void testColumnsValidation() {
runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.assertValid();
@ -78,6 +80,7 @@ public class TestFetchHBaseRow {
public void testNoIncomingFlowFile() {
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0);
@ -91,6 +94,7 @@ public class TestFetchHBaseRow {
public void testInvalidTableName() {
runner.setProperty(FetchHBaseRow.TABLE_NAME, "${hbase.table}");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
runner.enqueue("trigger flow file");
runner.run();
@ -106,6 +110,7 @@ public class TestFetchHBaseRow {
public void testInvalidRowId() {
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "${hbase.row}");
runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
runner.enqueue("trigger flow file");
runner.run();
@ -125,7 +130,7 @@ public class TestFetchHBaseRow {
final long ts1 = 123456789;
hBaseClientService.addResult("row1", cells, ts1);
runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_ATTRIBUTES);
@ -155,6 +160,7 @@ public class TestFetchHBaseRow {
final long ts1 = 123456789;
hBaseClientService.addResult("row1", cells, ts1);
runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.COLUMNS, "nifi:cq2");

View File

@ -76,6 +76,9 @@ public class TestGetHBase {
runner.setProperty(GetHBase.TABLE_NAME, "nifi");
runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient");
runner.setProperty(GetHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
runner.setProperty(GetHBase.AUTHORIZATIONS, "");
runner.setValidateExpressionUsage(true);
}
@After

View File

@ -505,6 +505,7 @@ public class TestPutHBaseJSON {
runner.setProperty(PutHBaseJSON.TABLE_NAME, table);
runner.setProperty(PutHBaseJSON.COLUMN_FAMILY, columnFamily);
runner.setProperty(PutHBaseJSON.BATCH_SIZE, batchSize);
return runner;
}

View File

@ -64,6 +64,8 @@ public class TestPutHBaseRecord {
}
runner.enableControllerService(parser);
runner.setProperty(PutHBaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutHBaseRecord.DEFAULT_VISIBILITY_STRING, "");
runner.setProperty(PutHBaseRecord.VISIBILITY_RECORD_PATH, "");
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.hbase.util.VisibilityUtil;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
public class TestVisibilityUtil {
private TestRunner runner;
@Before
public void setup() throws Exception {
runner = TestRunners.newTestRunner(PutHBaseCell.class);
final MockHBaseClientService hBaseClient = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClient);
runner.enableControllerService(hBaseClient);
runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
runner.setProperty(PutHBaseCell.TABLE_NAME, "test");
runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, "test");
runner.setProperty(PutHBaseCell.COLUMN_FAMILY, "test");
runner.assertValid();
}
@Test
public void testAllPresentOnFlowfile() {
runner.setProperty("visibility.test.test", "U&PII");
MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
ff.putAttributes(new HashMap<String, String>(){{
put("visibility.test.test", "U&PII&PHI");
}});
ProcessContext context = runner.getProcessContext();
String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context);
Assert.assertNotNull(label);
Assert.assertEquals("U&PII&PHI", label);
}
@Test
public void testOnlyColumnFamilyOnFlowfile() {
runner.setProperty("visibility.test", "U&PII");
MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
ff.putAttributes(new HashMap<String, String>(){{
put("visibility.test", "U&PII&PHI");
}});
ProcessContext context = runner.getProcessContext();
String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context);
Assert.assertNotNull(label);
Assert.assertEquals("U&PII&PHI", label);
}
@Test
public void testInvalidAttributes() {
runner.setProperty("visibility.test", "U&PII");
MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
ff.putAttributes(new HashMap<String, String>(){{
put("visibility..test", "U&PII&PHI");
}});
ProcessContext context = runner.getProcessContext();
String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context);
Assert.assertNotNull(label);
Assert.assertEquals("U&PII", label);
}
@Test
public void testColumnFamilyAttributeOnly() {
MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
ff.putAttributes(new HashMap<String, String>(){{
put("visibility.test", "U&PII");
}});
ProcessContext context = runner.getProcessContext();
String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context);
Assert.assertNotNull(label);
Assert.assertEquals("U&PII", label);
}
@Test
public void testNoAttributes() {
runner.setProperty("visibility.test", "U&PII");
MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
ProcessContext context = runner.getProcessContext();
String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context);
Assert.assertNotNull(label);
Assert.assertEquals("U&PII", label);
runner.setProperty("visibility.test.test", "U&PII&PHI");
label = VisibilityUtil.pickVisibilityString("test", "test", ff, context);
Assert.assertNotNull(label);
Assert.assertEquals("U&PII&PHI", label);
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
/**
* Encapsulates the information for a delete operation.
*/
public class DeleteRequest {
private byte[] rowId;
private byte[] columnFamily;
private byte[] columnQualifier;
private String visibilityLabel;
public DeleteRequest(byte[] rowId, byte[] columnFamily, byte[] columnQualifier, String visibilityLabel) {
this.rowId = rowId;
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
this.visibilityLabel = visibilityLabel;
}
public byte[] getRowId() {
return rowId;
}
public byte[] getColumnFamily() {
return columnFamily;
}
public byte[] getColumnQualifier() {
return columnQualifier;
}
public String getVisibilityLabel() {
return visibilityLabel;
}
@Override
public String toString() {
return new StringBuilder()
.append(String.format("Row ID: %s\n", new String(rowId)))
.append(String.format("Column Family: %s\n", new String(columnFamily)))
.append(String.format("Column Qualifier: %s\n", new String(columnQualifier)))
.append(visibilityLabel != null ? String.format("Visibility Label: %s", visibilityLabel) : "")
.toString();
}
}

View File

@ -118,6 +118,18 @@ public interface HBaseClientService extends ControllerService {
*/
void delete(String tableName, byte[] rowId) throws IOException;
/**
* Deletes the given row on HBase. Uses the supplied visibility label for all cells in the delete.
* It will fail if HBase cannot delete a cell because the visibility label on the cell does not match the specified
* label.
*
* @param tableName the name of an HBase table
* @param rowId the id of the row to delete
* @param visibilityLabel a visibility label to apply to the delete
* @throws IOException thrown when there are communication errors with HBase
*/
void delete(String tableName, byte[] rowId, String visibilityLabel) throws IOException;
/**
* Deletes a list of rows in HBase. All cells are deleted.
*
@ -127,6 +139,25 @@ public interface HBaseClientService extends ControllerService {
void delete(String tableName, List<byte[]> rowIds) throws IOException;
/**
* Deletes a list of cells from HBase. This is intended to be used with granular delete operations.
*
* @param tableName the name of an HBase table.
* @param deletes a list of DeleteRequest objects.
* @throws IOException thrown when there are communication errors with HBase
*/
void deleteCells(String tableName, List<DeleteRequest> deletes) throws IOException;
/**
* Deletes a list of rows in HBase. All cells that match the visibility label are deleted.
*
* @param tableName the name of an HBase table
* @param rowIds a list of rowIds to send in a batch delete
* @param visibilityLabel a visibility label expression
*/
void delete(String tableName, List<byte[]> rowIds, String visibilityLabel) throws IOException;
/**
* Scans the given table using the optional filter criteria and passing each result to the provided handler.
*
@ -139,6 +170,19 @@ public interface HBaseClientService extends ControllerService {
*/
void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, ResultHandler handler) throws IOException;
/**
* Scans the given table using the optional filter criteria and passing each result to the provided handler.
*
* @param tableName the name of an HBase table to scan
* @param columns optional columns to return, if not specified all columns are returned
* @param filterExpression optional filter expression, if not specified no filtering is performed
* @param minTime the minimum timestamp of cells to return, passed to the HBase scanner timeRange
* @param authorizations the visibility labels to apply to the scanner.
* @param handler a handler to process rows of the result set
* @throws IOException thrown when there are communication errors with HBase
*/
void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, List<String> authorizations, ResultHandler handler) throws IOException;
/**
* Scans the given table for the given rowId and passes the result to the handler.
*
@ -149,7 +193,7 @@ public interface HBaseClientService extends ControllerService {
* @param handler a handler to process rows of the result
* @throws IOException thrown when there are communication errors with HBase
*/
void scan(String tableName, byte[] startRow, byte[] endRow, Collection<Column> columns, ResultHandler handler) throws IOException;
void scan(String tableName, byte[] startRow, byte[] endRow, Collection<Column> columns, List<String> authorizations, ResultHandler handler) throws IOException;
/**
* Scans the given table for the given range of row keys or time rage and passes the result to a handler.<br/>
@ -163,10 +207,11 @@ public interface HBaseClientService extends ControllerService {
* @param limitRows the maximum number of rows to be returned by scanner
* @param isReversed whether this scan is a reversed one.
* @param columns optional columns to return, if not specified all columns are returned
* @param authorizations optional list of visibility labels that the user should be able to see when communicating with HBase
* @param handler a handler to process rows of the result
*/
void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin, Long timerangeMax, Integer limitRows,
Boolean isReversed, Collection<Column> columns, ResultHandler handler) throws IOException;
Boolean isReversed, Collection<Column> columns, List<String> authorizations, ResultHandler handler) throws IOException;
/**
* Converts the given boolean to it's byte representation.

View File

@ -24,18 +24,24 @@ public class PutColumn {
private final byte[] columnFamily;
private final byte[] columnQualifier;
private final byte[] buffer;
private final String visibility;
private final Long timestamp;
public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer) {
this(columnFamily, columnQualifier, buffer, null);
this(columnFamily, columnQualifier, buffer, null, null);
}
public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer, final Long timestamp) {
this(columnFamily, columnQualifier, buffer, timestamp, null);
}
public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer, final Long timestamp, final String visibility) {
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
this.buffer = buffer;
this.timestamp = timestamp;
this.visibility = visibility;
}
public byte[] getColumnFamily() {
@ -50,6 +56,10 @@ public class PutColumn {
return buffer;
}
public String getVisibility() {
return visibility;
}
public Long getTimestamp() {
return timestamp;
}

View File

@ -51,7 +51,7 @@ import org.apache.nifi.processor.util.StandardValidators;
@CapabilityDescription("Provides the ability to use an HBase table as a cache, in place of a DistributedMapCache."
+ " Uses a HBase_1_1_2_ClientService controller to communicate with HBase.")
public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService implements DistributedMapCacheClient {
public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService implements DistributedMapCacheClient, VisibilityLabelService {
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
@ -90,6 +90,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(HBASE_CACHE_TABLE_NAME);
descriptors.add(AUTHORIZATIONS);
descriptors.add(HBASE_CLIENT_SERVICE);
descriptors.add(HBASE_COLUMN_FAMILY);
descriptors.add(HBASE_COLUMN_QUALIFIER);
@ -106,6 +107,8 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
private volatile String hBaseColumnQualifier;
private volatile byte[] hBaseColumnQualifierBytes;
private List<String> authorizations;
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException{
hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
@ -116,6 +119,8 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
hBaseColumnFamilyBytes = hBaseColumnFamily.getBytes(StandardCharsets.UTF_8);
hBaseColumnQualifierBytes = hBaseColumnQualifier.getBytes(StandardCharsets.UTF_8);
authorizations = getAuthorizations(context);
}
private <T> byte[] serialize(final T value, final Serializer<T> serializer) throws IOException {
@ -158,7 +163,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
final List<Column> columnsList = new ArrayList<Column>(0);
hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, handler);
hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
return (handler.numRows() > 0);
}
@ -190,7 +195,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
final List<Column> columnsList = new ArrayList<Column>(0);
hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, handler);
hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
if (handler.numRows() > 1) {
throw new IOException("Found multiple rows in HBase for key");
} else if(handler.numRows() == 1) {

View File

@ -25,14 +25,17 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.security.visibility.Authorizations;
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.DynamicProperty;
@ -62,6 +65,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@ -115,6 +119,10 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
return connection;
}
protected void setConnection(Connection connection) {
this.connection = connection;
}
@Override
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
kerberosConfigFile = config.getKerberosConfigurationFile();
@ -315,6 +323,8 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
private String principal = null;
protected Configuration getConfigurationFromFiles(final String configFiles) {
final Configuration hbaseConfig = HBaseConfiguration.create();
if (StringUtils.isNotBlank(configFiles)) {
@ -336,51 +346,85 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
}
private static final byte[] EMPTY_VIS_STRING;
static {
try {
EMPTY_VIS_STRING = "".getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
private List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) {
List<Put> retVal = new ArrayList<>();
try {
Put put = null;
for (final PutColumn column : columns) {
if (put == null || (put.getCellVisibility() == null && column.getVisibility() != null) || ( put.getCellVisibility() != null
&& !put.getCellVisibility().getExpression().equals(column.getVisibility())
)) {
put = new Put(rowKey);
if (column.getVisibility() != null) {
put.setCellVisibility(new CellVisibility(column.getVisibility()));
}
retVal.add(put);
}
if (column.getTimestamp() != null) {
put.addColumn(
column.getColumnFamily(),
column.getColumnQualifier(),
column.getTimestamp(),
column.getBuffer());
} else {
put.addColumn(
column.getColumnFamily(),
column.getColumnQualifier(),
column.getBuffer());
}
}
} catch (DeserializationException de) {
getLogger().error("Error writing cell visibility statement.", de);
throw new RuntimeException(de);
}
return retVal;
}
@Override
public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
// Create one Put per row....
final Map<String, Put> rowPuts = new HashMap<>();
final Map<String, List<PutColumn>> sorted = new HashMap<>();
final List<Put> newPuts = new ArrayList<>();
for (final PutFlowFile putFlowFile : puts) {
//this is used for the map key as a byte[] does not work as a key.
final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8);
Put put = rowPuts.get(rowKeyString);
if (put == null) {
put = new Put(putFlowFile.getRow());
rowPuts.put(rowKeyString, put);
List<PutColumn> columns = sorted.get(rowKeyString);
if (columns == null) {
columns = new ArrayList<>();
sorted.put(rowKeyString, columns);
}
for (final PutColumn column : putFlowFile.getColumns()) {
if (column.getTimestamp() != null) {
put.addColumn(
column.getColumnFamily(),
column.getColumnQualifier(),
column.getTimestamp(),
column.getBuffer());
} else {
put.addColumn(
column.getColumnFamily(),
column.getColumnQualifier(),
column.getBuffer());
}
}
columns.addAll(putFlowFile.getColumns());
}
table.put(new ArrayList<>(rowPuts.values()));
for (final Map.Entry<String, List<PutColumn>> entry : sorted.entrySet()) {
newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue()));
}
table.put(newPuts);
}
}
@Override
public void put(final String tableName, final byte[] rowId, final Collection<PutColumn> columns) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
Put put = new Put(rowId);
for (final PutColumn column : columns) {
put.addColumn(
column.getColumnFamily(),
column.getColumnQualifier(),
column.getBuffer());
}
table.put(put);
table.put(buildPuts(rowId, new ArrayList(columns)));
}
}
@ -398,18 +442,54 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
@Override
public void delete(final String tableName, final byte[] rowId) throws IOException {
delete(tableName, rowId, null);
}
@Override
public void delete(String tableName, byte[] rowId, String visibilityLabel) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
Delete delete = new Delete(rowId);
if (!StringUtils.isEmpty(visibilityLabel)) {
delete.setCellVisibility(new CellVisibility(visibilityLabel));
}
table.delete(delete);
}
}
@Override
public void delete(String tableName, List<byte[]> rowIds) throws IOException {
delete(tableName, rowIds);
}
@Override
public void deleteCells(String tableName, List<DeleteRequest> deletes) throws IOException {
List<Delete> deleteRequests = new ArrayList<>();
for (int index = 0; index < deletes.size(); index++) {
DeleteRequest req = deletes.get(index);
Delete delete = new Delete(req.getRowId())
.addColumn(req.getColumnFamily(), req.getColumnQualifier());
if (!StringUtils.isEmpty(req.getVisibilityLabel())) {
delete.setCellVisibility(new CellVisibility(req.getVisibilityLabel()));
}
deleteRequests.add(delete);
}
batchDelete(tableName, deleteRequests);
}
@Override
public void delete(String tableName, List<byte[]> rowIds, String visibilityLabel) throws IOException {
List<Delete> deletes = new ArrayList<>();
for (int index = 0; index < rowIds.size(); index++) {
deletes.add(new Delete(rowIds.get(index)));
Delete delete = new Delete(rowIds.get(index));
if (!StringUtils.isBlank(visibilityLabel)) {
delete.setCellVisibility(new CellVisibility(visibilityLabel));
}
deletes.add(delete);
}
batchDelete(tableName, deletes);
}
private void batchDelete(String tableName, List<Delete> deletes) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
table.delete(deletes);
}
@ -418,7 +498,11 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
@Override
public void scan(final String tableName, final Collection<Column> columns, final String filterExpression, final long minTime, final ResultHandler handler)
throws IOException {
scan(tableName, columns, filterExpression, minTime, null, handler);
}
@Override
public void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, List<String> visibilityLabels, ResultHandler handler) throws IOException {
Filter filter = null;
if (!StringUtils.isBlank(filterExpression)) {
ParseFilter parseFilter = new ParseFilter();
@ -426,7 +510,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
try (final Table table = connection.getTable(TableName.valueOf(tableName));
final ResultScanner scanner = getResults(table, columns, filter, minTime)) {
final ResultScanner scanner = getResults(table, columns, filter, minTime, visibilityLabels)) {
for (final Result result : scanner) {
final byte[] rowKey = result.getRow();
@ -451,11 +535,11 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
@Override
public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, final ResultHandler handler)
public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, List<String> authorizations, final ResultHandler handler)
throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName));
final ResultScanner scanner = getResults(table, startRow, endRow, columns)) {
final ResultScanner scanner = getResults(table, startRow, endRow, columns, authorizations)) {
for (final Result result : scanner) {
final byte[] rowKey = result.getRow();
@ -482,11 +566,11 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
@Override
public void scan(final String tableName, final String startRow, final String endRow, String filterExpression,
final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed,
final Collection<Column> columns, final ResultHandler handler) throws IOException {
final Collection<Column> columns, List<String> visibilityLabels, final ResultHandler handler) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName));
final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin,
timerangeMax, limitRows, isReversed, columns)) {
timerangeMax, limitRows, isReversed, columns, visibilityLabels)) {
int cnt = 0;
final int lim = limitRows != null ? limitRows : 0;
@ -515,12 +599,11 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
handler.handle(rowKey, resultCells);
}
}
}
//
protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax,
final Integer limitRows, final Boolean isReversed, final Collection<Column> columns) throws IOException {
final Integer limitRows, final Boolean isReversed, final Collection<Column> columns, List<String> authorizations) throws IOException {
final Scan scan = new Scan();
if (!StringUtils.isBlank(startRow)){
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
@ -529,6 +612,9 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8));
}
if (authorizations != null && authorizations.size() > 0) {
scan.setAuthorizations(new Authorizations(authorizations));
}
Filter filter = null;
if (columns != null) {
@ -565,12 +651,16 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
// protected and extracted into separate method for testing
protected ResultScanner getResults(final Table table, final byte[] startRow, final byte[] endRow, final Collection<Column> columns) throws IOException {
protected ResultScanner getResults(final Table table, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, List<String> authorizations) throws IOException {
final Scan scan = new Scan();
scan.setStartRow(startRow);
scan.setStopRow(endRow);
if (columns != null) {
if (authorizations != null && authorizations.size() > 0) {
scan.setAuthorizations(new Authorizations(authorizations));
}
if (columns != null && columns.size() > 0) {
for (Column col : columns) {
if (col.getQualifier() == null) {
scan.addFamily(col.getFamily());
@ -584,7 +674,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
// protected and extracted into separate method for testing
protected ResultScanner getResults(final Table table, final Collection<Column> columns, final Filter filter, final long minTime) throws IOException {
protected ResultScanner getResults(final Table table, final Collection<Column> columns, final Filter filter, final long minTime, List<String> authorizations) throws IOException {
// Create a new scan. We will set the min timerange as the latest timestamp that
// we have seen so far. The minimum timestamp is inclusive, so we will get duplicates.
// We will record any cells that have the latest timestamp, so that when we scan again,
@ -592,6 +682,10 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
final Scan scan = new Scan();
scan.setTimeRange(minTime, Long.MAX_VALUE);
if (authorizations != null && authorizations.size() > 0) {
scan.setAuthorizations(new Authorizations(authorizations));
}
if (filter != null) {
scan.setFilter(filter);
}

View File

@ -54,7 +54,7 @@ import java.util.Set;
@Tags({"hbase", "record", "lookup", "service"})
@CapabilityDescription("A lookup service that retrieves one or more columns from HBase and returns them as a record. The lookup coordinates " +
"must contain 'rowKey' which will be the HBase row id.")
public class HBase_1_1_2_RecordLookupService extends AbstractControllerService implements LookupService<Record> {
public class HBase_1_1_2_RecordLookupService extends AbstractControllerService implements LookupService<Record>, VisibilityLabelService {
static final String ROW_KEY_KEY = "rowKey";
private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
@ -98,6 +98,7 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(HBASE_CLIENT_SERVICE);
props.add(TABLE_NAME);
props.add(AUTHORIZATIONS);
props.add(RETURN_COLUMNS);
props.add(CHARSET);
PROPERTIES = Collections.unmodifiableList(props);
@ -107,6 +108,7 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i
private List<Column> columns;
private Charset charset;
private HBaseClientService hBaseClientService;
private List<String> authorizations;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -127,7 +129,8 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i
final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
try {
final Map<String, Object> values = new HashMap<>();
hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, (byte[] row, ResultCell[] resultCells) -> {
hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, authorizations, (byte[] row, ResultCell[] resultCells) -> {
for (final ResultCell cell : resultCells) {
final byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength());
final byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
@ -167,6 +170,7 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i
this.tableName = context.getProperty(TABLE_NAME).getValue();
this.columns = getColumns(context.getProperty(RETURN_COLUMNS).getValue());
this.charset = Charset.forName(context.getProperty(CHARSET).getValue());
this.authorizations = getAuthorizations(context);
}
@OnDisabled

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.reporting.InitializationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
public class IntegrationTestClientService extends HBase_1_1_2_ClientService {
IntegrationTestClientService(Connection hbaseConnection) {
super();
setConnection(hbaseConnection);
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return new ArrayList<>();
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public interface VisibilityLabelService {
PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder()
.name("hb-lu-authorizations")
.displayName("Authorizations")
.description("The list of authorization tokens to be used with cell visibility if it is enabled. These will be used to " +
"override the default authorization list for the user accessing HBase.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
default List<String> getAuthorizations(ConfigurationContext context) {
List<String> tokens = new ArrayList<>();
String authorizationString = context.getProperty(AUTHORIZATIONS).isSet()
? context.getProperty(AUTHORIZATIONS).getValue()
: "";
if (!StringUtils.isEmpty(authorizationString)) {
tokens = Arrays.asList(authorizationString.split(",[\\s]*"));
}
return tokens;
}
}

View File

@ -142,21 +142,19 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService {
return true;
}
@Override
protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection<Column> columns) throws IOException {
protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection<Column> columns, List<String> labels) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
return scanner;
}
@Override
protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime) throws IOException {
protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime, List<String> labels) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
return scanner;
}
@Override
protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax,
final Integer limitRows, final Boolean isReversed, final Collection<Column> columns) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);