NIFI-5956 Option BlockCache HBaseScanProcessor

This closes #3295.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Sandish Kumar 2019-02-07 11:38:34 -06:00 committed by Bryan Bende
parent 4db5446c87
commit 82e2c97782
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
5 changed files with 29 additions and 8 deletions

View File

@ -209,6 +209,17 @@ public class ScanHBase extends AbstractProcessor implements VisibilityFetchSuppo
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
static final PropertyDescriptor BLOCK_CACHE = new PropertyDescriptor.Builder()
.displayName("Block Cache")
.name("block-cache")
.description("The Block Cache to enable/disable block cache on HBase scan.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.required(true)
.defaultValue("true")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original input file will be routed to this destination, even if no rows are retrieved based on provided conditions.")
@ -244,6 +255,7 @@ public class ScanHBase extends AbstractProcessor implements VisibilityFetchSuppo
props.add(JSON_FORMAT);
props.add(ENCODE_CHARSET);
props.add(DECODE_CHARSET);
props.add(BLOCK_CACHE);
properties = Collections.unmodifiableList(props);
}
@ -368,6 +380,8 @@ public class ScanHBase extends AbstractProcessor implements VisibilityFetchSuppo
final Boolean isReversed = context.getProperty(REVERSED_SCAN).asBoolean();
final Boolean blockCache = context.getProperty(BLOCK_CACHE).asBoolean();
final Integer bulkSize = context.getProperty(BULK_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
final List<Column> columns = getColumns(context.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue());
@ -376,6 +390,7 @@ public class ScanHBase extends AbstractProcessor implements VisibilityFetchSuppo
final AtomicReference<Long> rowsPulledHolder = new AtomicReference<>(0L);
final AtomicReference<Long> ffCountHolder = new AtomicReference<>(0L);
ScanHBaseResultHandler handler = new ScanHBaseResultHandler(context, session, flowFile, rowsPulledHolder, ffCountHolder, hBaseClientService, tableName, bulkSize);
try {
hBaseClientService.scan(tableName,
startRow, endRow,
@ -383,6 +398,7 @@ public class ScanHBase extends AbstractProcessor implements VisibilityFetchSuppo
timerangeMin, timerangeMax,
limitRows,
isReversed,
blockCache,
columns,
authorizations,
handler);

View File

@ -176,7 +176,7 @@ public class MockHBaseClientService extends AbstractControllerService implements
@Override
public void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin,
Long timerangeMax, Integer limitRows, Boolean isReversed, Collection<Column> columns, List<String> visibilityLabels, ResultHandler handler)
Long timerangeMax, Integer limitRows, Boolean isReversed, Boolean blockCache, Collection<Column> columns, List<String> visibilityLabels, ResultHandler handler)
throws IOException {
if (throwException) {
throw new IOException("exception");

View File

@ -161,12 +161,13 @@ public interface HBaseClientService extends ControllerService {
* @param timerangeMax the maximum timestamp of cells to return, passed to the HBase scanner timeRange
* @param limitRows the maximum number of rows to be returned by scanner
* @param isReversed whether this scan is a reversed one.
* @param blockCache set to use the block cache option of hbase scan.
* @param columns optional columns to return, if not specified all columns are returned
* @param authorizations optional list of visibility labels that the user should be able to see when communicating with HBase
* @param handler a handler to process rows of the result
*/
void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin, Long timerangeMax, Integer limitRows,
Boolean isReversed, Collection<Column> columns, List<String> authorizations, ResultHandler handler) throws IOException;
Boolean isReversed, Boolean blockCache, Collection<Column> columns, List<String> authorizations, ResultHandler handler) throws IOException;
/**
* Converts the given boolean to it's byte representation.

View File

@ -612,11 +612,11 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
@Override
public void scan(final String tableName, final String startRow, final String endRow, String filterExpression,
final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed,
final Collection<Column> columns, List<String> visibilityLabels, final ResultHandler handler) throws IOException {
final Boolean blockCache, final Collection<Column> columns, List<String> visibilityLabels, final ResultHandler handler) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName));
final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin,
timerangeMax, limitRows, isReversed, columns, visibilityLabels)) {
timerangeMax, limitRows, isReversed, blockCache, columns, visibilityLabels)) {
int cnt = 0;
final int lim = limitRows != null ? limitRows : 0;
@ -649,7 +649,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
//
protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax,
final Integer limitRows, final Boolean isReversed, final Collection<Column> columns, List<String> authorizations) throws IOException {
final Integer limitRows, final Boolean isReversed, final Boolean blockCache, final Collection<Column> columns, List<String> authorizations) throws IOException {
final Scan scan = new Scan();
if (!StringUtils.isBlank(startRow)){
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
@ -693,6 +693,8 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
scan.setReversed(isReversed);
}
scan.setCacheBlocks(blockCache);
return table.getScanner(scan);
}

View File

@ -612,11 +612,11 @@ public class HBase_2_ClientService extends AbstractControllerService implements
@Override
public void scan(final String tableName, final String startRow, final String endRow, String filterExpression,
final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed,
final Collection<Column> columns, List<String> visibilityLabels, final ResultHandler handler) throws IOException {
final Boolean blockCache, final Collection<Column> columns, List<String> visibilityLabels, final ResultHandler handler) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName));
final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin,
timerangeMax, limitRows, isReversed, columns, visibilityLabels)) {
timerangeMax, limitRows, isReversed, blockCache, columns, visibilityLabels)) {
int cnt = 0;
final int lim = limitRows != null ? limitRows : 0;
@ -649,7 +649,7 @@ public class HBase_2_ClientService extends AbstractControllerService implements
//
protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax,
final Integer limitRows, final Boolean isReversed, final Collection<Column> columns, List<String> authorizations) throws IOException {
final Integer limitRows, final Boolean isReversed, final Boolean blockCache, final Collection<Column> columns, List<String> authorizations) throws IOException {
final Scan scan = new Scan();
if (!StringUtils.isBlank(startRow)){
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
@ -693,6 +693,8 @@ public class HBase_2_ClientService extends AbstractControllerService implements
scan.setReversed(isReversed);
}
scan.setCacheBlocks(blockCache);
return table.getScanner(scan);
}