NIFI-5980 Added HBase_1_1_2_ListLookupService.

NIFI-5980 Added HBase_2_ListLookupService.

This closes #3278.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mike Thomsen 2019-01-29 17:20:13 -05:00 committed by Bryan Bende
parent 4b45e85327
commit b0a93b473b
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
9 changed files with 785 additions and 248 deletions

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
public abstract class AbstractHBaseLookupService extends AbstractControllerService {
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("hbase-client-service")
.displayName("HBase Client Service")
.description("Specifies the HBase Client Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("hb-lu-table-name")
.displayName("Table Name")
.description("The name of the table where look ups will be run.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor RETURN_COLUMNS = new PropertyDescriptor.Builder()
.name("hb-lu-return-cols")
.displayName("Columns")
.description("A comma-separated list of \\\"<colFamily>:<colQualifier>\\\" pairs to return when scanning. " +
"To return all columns for a given family, leave off the qualifier such as \\\"<colFamily1>,<colFamily2>\\\".")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("hb-lu-charset")
.displayName("Character Set")
.description("Specifies the character set used to decode bytes retrieved from HBase.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
static final String ROW_KEY_KEY = "rowKey";
protected static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
static final List<PropertyDescriptor> PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(HBASE_CLIENT_SERVICE);
props.add(TABLE_NAME);
props.add(AUTHORIZATIONS);
props.add(RETURN_COLUMNS);
props.add(CHARSET);
PROPERTIES = Collections.unmodifiableList(props);
}
protected String tableName;
protected List<Column> columns;
protected Charset charset;
protected HBaseClientService hBaseClientService;
protected List<String> authorizations;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
this.hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
this.tableName = context.getProperty(TABLE_NAME).getValue();
this.columns = getColumns(context.getProperty(RETURN_COLUMNS).getValue());
this.charset = Charset.forName(context.getProperty(CHARSET).getValue());
this.authorizations = VisibilityLabelUtils.getAuthorizations(context);
}
@OnDisabled
public void onDisabled() {
this.hBaseClientService = null;
this.tableName = null;
this.columns = null;
this.charset = null;
}
protected List<Column> getColumns(final String columnsValue) {
final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
final List<Column> columnsList = new ArrayList<>();
for (final String column : columns) {
if (column.contains(":")) {
final String[] parts = column.trim().split(":");
final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
columnsList.add(new Column(cf, cq));
} else {
final byte[] cf = column.trim().getBytes(StandardCharsets.UTF_8);
columnsList.add(new Column(cf, null));
}
}
return columnsList;
}
protected Map<String, Object> scan(byte[] rowKeyBytes) throws IOException {
final Map<String, Object> values = new HashMap<>();
hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, authorizations, (byte[] row, ResultCell[] resultCells) -> {
for (final ResultCell cell : resultCells) {
final byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength());
final byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
values.put(new String(qualifier, charset), new String(value, charset));
}
});
return values;
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class HBase_1_1_2_ListLookupService extends AbstractHBaseLookupService implements LookupService<List> {
public static final AllowableValue KEY_LIST = new AllowableValue("key_list", "List of keys",
"Return the row as a list of the column qualifiers (keys)");
public static final AllowableValue VALUE_LIST = new AllowableValue("value_list", "List of values",
"Return the row as a list of the values associated with each column qualifier.");
public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor.Builder()
.name("hb-lu-list-return-type")
.displayName("Return Type")
.description("Choose whether to return a list of the keys or a list of the values for the supplied row key.")
.allowableValues(KEY_LIST, VALUE_LIST)
.defaultValue(KEY_LIST.getValue())
.required(true)
.addValidator(Validator.VALID)
.build();
public static final List<PropertyDescriptor> _PROPERTIES;
static {
List<PropertyDescriptor> _temp = new ArrayList<>();
_temp.addAll(PROPERTIES);
_temp.add(RETURN_TYPE);
_PROPERTIES = Collections.unmodifiableList(_temp);
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return _PROPERTIES;
}
@Override
public Optional<List> lookup(Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates.get(ROW_KEY_KEY) == null) {
return Optional.empty();
}
final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
if (StringUtils.isBlank(rowKey)) {
return Optional.empty();
}
final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
try {
final Map<String, Object> values = scan(rowKeyBytes);
if (values.size() > 0) {
List<String> retVal = returnType.equals(KEY_LIST.getValue())
? new ArrayList<>(values.keySet())
: values.values().stream().map( obj -> obj.toString() ).collect(Collectors.toList());
return Optional.ofNullable(retVal);
} else {
return Optional.empty();
}
} catch (IOException e) {
getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e);
throw new LookupFailureException(e);
}
}
private String returnType;
@OnEnabled
public void onEnabled(ConfigurationContext context) throws InterruptedException, IOException, InitializationException {
super.onEnabled(context);
returnType = context.getProperty(RETURN_TYPE).getValue();
}
@Override
public Class<?> getValueType() {
return List.class;
}
@Override
public Set<String> getRequiredKeys() {
return REQUIRED_KEYS;
}
}

