NIFI-4543: Improve HBase processors provenance transit URL

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2237.
This commit is contained in:
Koji Kawamura 2017-10-30 11:03:18 +09:00 committed by Pierre Villard
parent 77a51e1a9e
commit 116c846342
6 changed files with 35 additions and 12 deletions

View File

@ -201,7 +201,7 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
}
protected String getTransitUri(PutFlowFile putFlowFile) {
return "hbase://" + putFlowFile.getTableName() + "/" + new String(putFlowFile.getRow(), StandardCharsets.UTF_8);
return clientService.toTransitUri(putFlowFile.getTableName(), new String(putFlowFile.getRow(), StandardCharsets.UTF_8));
}
protected byte[] getRow(final String row, final String encoding) {

View File

@ -285,10 +285,10 @@ public class FetchHBaseRow extends AbstractProcessor {
handlerFlowFile = session.putAllAttributes(handlerFlowFile, attributes);
final String transitUri = "hbase://" + tableName + "/" + rowId;
if (destination.equals(DESTINATION_CONTENT.getValue())) {
final String transitUri = hBaseClientService.toTransitUri(tableName, rowId);
// Regardless to where the result is written to, emit a fetch event.
session.getProvenanceReporter().fetch(handlerFlowFile, transitUri);
} else {
if (!destination.equals(DESTINATION_CONTENT.getValue())) {
session.getProvenanceReporter().modifyAttributes(handlerFlowFile, "Added attributes to FlowFile from " + transitUri);
}

View File

@ -365,7 +365,7 @@ public class GetHBase extends AbstractProcessor {
attributes.put("mime.type", "application/json");
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, "hbase://" + tableName + "/" + rowKeyString);
session.getProvenanceReporter().receive(flowFile, hBaseClientService.toTransitUri(tableName, rowKeyString));
session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Received {} from HBase with row key {}", new Object[]{flowFile, rowKeyString});

View File

@ -243,12 +243,6 @@ public class PutHBaseRecord extends AbstractPutHBase {
session.getProvenanceReporter().send(flowFile, getTransitUri(pff), details, time);
}
@Override
protected String getTransitUri(PutFlowFile putFlowFile) {
return "hbase://" + putFlowFile.getTableName();
}
@Override
protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) {
return null;

View File

@ -196,4 +196,15 @@ public interface HBaseClientService extends ControllerService {
*/
byte[] toBytesBinary(String s);
/**
* Create a transit URI from the current configuration and the specified table name.
* The default implementation just prepend "hbase://" to the table name and row key, i.e. "hbase://tableName/rowKey".
* @param tableName The name of a HBase table
* @param rowKey The target HBase row key, this can be null or empty string if the operation is not targeted to a specific row
* @return a qualified transit URI which can identify a HBase table row in a HBase cluster
*/
default String toTransitUri(String tableName, String rowKey) {
return "hbase://" + tableName + (rowKey != null && !rowKey.isEmpty() ? "/" + rowKey : "");
}
}

View File

@ -57,6 +57,8 @@ import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@ -81,6 +83,8 @@ import java.util.concurrent.atomic.AtomicReference;
description="These properties will be set on the HBase configuration after loading any provided configuration files.")
public class HBase_1_1_2_ClientService extends AbstractControllerService implements HBaseClientService {
private static final Logger logger = LoggerFactory.getLogger(HBase_1_1_2_ClientService.class);
static final String HBASE_CONF_ZK_QUORUM = "hbase.zookeeper.quorum";
static final String HBASE_CONF_ZK_PORT = "hbase.zookeeper.property.clientPort";
static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent";
@ -535,4 +539,18 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
public byte[] toBytesBinary(String s) {
return Bytes.toBytesBinary(s);
}
@Override
public String toTransitUri(String tableName, String rowKey) {
if (connection == null) {
logger.warn("Connection has not been established, could not create a transit URI. Returning null.");
return null;
}
try {
final String masterAddress = connection.getAdmin().getClusterStatus().getMaster().getHostAndPort();
return "hbase://" + masterAddress + "/" + tableName + (rowKey != null && !rowKey.isEmpty() ? "/" + rowKey : "");
} catch (IOException e) {
throw new RuntimeException("Failed to get HBase Admin interface, due to " + e, e);
}
}
}