diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml index bd4dd2ce19..112d088d8b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml @@ -32,6 +32,11 @@ 1.5.0-SNAPSHOT provided + + org.apache.nifi + nifi-lookup-service-api + provided + org.apache.nifi nifi-api @@ -49,8 +54,13 @@ org.apache.nifi nifi-distributed-cache-client-service-api provided - + + org.apache.nifi + nifi-record + provided + + org.apache.hbase hbase-client diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index 53f5834d6f..b2e59c15fc 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -99,6 +99,10 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme // Holder of cached Configuration information so validation does not reload the same config over and over private final AtomicReference validationResourceHolder = new AtomicReference<>(); + protected Connection getConnection() { + return connection; + } + @Override protected void init(ControllerServiceInitializationContext config) throws InitializationException { kerberosConfigFile = config.getKerberosConfigurationFile(); @@ -113,9 +117,14 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme props.add(ZOOKEEPER_ZNODE_PARENT); props.add(HBASE_CLIENT_RETRIES); props.add(PHOENIX_CLIENT_JAR_LOCATION); + props.addAll(getAdditionalProperties()); this.properties = Collections.unmodifiableList(props); } + protected List getAdditionalProperties() { + return new ArrayList<>(); + } + protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { return new KerberosProperties(kerberosConfigFile); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java new file mode 100644 index 0000000000..5f47c3139d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.LookupService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.Set; + +@Tags({"hbase", "record", "lookup", "service"}) +@CapabilityDescription( + "A lookup service that retrieves one or more columns from HBase based on a supplied rowKey." +) +public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService { + private static final Set REQUIRED_KEYS = Collections.singleton("rowKey"); + + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("hb-lu-table-name") + .displayName("Table Name") + .description("The name of the table where look ups will be run.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder() + .name("hb-lu-return-cfs") + .displayName("Column Families") + .description("The column families that will be returned.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder() + .name("hb-lu-return-qfs") + .displayName("Column Qualifiers") + .description("The column qualifies that will be returned.") + .required(false) + .addValidator(Validator.VALID) + .build(); + protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("hb-lu-charset") + .displayName("Character Set") + .description("Specifies the character set of the document data.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + private String tableName; + private List families; + private List qualifiers; + private Charset charset; + + @Override + protected List getAdditionalProperties() { + List retVal = new ArrayList<>(); + retVal.add(TABLE_NAME); + retVal.add(RETURN_CFS); + retVal.add(RETURN_QFS); + retVal.add(CHARSET); + return retVal; + } + + @Override + public Optional lookup(Map coordinates) throws LookupFailureException { + byte[] rowKey = coordinates.get("rowKey").getBytes(); + try { + Map values = new HashMap<>(); + try (Table table = getConnection().getTable(TableName.valueOf(tableName))) { + Get get = new Get(rowKey); + Result result = table.get(get); + + for (byte[] fam : families) { + NavigableMap map = result.getFamilyMap(fam); + for (Map.Entry entry : map.entrySet()) { + if (qualifiers.contains(entry.getKey()) || qualifiers.size() == 0) { + values.put(new String(entry.getKey(), charset), new String(entry.getValue(), charset)); + } + } + } + } + + if (values.size() > 0) { + final List fields = new ArrayList<>(); + for (String key : values.keySet()) { + fields.add(new RecordField(key, RecordFieldType.STRING.getDataType())); + } + final RecordSchema schema = new SimpleRecordSchema(fields); + return Optional.ofNullable(new MapRecord(schema, values)); + } else { + return Optional.empty(); + } + } catch (IOException e) { + getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e); + throw new LookupFailureException(e); + } + } + + @Override + public Class getValueType() { + return Record.class; + } + + @Override + public Set getRequiredKeys() { + return REQUIRED_KEYS; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { + super.onEnabled(context); + + this.tableName = context.getProperty(TABLE_NAME).getValue(); + this.charset = Charset.forName(context.getProperty(CHARSET).getValue()); + + String families = context.getProperty(RETURN_CFS).getValue(); + String[] familiesSplit = families.split(","); + this.families = new ArrayList<>(); + for (String fs : familiesSplit) { + this.families.add(fs.trim().getBytes()); + } + this.qualifiers = new ArrayList<>(); + String quals = context.getProperty(RETURN_QFS).getValue(); + + if (quals != null && quals.length() > 0) { + String[] qualsSplit = quals.split(","); + for (String q : qualsSplit) { + this.qualifiers.add(q.trim().getBytes()); + } + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index f97d88c510..1fcd3d9667 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -14,3 +14,4 @@ # limitations under the License. org.apache.nifi.hbase.HBase_1_1_2_ClientService org.apache.nifi.hbase.HBase_1_1_2_ClientMapCacheService +org.apache.nifi.hbase.HBase_1_1_2_LookupService diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_LookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_LookupService.java new file mode 100644 index 0000000000..9ca7940a95 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_LookupService.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +@Ignore("This is an integration test. It requires a table with the name guids and a column family named property. Example" + + "Docker Compose configuration:" + + " hbase-docker:\n" + + " container_name: hbase-docker\n" + + " image: \"dajobe/hbase\"\n" + + " ports:\n" + + " - 16010:16010\n" + + " - 2181:2181\n" + + " - 60000:60000\n" + + " - 60010:60010\n" + + " - 60020:60020\n" + + " - 60030:60030\n" + + " - 9090:9090\n" + + " - 9095:9095\n" + + " hostname: hbase-docker") +public class TestHBase_1_1_2_LookupService { + + TestRunner runner; + HBase_1_1_2_LookupService service; + + static final byte[] FAM = "property".getBytes(); + static final byte[] QUAL1 = "uuid".getBytes(); + static final byte[] QUAL2 = "uuid2".getBytes(); + + static final String TABLE_NAME = "guids"; + + @Before + public void before() throws Exception { + runner = TestRunners.newTestRunner(TestLookupProcessor.class); + service = new HBase_1_1_2_LookupService(); + runner.addControllerService("lookupService", service); + runner.setProperty(service, HBaseClientService.ZOOKEEPER_QUORUM, "hbase-docker"); + runner.setProperty(service, HBaseClientService.ZOOKEEPER_CLIENT_PORT, "2181"); + runner.setProperty(service, HBaseClientService.ZOOKEEPER_ZNODE_PARENT, "/hbase"); + runner.setProperty(service, HBaseClientService.HBASE_CLIENT_RETRIES, "3"); + runner.setProperty(service, HBase_1_1_2_LookupService.TABLE_NAME, TABLE_NAME); + runner.setProperty(service, HBase_1_1_2_LookupService.RETURN_CFS, "property"); + runner.setProperty(service, HBase_1_1_2_LookupService.CHARSET, "UTF-8"); + } + + @After + public void after() throws Exception { + service.shutdown(); + } + + @Test + public void testSingleLookup() throws Exception { + runner.enableControllerService(service); + runner.assertValid(service); + + String uuid = UUID.randomUUID().toString(); + String rowKey = String.format("x-y-z-%d", Calendar.getInstance().getTimeInMillis()); + + PutColumn column = new PutColumn(FAM, QUAL1, uuid.getBytes()); + + service.put(TABLE_NAME, rowKey.getBytes(), Arrays.asList(column)); + + Map lookup = new HashMap<>(); + lookup.put("rowKey", rowKey); + Optional result = service.lookup(lookup); + + Assert.assertNotNull("Result was null", result); + Assert.assertNotNull("The value was null", result.get()); + MapRecord record = (MapRecord)result.get(); + Assert.assertEquals("The value didn't match.", uuid, record.getAsString("uuid")); + } + + + @Test + public void testMultipleLookup() throws Exception { + runner.enableControllerService(service); + runner.assertValid(service); + + String uuid = UUID.randomUUID().toString(); + String uuid2 = UUID.randomUUID().toString(); + String rowKey = String.format("x-y-z-%d", Calendar.getInstance().getTimeInMillis()); + + List columns = new ArrayList<>(); + columns.add(new PutColumn(FAM, QUAL1, uuid.getBytes())); + columns.add(new PutColumn(FAM, QUAL2, uuid2.getBytes())); + + service.put(TABLE_NAME, rowKey.getBytes(), columns); + + Map lookup = new HashMap<>(); + lookup.put("rowKey", rowKey); + Optional result = service.lookup(lookup); + + Assert.assertNotNull("Result was null", result); + Assert.assertNotNull("The value was null", result.get()); + Assert.assertTrue("Wrong type.", result.get() instanceof MapRecord); + MapRecord record = (MapRecord)result.get(); + Assert.assertEquals("Qual 1 was wrong", uuid, record.getAsString("uuid")); + Assert.assertEquals("Qual 2 was wrong", uuid2, record.getAsString("uuid2")); + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestLookupProcessor.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestLookupProcessor.java new file mode 100644 index 0000000000..729c6f9594 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestLookupProcessor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.ArrayList; +import java.util.List; + +public class TestLookupProcessor extends AbstractProcessor { + static final PropertyDescriptor HBASE_LOOKUP_SERVICE = new PropertyDescriptor.Builder() + .name("HBase Lookup Service") + .description("HBaseLookupService") + .identifiesControllerService(HBase_1_1_2_LookupService.class) + .required(true) + .build(); + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + + @Override + protected List getSupportedPropertyDescriptors() { + List propDescs = new ArrayList<>(); + propDescs.add(HBASE_LOOKUP_SERVICE); + return propDescs; + } +}