mirror of https://github.com/apache/nifi.git
NIFI-4346 Modifying HBase_1_1_2_LookupService to use HBase_1_1_2_ClientService, instead of extend it
This closes #2125. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
eb97a68110
commit
5930c0c212
|
@ -1,172 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.nifi.hbase;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
|
||||||
import org.apache.nifi.components.Validator;
|
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
|
||||||
import org.apache.nifi.lookup.LookupFailureException;
|
|
||||||
import org.apache.nifi.lookup.LookupService;
|
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
|
||||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
|
||||||
import org.apache.nifi.serialization.record.MapRecord;
|
|
||||||
import org.apache.nifi.serialization.record.Record;
|
|
||||||
import org.apache.nifi.serialization.record.RecordField;
|
|
||||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
|
||||||
import org.apache.nifi.serialization.record.RecordSchema;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.charset.Charset;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.NavigableMap;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
@Tags({"hbase", "record", "lookup", "service"})
|
|
||||||
@CapabilityDescription(
|
|
||||||
"A lookup service that retrieves one or more columns from HBase based on a supplied rowKey."
|
|
||||||
)
|
|
||||||
public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService<Record> {
|
|
||||||
private static final Set<String> REQUIRED_KEYS = Collections.singleton("rowKey");
|
|
||||||
|
|
||||||
public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
|
|
||||||
.name("hb-lu-table-name")
|
|
||||||
.displayName("Table Name")
|
|
||||||
.description("The name of the table where look ups will be run.")
|
|
||||||
.required(true)
|
|
||||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder()
|
|
||||||
.name("hb-lu-return-cfs")
|
|
||||||
.displayName("Column Families")
|
|
||||||
.description("The column families that will be returned.")
|
|
||||||
.required(true)
|
|
||||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder()
|
|
||||||
.name("hb-lu-return-qfs")
|
|
||||||
.displayName("Column Qualifiers")
|
|
||||||
.description("The column qualifies that will be returned.")
|
|
||||||
.required(false)
|
|
||||||
.addValidator(Validator.VALID)
|
|
||||||
.build();
|
|
||||||
protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
|
|
||||||
.name("hb-lu-charset")
|
|
||||||
.displayName("Character Set")
|
|
||||||
.description("Specifies the character set of the document data.")
|
|
||||||
.required(true)
|
|
||||||
.defaultValue("UTF-8")
|
|
||||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
private String tableName;
|
|
||||||
private List<byte[]> families;
|
|
||||||
private List<byte[]> qualifiers;
|
|
||||||
private Charset charset;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected List<PropertyDescriptor> getAdditionalProperties() {
|
|
||||||
List<PropertyDescriptor> retVal = new ArrayList<>();
|
|
||||||
retVal.add(TABLE_NAME);
|
|
||||||
retVal.add(RETURN_CFS);
|
|
||||||
retVal.add(RETURN_QFS);
|
|
||||||
retVal.add(CHARSET);
|
|
||||||
return retVal;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Optional<Record> lookup(Map<String, String> coordinates) throws LookupFailureException {
|
|
||||||
byte[] rowKey = coordinates.get("rowKey").getBytes();
|
|
||||||
try {
|
|
||||||
Map<String, Object> values = new HashMap<>();
|
|
||||||
try (Table table = getConnection().getTable(TableName.valueOf(tableName))) {
|
|
||||||
Get get = new Get(rowKey);
|
|
||||||
Result result = table.get(get);
|
|
||||||
|
|
||||||
for (byte[] fam : families) {
|
|
||||||
NavigableMap<byte[], byte[]> map = result.getFamilyMap(fam);
|
|
||||||
for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
|
|
||||||
if (qualifiers.contains(entry.getKey()) || qualifiers.size() == 0) {
|
|
||||||
values.put(new String(entry.getKey(), charset), new String(entry.getValue(), charset));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (values.size() > 0) {
|
|
||||||
final List<RecordField> fields = new ArrayList<>();
|
|
||||||
for (String key : values.keySet()) {
|
|
||||||
fields.add(new RecordField(key, RecordFieldType.STRING.getDataType()));
|
|
||||||
}
|
|
||||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
|
||||||
return Optional.ofNullable(new MapRecord(schema, values));
|
|
||||||
} else {
|
|
||||||
return Optional.empty();
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e);
|
|
||||||
throw new LookupFailureException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Class<?> getValueType() {
|
|
||||||
return Record.class;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Set<String> getRequiredKeys() {
|
|
||||||
return REQUIRED_KEYS;
|
|
||||||
}
|
|
||||||
|
|
||||||
@OnEnabled
|
|
||||||
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
|
|
||||||
super.onEnabled(context);
|
|
||||||
|
|
||||||
this.tableName = context.getProperty(TABLE_NAME).getValue();
|
|
||||||
this.charset = Charset.forName(context.getProperty(CHARSET).getValue());
|
|
||||||
|
|
||||||
String families = context.getProperty(RETURN_CFS).getValue();
|
|
||||||
String[] familiesSplit = families.split(",");
|
|
||||||
this.families = new ArrayList<>();
|
|
||||||
for (String fs : familiesSplit) {
|
|
||||||
this.families.add(fs.trim().getBytes());
|
|
||||||
}
|
|
||||||
this.qualifiers = new ArrayList<>();
|
|
||||||
String quals = context.getProperty(RETURN_QFS).getValue();
|
|
||||||
|
|
||||||
if (quals != null && quals.length() > 0) {
|
|
||||||
String[] qualsSplit = quals.split(",");
|
|
||||||
for (String q : qualsSplit) {
|
|
||||||
this.qualifiers.add(q.trim().getBytes());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,197 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.hbase;
|
||||||
|
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
|
import org.apache.nifi.hbase.scan.Column;
|
||||||
|
import org.apache.nifi.hbase.scan.ResultCell;
|
||||||
|
import org.apache.nifi.lookup.LookupFailureException;
|
||||||
|
import org.apache.nifi.lookup.LookupService;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||||
|
import org.apache.nifi.serialization.record.MapRecord;
|
||||||
|
import org.apache.nifi.serialization.record.Record;
|
||||||
|
import org.apache.nifi.serialization.record.RecordField;
|
||||||
|
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||||
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
import org.apache.nifi.util.StringUtils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
@Tags({"hbase", "record", "lookup", "service"})
|
||||||
|
@CapabilityDescription("A lookup service that retrieves one or more columns from HBase and returns them as a record. The lookup coordinates " +
|
||||||
|
"must contain 'rowKey' which will be the HBase row id.")
|
||||||
|
public class HBase_1_1_2_RecordLookupService extends AbstractControllerService implements LookupService<Record> {
|
||||||
|
|
||||||
|
static final String ROW_KEY_KEY = "rowKey";
|
||||||
|
private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
|
||||||
|
|
||||||
|
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
|
||||||
|
.name("hbase-client-service")
|
||||||
|
.displayName("HBase Client Service")
|
||||||
|
.description("Specifies the HBase Client Controller Service to use for accessing HBase.")
|
||||||
|
.required(true)
|
||||||
|
.identifiesControllerService(HBaseClientService.class)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
|
||||||
|
.name("hb-lu-table-name")
|
||||||
|
.displayName("Table Name")
|
||||||
|
.description("The name of the table where look ups will be run.")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor RETURN_COLUMNS = new PropertyDescriptor.Builder()
|
||||||
|
.name("hb-lu-return-cols")
|
||||||
|
.displayName("Columns")
|
||||||
|
.description("A comma-separated list of \\\"<colFamily>:<colQualifier>\\\" pairs to return when scanning. " +
|
||||||
|
"To return all columns for a given family, leave off the qualifier such as \\\"<colFamily1>,<colFamily2>\\\".")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
|
||||||
|
.name("hb-lu-charset")
|
||||||
|
.displayName("Character Set")
|
||||||
|
.description("Specifies the character set used to decode bytes retrieved from HBase.")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("UTF-8")
|
||||||
|
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final List<PropertyDescriptor> PROPERTIES;
|
||||||
|
static {
|
||||||
|
final List<PropertyDescriptor> props = new ArrayList<>();
|
||||||
|
props.add(HBASE_CLIENT_SERVICE);
|
||||||
|
props.add(TABLE_NAME);
|
||||||
|
props.add(RETURN_COLUMNS);
|
||||||
|
props.add(CHARSET);
|
||||||
|
PROPERTIES = Collections.unmodifiableList(props);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String tableName;
|
||||||
|
private List<Column> columns;
|
||||||
|
private Charset charset;
|
||||||
|
private HBaseClientService hBaseClientService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return PROPERTIES;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<Record> lookup(Map<String, String> coordinates) throws LookupFailureException {
|
||||||
|
final String rowKey = coordinates.get(ROW_KEY_KEY);
|
||||||
|
if (StringUtils.isBlank(rowKey)) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
|
||||||
|
try {
|
||||||
|
final Map<String, Object> values = new HashMap<>();
|
||||||
|
hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, (byte[] row, ResultCell[] resultCells) -> {
|
||||||
|
for (final ResultCell cell : resultCells) {
|
||||||
|
final byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength());
|
||||||
|
final byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
|
||||||
|
values.put(new String(qualifier, charset), new String(value, charset));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (values.size() > 0) {
|
||||||
|
final List<RecordField> fields = new ArrayList<>();
|
||||||
|
for (String key : values.keySet()) {
|
||||||
|
fields.add(new RecordField(key, RecordFieldType.STRING.getDataType()));
|
||||||
|
}
|
||||||
|
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||||
|
return Optional.ofNullable(new MapRecord(schema, values));
|
||||||
|
} else {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e);
|
||||||
|
throw new LookupFailureException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Class<?> getValueType() {
|
||||||
|
return Record.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<String> getRequiredKeys() {
|
||||||
|
return REQUIRED_KEYS;
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnEnabled
|
||||||
|
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
|
||||||
|
this.hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
|
||||||
|
this.tableName = context.getProperty(TABLE_NAME).getValue();
|
||||||
|
this.columns = getColumns(context.getProperty(RETURN_COLUMNS).getValue());
|
||||||
|
this.charset = Charset.forName(context.getProperty(CHARSET).getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnDisabled
|
||||||
|
public void onDisabled() {
|
||||||
|
this.hBaseClientService = null;
|
||||||
|
this.tableName = null;
|
||||||
|
this.columns = null;
|
||||||
|
this.charset = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Column> getColumns(final String columnsValue) {
|
||||||
|
final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
|
||||||
|
|
||||||
|
final List<Column> columnsList = new ArrayList<>();
|
||||||
|
|
||||||
|
for (final String column : columns) {
|
||||||
|
if (column.contains(":")) {
|
||||||
|
final String[] parts = column.trim().split(":");
|
||||||
|
final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
|
||||||
|
final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
|
||||||
|
columnsList.add(new Column(cf, cq));
|
||||||
|
} else {
|
||||||
|
final byte[] cf = column.trim().getBytes(StandardCharsets.UTF_8);
|
||||||
|
columnsList.add(new Column(cf, null));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return columnsList;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -14,4 +14,4 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
org.apache.nifi.hbase.HBase_1_1_2_ClientService
|
org.apache.nifi.hbase.HBase_1_1_2_ClientService
|
||||||
org.apache.nifi.hbase.HBase_1_1_2_ClientMapCacheService
|
org.apache.nifi.hbase.HBase_1_1_2_ClientMapCacheService
|
||||||
org.apache.nifi.hbase.HBase_1_1_2_LookupService
|
org.apache.nifi.hbase.HBase_1_1_2_RecordLookupService
|
||||||
|
|
|
@ -1,133 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.nifi.hbase;
|
|
||||||
|
|
||||||
import org.apache.nifi.hbase.put.PutColumn;
|
|
||||||
import org.apache.nifi.serialization.record.MapRecord;
|
|
||||||
import org.apache.nifi.util.TestRunner;
|
|
||||||
import org.apache.nifi.util.TestRunners;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Calendar;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
@Ignore("This is an integration test. It requires a table with the name guids and a column family named property. Example" +
|
|
||||||
"Docker Compose configuration:" +
|
|
||||||
" hbase-docker:\n" +
|
|
||||||
" container_name: hbase-docker\n" +
|
|
||||||
" image: \"dajobe/hbase\"\n" +
|
|
||||||
" ports:\n" +
|
|
||||||
" - 16010:16010\n" +
|
|
||||||
" - 2181:2181\n" +
|
|
||||||
" - 60000:60000\n" +
|
|
||||||
" - 60010:60010\n" +
|
|
||||||
" - 60020:60020\n" +
|
|
||||||
" - 60030:60030\n" +
|
|
||||||
" - 9090:9090\n" +
|
|
||||||
" - 9095:9095\n" +
|
|
||||||
" hostname: hbase-docker")
|
|
||||||
public class TestHBase_1_1_2_LookupService {
|
|
||||||
|
|
||||||
TestRunner runner;
|
|
||||||
HBase_1_1_2_LookupService service;
|
|
||||||
|
|
||||||
static final byte[] FAM = "property".getBytes();
|
|
||||||
static final byte[] QUAL1 = "uuid".getBytes();
|
|
||||||
static final byte[] QUAL2 = "uuid2".getBytes();
|
|
||||||
|
|
||||||
static final String TABLE_NAME = "guids";
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void before() throws Exception {
|
|
||||||
runner = TestRunners.newTestRunner(TestLookupProcessor.class);
|
|
||||||
service = new HBase_1_1_2_LookupService();
|
|
||||||
runner.addControllerService("lookupService", service);
|
|
||||||
runner.setProperty(service, HBaseClientService.ZOOKEEPER_QUORUM, "hbase-docker");
|
|
||||||
runner.setProperty(service, HBaseClientService.ZOOKEEPER_CLIENT_PORT, "2181");
|
|
||||||
runner.setProperty(service, HBaseClientService.ZOOKEEPER_ZNODE_PARENT, "/hbase");
|
|
||||||
runner.setProperty(service, HBaseClientService.HBASE_CLIENT_RETRIES, "3");
|
|
||||||
runner.setProperty(service, HBase_1_1_2_LookupService.TABLE_NAME, TABLE_NAME);
|
|
||||||
runner.setProperty(service, HBase_1_1_2_LookupService.RETURN_CFS, "property");
|
|
||||||
runner.setProperty(service, HBase_1_1_2_LookupService.CHARSET, "UTF-8");
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void after() throws Exception {
|
|
||||||
service.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSingleLookup() throws Exception {
|
|
||||||
runner.enableControllerService(service);
|
|
||||||
runner.assertValid(service);
|
|
||||||
|
|
||||||
String uuid = UUID.randomUUID().toString();
|
|
||||||
String rowKey = String.format("x-y-z-%d", Calendar.getInstance().getTimeInMillis());
|
|
||||||
|
|
||||||
PutColumn column = new PutColumn(FAM, QUAL1, uuid.getBytes());
|
|
||||||
|
|
||||||
service.put(TABLE_NAME, rowKey.getBytes(), Arrays.asList(column));
|
|
||||||
|
|
||||||
Map<String, String> lookup = new HashMap<>();
|
|
||||||
lookup.put("rowKey", rowKey);
|
|
||||||
Optional result = service.lookup(lookup);
|
|
||||||
|
|
||||||
Assert.assertNotNull("Result was null", result);
|
|
||||||
Assert.assertNotNull("The value was null", result.get());
|
|
||||||
MapRecord record = (MapRecord)result.get();
|
|
||||||
Assert.assertEquals("The value didn't match.", uuid, record.getAsString("uuid"));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMultipleLookup() throws Exception {
|
|
||||||
runner.enableControllerService(service);
|
|
||||||
runner.assertValid(service);
|
|
||||||
|
|
||||||
String uuid = UUID.randomUUID().toString();
|
|
||||||
String uuid2 = UUID.randomUUID().toString();
|
|
||||||
String rowKey = String.format("x-y-z-%d", Calendar.getInstance().getTimeInMillis());
|
|
||||||
|
|
||||||
List<PutColumn> columns = new ArrayList<>();
|
|
||||||
columns.add(new PutColumn(FAM, QUAL1, uuid.getBytes()));
|
|
||||||
columns.add(new PutColumn(FAM, QUAL2, uuid2.getBytes()));
|
|
||||||
|
|
||||||
service.put(TABLE_NAME, rowKey.getBytes(), columns);
|
|
||||||
|
|
||||||
Map<String, String> lookup = new HashMap<>();
|
|
||||||
lookup.put("rowKey", rowKey);
|
|
||||||
Optional result = service.lookup(lookup);
|
|
||||||
|
|
||||||
Assert.assertNotNull("Result was null", result);
|
|
||||||
Assert.assertNotNull("The value was null", result.get());
|
|
||||||
Assert.assertTrue("Wrong type.", result.get() instanceof MapRecord);
|
|
||||||
MapRecord record = (MapRecord)result.get();
|
|
||||||
Assert.assertEquals("Qual 1 was wrong", uuid, record.getAsString("uuid"));
|
|
||||||
Assert.assertEquals("Qual 2 was wrong", uuid2, record.getAsString("uuid2"));
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,123 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.hbase;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.nifi.hadoop.KerberosProperties;
|
||||||
|
import org.apache.nifi.serialization.record.Record;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class TestHBase_1_1_2_RecordLookupService {
|
||||||
|
|
||||||
|
static final String TABLE_NAME = "guids";
|
||||||
|
static final String ROW = "row1";
|
||||||
|
static final String COLS = "cf1:cq1,cf2:cq2";
|
||||||
|
|
||||||
|
private TestRunner runner;
|
||||||
|
private HBase_1_1_2_RecordLookupService lookupService;
|
||||||
|
private MockHBaseClientService clientService;
|
||||||
|
private TestRecordLookupProcessor testLookupProcessor;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws Exception {
|
||||||
|
testLookupProcessor = new TestRecordLookupProcessor();
|
||||||
|
runner = TestRunners.newTestRunner(testLookupProcessor);
|
||||||
|
|
||||||
|
// setup mock HBaseClientService
|
||||||
|
final Table table = Mockito.mock(Table.class);
|
||||||
|
when(table.getName()).thenReturn(TableName.valueOf(TABLE_NAME));
|
||||||
|
|
||||||
|
final KerberosProperties kerberosProperties = new KerberosProperties(new File("src/test/resources/krb5.conf"));
|
||||||
|
clientService = new MockHBaseClientService(table, "family", kerberosProperties);
|
||||||
|
runner.addControllerService("clientService", clientService);
|
||||||
|
runner.setProperty(clientService, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
|
||||||
|
runner.enableControllerService(clientService);
|
||||||
|
|
||||||
|
// setup HBase LookupService
|
||||||
|
lookupService = new HBase_1_1_2_RecordLookupService();
|
||||||
|
runner.addControllerService("lookupService", lookupService);
|
||||||
|
runner.setProperty(lookupService, HBase_1_1_2_RecordLookupService.HBASE_CLIENT_SERVICE, "clientService");
|
||||||
|
runner.setProperty(lookupService, HBase_1_1_2_RecordLookupService.TABLE_NAME, TABLE_NAME);
|
||||||
|
runner.enableControllerService(lookupService);
|
||||||
|
|
||||||
|
// setup test processor
|
||||||
|
runner.setProperty(TestRecordLookupProcessor.HBASE_LOOKUP_SERVICE, "lookupService");
|
||||||
|
runner.setProperty(TestRecordLookupProcessor.HBASE_ROW, ROW);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSuccessfulLookupAllColumns() {
|
||||||
|
// setup some staged data in the mock client service
|
||||||
|
final Map<String,String> cells = new HashMap<>();
|
||||||
|
cells.put("cq1", "v1");
|
||||||
|
cells.put("cq2", "v2");
|
||||||
|
clientService.addResult("row1", cells, System.currentTimeMillis());
|
||||||
|
|
||||||
|
// run the processor
|
||||||
|
runner.enqueue("trigger flow file");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_SUCCESS);
|
||||||
|
|
||||||
|
final List<Record> records = testLookupProcessor.getLookedupRecords();
|
||||||
|
Assert.assertNotNull(records);
|
||||||
|
Assert.assertEquals(1, records.size());
|
||||||
|
|
||||||
|
final Record record = records.get(0);
|
||||||
|
Assert.assertEquals("v1", record.getAsString("cq1"));
|
||||||
|
Assert.assertEquals("v2", record.getAsString("cq2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLookupWithNoResults() {
|
||||||
|
// run the processor
|
||||||
|
runner.enqueue("trigger flow file");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_FAILURE);
|
||||||
|
|
||||||
|
final List<Record> records = testLookupProcessor.getLookedupRecords();
|
||||||
|
Assert.assertNotNull(records);
|
||||||
|
Assert.assertEquals(0, records.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLookupWhenMissingRowKeyCoordinate() {
|
||||||
|
runner.removeProperty(TestRecordLookupProcessor.HBASE_ROW);
|
||||||
|
|
||||||
|
// run the processor
|
||||||
|
runner.enqueue("trigger flow file");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_FAILURE);
|
||||||
|
|
||||||
|
final List<Record> records = testLookupProcessor.getLookedupRecords();
|
||||||
|
Assert.assertNotNull(records);
|
||||||
|
Assert.assertEquals(0, records.size());
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,47 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.nifi.hbase;
|
|
||||||
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public class TestLookupProcessor extends AbstractProcessor {
|
|
||||||
static final PropertyDescriptor HBASE_LOOKUP_SERVICE = new PropertyDescriptor.Builder()
|
|
||||||
.name("HBase Lookup Service")
|
|
||||||
.description("HBaseLookupService")
|
|
||||||
.identifiesControllerService(HBase_1_1_2_LookupService.class)
|
|
||||||
.required(true)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
|
||||||
List<PropertyDescriptor> propDescs = new ArrayList<>();
|
|
||||||
propDescs.add(HBASE_LOOKUP_SERVICE);
|
|
||||||
return propDescs;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,118 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.hbase;
|
||||||
|
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.lookup.LookupFailureException;
|
||||||
|
import org.apache.nifi.lookup.LookupService;
|
||||||
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.serialization.record.Record;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public class TestRecordLookupProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
|
static final PropertyDescriptor HBASE_LOOKUP_SERVICE = new PropertyDescriptor.Builder()
|
||||||
|
.name("HBase Lookup Service")
|
||||||
|
.description("HBaseLookupService")
|
||||||
|
.identifiesControllerService(LookupService.class)
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor HBASE_ROW = new PropertyDescriptor.Builder()
|
||||||
|
.name("HBase Row Id")
|
||||||
|
.description("The Row Id to Lookup.")
|
||||||
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
|
.name("success")
|
||||||
|
.description("All success FlowFiles are routed to this relationship")
|
||||||
|
.build();
|
||||||
|
static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||||
|
.name("failure")
|
||||||
|
.description("All failed FlowFiles are routed to this relationship")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private List<Record> lookedupRecords = new ArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
List<PropertyDescriptor> propDescs = new ArrayList<>();
|
||||||
|
propDescs.add(HBASE_LOOKUP_SERVICE);
|
||||||
|
propDescs.add(HBASE_ROW);
|
||||||
|
return propDescs;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
Set<Relationship> relationships = new HashSet<>();
|
||||||
|
relationships.add(REL_SUCCESS);
|
||||||
|
relationships.add(REL_FAILURE);
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||||
|
FlowFile flowFile = session.get();
|
||||||
|
if (flowFile == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String rowKey = context.getProperty(HBASE_ROW).getValue();
|
||||||
|
|
||||||
|
final Map<String,String> coordinates = new HashMap<>();
|
||||||
|
coordinates.put(HBase_1_1_2_RecordLookupService.ROW_KEY_KEY, rowKey);
|
||||||
|
|
||||||
|
final LookupService<Record> lookupService = context.getProperty(HBASE_LOOKUP_SERVICE).asControllerService(LookupService.class);
|
||||||
|
try {
|
||||||
|
final Optional<Record> record = lookupService.lookup(coordinates);
|
||||||
|
if (record.isPresent()) {
|
||||||
|
lookedupRecords.add(record.get());
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
} else {
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (LookupFailureException e) {
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Record> getLookedupRecords() {
|
||||||
|
return new ArrayList<>(lookedupRecords);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearLookedupRecords() {
|
||||||
|
this.lookedupRecords.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue