NIFI-4346 Created a LookupService that uses HBase as its back end.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mike Thomsen 2017-09-01 16:21:51 -04:00 committed by Bryan Bende
parent 39c5c5ab42
commit eb97a68110
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
6 changed files with 373 additions and 1 deletions

View File

@ -32,6 +32,11 @@
<version>1.5.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
@ -49,8 +54,13 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>

View File

@ -99,6 +99,10 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
// Holder of cached Configuration information so validation does not reload the same config over and over
private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
protected Connection getConnection() {
return connection;
}
@Override
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
kerberosConfigFile = config.getKerberosConfigurationFile();
@ -113,9 +117,14 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
props.add(ZOOKEEPER_ZNODE_PARENT);
props.add(HBASE_CLIENT_RETRIES);
props.add(PHOENIX_CLIENT_JAR_LOCATION);
props.addAll(getAdditionalProperties());
this.properties = Collections.unmodifiableList(props);
}
protected List<PropertyDescriptor> getAdditionalProperties() {
return new ArrayList<>();
}
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return new KerberosProperties(kerberosConfigFile);
}

View File

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
@Tags({"hbase", "record", "lookup", "service"})
@CapabilityDescription(
"A lookup service that retrieves one or more columns from HBase based on a supplied rowKey."
)
public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService<Record> {
private static final Set<String> REQUIRED_KEYS = Collections.singleton("rowKey");
public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("hb-lu-table-name")
.displayName("Table Name")
.description("The name of the table where look ups will be run.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder()
.name("hb-lu-return-cfs")
.displayName("Column Families")
.description("The column families that will be returned.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder()
.name("hb-lu-return-qfs")
.displayName("Column Qualifiers")
.description("The column qualifies that will be returned.")
.required(false)
.addValidator(Validator.VALID)
.build();
protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("hb-lu-charset")
.displayName("Character Set")
.description("Specifies the character set of the document data.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
private String tableName;
private List<byte[]> families;
private List<byte[]> qualifiers;
private Charset charset;
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
List<PropertyDescriptor> retVal = new ArrayList<>();
retVal.add(TABLE_NAME);
retVal.add(RETURN_CFS);
retVal.add(RETURN_QFS);
retVal.add(CHARSET);
return retVal;
}
@Override
public Optional<Record> lookup(Map<String, String> coordinates) throws LookupFailureException {
byte[] rowKey = coordinates.get("rowKey").getBytes();
try {
Map<String, Object> values = new HashMap<>();
try (Table table = getConnection().getTable(TableName.valueOf(tableName))) {
Get get = new Get(rowKey);
Result result = table.get(get);
for (byte[] fam : families) {
NavigableMap<byte[], byte[]> map = result.getFamilyMap(fam);
for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
if (qualifiers.contains(entry.getKey()) || qualifiers.size() == 0) {
values.put(new String(entry.getKey(), charset), new String(entry.getValue(), charset));
}
}
}
}
if (values.size() > 0) {
final List<RecordField> fields = new ArrayList<>();
for (String key : values.keySet()) {
fields.add(new RecordField(key, RecordFieldType.STRING.getDataType()));
}
final RecordSchema schema = new SimpleRecordSchema(fields);
return Optional.ofNullable(new MapRecord(schema, values));
} else {
return Optional.empty();
}
} catch (IOException e) {
getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e);
throw new LookupFailureException(e);
}
}
@Override
public Class<?> getValueType() {
return Record.class;
}
@Override
public Set<String> getRequiredKeys() {
return REQUIRED_KEYS;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
super.onEnabled(context);
this.tableName = context.getProperty(TABLE_NAME).getValue();
this.charset = Charset.forName(context.getProperty(CHARSET).getValue());
String families = context.getProperty(RETURN_CFS).getValue();
String[] familiesSplit = families.split(",");
this.families = new ArrayList<>();
for (String fs : familiesSplit) {
this.families.add(fs.trim().getBytes());
}
this.qualifiers = new ArrayList<>();
String quals = context.getProperty(RETURN_QFS).getValue();
if (quals != null && quals.length() > 0) {
String[] qualsSplit = quals.split(",");
for (String q : qualsSplit) {
this.qualifiers.add(q.trim().getBytes());
}
}
}
}

View File