View File

@ -19,17 +19,8 @@ package org.apache.nifi.hbase;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
@ -39,84 +30,17 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
@Tags({"hbase", "record", "lookup", "service"})
@CapabilityDescription("A lookup service that retrieves one or more columns from HBase and returns them as a record. The lookup coordinates " +
"must contain 'rowKey' which will be the HBase row id.")
public class HBase_1_1_2_RecordLookupService extends AbstractControllerService implements LookupService<Record> {
static final String ROW_KEY_KEY = "rowKey";
private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("hbase-client-service")
.displayName("HBase Client Service")
.description("Specifies the HBase Client Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("hb-lu-table-name")
.displayName("Table Name")
.description("The name of the table where look ups will be run.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor RETURN_COLUMNS = new PropertyDescriptor.Builder()
.name("hb-lu-return-cols")
.displayName("Columns")
.description("A comma-separated list of \\\"<colFamily>:<colQualifier>\\\" pairs to return when scanning. " +
"To return all columns for a given family, leave off the qualifier such as \\\"<colFamily1>,<colFamily2>\\\".")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("hb-lu-charset")
.displayName("Character Set")
.description("Specifies the character set used to decode bytes retrieved from HBase.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
static final List<PropertyDescriptor> PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(HBASE_CLIENT_SERVICE);
props.add(TABLE_NAME);
props.add(AUTHORIZATIONS);
props.add(RETURN_COLUMNS);
props.add(CHARSET);
PROPERTIES = Collections.unmodifiableList(props);
}
private String tableName;
private List<Column> columns;
private Charset charset;
private HBaseClientService hBaseClientService;
private List<String> authorizations;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
public class HBase_1_1_2_RecordLookupService extends AbstractHBaseLookupService implements LookupService<Record> {
@Override
public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates.get(ROW_KEY_KEY) == null) {
@ -130,15 +54,7 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i
final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
try {
final Map<String, Object> values = new HashMap<>();
hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, authorizations, (byte[] row, ResultCell[] resultCells) -> {
for (final ResultCell cell : resultCells) {
final byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength());
final byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
values.put(new String(qualifier, charset), new String(value, charset));
}
});
final Map<String, Object> values = scan(rowKeyBytes);
if (values.size() > 0) {
final List<RecordField> fields = new ArrayList<>();
@ -165,43 +81,5 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i
public Set<String> getRequiredKeys() {
return REQUIRED_KEYS;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
this.hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
this.tableName = context.getProperty(TABLE_NAME).getValue();
this.columns = getColumns(context.getProperty(RETURN_COLUMNS).getValue());
this.charset = Charset.forName(context.getProperty(CHARSET).getValue());
this.authorizations = VisibilityLabelUtils.getAuthorizations(context);
}
@OnDisabled
public void onDisabled() {
this.hBaseClientService = null;
this.tableName = null;
this.columns = null;
this.charset = null;
}
private List<Column> getColumns(final String columnsValue) {
final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
final List<Column> columnsList = new ArrayList<>();
for (final String column : columns) {
if (column.contains(":")) {
final String[] parts = column.trim().split(":");
final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
columnsList.add(new Column(cf, cq));
} else {
final byte[] cf = column.trim().getBytes(StandardCharsets.UTF_8);
columnsList.add(new Column(cf, null));
}
}
return columnsList;
}
}

View File

@ -14,4 +14,5 @@
# limitations under the License.
org.apache.nifi.hbase.HBase_1_1_2_ClientService
org.apache.nifi.hbase.HBase_1_1_2_ClientMapCacheService
org.apache.nifi.hbase.HBase_1_1_2_ListLookupService
org.apache.nifi.hbase.HBase_1_1_2_RecordLookupService

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
public class TestHBase_1_1_2_ListLookupService {
static final String TABLE_NAME = "guids";
static final String ROW = "row1";
static final String COLS = "cf1:cq1,cf2:cq2";
private TestRunner runner;
private HBase_1_1_2_ListLookupService lookupService;
private MockHBaseClientService clientService;
private TestRecordLookupProcessor testLookupProcessor;
@Before
public void before() throws Exception {
testLookupProcessor = new TestRecordLookupProcessor();
runner = TestRunners.newTestRunner(testLookupProcessor);
// setup mock HBaseClientService
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(TABLE_NAME));
final KerberosProperties kerberosProperties = new KerberosProperties(new File("src/test/resources/krb5.conf"));
clientService = new MockHBaseClientService(table, "family", kerberosProperties);
runner.addControllerService("clientService", clientService);
runner.setProperty(clientService, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
runner.enableControllerService(clientService);
// setup HBase LookupService
lookupService = new HBase_1_1_2_ListLookupService();
runner.addControllerService("lookupService", lookupService);
runner.setProperty(lookupService, HBase_1_1_2_ListLookupService.HBASE_CLIENT_SERVICE, "clientService");
runner.setProperty(lookupService, HBase_1_1_2_ListLookupService.TABLE_NAME, TABLE_NAME);
runner.enableControllerService(lookupService);
// setup test processor
runner.setProperty(TestRecordLookupProcessor.HBASE_LOOKUP_SERVICE, "lookupService");
runner.setProperty(TestRecordLookupProcessor.HBASE_ROW, ROW);
}
private Optional<List> setupAndRun() throws Exception {
// setup some staged data in the mock client service
final Map<String,String> cells = new HashMap<>();
cells.put("cq1", "v1");
cells.put("cq2", "v2");
clientService.addResult("row1", cells, System.currentTimeMillis());
// run the processor
runner.enqueue("trigger flow file");
runner.run();
runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_SUCCESS);
Map<String, Object> lookup = new HashMap<>();
lookup.put("rowKey", "row1");
return lookupService.lookup(lookup);
}
@Test
public void testLookupKeyList() throws Exception {
Optional<List> results = setupAndRun();
assertTrue(results.isPresent());
List result = results.get();
assertTrue(result.size() == 2);
assertTrue(result.contains("cq1"));
assertTrue(result.contains("cq2"));
}
@Test
public void testLookupValueList() throws Exception {
runner.disableControllerService(lookupService);
runner.setProperty(lookupService, HBase_1_1_2_ListLookupService.RETURN_TYPE, HBase_1_1_2_ListLookupService.VALUE_LIST);
runner.enableControllerService(lookupService);
Optional<List> results = setupAndRun();
assertTrue(results.isPresent());
List result = results.get();
assertTrue(result.size() == 2);
assertTrue(result.contains("v1"));
assertTrue(result.contains("v2"));
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
public abstract class AbstractHBaseLookupService extends AbstractControllerService {
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("hbase-client-service")
.displayName("HBase Client Service")
.description("Specifies the HBase Client Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("hb-lu-table-name")
.displayName("Table Name")
.description("The name of the table where look ups will be run.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor RETURN_COLUMNS = new PropertyDescriptor.Builder()
.name("hb-lu-return-cols")
.displayName("Columns")
.description("A comma-separated list of \\\"<colFamily>:<colQualifier>\\\" pairs to return when scanning. " +
"To return all columns for a given family, leave off the qualifier such as \\\"<colFamily1>,<colFamily2>\\\".")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("hb-lu-charset")
.displayName("Character Set")
.description("Specifies the character set used to decode bytes retrieved from HBase.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
static final String ROW_KEY_KEY = "rowKey";
protected static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
static final List<PropertyDescriptor> PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(HBASE_CLIENT_SERVICE);
props.add(TABLE_NAME);
props.add(AUTHORIZATIONS);
props.add(RETURN_COLUMNS);
props.add(CHARSET);
PROPERTIES = Collections.unmodifiableList(props);
}
protected String tableName;
protected List<Column> columns;
protected Charset charset;
protected HBaseClientService hBaseClientService;
protected List<String> authorizations;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
this.hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
this.tableName = context.getProperty(TABLE_NAME).getValue();
this.columns = getColumns(context.getProperty(RETURN_COLUMNS).getValue());
this.charset = Charset.forName(context.getProperty(CHARSET).getValue());
this.authorizations = VisibilityLabelUtils.getAuthorizations(context);
}
@OnDisabled
public void onDisabled() {
this.hBaseClientService = null;
this.tableName = null;
this.columns = null;
this.charset = null;
}
protected List<Column> getColumns(final String columnsValue) {
final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
final List<Column> columnsList = new ArrayList<>();
for (final String column : columns) {
if (column.contains(":")) {
final String[] parts = column.trim().split(":");
final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
columnsList.add(new Column(cf, cq));
} else {
final byte[] cf = column.trim().getBytes(StandardCharsets.UTF_8);
columnsList.add(new Column(cf, null));
}
}
return columnsList;
}
protected Map<String, Object> scan(byte[] rowKeyBytes) throws IOException {
final Map<String, Object> values = new HashMap<>();
hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, authorizations, (byte[] row, ResultCell[] resultCells) -> {
for (final ResultCell cell : resultCells) {
final byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength());
final byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
values.put(new String(qualifier, charset), new String(value, charset));
}
});
return values;
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class HBase_2_ListLookupService extends AbstractHBaseLookupService implements LookupService<List> {
public static final AllowableValue KEY_LIST = new AllowableValue("key_list", "List of keys",
"Return the row as a list of the column qualifiers (keys)");
public static final AllowableValue VALUE_LIST = new AllowableValue("value_list", "List of values",
"Return the row as a list of the values associated with each column qualifier.");
public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor.Builder()
.name("hb-lu-list-return-type")
.displayName("Return Type")
.description("Choose whether to return a list of the keys or a list of the values for the supplied row key.")
.allowableValues(KEY_LIST, VALUE_LIST)
.defaultValue(KEY_LIST.getValue())
.required(true)
.addValidator(Validator.VALID)
.build();
public static final List<PropertyDescriptor> _PROPERTIES;
static {
List<PropertyDescriptor> _temp = new ArrayList<>();
_temp.addAll(PROPERTIES);
_temp.add(RETURN_TYPE);
_PROPERTIES = Collections.unmodifiableList(_temp);
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return _PROPERTIES;
}
@Override
public Optional<List> lookup(Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates.get(ROW_KEY_KEY) == null) {
return Optional.empty();
}
final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
if (StringUtils.isBlank(rowKey)) {
return Optional.empty();
}
final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
try {
final Map<String, Object> values = scan(rowKeyBytes);
if (values.size() > 0) {
List<String> retVal = returnType.equals(KEY_LIST.getValue())
? new ArrayList<>(values.keySet())
: values.values().stream().map( obj -> obj.toString() ).collect(Collectors.toList());
return Optional.ofNullable(retVal);
} else {
return Optional.empty();
}
} catch (IOException e) {
getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e);
throw new LookupFailureException(e);
}
}
private String returnType;
@OnEnabled
public void onEnabled(ConfigurationContext context) throws InterruptedException, IOException, InitializationException {
super.onEnabled(context);
returnType = context.getProperty(RETURN_TYPE).getValue();
}
@Override
public Class<?> getValueType() {
return List.class;
}
@Override
public Set<String> getRequiredKeys() {
return REQUIRED_KEYS;
}
}

View File

@ -19,17 +19,8 @@ package org.apache.nifi.hbase;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
@ -39,84 +30,17 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
@Tags({"hbase", "record", "lookup", "service"})
@CapabilityDescription("A lookup service that retrieves one or more columns from HBase and returns them as a record. The lookup coordinates " +
"must contain 'rowKey' which will be the HBase row id.")
public class HBase_2_RecordLookupService extends AbstractControllerService implements LookupService<Record> {
static final String ROW_KEY_KEY = "rowKey";
private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("hbase-client-service")
.displayName("HBase Client Service")
.description("Specifies the HBase Client Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("hb-lu-table-name")
.displayName("Table Name")
.description("The name of the table where look ups will be run.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor RETURN_COLUMNS = new PropertyDescriptor.Builder()
.name("hb-lu-return-cols")
.displayName("Columns")
.description("A comma-separated list of \\\"<colFamily>:<colQualifier>\\\" pairs to return when scanning. " +
"To return all columns for a given family, leave off the qualifier such as \\\"<colFamily1>,<colFamily2>\\\".")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("hb-lu-charset")
.displayName("Character Set")
.description("Specifies the character set used to decode bytes retrieved from HBase.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
static final List<PropertyDescriptor> PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(HBASE_CLIENT_SERVICE);
props.add(TABLE_NAME);
props.add(AUTHORIZATIONS);
props.add(RETURN_COLUMNS);
props.add(CHARSET);
PROPERTIES = Collections.unmodifiableList(props);
}
private String tableName;
private List<Column> columns;
private Charset charset;
private HBaseClientService hBaseClientService;
private List<String> authorizations;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
public class HBase_2_RecordLookupService extends AbstractHBaseLookupService implements LookupService<Record> {
@Override
public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates.get(ROW_KEY_KEY) == null) {
@ -130,15 +54,7 @@ public class HBase_2_RecordLookupService extends AbstractControllerService imple
final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
try {
final Map<String, Object> values = new HashMap<>();
hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, authorizations, (byte[] row, ResultCell[] resultCells) -> {
for (final ResultCell cell : resultCells) {
final byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength());
final byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
values.put(new String(qualifier, charset), new String(value, charset));
}
});
final Map<String, Object> values = scan(rowKeyBytes);
if (values.size() > 0) {
final List<RecordField> fields = new ArrayList<>();
@ -165,43 +81,5 @@ public class HBase_2_RecordLookupService extends AbstractControllerService imple
public Set<String> getRequiredKeys() {
return REQUIRED_KEYS;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
this.hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
this.tableName = context.getProperty(TABLE_NAME).getValue();
this.columns = getColumns(context.getProperty(RETURN_COLUMNS).getValue());
this.charset = Charset.forName(context.getProperty(CHARSET).getValue());
this.authorizations = VisibilityLabelUtils.getAuthorizations(context);
}
@OnDisabled
public void onDisabled() {
this.hBaseClientService = null;
this.tableName = null;
this.columns = null;
this.charset = null;
}
private List<Column> getColumns(final String columnsValue) {
final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
final List<Column> columnsList = new ArrayList<>();
for (final String column : columns) {
if (column.contains(":")) {
final String[] parts = column.trim().split(":");
final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
columnsList.add(new Column(cf, cq));
} else {
final byte[] cf = column.trim().getBytes(StandardCharsets.UTF_8);
columnsList.add(new Column(cf, null));
}
}
return columnsList;
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
public class TestHBase_2_ListLookupService {
static final String TABLE_NAME = "guids";
static final String ROW = "row1";
static final String COLS = "cf1:cq1,cf2:cq2";
private TestRunner runner;
private HBase_2_ListLookupService lookupService;
private MockHBaseClientService clientService;
private TestRecordLookupProcessor testLookupProcessor;
@Before
public void before() throws Exception {
testLookupProcessor = new TestRecordLookupProcessor();
runner = TestRunners.newTestRunner(testLookupProcessor);
// setup mock HBaseClientService
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(TABLE_NAME));
final KerberosProperties kerberosProperties = new KerberosProperties(new File("src/test/resources/krb5.conf"));
clientService = new MockHBaseClientService(table, "family", kerberosProperties);
runner.addControllerService("clientService", clientService);
runner.setProperty(clientService, HBase_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
runner.enableControllerService(clientService);
// setup HBase LookupService
lookupService = new HBase_2_ListLookupService();
runner.addControllerService("lookupService", lookupService);
runner.setProperty(lookupService, HBase_2_ListLookupService.HBASE_CLIENT_SERVICE, "clientService");
runner.setProperty(lookupService, HBase_2_RecordLookupService.TABLE_NAME, TABLE_NAME);
runner.enableControllerService(lookupService);
// setup test processor
runner.setProperty(TestRecordLookupProcessor.HBASE_LOOKUP_SERVICE, "lookupService");
runner.setProperty(TestRecordLookupProcessor.HBASE_ROW, ROW);
}
private Optional<List> setupAndRun() throws Exception {
// setup some staged data in the mock client service
final Map<String,String> cells = new HashMap<>();
cells.put("cq1", "v1");
cells.put("cq2", "v2");
clientService.addResult("row1", cells, System.currentTimeMillis());
// run the processor
runner.enqueue("trigger flow file");
runner.run();
runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_SUCCESS);
Map<String, Object> lookup = new HashMap<>();
lookup.put("rowKey", "row1");
return lookupService.lookup(lookup);
}
@Test
public void testLookupKeyList() throws Exception {
Optional<List> results = setupAndRun();
assertTrue(results.isPresent());
List result = results.get();
assertTrue(result.size() == 2);
assertTrue(result.contains("cq1"));
assertTrue(result.contains("cq2"));
}
@Test
public void testLookupValueList() throws Exception {
runner.disableControllerService(lookupService);
runner.setProperty(lookupService, HBase_2_ListLookupService.RETURN_TYPE, HBase_2_ListLookupService.VALUE_LIST);
runner.enableControllerService(lookupService);
Optional<List> results = setupAndRun();
assertTrue(results.isPresent());
List result = results.get();
assertTrue(result.size() == 2);
assertTrue(result.contains("v1"));
assertTrue(result.contains("v2"));
}
}