From 116c8463428c1fb51bfb7a8adfcf23c32fded964 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Mon, 30 Oct 2017 11:03:18 +0900 Subject: [PATCH] NIFI-4543: Improve HBase processors provenance transit URL Signed-off-by: Pierre Villard This closes #2237. --- .../apache/nifi/hbase/AbstractPutHBase.java | 2 +- .../org/apache/nifi/hbase/FetchHBaseRow.java | 8 ++++---- .../java/org/apache/nifi/hbase/GetHBase.java | 2 +- .../org/apache/nifi/hbase/PutHBaseRecord.java | 6 ------ .../apache/nifi/hbase/HBaseClientService.java | 11 +++++++++++ .../nifi/hbase/HBase_1_1_2_ClientService.java | 18 ++++++++++++++++++ 6 files changed, 35 insertions(+), 12 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java index 237bc03d35..ac24baa7db 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java @@ -201,7 +201,7 @@ public abstract class AbstractPutHBase extends AbstractProcessor { } protected String getTransitUri(PutFlowFile putFlowFile) { - return "hbase://" + putFlowFile.getTableName() + "/" + new String(putFlowFile.getRow(), StandardCharsets.UTF_8); + return clientService.toTransitUri(putFlowFile.getTableName(), new String(putFlowFile.getRow(), StandardCharsets.UTF_8)); } protected byte[] getRow(final String row, final String encoding) { diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java index 54dcec68c7..8d06da9b37 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/FetchHBaseRow.java @@ -285,10 +285,10 @@ public class FetchHBaseRow extends AbstractProcessor { handlerFlowFile = session.putAllAttributes(handlerFlowFile, attributes); - final String transitUri = "hbase://" + tableName + "/" + rowId; - if (destination.equals(DESTINATION_CONTENT.getValue())) { - session.getProvenanceReporter().fetch(handlerFlowFile, transitUri); - } else { + final String transitUri = hBaseClientService.toTransitUri(tableName, rowId); + // Regardless to where the result is written to, emit a fetch event. + session.getProvenanceReporter().fetch(handlerFlowFile, transitUri); + if (!destination.equals(DESTINATION_CONTENT.getValue())) { session.getProvenanceReporter().modifyAttributes(handlerFlowFile, "Added attributes to FlowFile from " + transitUri); } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java index 6002c3cc39..dee37c6f9f 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java @@ -365,7 +365,7 @@ public class GetHBase extends AbstractProcessor { attributes.put("mime.type", "application/json"); flowFile = session.putAllAttributes(flowFile, attributes); - session.getProvenanceReporter().receive(flowFile, "hbase://" + tableName + "/" + rowKeyString); + session.getProvenanceReporter().receive(flowFile, hBaseClientService.toTransitUri(tableName, rowKeyString)); session.transfer(flowFile, REL_SUCCESS); getLogger().debug("Received {} from HBase with row key {}", new Object[]{flowFile, rowKeyString}); diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java index 90b2fdbeea..3e6d62448a 100755 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java @@ -243,12 +243,6 @@ public class PutHBaseRecord extends AbstractPutHBase { session.getProvenanceReporter().send(flowFile, getTransitUri(pff), details, time); } - @Override - protected String getTransitUri(PutFlowFile putFlowFile) { - return "hbase://" + putFlowFile.getTableName(); - } - - @Override protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) { return null; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index c67b0cafc9..0c2a131fd0 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -196,4 +196,15 @@ public interface HBaseClientService extends ControllerService { */ byte[] toBytesBinary(String s); + /** + * Create a transit URI from the current configuration and the specified table name. + * The default implementation just prepend "hbase://" to the table name and row key, i.e. "hbase://tableName/rowKey". + * @param tableName The name of a HBase table + * @param rowKey The target HBase row key, this can be null or empty string if the operation is not targeted to a specific row + * @return a qualified transit URI which can identify a HBase table row in a HBase cluster + */ + default String toTransitUri(String tableName, String rowKey) { + return "hbase://" + tableName + (rowKey != null && !rowKey.isEmpty() ? "/" + rowKey : ""); + } + } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index b2e59c15fc..bc097e4b99 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -57,6 +57,8 @@ import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -81,6 +83,8 @@ import java.util.concurrent.atomic.AtomicReference; description="These properties will be set on the HBase configuration after loading any provided configuration files.") public class HBase_1_1_2_ClientService extends AbstractControllerService implements HBaseClientService { + private static final Logger logger = LoggerFactory.getLogger(HBase_1_1_2_ClientService.class); + static final String HBASE_CONF_ZK_QUORUM = "hbase.zookeeper.quorum"; static final String HBASE_CONF_ZK_PORT = "hbase.zookeeper.property.clientPort"; static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent"; @@ -535,4 +539,18 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme public byte[] toBytesBinary(String s) { return Bytes.toBytesBinary(s); } + + @Override + public String toTransitUri(String tableName, String rowKey) { + if (connection == null) { + logger.warn("Connection has not been established, could not create a transit URI. Returning null."); + return null; + } + try { + final String masterAddress = connection.getAdmin().getClusterStatus().getMaster().getHostAndPort(); + return "hbase://" + masterAddress + "/" + tableName + (rowKey != null && !rowKey.isEmpty() ? "/" + rowKey : ""); + } catch (IOException e) { + throw new RuntimeException("Failed to get HBase Admin interface, due to " + e, e); + } + } }