@ -14,3 +14,4 @@
# limitations under the License.
org.apache.nifi.hbase.HBase_1_1_2_ClientService
org.apache.nifi.hbase.HBase_1_1_2_ClientMapCacheService
org.apache.nifi.hbase.HBase_1_1_2_LookupService

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
@Ignore("This is an integration test. It requires a table with the name guids and a column family named property. Example" +
"Docker Compose configuration:" +
" hbase-docker:\n" +
" container_name: hbase-docker\n" +
" image: \"dajobe/hbase\"\n" +
" ports:\n" +
" - 16010:16010\n" +
" - 2181:2181\n" +
" - 60000:60000\n" +
" - 60010:60010\n" +
" - 60020:60020\n" +
" - 60030:60030\n" +
" - 9090:9090\n" +
" - 9095:9095\n" +
" hostname: hbase-docker")
public class TestHBase_1_1_2_LookupService {
TestRunner runner;
HBase_1_1_2_LookupService service;
static final byte[] FAM = "property".getBytes();
static final byte[] QUAL1 = "uuid".getBytes();
static final byte[] QUAL2 = "uuid2".getBytes();
static final String TABLE_NAME = "guids";
@Before
public void before() throws Exception {
runner = TestRunners.newTestRunner(TestLookupProcessor.class);
service = new HBase_1_1_2_LookupService();
runner.addControllerService("lookupService", service);
runner.setProperty(service, HBaseClientService.ZOOKEEPER_QUORUM, "hbase-docker");
runner.setProperty(service, HBaseClientService.ZOOKEEPER_CLIENT_PORT, "2181");
runner.setProperty(service, HBaseClientService.ZOOKEEPER_ZNODE_PARENT, "/hbase");
runner.setProperty(service, HBaseClientService.HBASE_CLIENT_RETRIES, "3");
runner.setProperty(service, HBase_1_1_2_LookupService.TABLE_NAME, TABLE_NAME);
runner.setProperty(service, HBase_1_1_2_LookupService.RETURN_CFS, "property");
runner.setProperty(service, HBase_1_1_2_LookupService.CHARSET, "UTF-8");
}
@After
public void after() throws Exception {
service.shutdown();
}
@Test
public void testSingleLookup() throws Exception {
runner.enableControllerService(service);
runner.assertValid(service);
String uuid = UUID.randomUUID().toString();
String rowKey = String.format("x-y-z-%d", Calendar.getInstance().getTimeInMillis());
PutColumn column = new PutColumn(FAM, QUAL1, uuid.getBytes());
service.put(TABLE_NAME, rowKey.getBytes(), Arrays.asList(column));
Map<String, String> lookup = new HashMap<>();
lookup.put("rowKey", rowKey);
Optional result = service.lookup(lookup);
Assert.assertNotNull("Result was null", result);
Assert.assertNotNull("The value was null", result.get());
MapRecord record = (MapRecord)result.get();
Assert.assertEquals("The value didn't match.", uuid, record.getAsString("uuid"));
}
@Test
public void testMultipleLookup() throws Exception {
runner.enableControllerService(service);
runner.assertValid(service);
String uuid = UUID.randomUUID().toString();
String uuid2 = UUID.randomUUID().toString();
String rowKey = String.format("x-y-z-%d", Calendar.getInstance().getTimeInMillis());
List<PutColumn> columns = new ArrayList<>();
columns.add(new PutColumn(FAM, QUAL1, uuid.getBytes()));
columns.add(new PutColumn(FAM, QUAL2, uuid2.getBytes()));
service.put(TABLE_NAME, rowKey.getBytes(), columns);
Map<String, String> lookup = new HashMap<>();
lookup.put("rowKey", rowKey);
Optional result = service.lookup(lookup);
Assert.assertNotNull("Result was null", result);
Assert.assertNotNull("The value was null", result.get());
Assert.assertTrue("Wrong type.", result.get() instanceof MapRecord);
MapRecord record = (MapRecord)result.get();
Assert.assertEquals("Qual 1 was wrong", uuid, record.getAsString("uuid"));
Assert.assertEquals("Qual 2 was wrong", uuid2, record.getAsString("uuid2"));
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import java.util.ArrayList;
import java.util.List;
public class TestLookupProcessor extends AbstractProcessor {
static final PropertyDescriptor HBASE_LOOKUP_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Lookup Service")
.description("HBaseLookupService")
.identifiesControllerService(HBase_1_1_2_LookupService.class)
.required(true)
.build();
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> propDescs = new ArrayList<>();
propDescs.add(HBASE_LOOKUP_SERVICE);
return propDescs;
}
}