diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
index a44f4d6a0c..7538ea811e 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml
@@ -51,6 +51,7 @@
org.apache.nifinifi-record
+ ${project.version}org.apache.commons
@@ -78,11 +79,17 @@
mockito-alltest
+
org.apache.nifinifi-mock-record-utils1.7.0-SNAPSHOTtest
+
+ org.apache.nifi
+ nifi-record-path
+ ${project.version}
+
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
index db12936c56..d5c9acc6ff 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
@@ -17,14 +17,9 @@
package org.apache.nifi.hbase;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperties;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
@@ -38,9 +33,26 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
/**
* Base class for processors that put data to HBase.
*/
+@DynamicProperties({
+ @DynamicProperty(name = "visibility.", description = "Visibility label for everything under that column family " +
+ "when a specific label for a particular column qualifier is not available.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ value = "visibility label for "
+ ),
+ @DynamicProperty(name = "visibility..", description = "Visibility label for the specified column qualifier " +
+ "qualified by a configured column family.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ value = "visibility label for :."
+ )
+})
public abstract class AbstractPutHBase extends AbstractProcessor {
protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
@@ -131,6 +143,36 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
clientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
}
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ if (propertyDescriptorName.startsWith("visibility.")) {
+ String[] parts = propertyDescriptorName.split("\\.");
+ String displayName;
+ String description;
+
+ if (parts.length == 2) {
+ displayName = String.format("Column Family %s Default Visibility", parts[1]);
+ description = String.format("Default visibility setting for %s", parts[1]);
+ } else if (parts.length == 3) {
+ displayName = String.format("Column Qualifier %s.%s Default Visibility", parts[1], parts[2]);
+ description = String.format("Default visibility setting for %s.%s", parts[1], parts[2]);
+ } else {
+ return null;
+ }
+
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .displayName(displayName)
+ .description(description)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dynamic(true)
+ .build();
+ }
+
+ return null;
+ }
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java
new file mode 100644
index 0000000000..4ca47f8b9d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Scanner;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttributes({
+ @WritesAttribute(attribute = "error.line", description = "The line number of the error."),
+ @WritesAttribute(attribute = "error.msg", description = "The message explaining the error.")
+})
+@Tags({"hbase", "delete", "cell", "cells", "visibility"})
+@CapabilityDescription("This processor allows the user to delete individual HBase cells by specifying one or more lines " +
+ "in the flowfile content that are a sequence composed of row ID, column family, column qualifier and associated visibility labels " +
+ "if visibility labels are enabled and in use. A user-defined separator is used to separate each of these pieces of data on each " +
+ "line, with :::: being the default separator.")
+public class DeleteHBaseCells extends AbstractDeleteHBase {
+ static final PropertyDescriptor SEPARATOR = new PropertyDescriptor.Builder()
+ .name("delete-hbase-cell-separator")
+ .displayName("Separator")
+ .description("Each line of the flowfile content is separated into components for building a delete using this" +
+ "separator. It should be something other than a single colon or a comma because these are values that " +
+ "are associated with columns and visibility labels respectively. To delete a row with ID xyz, column family abc, " +
+ "column qualifier def and visibility label PII&PHI, one would specify xyz::::abc::::def::::PII&PHI given the default " +
+ "value")
+ .required(true)
+ .defaultValue("::::")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ static final String ERROR_LINE = "error.line";
+ static final String ERROR_MSG = "error.msg";
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final List properties = new ArrayList<>();
+ properties.add(HBASE_CLIENT_SERVICE);
+ properties.add(TABLE_NAME);
+ properties.add(SEPARATOR);
+
+ return properties;
+ }
+
+ private FlowFile writeErrorAttributes(int line, String msg, FlowFile file, ProcessSession session) {
+ file = session.putAttribute(file, ERROR_LINE, String.valueOf(line));
+ file = session.putAttribute(file, ERROR_MSG, msg != null ? msg : "");
+ return file;
+ }
+
+ private void logCell(String rowId, String family, String column, String visibility) {
+ StringBuilder sb = new StringBuilder()
+ .append("Assembling cell delete for...\t")
+ .append(String.format("Row ID: %s\t", rowId))
+ .append(String.format("Column Family: %s\t", family))
+ .append(String.format("Column Qualifier: %s\t", column))
+ .append(String.format("Visibility Label: %s", visibility));
+ getLogger().debug(sb.toString());
+ }
+
+ @Override
+ protected void doDelete(ProcessContext context, ProcessSession session) throws Exception {
+ FlowFile input = session.get();
+ if (input == null) {
+ return;
+ }
+
+ final String separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(input).getValue();
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(input).getValue();
+ List rowKeys = new ArrayList<>();
+ int lineNum = 1;
+ try (InputStream is = session.read(input)) {
+ Scanner scanner = new Scanner(is);
+ List deletes = new ArrayList<>();
+ while (scanner.hasNextLine()) {
+ String line = scanner.nextLine().trim();
+ if (line.equals("")) {
+ continue;
+ }
+ String[] parts = line.split(separator);
+ if (parts.length < 3 || parts.length > 4) {
+ final String msg = String.format("Invalid line length. It must have 3 or 4 components. It had %d.", parts.length);
+ input = writeErrorAttributes(lineNum, msg, input, session);
+ session.transfer(input, REL_FAILURE);
+ getLogger().error(msg);
+ return;
+ }
+ String rowId = parts[0];
+ String family = parts[1];
+ String column = parts[2];
+ String visibility = parts.length == 4 ? parts[3] : null;
+
+ DeleteRequest request = new DeleteRequest(rowId.getBytes(), family.getBytes(), column.getBytes(), visibility);
+ deletes.add(request);
+ if (!rowKeys.contains(rowId)) {
+ rowKeys.add(rowId);
+ }
+
+ if (getLogger().isDebugEnabled()) {
+ logCell(rowId, family, column, visibility);
+ }
+
+ lineNum++;
+ }
+ is.close();
+ clientService.deleteCells(tableName, deletes);
+ for (int index = 0; index < rowKeys.size(); index++) { //Could be many row keys in one flowfile.
+ session.getProvenanceReporter().invokeRemoteProcess(input, clientService.toTransitUri(tableName, rowKeys.get(index)));
+ }
+
+ session.transfer(input, REL_SUCCESS);
+ } catch (Exception ex) {
+ input = writeErrorAttributes(lineNum, ex.getMessage(), input, session);
+ session.transfer(input, REL_FAILURE);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java
index 9fe3882e5a..fb978af6cd 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java
@@ -104,6 +104,17 @@ public class DeleteHBaseRow extends AbstractDeleteHBase {
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
+ static final PropertyDescriptor VISIBLITY_LABEL = new PropertyDescriptor.Builder()
+ .name("delete-visibility-label")
+ .displayName("Visibility Label")
+ .description("If visibility labels are enabled, a row cannot be deleted without supplying its visibility label(s) in the delete " +
+ "request. Note: this visibility label will be applied to all cells within the row that is specified. If some cells have " +
+ "different visibility labels, they will not be deleted. When that happens, the failure to delete will be considered a success " +
+ "because HBase does not report it as a failure.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
@Override
protected List getSupportedPropertyDescriptors() {
@@ -112,6 +123,7 @@ public class DeleteHBaseRow extends AbstractDeleteHBase {
properties.add(FLOWFILE_FETCH_COUNT);
properties.add(BATCH_SIZE);
properties.add(KEY_SEPARATOR);
+ properties.add(VISIBLITY_LABEL);
properties.add(CHARSET);
return properties;
@@ -128,10 +140,12 @@ public class DeleteHBaseRow extends AbstractDeleteHBase {
if (flowFiles != null && flowFiles.size() > 0) {
for (int index = 0; index < flowFiles.size(); index++) {
FlowFile flowFile = flowFiles.get(index);
+ final String visibility = context.getProperty(VISIBLITY_LABEL).isSet()
+ ? context.getProperty(VISIBLITY_LABEL).evaluateAttributeExpressions(flowFile).getValue() : null;
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
try {
if (location.equals(ROW_ID_CONTENT.getValue())) {
- flowFile = doDeleteFromContent(flowFile, context, session, tableName, batchSize, charset);
+ flowFile = doDeleteFromContent(flowFile, context, session, tableName, batchSize, charset, visibility);
if (flowFile.getAttribute(RESTART_INDEX) != null) {
session.transfer(flowFile, REL_FAILURE);
} else {
@@ -140,7 +154,7 @@ public class DeleteHBaseRow extends AbstractDeleteHBase {
session.getProvenanceReporter().invokeRemoteProcess(flowFile, transitUrl);
}
} else {
- String transitUrl = doDeleteFromAttribute(flowFile, context, tableName, charset);
+ String transitUrl = doDeleteFromAttribute(flowFile, context, tableName, charset, visibility);
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, transitUrl);
}
@@ -152,14 +166,14 @@ public class DeleteHBaseRow extends AbstractDeleteHBase {
}
}
- private String doDeleteFromAttribute(FlowFile flowFile, ProcessContext context, String tableName, String charset) throws Exception {
+ private String doDeleteFromAttribute(FlowFile flowFile, ProcessContext context, String tableName, String charset, String visibility) throws Exception {
String rowKey = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
- clientService.delete(tableName, rowKey.getBytes(charset));
+ clientService.delete(tableName, rowKey.getBytes(charset), visibility);
return clientService.toTransitUri(tableName, rowKey);
}
- private FlowFile doDeleteFromContent(FlowFile flowFile, ProcessContext context, ProcessSession session, String tableName, int batchSize, String charset) throws Exception {
+ private FlowFile doDeleteFromContent(FlowFile flowFile, ProcessContext context, ProcessSession session, String tableName, int batchSize, String charset, String visibility) throws Exception {
String keySeparator = context.getProperty(KEY_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
final String restartIndex = flowFile.getAttribute(RESTART_INDEX);
@@ -192,13 +206,13 @@ public class DeleteHBaseRow extends AbstractDeleteHBase {
batch.add(parts[index].getBytes(charset));
if (batch.size() == batchSize) {
- clientService.delete(tableName, batch);
+ clientService.delete(tableName, batch, visibility);
batch = new ArrayList<>();
}
last = parts[index];
}
if (batch.size() > 0) {
- clientService.delete(tableName, batch);
+ clientService.delete(tableName, batch, visibility);
}
flowFile = session.removeAttribute(flowFile, RESTART_INDEX);
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java
index d8e4a9a51c..1a29d50bac 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java
@@ -63,7 +63,7 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "hbase.row", description = "A JSON document representing the row. This property is only written when a Destination of flowfile-attributes is selected."),
@WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise")
})
-public class FetchHBaseRow extends AbstractProcessor {
+public class FetchHBaseRow extends AbstractProcessor implements VisibilityFetchSupport {
static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
@@ -179,6 +179,7 @@ public class FetchHBaseRow extends AbstractProcessor {
props.add(TABLE_NAME);
props.add(ROW_ID);
props.add(COLUMNS);
+ props.add(AUTHORIZATIONS);
props.add(DESTINATION);
props.add(JSON_FORMAT);
props.add(JSON_VALUE_ENCODING);
@@ -252,6 +253,8 @@ public class FetchHBaseRow extends AbstractProcessor {
final String destination = context.getProperty(DESTINATION).getValue();
final boolean base64Encode = context.getProperty(JSON_VALUE_ENCODING).getValue().equals(ENCODING_BASE64.getValue());
+ List authorizations = getAuthorizations(context, flowFile);
+
final RowSerializer rowSerializer = base64Encode ? base64RowSerializer : regularRowSerializer;
final FetchHBaseRowHandler handler = destination.equals(DESTINATION_CONTENT.getValue())
@@ -260,7 +263,7 @@ public class FetchHBaseRow extends AbstractProcessor {
final byte[] rowIdBytes = rowId.getBytes(StandardCharsets.UTF_8);
try {
- hBaseClientService.scan(tableName, rowIdBytes, rowIdBytes, columns, handler);
+ hBaseClientService.scan(tableName, rowIdBytes, rowIdBytes, columns, authorizations, handler);
} catch (Exception e) {
getLogger().error("Unable to fetch row {} from {} due to {}", new Object[] {rowId, tableName, e});
session.transfer(handler.getFlowFile(), REL_FAILURE);
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
index 8170bc6e00..582814ea7a 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
@@ -21,7 +21,6 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
-import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@@ -55,6 +54,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
@@ -65,7 +65,6 @@ import org.apache.nifi.hbase.io.JsonRowSerializer;
import org.apache.nifi.hbase.io.RowSerializer;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
-import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.util.ObjectSerDe;
import org.apache.nifi.hbase.util.StringSerDe;
import org.apache.nifi.processor.AbstractProcessor;
@@ -73,7 +72,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@TriggerWhenEmpty
@@ -91,7 +89,7 @@ import org.apache.nifi.processor.util.StandardValidators;
@Stateful(scopes = Scope.CLUSTER, description = "After performing a fetching from HBase, stores a timestamp of the last-modified cell that was found. In addition, it stores the ID of the row(s) "
+ "and the value of each cell that has that timestamp as its modification date. This is stored across the cluster and allows the next fetch to avoid duplicating data, even if this Processor is "
+ "run on Primary Node only and the Primary Node changes.")
-public class GetHBase extends AbstractProcessor {
+public class GetHBase extends AbstractProcessor implements VisibilityFetchSupport {
static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
@@ -149,6 +147,14 @@ public class GetHBase extends AbstractProcessor {
.allowableValues(NONE, CURRENT_TIME)
.defaultValue(NONE.getValue())
.build();
+ static final PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder()
+ .name("hbase-fetch-row-authorizations")
+ .displayName("Authorizations")
+ .description("The list of authorizations to pass to the scanner. This will be ignored if cell visibility labels are not in use.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(Validator.VALID)
+ .build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -172,6 +178,7 @@ public class GetHBase extends AbstractProcessor {
properties.add(DISTRIBUTED_CACHE_SERVICE);
properties.add(TABLE_NAME);
properties.add(COLUMNS);
+ properties.add(AUTHORIZATIONS);
properties.add(FILTER_EXPRESSION);
properties.add(INITIAL_TIMERANGE);
properties.add(CHARSET);
@@ -253,6 +260,9 @@ public class GetHBase extends AbstractProcessor {
final String tableName = context.getProperty(TABLE_NAME).getValue();
final String initialTimeRange = context.getProperty(INITIAL_TIMERANGE).getValue();
final String filterExpression = context.getProperty(FILTER_EXPRESSION).getValue();
+
+ List authorizations = getAuthorizations(context, null);
+
final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
// if the table was changed then remove any previous state
@@ -279,105 +289,97 @@ public class GetHBase extends AbstractProcessor {
final AtomicReference latestTimestampHolder = new AtomicReference<>(minTime);
- hBaseClientService.scan(tableName, columns, filterExpression, minTime, new ResultHandler() {
- @Override
- public void handle(final byte[] rowKey, final ResultCell[] resultCells) {
+ hBaseClientService.scan(tableName, columns, filterExpression, minTime, authorizations, (rowKey, resultCells) -> {
- final String rowKeyString = new String(rowKey, StandardCharsets.UTF_8);
+ final String rowKeyString = new String(rowKey, StandardCharsets.UTF_8);
- // check if latest cell timestamp is equal to our cutoff.
- // if any of the cells have a timestamp later than our cutoff, then we
- // want the row. But if the cell with the latest timestamp is equal to
- // our cutoff, then we want to check if that's one of the cells that
- // we have already seen.
- long latestCellTimestamp = 0L;
+ // check if latest cell timestamp is equal to our cutoff.
+ // if any of the cells have a timestamp later than our cutoff, then we
+ // want the row. But if the cell with the latest timestamp is equal to
+ // our cutoff, then we want to check if that's one of the cells that
+ // we have already seen.
+ long latestCellTimestamp = 0L;
+ for (final ResultCell cell : resultCells) {
+ if (cell.getTimestamp() > latestCellTimestamp) {
+ latestCellTimestamp = cell.getTimestamp();
+ }
+ }
+
+ // we've already seen this.
+ if (latestCellTimestamp < minTime) {
+ getLogger().debug("latest cell timestamp for row {} is {}, which is earlier than the minimum time of {}",
+ new Object[] {rowKeyString, latestCellTimestamp, minTime});
+ return;
+ }
+
+ if (latestCellTimestamp == minTime) {
+ // latest cell timestamp is equal to our minimum time. Check if all cells that have
+ // that timestamp are in our list of previously seen cells.
+ boolean allSeen = true;
for (final ResultCell cell : resultCells) {
- if (cell.getTimestamp() > latestCellTimestamp) {
- latestCellTimestamp = cell.getTimestamp();
+ if (cell.getTimestamp() == latestCellTimestamp) {
+ if (lastResult == null || !lastResult.contains(cell)) {
+ allSeen = false;
+ break;
+ }
}
}
- // we've already seen this.
- if (latestCellTimestamp < minTime) {
- getLogger().debug("latest cell timestamp for row {} is {}, which is earlier than the minimum time of {}",
- new Object[] {rowKeyString, latestCellTimestamp, minTime});
+ if (allSeen) {
+ // we have already seen all of the cells for this row. We do not want to
+ // include this cell in our output.
+ getLogger().debug("all cells for row {} have already been seen", new Object[] { rowKeyString });
return;
}
+ }
- if (latestCellTimestamp == minTime) {
- // latest cell timestamp is equal to our minimum time. Check if all cells that have
- // that timestamp are in our list of previously seen cells.
- boolean allSeen = true;
- for (final ResultCell cell : resultCells) {
- if (cell.getTimestamp() == latestCellTimestamp) {
- if (lastResult == null || !lastResult.contains(cell)) {
- allSeen = false;
- break;
- }
+ // If the latest timestamp of the cell is later than the latest timestamp we have already seen,
+ // we want to keep track of the cells that match this timestamp so that the next time we scan,
+ // we can ignore these cells.
+ if (latestCellTimestamp >= latestTimestampHolder.get()) {
+ // new timestamp, so clear all of the 'matching cells'
+ if (latestCellTimestamp > latestTimestampHolder.get()) {
+ latestTimestampHolder.set(latestCellTimestamp);
+ cellsMatchingTimestamp.clear();
+ }
+
+ for (final ResultCell cell : resultCells) {
+ final long ts = cell.getTimestamp();
+ if (ts == latestCellTimestamp) {
+ final byte[] rowValue = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength() + cell.getRowOffset());
+ final byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset());
+
+ final String rowHash = new String(rowValue, StandardCharsets.UTF_8);
+ Set cellHashes = cellsMatchingTimestamp.get(rowHash);
+ if (cellHashes == null) {
+ cellHashes = new HashSet<>();
+ cellsMatchingTimestamp.put(rowHash, cellHashes);
}
- }
-
- if (allSeen) {
- // we have already seen all of the cells for this row. We do not want to
- // include this cell in our output.
- getLogger().debug("all cells for row {} have already been seen", new Object[] { rowKeyString });
- return;
+ cellHashes.add(new String(cellValue, StandardCharsets.UTF_8));
}
}
+ }
- // If the latest timestamp of the cell is later than the latest timestamp we have already seen,
- // we want to keep track of the cells that match this timestamp so that the next time we scan,
- // we can ignore these cells.
- if (latestCellTimestamp >= latestTimestampHolder.get()) {
- // new timestamp, so clear all of the 'matching cells'
- if (latestCellTimestamp > latestTimestampHolder.get()) {
- latestTimestampHolder.set(latestCellTimestamp);
- cellsMatchingTimestamp.clear();
- }
+ // write the row to a new FlowFile.
+ FlowFile flowFile = session.create();
+ flowFile = session.write(flowFile, out -> serializer.serialize(rowKey, resultCells, out));
- for (final ResultCell cell : resultCells) {
- final long ts = cell.getTimestamp();
- if (ts == latestCellTimestamp) {
- final byte[] rowValue = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength() + cell.getRowOffset());
- final byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset());
+ final Map attributes = new HashMap<>();
+ attributes.put("hbase.table", tableName);
+ attributes.put("mime.type", "application/json");
+ flowFile = session.putAllAttributes(flowFile, attributes);
- final String rowHash = new String(rowValue, StandardCharsets.UTF_8);
- Set cellHashes = cellsMatchingTimestamp.get(rowHash);
- if (cellHashes == null) {
- cellHashes = new HashSet<>();
- cellsMatchingTimestamp.put(rowHash, cellHashes);
- }
- cellHashes.add(new String(cellValue, StandardCharsets.UTF_8));
- }
- }
- }
+ session.getProvenanceReporter().receive(flowFile, hBaseClientService.toTransitUri(tableName, rowKeyString));
+ session.transfer(flowFile, REL_SUCCESS);
+ getLogger().debug("Received {} from HBase with row key {}", new Object[]{flowFile, rowKeyString});
- // write the row to a new FlowFile.
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- serializer.serialize(rowKey, resultCells, out);
- }
- });
+ // we could potentially have a huge number of rows. If we get to 500, go ahead and commit the
+ // session so that we can avoid buffering tons of FlowFiles without ever sending any out.
+ long rowsPulled = rowsPulledHolder.get();
+ rowsPulledHolder.set(++rowsPulled);
- final Map attributes = new HashMap<>();
- attributes.put("hbase.table", tableName);
- attributes.put("mime.type", "application/json");
- flowFile = session.putAllAttributes(flowFile, attributes);
-
- session.getProvenanceReporter().receive(flowFile, hBaseClientService.toTransitUri(tableName, rowKeyString));
- session.transfer(flowFile, REL_SUCCESS);
- getLogger().debug("Received {} from HBase with row key {}", new Object[]{flowFile, rowKeyString});
-
- // we could potentially have a huge number of rows. If we get to 500, go ahead and commit the
- // session so that we can avoid buffering tons of FlowFiles without ever sending any out.
- long rowsPulled = rowsPulledHolder.get();
- rowsPulledHolder.set(++rowsPulled);
-
- if (++rowsPulled % getBatchSize() == 0) {
- session.commit();
- }
+ if (++rowsPulled % getBatchSize() == 0) {
+ session.commit();
}
});
@@ -636,5 +638,4 @@ public class GetHBase extends AbstractProcessor {
return new ScanResult(timestamp, matchingCellHashes);
}
}
-
}
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
index b04908023d..8e060b56ab 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
@@ -17,10 +17,10 @@
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
@@ -30,12 +30,9 @@ import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StringUtils;
-import java.io.IOException;
-import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
@@ -44,6 +41,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import static org.apache.nifi.hbase.util.VisibilityUtil.pickVisibilityString;
+
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@@ -82,6 +81,8 @@ public class PutHBaseCell extends AbstractPutHBase {
final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue();
final String timestampValue = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
+ final String visibilityStringToUse = pickVisibilityString(columnFamily, columnQualifier, flowFile, context);
+
final Long timestamp;
if (!StringUtils.isBlank(timestampValue)) {
try {
@@ -96,16 +97,15 @@ public class PutHBaseCell extends AbstractPutHBase {
final byte[] buffer = new byte[(int) flowFile.getSize()];
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- StreamUtils.fillBuffer(in, buffer);
- }
- });
+ session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
+ PutColumn column = StringUtils.isEmpty(visibilityStringToUse)
+ ? new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
+ columnQualifier.getBytes(StandardCharsets.UTF_8), buffer, timestamp)
+ : new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
+ columnQualifier.getBytes(StandardCharsets.UTF_8), buffer, timestamp, visibilityStringToUse);
- final Collection columns = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
- columnQualifier.getBytes(StandardCharsets.UTF_8), buffer, timestamp));
+ final Collection columns = Collections.singletonList(column);
byte[] rowKeyBytes = getRow(row,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue());
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
index bee188a37c..643c5d8866 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
@@ -37,11 +37,9 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.BufferedInputStream;
-import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -52,6 +50,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.nifi.hbase.util.VisibilityUtil.pickVisibilityString;
+
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@@ -185,12 +185,9 @@ public class PutHBaseJSON extends AbstractPutHBase {
final ObjectMapper mapper = new ObjectMapper();
final AtomicReference rootNodeRef = new AtomicReference<>(null);
try {
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- try (final InputStream bufferedIn = new BufferedInputStream(in)) {
- rootNodeRef.set(mapper.readTree(bufferedIn));
- }
+ session.read(flowFile, in -> {
+ try (final InputStream bufferedIn = new BufferedInputStream(in)) {
+ rootNodeRef.set(mapper.readTree(bufferedIn));
}
});
} catch (final ProcessException pe) {
@@ -256,7 +253,13 @@ public class PutHBaseJSON extends AbstractPutHBase {
final byte[] colFamBytes = columnFamily.getBytes(StandardCharsets.UTF_8);
final byte[] colQualBytes = fieldName.getBytes(StandardCharsets.UTF_8);
final byte[] colValBytes = fieldValueHolder.get();
- columns.add(new PutColumn(colFamBytes, colQualBytes, colValBytes, timestamp));
+
+ final String visibilityStringToUse = pickVisibilityString(columnFamily, fieldName, flowFile, context);
+ PutColumn column = StringUtils.isEmpty(visibilityStringToUse)
+ ? new PutColumn(colFamBytes, colQualBytes, colValBytes, timestamp)
+ : new PutColumn(colFamBytes, colQualBytes, colValBytes, timestamp, visibilityStringToUse);
+
+ columns.add(column);
}
}
}
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
index 30701c7ca4..e89c6dc7d6 100755
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
@@ -23,20 +23,28 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.hbase.util.VisibilityUtil;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
@@ -47,6 +55,7 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
@EventDriven
@@ -75,6 +84,17 @@ public class PutHBaseRecord extends AbstractPutHBase {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ protected static final PropertyDescriptor DEFAULT_VISIBILITY_STRING = new PropertyDescriptor.Builder()
+ .name("hbase-default-vis-string")
+ .displayName("Default Visibility String")
+ .description("When using visibility labels, any value set in this field will be applied to all cells that are written unless " +
+ "an attribute with the convention \"visibility.COLUMN_FAMILY.COLUMN_QUALIFIER\" is present on the flowfile. If this field " +
+ "is left blank, it will be assumed that no visibility is to be set unless visibility-related attributes are set. NOTE: " +
+ "this configuration will have no effect on your data if you have not enabled visibility labels in the HBase cluster.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(Validator.VALID)
+ .build();
protected static final String FAIL_VALUE = "Fail";
protected static final String WARN_VALUE = "Warn";
@@ -142,6 +162,16 @@ public class PutHBaseRecord extends AbstractPutHBase {
.allowableValues(NULL_FIELD_EMPTY, NULL_FIELD_SKIP)
.build();
+ protected static final PropertyDescriptor VISIBILITY_RECORD_PATH = new PropertyDescriptor.Builder()
+ .name("put-hb-rec-visibility-record-path")
+ .displayName("Visibility String Record Path Root")
+ .description("A record path that points to part of the record which contains a path to a mapping of visibility strings to record paths")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .build();
+
+ protected RecordPathCache recordPathCache;
+
@Override
public final List getSupportedPropertyDescriptors() {
final List properties = new ArrayList<>();
@@ -152,6 +182,8 @@ public class PutHBaseRecord extends AbstractPutHBase {
properties.add(ROW_ID_ENCODING_STRATEGY);
properties.add(NULL_FIELD_STRATEGY);
properties.add(COLUMN_FAMILY);
+ properties.add(DEFAULT_VISIBILITY_STRING);
+ properties.add(VISIBILITY_RECORD_PATH);
properties.add(TIMESTAMP_FIELD_NAME);
properties.add(BATCH_SIZE);
properties.add(COMPLEX_FIELD_STRATEGY);
@@ -177,6 +209,12 @@ public class PutHBaseRecord extends AbstractPutHBase {
return columns;
}
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ recordPathCache = new RecordPathCache(4);
+ super.onScheduled(context);
+ }
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
@@ -195,6 +233,12 @@ public class PutHBaseRecord extends AbstractPutHBase {
final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
+ final String recordPathText = context.getProperty(VISIBILITY_RECORD_PATH).getValue();
+
+ RecordPath recordPath = null;
+ if (recordPathCache != null && !StringUtils.isEmpty(recordPathText)) {
+ recordPath = recordPathCache.getCompiled(recordPathText);
+ }
final long start = System.nanoTime();
int index = 0;
@@ -215,7 +259,7 @@ public class PutHBaseRecord extends AbstractPutHBase {
}
while ((record = reader.nextRecord()) != null) {
- PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily,
+ PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), recordPath, flowFile, rowFieldName, columnFamily,
timestampFieldName, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
if (putFlowFile.getColumns().size() == 0) {
continue;
@@ -340,13 +384,14 @@ public class PutHBaseRecord extends AbstractPutHBase {
static final byte[] EMPTY = "".getBytes();
- protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, String rowFieldName,
+ protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, RecordPath recordPath, FlowFile flowFile, String rowFieldName,
String columnFamily, String timestampFieldName, String fieldEncodingStrategy, String rowEncodingStrategy,
String complexFieldStrategy)
throws PutCreationFailedInvokedException {
PutFlowFile retVal = null;
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String nullStrategy = context.getProperty(NULL_FIELD_STRATEGY).getValue();
+ final String defaultVisibility = context.getProperty(DEFAULT_VISIBILITY_STRING).evaluateAttributeExpressions(flowFile).getValue();
boolean asString = STRING_ENCODING_VALUE.equals(fieldEncodingStrategy);
@@ -368,9 +413,18 @@ public class PutHBaseRecord extends AbstractPutHBase {
timestamp = null;
}
+ RecordField visField = null;
+ Map visSettings = null;
+ if (recordPath != null) {
+ final RecordPathResult result = recordPath.evaluate(record);
+ FieldValue fv = result.getSelectedFields().findFirst().get();
+ visField = fv.getField();
+ visSettings = (Map)fv.getValue();
+ }
+
List columns = new ArrayList<>();
for (String name : schema.getFieldNames()) {
- if (name.equals(rowFieldName) || name.equals(timestampFieldName)) {
+ if (name.equals(rowFieldName) || name.equals(timestampFieldName) || (visField != null && name.equals(visField.getFieldName()))) {
continue;
}
@@ -386,7 +440,20 @@ public class PutHBaseRecord extends AbstractPutHBase {
if (fieldValueBytes != null) {
- columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp));
+
+ String visString = (visField != null && visSettings != null && visSettings.containsKey(name))
+ ? (String)visSettings.get(name) : defaultVisibility;
+
+ //TODO: factor this into future enhancements to how complex records are handled.
+ if (StringUtils.isBlank(visString)) {
+ visString = VisibilityUtil.pickVisibilityString(columnFamily, name, flowFile, context);
+ }
+
+ PutColumn column = !StringUtils.isEmpty(visString)
+ ? new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp, visString)
+ : new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp);
+
+ columns.add(column);
}
}
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
index 4abd470ef1..251e8acdbc 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
@@ -69,7 +69,7 @@ import org.apache.nifi.processor.util.StandardValidators;
+ "Could be null (not present) if transfered to FAILURE")
})
-public class ScanHBase extends AbstractProcessor {
+public class ScanHBase extends AbstractProcessor implements VisibilityFetchSupport {
//enhanced regex for columns to allow "-" in column qualifier names
static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
static final String nl = System.lineSeparator();
@@ -231,6 +231,7 @@ public class ScanHBase extends AbstractProcessor {
List props = new ArrayList<>();
props.add(HBASE_CLIENT_SERVICE);
props.add(TABLE_NAME);
+ props.add(AUTHORIZATIONS);
props.add(START_ROW);
props.add(END_ROW);
props.add(TIME_RANGE_MIN);
@@ -326,6 +327,8 @@ public class ScanHBase extends AbstractProcessor {
return;
}
+ final List authorizations = getAuthorizations(context, flowFile);
+
final String startRow = context.getProperty(START_ROW).evaluateAttributeExpressions(flowFile).getValue();
final String endRow = context.getProperty(END_ROW).evaluateAttributeExpressions(flowFile).getValue();
@@ -381,6 +384,7 @@ public class ScanHBase extends AbstractProcessor {
limitRows,
isReversed,
columns,
+ authorizations,
handler);
} catch (Exception e) {
if (handler.getFlowFile() != null){
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/VisibilityFetchSupport.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/VisibilityFetchSupport.java
new file mode 100644
index 0000000000..3cbc0bfd80
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/VisibilityFetchSupport.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public interface VisibilityFetchSupport {
+ PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder()
+ .name("hbase-fetch-row-authorizations")
+ .displayName("Authorizations")
+ .description("The list of authorizations to pass to the scanner. This will be ignored if cell visibility labels are not in use.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(Validator.VALID)
+ .build();
+
+ default List getAuthorizations(ProcessContext context, FlowFile flowFile) {
+ final String authorizationString = context.getProperty(AUTHORIZATIONS).isSet()
+ ? context.getProperty(AUTHORIZATIONS).evaluateAttributeExpressions(flowFile).getValue().trim()
+ : "";
+ List authorizations = new ArrayList<>();
+ if (!StringUtils.isBlank(authorizationString)) {
+ String[] parts = authorizationString.split(",");
+ for (String part : parts) {
+ authorizations.add(part.trim());
+ }
+ }
+
+ return authorizations;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/VisibilityUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/VisibilityUtil.java
new file mode 100644
index 0000000000..602427c307
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/util/VisibilityUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+
+public class VisibilityUtil {
+ public static String pickVisibilityString(String columnFamily, String columnQualifier, FlowFile flowFile, ProcessContext context) {
+ if (StringUtils.isBlank(columnFamily)) {
+ return null;
+ }
+ String lookupKey = String.format("visibility.%s%s%s", columnFamily, !StringUtils.isBlank(columnQualifier) ? "." : "", columnQualifier);
+ String fromAttribute = flowFile.getAttribute(lookupKey);
+
+ if (fromAttribute == null && !StringUtils.isBlank(columnQualifier)) {
+ String lookupKeyFam = String.format("visibility.%s", columnFamily);
+ fromAttribute = flowFile.getAttribute(lookupKeyFam);
+ }
+
+ if (fromAttribute != null) {
+ return fromAttribute;
+ } else {
+ PropertyValue descriptor = context.getProperty(lookupKey);
+ if (descriptor == null || !descriptor.isSet()) {
+ descriptor = context.getProperty(String.format("visibility.%s", columnFamily));
+ }
+
+ String retVal = descriptor != null ? descriptor.evaluateAttributeExpressions(flowFile).getValue() : null;
+
+ return retVal;
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/docs.org.apache.nifi.hbase.DeleteHBaseCells/additionalDetails.html b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/docs.org.apache.nifi.hbase.DeleteHBaseCells/additionalDetails.html
new file mode 100644
index 0000000000..b1820c087c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/docs.org.apache.nifi.hbase.DeleteHBaseCells/additionalDetails.html
@@ -0,0 +1,39 @@
+
+
+
+
+
+ DeleteHBaseCells
+
+
+
+
+
+
Overview
+
+ This processor provides the ability to do deletes against one or more HBase cells, without having to delete the entire row. It should
+ be used as the primary delete method when visibility labels are in use and the cells have different visibility labels. Each line in
+ the flowfile body is a fully qualified cell (row id, column family, column qualifier and visibility labels if applicable). The separator
+ that separates each piece of the fully qualified cell is configurable, but :::: is the default value.
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/docs.org.apache.nifi.hbase.PutHBaseCell/additionalDetails.html b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/docs.org.apache.nifi.hbase.PutHBaseCell/additionalDetails.html
new file mode 100644
index 0000000000..7fb35d722d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/docs.org.apache.nifi.hbase.PutHBaseCell/additionalDetails.html
@@ -0,0 +1,41 @@
+
+
+
+
+
+ PutHBaseCell
+
+
+
+
+
+
Visibility Labels
+
+ This processor provides the ability to attach visibility labels to HBase Puts that it generates, if visibility labels
+ are enabled on the HBase cluster. There are two ways to enable this:
+
+
+
Attributes on the flowfile.
+
Dynamic properties added to the processor.
+
+
When the dynamic properties are defined on the processor, they will be the default value, but can be overridden by
+attributes set on the flowfile. The naming convention for both (property name and attribute name) is:
+
+
visibility.COLUMN_FAMILY - every column qualifier under the column family will get this.
+
visibility.COLUMN_FAMILY.COLUMN_VISIBILITY - the qualified column qualifier will be assigned this value.
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/docs.org.apache.nifi.hbase.PutHBaseJSON/additionalDetails.html b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/docs.org.apache.nifi.hbase.PutHBaseJSON/additionalDetails.html
new file mode 100644
index 0000000000..09c49957c9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/docs.org.apache.nifi.hbase.PutHBaseJSON/additionalDetails.html
@@ -0,0 +1,41 @@
+
+
+
+
+
+ PutHBaseJSON
+
+
+
+
+
+
Visibility Labels
+
+ This processor provides the ability to attach visibility labels to HBase Puts that it generates, if visibility labels
+ are enabled on the HBase cluster. There are two ways to enable this:
+
+
+
Attributes on the flowfile.
+
Dynamic properties added to the processor.
+
+
When the dynamic properties are defined on the processor, they will be the default value, but can be overridden by
+ attributes set on the flowfile. The naming convention for both (property name and attribute name) is:
+
+
visibility.COLUMN_FAMILY - every column qualifier under the column family will get this.
+
visibility.COLUMN_FAMILY.COLUMN_VISIBILITY - the qualified column qualifier will be assigned this value.
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/docs.org.apache.nifi.hbase.PutHBaseRecord/additionalDetails.html b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/docs.org.apache.nifi.hbase.PutHBaseRecord/additionalDetails.html
new file mode 100644
index 0000000000..975f3ead6e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/docs.org.apache.nifi.hbase.PutHBaseRecord/additionalDetails.html
@@ -0,0 +1,113 @@
+
+
+
+
+
+ PutHBaseRecord
+
+
+
+
+
+
Visibility Labels:
+
+ PutHBaseRecord provides the ability to define a branch of the record as a map which contains an association between
+ column qualifiers and the visibility label that they should have assigned to them.
+
Example is for row with ID patient-1 and column family patient
+
+
+
+
Row
+
Value
+
Visibility
+
+
+
+
+
patient-1:patient:name
+
John Smith
+
OPEN
+
+
+
patient-1:patient:address
+
12345 Main Street
+
PII
+
+
+
patient-1:patient:
+
1970-01-01
+
PII
+
+
+
patient-1:patient:attendingPhysician
+
Dr. Jane Doe
+
PII&PHI
+
+
+
patient-1:patient:accountNumber
+
1234-567-890-ABC
+
PII&BILLING
+
+
+
+
In addition to the branch for visibility labels, the same methods used for PutHBaseCell and PutHBaseJSON can be used.
+They are:
+
+
Attributes on the flowfile.
+
Dynamic properties added to the processor.
+
+
When the dynamic properties are defined on the processor, they will be the default value, but can be overridden by
+ attributes set on the flowfile. The naming convention for both (property name and attribute name) is:
+
+
visibility.COLUMN_FAMILY - every column qualifier under the column family will get this.
+
visibility.COLUMN_FAMILY.COLUMN_VISIBILITY - the qualified column qualifier will be assigned this value.
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 37a668ccde..a55064ba52 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+org.apache.nifi.hbase.DeleteHBaseCells
org.apache.nifi.hbase.DeleteHBaseRow
org.apache.nifi.hbase.GetHBase
org.apache.nifi.hbase.PutHBaseCell
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/DeleteTestBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/DeleteTestBase.java
new file mode 100644
index 0000000000..f0149ce698
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/DeleteTestBase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class DeleteTestBase {
+ protected TestRunner runner;
+ protected MockHBaseClientService hBaseClient;
+
+ public void setup(Class clz) throws InitializationException {
+ runner = TestRunners.newTestRunner(clz);
+
+ hBaseClient = new MockHBaseClientService();
+ runner.addControllerService("hbaseClient", hBaseClient);
+ runner.enableControllerService(hBaseClient);
+
+ runner.setProperty(DeleteHBaseRow.TABLE_NAME, "nifi");
+ runner.setProperty(DeleteHBaseRow.HBASE_CLIENT_SERVICE, "hbaseClient");
+ }
+
+ List populateTable(int max) {
+ List ids = new ArrayList<>();
+ for (int index = 0; index < max; index++) {
+ String uuid = UUID.randomUUID().toString();
+ ids.add(uuid);
+ Map cells = new HashMap<>();
+ cells.put("test", UUID.randomUUID().toString());
+ hBaseClient.addResult(uuid, cells, System.currentTimeMillis());
+ }
+
+ return ids;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index b3b65f163f..052c5b7c16 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -72,6 +72,9 @@ public class MockHBaseClientService extends AbstractControllerService implements
throw new UnsupportedOperationException();
}
+ @Override
+ public void delete(String tableName, byte[] rowId, String visibilityLabel) throws IOException { }
+
private int deletePoint = 0;
public void setDeletePoint(int deletePoint) {
this.deletePoint = deletePoint;
@@ -98,6 +101,18 @@ public class MockHBaseClientService extends AbstractControllerService implements
}
}
+ @Override
+ public void deleteCells(String tableName, List deletes) throws IOException {
+ for (DeleteRequest req : deletes) {
+ results.remove(new String(req.getRowId()));
+ }
+ }
+
+ @Override
+ public void delete(String tableName, List rowIds, String visibilityLabel) throws IOException {
+ delete(tableName, rowIds);
+ }
+
public int size() {
return results.size();
}
@@ -107,7 +122,7 @@ public class MockHBaseClientService extends AbstractControllerService implements
}
@Override
- public void scan(String tableName, byte[] startRow, byte[] endRow, Collection columns, ResultHandler handler) throws IOException {
+ public void scan(String tableName, byte[] startRow, byte[] endRow, Collection columns, List labels, ResultHandler handler) throws IOException {
if (throwException) {
throw new IOException("exception");
}
@@ -154,9 +169,14 @@ public class MockHBaseClientService extends AbstractControllerService implements
numScans++;
}
+ @Override
+ public void scan(String tableName, Collection columns, String filterExpression, long minTime, List visibilityLabels, ResultHandler handler) throws IOException {
+ scan(tableName, columns, filterExpression, minTime, handler);
+ }
+
@Override
public void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin,
- Long timerangeMax, Integer limitRows, Boolean isReversed, Collection columns, ResultHandler handler)
+ Long timerangeMax, Integer limitRows, Boolean isReversed, Collection columns, List visibilityLabels, ResultHandler handler)
throws IOException {
if (throwException) {
throw new IOException("exception");
@@ -164,8 +184,8 @@ public class MockHBaseClientService extends AbstractControllerService implements
int i = 0;
// pass all the staged data to the handler
- for (final Map.Entry entry : results.entrySet()) {
- if (linesBeforeException>=0 && i++>=linesBeforeException) {
+ for (final Map.Entry entry : results.entrySet()) {
+ if (linesBeforeException >= 0 && i++ >= linesBeforeException) {
throw new IOException("iterating exception");
}
handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue());
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java
new file mode 100644
index 0000000000..ce8e044b02
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseCells.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestDeleteHBaseCells extends DeleteTestBase {
+
+ @Before
+ public void setup() throws InitializationException {
+ super.setup(DeleteHBaseCells.class);
+ }
+
+ @Test
+ public void testSimpleDelete() {
+ final String SEP = "::::";
+ List ids = populateTable(10000);
+ runner.setProperty(DeleteHBaseCells.SEPARATOR, SEP);
+ runner.assertValid();
+ StringBuilder sb = new StringBuilder();
+ for (String id : ids) {
+ sb.append(String.format("%s%sX%sY\n", id, SEP, SEP));
+ }
+ runner.enqueue(sb.toString().trim());
+ runner.run();
+ runner.assertAllFlowFilesTransferred(DeleteHBaseCells.REL_SUCCESS);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java
index fe819dde1d..3315a27179 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java
@@ -19,45 +19,19 @@ package org.apache.nifi.hbase;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
-public class TestDeleteHBaseRow {
- private TestRunner runner;
- private MockHBaseClientService hBaseClient;
+public class TestDeleteHBaseRow extends DeleteTestBase {
@Before
public void setup() throws InitializationException {
- runner = TestRunners.newTestRunner(new DeleteHBaseRow());
-
- hBaseClient = new MockHBaseClientService();
- runner.addControllerService("hbaseClient", hBaseClient);
- runner.enableControllerService(hBaseClient);
-
- runner.setProperty(DeleteHBaseRow.TABLE_NAME, "nifi");
- runner.setProperty(DeleteHBaseRow.HBASE_CLIENT_SERVICE, "hbaseClient");
- }
-
- List populateTable(int max) {
- List ids = new ArrayList<>();
- for (int index = 0; index < max; index++) {
- String uuid = UUID.randomUUID().toString();
- ids.add(uuid);
- Map cells = new HashMap<>();
- cells.put("test", UUID.randomUUID().toString());
- hBaseClient.addResult(uuid, cells, System.currentTimeMillis());
- }
-
- return ids;
+ super.setup(DeleteHBaseRow.class);
}
@Test
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java
index b44c6b094f..5878bca9bd 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestFetchHBaseRow.java
@@ -39,6 +39,7 @@ public class TestFetchHBaseRow {
public void setup() throws InitializationException {
proc = new FetchHBaseRow();
runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
hBaseClientService = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClientService);
@@ -48,6 +49,7 @@ public class TestFetchHBaseRow {
@Test
public void testColumnsValidation() {
+ runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.assertValid();
@@ -78,6 +80,7 @@ public class TestFetchHBaseRow {
public void testNoIncomingFlowFile() {
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
+ runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
runner.run();
runner.assertTransferCount(FetchHBaseRow.REL_FAILURE, 0);
@@ -91,6 +94,7 @@ public class TestFetchHBaseRow {
public void testInvalidTableName() {
runner.setProperty(FetchHBaseRow.TABLE_NAME, "${hbase.table}");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
+ runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
runner.enqueue("trigger flow file");
runner.run();
@@ -106,6 +110,7 @@ public class TestFetchHBaseRow {
public void testInvalidRowId() {
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "${hbase.row}");
+ runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
runner.enqueue("trigger flow file");
runner.run();
@@ -125,7 +130,7 @@ public class TestFetchHBaseRow {
final long ts1 = 123456789;
hBaseClientService.addResult("row1", cells, ts1);
-
+ runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.DESTINATION, FetchHBaseRow.DESTINATION_ATTRIBUTES);
@@ -155,6 +160,7 @@ public class TestFetchHBaseRow {
final long ts1 = 123456789;
hBaseClientService.addResult("row1", cells, ts1);
+ runner.setProperty(FetchHBaseRow.AUTHORIZATIONS, "");
runner.setProperty(FetchHBaseRow.TABLE_NAME, "table1");
runner.setProperty(FetchHBaseRow.ROW_ID, "row1");
runner.setProperty(FetchHBaseRow.COLUMNS, "nifi:cq2");
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
index 24f83e9ae9..484d714ab8 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
@@ -76,6 +76,9 @@ public class TestGetHBase {
runner.setProperty(GetHBase.TABLE_NAME, "nifi");
runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient");
runner.setProperty(GetHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
+ runner.setProperty(GetHBase.AUTHORIZATIONS, "");
+
+ runner.setValidateExpressionUsage(true);
}
@After
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
index ee799e4dd0..14a9bde871 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
@@ -505,6 +505,7 @@ public class TestPutHBaseJSON {
runner.setProperty(PutHBaseJSON.TABLE_NAME, table);
runner.setProperty(PutHBaseJSON.COLUMN_FAMILY, columnFamily);
runner.setProperty(PutHBaseJSON.BATCH_SIZE, batchSize);
+
return runner;
}
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java
index af0da956e1..e817a8643e 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseRecord.java
@@ -64,6 +64,8 @@ public class TestPutHBaseRecord {
}
runner.enableControllerService(parser);
runner.setProperty(PutHBaseRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(PutHBaseRecord.DEFAULT_VISIBILITY_STRING, "");
+ runner.setProperty(PutHBaseRecord.VISIBILITY_RECORD_PATH, "");
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestVisibilityUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestVisibilityUtil.java
new file mode 100644
index 0000000000..261efae2fd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestVisibilityUtil.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.hbase.util.VisibilityUtil;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class TestVisibilityUtil {
+ private TestRunner runner;
+
+ @Before
+ public void setup() throws Exception {
+ runner = TestRunners.newTestRunner(PutHBaseCell.class);
+ final MockHBaseClientService hBaseClient = new MockHBaseClientService();
+ runner.addControllerService("hbaseClient", hBaseClient);
+ runner.enableControllerService(hBaseClient);
+ runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
+ runner.setProperty(PutHBaseCell.TABLE_NAME, "test");
+ runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, "test");
+ runner.setProperty(PutHBaseCell.COLUMN_FAMILY, "test");
+ runner.assertValid();
+ }
+
+ @Test
+ public void testAllPresentOnFlowfile() {
+ runner.setProperty("visibility.test.test", "U&PII");
+
+ MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
+ ff.putAttributes(new HashMap(){{
+ put("visibility.test.test", "U&PII&PHI");
+ }});
+ ProcessContext context = runner.getProcessContext();
+
+ String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context);
+
+ Assert.assertNotNull(label);
+ Assert.assertEquals("U&PII&PHI", label);
+ }
+
+ @Test
+ public void testOnlyColumnFamilyOnFlowfile() {
+ runner.setProperty("visibility.test", "U&PII");
+
+ MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
+ ff.putAttributes(new HashMap(){{
+ put("visibility.test", "U&PII&PHI");
+ }});
+ ProcessContext context = runner.getProcessContext();
+
+ String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context);
+
+ Assert.assertNotNull(label);
+ Assert.assertEquals("U&PII&PHI", label);
+ }
+
+ @Test
+ public void testInvalidAttributes() {
+ runner.setProperty("visibility.test", "U&PII");
+
+ MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
+ ff.putAttributes(new HashMap(){{
+ put("visibility..test", "U&PII&PHI");
+ }});
+ ProcessContext context = runner.getProcessContext();
+
+ String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context);
+
+ Assert.assertNotNull(label);
+ Assert.assertEquals("U&PII", label);
+ }
+
+ @Test
+ public void testColumnFamilyAttributeOnly() {
+ MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
+ ff.putAttributes(new HashMap(){{
+ put("visibility.test", "U&PII");
+ }});
+ ProcessContext context = runner.getProcessContext();
+
+ String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context);
+
+ Assert.assertNotNull(label);
+ Assert.assertEquals("U&PII", label);
+ }
+
+ @Test
+ public void testNoAttributes() {
+ runner.setProperty("visibility.test", "U&PII");
+
+ MockFlowFile ff = new MockFlowFile(System.currentTimeMillis());
+ ProcessContext context = runner.getProcessContext();
+
+ String label = VisibilityUtil.pickVisibilityString("test", "test", ff, context);
+
+ Assert.assertNotNull(label);
+ Assert.assertEquals("U&PII", label);
+
+ runner.setProperty("visibility.test.test", "U&PII&PHI");
+ label = VisibilityUtil.pickVisibilityString("test", "test", ff, context);
+
+ Assert.assertNotNull(label);
+ Assert.assertEquals("U&PII&PHI", label);
+
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/DeleteRequest.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/DeleteRequest.java
new file mode 100644
index 0000000000..da3fd5c302
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/DeleteRequest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+/**
+ * Encapsulates the information for a delete operation.
+ */
+public class DeleteRequest {
+ private byte[] rowId;
+ private byte[] columnFamily;
+ private byte[] columnQualifier;
+ private String visibilityLabel;
+
+ public DeleteRequest(byte[] rowId, byte[] columnFamily, byte[] columnQualifier, String visibilityLabel) {
+ this.rowId = rowId;
+ this.columnFamily = columnFamily;
+ this.columnQualifier = columnQualifier;
+ this.visibilityLabel = visibilityLabel;
+ }
+
+ public byte[] getRowId() {
+ return rowId;
+ }
+
+ public byte[] getColumnFamily() {
+ return columnFamily;
+ }
+
+ public byte[] getColumnQualifier() {
+ return columnQualifier;
+ }
+
+ public String getVisibilityLabel() {
+ return visibilityLabel;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append(String.format("Row ID: %s\n", new String(rowId)))
+ .append(String.format("Column Family: %s\n", new String(columnFamily)))
+ .append(String.format("Column Qualifier: %s\n", new String(columnQualifier)))
+ .append(visibilityLabel != null ? String.format("Visibility Label: %s", visibilityLabel) : "")
+ .toString();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
index cff3fb61d4..cd3d85157c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
@@ -118,6 +118,18 @@ public interface HBaseClientService extends ControllerService {
*/
void delete(String tableName, byte[] rowId) throws IOException;
+ /**
+ * Deletes the given row on HBase. Uses the supplied visibility label for all cells in the delete.
+ * It will fail if HBase cannot delete a cell because the visibility label on the cell does not match the specified
+ * label.
+ *
+ * @param tableName the name of an HBase table
+ * @param rowId the id of the row to delete
+ * @param visibilityLabel a visibility label to apply to the delete
+ * @throws IOException thrown when there are communication errors with HBase
+ */
+ void delete(String tableName, byte[] rowId, String visibilityLabel) throws IOException;
+
/**
* Deletes a list of rows in HBase. All cells are deleted.
*
@@ -127,6 +139,25 @@ public interface HBaseClientService extends ControllerService {
void delete(String tableName, List rowIds) throws IOException;
+ /**
+ * Deletes a list of cells from HBase. This is intended to be used with granular delete operations.
+ *
+ * @param tableName the name of an HBase table.
+ * @param deletes a list of DeleteRequest objects.
+ * @throws IOException thrown when there are communication errors with HBase
+ */
+ void deleteCells(String tableName, List deletes) throws IOException;
+
+ /**
+ * Deletes a list of rows in HBase. All cells that match the visibility label are deleted.
+ *
+ * @param tableName the name of an HBase table
+ * @param rowIds a list of rowIds to send in a batch delete
+ * @param visibilityLabel a visibility label expression
+ */
+
+ void delete(String tableName, List rowIds, String visibilityLabel) throws IOException;
+
/**
* Scans the given table using the optional filter criteria and passing each result to the provided handler.
*
@@ -139,6 +170,19 @@ public interface HBaseClientService extends ControllerService {
*/
void scan(String tableName, Collection columns, String filterExpression, long minTime, ResultHandler handler) throws IOException;
+ /**
+ * Scans the given table using the optional filter criteria and passing each result to the provided handler.
+ *
+ * @param tableName the name of an HBase table to scan
+ * @param columns optional columns to return, if not specified all columns are returned
+ * @param filterExpression optional filter expression, if not specified no filtering is performed
+ * @param minTime the minimum timestamp of cells to return, passed to the HBase scanner timeRange
+ * @param authorizations the visibility labels to apply to the scanner.
+ * @param handler a handler to process rows of the result set
+ * @throws IOException thrown when there are communication errors with HBase
+ */
+ void scan(String tableName, Collection columns, String filterExpression, long minTime, List authorizations, ResultHandler handler) throws IOException;
+
/**
* Scans the given table for the given rowId and passes the result to the handler.
*
@@ -149,7 +193,7 @@ public interface HBaseClientService extends ControllerService {
* @param handler a handler to process rows of the result
* @throws IOException thrown when there are communication errors with HBase
*/
- void scan(String tableName, byte[] startRow, byte[] endRow, Collection columns, ResultHandler handler) throws IOException;
+ void scan(String tableName, byte[] startRow, byte[] endRow, Collection columns, List authorizations, ResultHandler handler) throws IOException;
/**
* Scans the given table for the given range of row keys or time rage and passes the result to a handler.
@@ -163,10 +207,11 @@ public interface HBaseClientService extends ControllerService {
* @param limitRows the maximum number of rows to be returned by scanner
* @param isReversed whether this scan is a reversed one.
* @param columns optional columns to return, if not specified all columns are returned
+ * @param authorizations optional list of visibility labels that the user should be able to see when communicating with HBase
* @param handler a handler to process rows of the result
*/
void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin, Long timerangeMax, Integer limitRows,
- Boolean isReversed, Collection columns, ResultHandler handler) throws IOException;
+ Boolean isReversed, Collection columns, List authorizations, ResultHandler handler) throws IOException;
/**
* Converts the given boolean to it's byte representation.
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
index b29e032712..77691659b1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
@@ -24,18 +24,24 @@ public class PutColumn {
private final byte[] columnFamily;
private final byte[] columnQualifier;
private final byte[] buffer;
+ private final String visibility;
private final Long timestamp;
public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer) {
- this(columnFamily, columnQualifier, buffer, null);
+ this(columnFamily, columnQualifier, buffer, null, null);
}
public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer, final Long timestamp) {
+ this(columnFamily, columnQualifier, buffer, timestamp, null);
+ }
+
+ public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer, final Long timestamp, final String visibility) {
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
this.buffer = buffer;
this.timestamp = timestamp;
+ this.visibility = visibility;
}
public byte[] getColumnFamily() {
@@ -50,6 +56,10 @@ public class PutColumn {
return buffer;
}
+ public String getVisibility() {
+ return visibility;
+ }
+
public Long getTimestamp() {
return timestamp;
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
index 7c10800a64..634ebd93c2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
@@ -51,7 +51,7 @@ import org.apache.nifi.processor.util.StandardValidators;
@CapabilityDescription("Provides the ability to use an HBase table as a cache, in place of a DistributedMapCache."
+ " Uses a HBase_1_1_2_ClientService controller to communicate with HBase.")
-public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService implements DistributedMapCacheClient {
+public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService implements DistributedMapCacheClient, VisibilityLabelService {
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
@@ -90,6 +90,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
protected List getSupportedPropertyDescriptors() {
final List descriptors = new ArrayList<>();
descriptors.add(HBASE_CACHE_TABLE_NAME);
+ descriptors.add(AUTHORIZATIONS);
descriptors.add(HBASE_CLIENT_SERVICE);
descriptors.add(HBASE_COLUMN_FAMILY);
descriptors.add(HBASE_COLUMN_QUALIFIER);
@@ -106,6 +107,8 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
private volatile String hBaseColumnQualifier;
private volatile byte[] hBaseColumnQualifierBytes;
+ private List authorizations;
+
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException{
hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
@@ -116,6 +119,8 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
hBaseColumnFamilyBytes = hBaseColumnFamily.getBytes(StandardCharsets.UTF_8);
hBaseColumnQualifierBytes = hBaseColumnQualifier.getBytes(StandardCharsets.UTF_8);
+
+ authorizations = getAuthorizations(context);
}
private byte[] serialize(final T value, final Serializer serializer) throws IOException {
@@ -158,7 +163,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
final List columnsList = new ArrayList(0);
- hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, handler);
+ hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
return (handler.numRows() > 0);
}
@@ -190,7 +195,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
final List columnsList = new ArrayList(0);
- hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, handler);
+ hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
if (handler.numRows() > 1) {
throw new IOException("Found multiple rows in HBase for key");
} else if(handler.numRows() == 1) {
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index ccbed607f1..01022e3dc8 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -25,14 +25,17 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.ParseFilter;
+import org.apache.hadoop.hbase.security.visibility.Authorizations;
+import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.DynamicProperty;
@@ -62,6 +65,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@@ -115,6 +119,10 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
return connection;
}
+ protected void setConnection(Connection connection) {
+ this.connection = connection;
+ }
+
@Override
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
kerberosConfigFile = config.getKerberosConfigurationFile();
@@ -315,6 +323,8 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
+ private String principal = null;
+
protected Configuration getConfigurationFromFiles(final String configFiles) {
final Configuration hbaseConfig = HBaseConfiguration.create();
if (StringUtils.isNotBlank(configFiles)) {
@@ -336,51 +346,85 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
}
+ private static final byte[] EMPTY_VIS_STRING;
+
+ static {
+ try {
+ EMPTY_VIS_STRING = "".getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private List buildPuts(byte[] rowKey, List columns) {
+ List retVal = new ArrayList<>();
+
+ try {
+ Put put = null;
+
+ for (final PutColumn column : columns) {
+ if (put == null || (put.getCellVisibility() == null && column.getVisibility() != null) || ( put.getCellVisibility() != null
+ && !put.getCellVisibility().getExpression().equals(column.getVisibility())
+ )) {
+ put = new Put(rowKey);
+
+ if (column.getVisibility() != null) {
+ put.setCellVisibility(new CellVisibility(column.getVisibility()));
+ }
+ retVal.add(put);
+ }
+
+ if (column.getTimestamp() != null) {
+ put.addColumn(
+ column.getColumnFamily(),
+ column.getColumnQualifier(),
+ column.getTimestamp(),
+ column.getBuffer());
+ } else {
+ put.addColumn(
+ column.getColumnFamily(),
+ column.getColumnQualifier(),
+ column.getBuffer());
+ }
+ }
+ } catch (DeserializationException de) {
+ getLogger().error("Error writing cell visibility statement.", de);
+ throw new RuntimeException(de);
+ }
+
+ return retVal;
+ }
+
@Override
public void put(final String tableName, final Collection puts) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
// Create one Put per row....
- final Map rowPuts = new HashMap<>();
+ final Map> sorted = new HashMap<>();
+ final List newPuts = new ArrayList<>();
+
for (final PutFlowFile putFlowFile : puts) {
- //this is used for the map key as a byte[] does not work as a key.
final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8);
- Put put = rowPuts.get(rowKeyString);
- if (put == null) {
- put = new Put(putFlowFile.getRow());
- rowPuts.put(rowKeyString, put);
+ List columns = sorted.get(rowKeyString);
+ if (columns == null) {
+ columns = new ArrayList<>();
+ sorted.put(rowKeyString, columns);
}
- for (final PutColumn column : putFlowFile.getColumns()) {
- if (column.getTimestamp() != null) {
- put.addColumn(
- column.getColumnFamily(),
- column.getColumnQualifier(),
- column.getTimestamp(),
- column.getBuffer());
- } else {
- put.addColumn(
- column.getColumnFamily(),
- column.getColumnQualifier(),
- column.getBuffer());
- }
- }
+ columns.addAll(putFlowFile.getColumns());
}
- table.put(new ArrayList<>(rowPuts.values()));
+ for (final Map.Entry> entry : sorted.entrySet()) {
+ newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue()));
+ }
+
+ table.put(newPuts);
}
}
@Override
public void put(final String tableName, final byte[] rowId, final Collection columns) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
- Put put = new Put(rowId);
- for (final PutColumn column : columns) {
- put.addColumn(
- column.getColumnFamily(),
- column.getColumnQualifier(),
- column.getBuffer());
- }
- table.put(put);
+ table.put(buildPuts(rowId, new ArrayList(columns)));
}
}
@@ -398,18 +442,54 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
@Override
public void delete(final String tableName, final byte[] rowId) throws IOException {
+ delete(tableName, rowId, null);
+ }
+
+ @Override
+ public void delete(String tableName, byte[] rowId, String visibilityLabel) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
Delete delete = new Delete(rowId);
+ if (!StringUtils.isEmpty(visibilityLabel)) {
+ delete.setCellVisibility(new CellVisibility(visibilityLabel));
+ }
table.delete(delete);
}
}
@Override
public void delete(String tableName, List rowIds) throws IOException {
+ delete(tableName, rowIds);
+ }
+
+ @Override
+ public void deleteCells(String tableName, List deletes) throws IOException {
+ List deleteRequests = new ArrayList<>();
+ for (int index = 0; index < deletes.size(); index++) {
+ DeleteRequest req = deletes.get(index);
+ Delete delete = new Delete(req.getRowId())
+ .addColumn(req.getColumnFamily(), req.getColumnQualifier());
+ if (!StringUtils.isEmpty(req.getVisibilityLabel())) {
+ delete.setCellVisibility(new CellVisibility(req.getVisibilityLabel()));
+ }
+ deleteRequests.add(delete);
+ }
+ batchDelete(tableName, deleteRequests);
+ }
+
+ @Override
+ public void delete(String tableName, List rowIds, String visibilityLabel) throws IOException {
List deletes = new ArrayList<>();
for (int index = 0; index < rowIds.size(); index++) {
- deletes.add(new Delete(rowIds.get(index)));
+ Delete delete = new Delete(rowIds.get(index));
+ if (!StringUtils.isBlank(visibilityLabel)) {
+ delete.setCellVisibility(new CellVisibility(visibilityLabel));
+ }
+ deletes.add(delete);
}
+ batchDelete(tableName, deletes);
+ }
+
+ private void batchDelete(String tableName, List deletes) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
table.delete(deletes);
}
@@ -418,7 +498,11 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
@Override
public void scan(final String tableName, final Collection columns, final String filterExpression, final long minTime, final ResultHandler handler)
throws IOException {
+ scan(tableName, columns, filterExpression, minTime, null, handler);
+ }
+ @Override
+ public void scan(String tableName, Collection columns, String filterExpression, long minTime, List visibilityLabels, ResultHandler handler) throws IOException {
Filter filter = null;
if (!StringUtils.isBlank(filterExpression)) {
ParseFilter parseFilter = new ParseFilter();
@@ -426,7 +510,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
try (final Table table = connection.getTable(TableName.valueOf(tableName));
- final ResultScanner scanner = getResults(table, columns, filter, minTime)) {
+ final ResultScanner scanner = getResults(table, columns, filter, minTime, visibilityLabels)) {
for (final Result result : scanner) {
final byte[] rowKey = result.getRow();
@@ -451,11 +535,11 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
@Override
- public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection columns, final ResultHandler handler)
+ public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection columns, List authorizations, final ResultHandler handler)
throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName));
- final ResultScanner scanner = getResults(table, startRow, endRow, columns)) {
+ final ResultScanner scanner = getResults(table, startRow, endRow, columns, authorizations)) {
for (final Result result : scanner) {
final byte[] rowKey = result.getRow();
@@ -482,11 +566,11 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
@Override
public void scan(final String tableName, final String startRow, final String endRow, String filterExpression,
final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed,
- final Collection columns, final ResultHandler handler) throws IOException {
+ final Collection columns, List visibilityLabels, final ResultHandler handler) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName));
final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin,
- timerangeMax, limitRows, isReversed, columns)) {
+ timerangeMax, limitRows, isReversed, columns, visibilityLabels)) {
int cnt = 0;
final int lim = limitRows != null ? limitRows : 0;
@@ -515,12 +599,11 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
handler.handle(rowKey, resultCells);
}
}
-
}
//
protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax,
- final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException {
+ final Integer limitRows, final Boolean isReversed, final Collection columns, List authorizations) throws IOException {
final Scan scan = new Scan();
if (!StringUtils.isBlank(startRow)){
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
@@ -529,6 +612,9 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8));
}
+ if (authorizations != null && authorizations.size() > 0) {
+ scan.setAuthorizations(new Authorizations(authorizations));
+ }
Filter filter = null;
if (columns != null) {
@@ -565,12 +651,16 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
// protected and extracted into separate method for testing
- protected ResultScanner getResults(final Table table, final byte[] startRow, final byte[] endRow, final Collection columns) throws IOException {
+ protected ResultScanner getResults(final Table table, final byte[] startRow, final byte[] endRow, final Collection columns, List authorizations) throws IOException {
final Scan scan = new Scan();
scan.setStartRow(startRow);
scan.setStopRow(endRow);
- if (columns != null) {
+ if (authorizations != null && authorizations.size() > 0) {
+ scan.setAuthorizations(new Authorizations(authorizations));
+ }
+
+ if (columns != null && columns.size() > 0) {
for (Column col : columns) {
if (col.getQualifier() == null) {
scan.addFamily(col.getFamily());
@@ -584,7 +674,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
// protected and extracted into separate method for testing
- protected ResultScanner getResults(final Table table, final Collection columns, final Filter filter, final long minTime) throws IOException {
+ protected ResultScanner getResults(final Table table, final Collection columns, final Filter filter, final long minTime, List authorizations) throws IOException {
// Create a new scan. We will set the min timerange as the latest timestamp that
// we have seen so far. The minimum timestamp is inclusive, so we will get duplicates.
// We will record any cells that have the latest timestamp, so that when we scan again,
@@ -592,6 +682,10 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
final Scan scan = new Scan();
scan.setTimeRange(minTime, Long.MAX_VALUE);
+ if (authorizations != null && authorizations.size() > 0) {
+ scan.setAuthorizations(new Authorizations(authorizations));
+ }
+
if (filter != null) {
scan.setFilter(filter);
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
index ddaa76561d..12fb6cf556 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
@@ -54,7 +54,7 @@ import java.util.Set;
@Tags({"hbase", "record", "lookup", "service"})
@CapabilityDescription("A lookup service that retrieves one or more columns from HBase and returns them as a record. The lookup coordinates " +
"must contain 'rowKey' which will be the HBase row id.")
-public class HBase_1_1_2_RecordLookupService extends AbstractControllerService implements LookupService {
+public class HBase_1_1_2_RecordLookupService extends AbstractControllerService implements LookupService, VisibilityLabelService {
static final String ROW_KEY_KEY = "rowKey";
private static final Set REQUIRED_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
@@ -98,6 +98,7 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i
final List props = new ArrayList<>();
props.add(HBASE_CLIENT_SERVICE);
props.add(TABLE_NAME);
+ props.add(AUTHORIZATIONS);
props.add(RETURN_COLUMNS);
props.add(CHARSET);
PROPERTIES = Collections.unmodifiableList(props);
@@ -107,6 +108,7 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i
private List columns;
private Charset charset;
private HBaseClientService hBaseClientService;
+ private List authorizations;
@Override
protected List getSupportedPropertyDescriptors() {
@@ -127,7 +129,8 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i
final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
try {
final Map values = new HashMap<>();
- hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, (byte[] row, ResultCell[] resultCells) -> {
+
+ hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, authorizations, (byte[] row, ResultCell[] resultCells) -> {
for (final ResultCell cell : resultCells) {
final byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength());
final byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
@@ -167,6 +170,7 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i
this.tableName = context.getProperty(TABLE_NAME).getValue();
this.columns = getColumns(context.getProperty(RETURN_COLUMNS).getValue());
this.charset = Charset.forName(context.getProperty(CHARSET).getValue());
+ this.authorizations = getAuthorizations(context);
}
@OnDisabled
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/IntegrationTestClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/IntegrationTestClientService.java
new file mode 100644
index 0000000000..452f6c2bdd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/IntegrationTestClientService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.reporting.InitializationException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+public class IntegrationTestClientService extends HBase_1_1_2_ClientService {
+ IntegrationTestClientService(Connection hbaseConnection) {
+ super();
+ setConnection(hbaseConnection);
+ }
+
+
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ return new ArrayList<>();
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
+
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/VisibilityLabelService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/VisibilityLabelService.java
new file mode 100644
index 0000000000..9c9ab72be2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/VisibilityLabelService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public interface VisibilityLabelService {
+ PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder()
+ .name("hb-lu-authorizations")
+ .displayName("Authorizations")
+ .description("The list of authorization tokens to be used with cell visibility if it is enabled. These will be used to " +
+ "override the default authorization list for the user accessing HBase.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ default List getAuthorizations(ConfigurationContext context) {
+ List tokens = new ArrayList<>();
+ String authorizationString = context.getProperty(AUTHORIZATIONS).isSet()
+ ? context.getProperty(AUTHORIZATIONS).getValue()
+ : "";
+ if (!StringUtils.isEmpty(authorizationString)) {
+ tokens = Arrays.asList(authorizationString.split(",[\\s]*"));
+ }
+
+ return tokens;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index e4b9280f64..917cb3bed8 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -142,21 +142,19 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService {
return true;
}
- @Override
- protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection columns) throws IOException {
+ protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection columns, List labels) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
return scanner;
}
@Override
- protected ResultScanner getResults(Table table, Collection columns, Filter filter, long minTime) throws IOException {
+ protected ResultScanner getResults(Table table, Collection columns, Filter filter, long minTime, List labels) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
return scanner;
}
- @Override
protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax,
final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);