From 5930c0c21246c4365698cfb1c90ebe2c5a446d07 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Fri, 6 Oct 2017 14:09:36 -0400 Subject: [PATCH] NIFI-4346 Modifying HBase_1_1_2_LookupService to use HBase_1_1_2_ClientService, instead of extend it This closes #2125. Signed-off-by: Bryan Bende --- .../nifi/hbase/HBase_1_1_2_LookupService.java | 172 --------------- .../HBase_1_1_2_RecordLookupService.java | 197 ++++++++++++++++++ ...g.apache.nifi.controller.ControllerService | 2 +- .../hbase/TestHBase_1_1_2_LookupService.java | 133 ------------ .../TestHBase_1_1_2_RecordLookupService.java | 123 +++++++++++ .../nifi/hbase/TestLookupProcessor.java | 47 ----- .../nifi/hbase/TestRecordLookupProcessor.java | 118 +++++++++++ 7 files changed, 439 insertions(+), 353 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java delete mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_LookupService.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_RecordLookupService.java delete mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestLookupProcessor.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestRecordLookupProcessor.java diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java deleted file mode 100644 index 5f47c3139d..0000000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.hbase; - -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnEnabled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.Validator; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.lookup.LookupFailureException; -import org.apache.nifi.lookup.LookupService; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Optional; -import java.util.Set; - -@Tags({"hbase", "record", "lookup", "service"}) -@CapabilityDescription( - "A lookup service that retrieves one or more columns from HBase based on a supplied rowKey." -) -public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService { - private static final Set REQUIRED_KEYS = Collections.singleton("rowKey"); - - public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() - .name("hb-lu-table-name") - .displayName("Table Name") - .description("The name of the table where look ups will be run.") - .required(true) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .build(); - public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder() - .name("hb-lu-return-cfs") - .displayName("Column Families") - .description("The column families that will be returned.") - .required(true) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .build(); - public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder() - .name("hb-lu-return-qfs") - .displayName("Column Qualifiers") - .description("The column qualifies that will be returned.") - .required(false) - .addValidator(Validator.VALID) - .build(); - protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() - .name("hb-lu-charset") - .displayName("Character Set") - .description("Specifies the character set of the document data.") - .required(true) - .defaultValue("UTF-8") - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .build(); - - private String tableName; - private List families; - private List qualifiers; - private Charset charset; - - @Override - protected List getAdditionalProperties() { - List retVal = new ArrayList<>(); - retVal.add(TABLE_NAME); - retVal.add(RETURN_CFS); - retVal.add(RETURN_QFS); - retVal.add(CHARSET); - return retVal; - } - - @Override - public Optional lookup(Map coordinates) throws LookupFailureException { - byte[] rowKey = coordinates.get("rowKey").getBytes(); - try { - Map values = new HashMap<>(); - try (Table table = getConnection().getTable(TableName.valueOf(tableName))) { - Get get = new Get(rowKey); - Result result = table.get(get); - - for (byte[] fam : families) { - NavigableMap map = result.getFamilyMap(fam); - for (Map.Entry entry : map.entrySet()) { - if (qualifiers.contains(entry.getKey()) || qualifiers.size() == 0) { - values.put(new String(entry.getKey(), charset), new String(entry.getValue(), charset)); - } - } - } - } - - if (values.size() > 0) { - final List fields = new ArrayList<>(); - for (String key : values.keySet()) { - fields.add(new RecordField(key, RecordFieldType.STRING.getDataType())); - } - final RecordSchema schema = new SimpleRecordSchema(fields); - return Optional.ofNullable(new MapRecord(schema, values)); - } else { - return Optional.empty(); - } - } catch (IOException e) { - getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e); - throw new LookupFailureException(e); - } - } - - @Override - public Class getValueType() { - return Record.class; - } - - @Override - public Set getRequiredKeys() { - return REQUIRED_KEYS; - } - - @OnEnabled - public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { - super.onEnabled(context); - - this.tableName = context.getProperty(TABLE_NAME).getValue(); - this.charset = Charset.forName(context.getProperty(CHARSET).getValue()); - - String families = context.getProperty(RETURN_CFS).getValue(); - String[] familiesSplit = families.split(","); - this.families = new ArrayList<>(); - for (String fs : familiesSplit) { - this.families.add(fs.trim().getBytes()); - } - this.qualifiers = new ArrayList<>(); - String quals = context.getProperty(RETURN_QFS).getValue(); - - if (quals != null && quals.length() > 0) { - String[] qualsSplit = quals.split(","); - for (String q : qualsSplit) { - this.qualifiers.add(q.trim().getBytes()); - } - } - } -} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java new file mode 100644 index 0000000000..20dc0d498c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.LookupService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +@Tags({"hbase", "record", "lookup", "service"}) +@CapabilityDescription("A lookup service that retrieves one or more columns from HBase and returns them as a record. The lookup coordinates " + + "must contain 'rowKey' which will be the HBase row id.") +public class HBase_1_1_2_RecordLookupService extends AbstractControllerService implements LookupService { + + static final String ROW_KEY_KEY = "rowKey"; + private static final Set REQUIRED_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY))); + + static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("hbase-client-service") + .displayName("HBase Client Service") + .description("Specifies the HBase Client Controller Service to use for accessing HBase.") + .required(true) + .identifiesControllerService(HBaseClientService.class) + .build(); + + static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("hb-lu-table-name") + .displayName("Table Name") + .description("The name of the table where look ups will be run.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor RETURN_COLUMNS = new PropertyDescriptor.Builder() + .name("hb-lu-return-cols") + .displayName("Columns") + .description("A comma-separated list of \\\":\\\" pairs to return when scanning. " + + "To return all columns for a given family, leave off the qualifier such as \\\",\\\".") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("hb-lu-charset") + .displayName("Character Set") + .description("Specifies the character set used to decode bytes retrieved from HBase.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + static final List PROPERTIES; + static { + final List props = new ArrayList<>(); + props.add(HBASE_CLIENT_SERVICE); + props.add(TABLE_NAME); + props.add(RETURN_COLUMNS); + props.add(CHARSET); + PROPERTIES = Collections.unmodifiableList(props); + } + + private String tableName; + private List columns; + private Charset charset; + private HBaseClientService hBaseClientService; + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Optional lookup(Map coordinates) throws LookupFailureException { + final String rowKey = coordinates.get(ROW_KEY_KEY); + if (StringUtils.isBlank(rowKey)) { + return Optional.empty(); + } + + final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8); + try { + final Map values = new HashMap<>(); + hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, (byte[] row, ResultCell[] resultCells) -> { + for (final ResultCell cell : resultCells) { + final byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength()); + final byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength()); + values.put(new String(qualifier, charset), new String(value, charset)); + } + }); + + if (values.size() > 0) { + final List fields = new ArrayList<>(); + for (String key : values.keySet()) { + fields.add(new RecordField(key, RecordFieldType.STRING.getDataType())); + } + final RecordSchema schema = new SimpleRecordSchema(fields); + return Optional.ofNullable(new MapRecord(schema, values)); + } else { + return Optional.empty(); + } + } catch (IOException e) { + getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e); + throw new LookupFailureException(e); + } + } + + @Override + public Class getValueType() { + return Record.class; + } + + @Override + public Set getRequiredKeys() { + return REQUIRED_KEYS; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { + this.hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + this.tableName = context.getProperty(TABLE_NAME).getValue(); + this.columns = getColumns(context.getProperty(RETURN_COLUMNS).getValue()); + this.charset = Charset.forName(context.getProperty(CHARSET).getValue()); + } + + @OnDisabled + public void onDisabled() { + this.hBaseClientService = null; + this.tableName = null; + this.columns = null; + this.charset = null; + } + + private List getColumns(final String columnsValue) { + final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(",")); + + final List columnsList = new ArrayList<>(); + + for (final String column : columns) { + if (column.contains(":")) { + final String[] parts = column.trim().split(":"); + final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8); + final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8); + columnsList.add(new Column(cf, cq)); + } else { + final byte[] cf = column.trim().getBytes(StandardCharsets.UTF_8); + columnsList.add(new Column(cf, null)); + } + } + + return columnsList; + } + +} + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 1fcd3d9667..5087688daa 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -14,4 +14,4 @@ # limitations under the License. org.apache.nifi.hbase.HBase_1_1_2_ClientService org.apache.nifi.hbase.HBase_1_1_2_ClientMapCacheService -org.apache.nifi.hbase.HBase_1_1_2_LookupService +org.apache.nifi.hbase.HBase_1_1_2_RecordLookupService diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_LookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_LookupService.java deleted file mode 100644 index 9ca7940a95..0000000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_LookupService.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.hbase; - -import org.apache.nifi.hbase.put.PutColumn; -import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Calendar; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; - -@Ignore("This is an integration test. It requires a table with the name guids and a column family named property. Example" + - "Docker Compose configuration:" + - " hbase-docker:\n" + - " container_name: hbase-docker\n" + - " image: \"dajobe/hbase\"\n" + - " ports:\n" + - " - 16010:16010\n" + - " - 2181:2181\n" + - " - 60000:60000\n" + - " - 60010:60010\n" + - " - 60020:60020\n" + - " - 60030:60030\n" + - " - 9090:9090\n" + - " - 9095:9095\n" + - " hostname: hbase-docker") -public class TestHBase_1_1_2_LookupService { - - TestRunner runner; - HBase_1_1_2_LookupService service; - - static final byte[] FAM = "property".getBytes(); - static final byte[] QUAL1 = "uuid".getBytes(); - static final byte[] QUAL2 = "uuid2".getBytes(); - - static final String TABLE_NAME = "guids"; - - @Before - public void before() throws Exception { - runner = TestRunners.newTestRunner(TestLookupProcessor.class); - service = new HBase_1_1_2_LookupService(); - runner.addControllerService("lookupService", service); - runner.setProperty(service, HBaseClientService.ZOOKEEPER_QUORUM, "hbase-docker"); - runner.setProperty(service, HBaseClientService.ZOOKEEPER_CLIENT_PORT, "2181"); - runner.setProperty(service, HBaseClientService.ZOOKEEPER_ZNODE_PARENT, "/hbase"); - runner.setProperty(service, HBaseClientService.HBASE_CLIENT_RETRIES, "3"); - runner.setProperty(service, HBase_1_1_2_LookupService.TABLE_NAME, TABLE_NAME); - runner.setProperty(service, HBase_1_1_2_LookupService.RETURN_CFS, "property"); - runner.setProperty(service, HBase_1_1_2_LookupService.CHARSET, "UTF-8"); - } - - @After - public void after() throws Exception { - service.shutdown(); - } - - @Test - public void testSingleLookup() throws Exception { - runner.enableControllerService(service); - runner.assertValid(service); - - String uuid = UUID.randomUUID().toString(); - String rowKey = String.format("x-y-z-%d", Calendar.getInstance().getTimeInMillis()); - - PutColumn column = new PutColumn(FAM, QUAL1, uuid.getBytes()); - - service.put(TABLE_NAME, rowKey.getBytes(), Arrays.asList(column)); - - Map lookup = new HashMap<>(); - lookup.put("rowKey", rowKey); - Optional result = service.lookup(lookup); - - Assert.assertNotNull("Result was null", result); - Assert.assertNotNull("The value was null", result.get()); - MapRecord record = (MapRecord)result.get(); - Assert.assertEquals("The value didn't match.", uuid, record.getAsString("uuid")); - } - - - @Test - public void testMultipleLookup() throws Exception { - runner.enableControllerService(service); - runner.assertValid(service); - - String uuid = UUID.randomUUID().toString(); - String uuid2 = UUID.randomUUID().toString(); - String rowKey = String.format("x-y-z-%d", Calendar.getInstance().getTimeInMillis()); - - List columns = new ArrayList<>(); - columns.add(new PutColumn(FAM, QUAL1, uuid.getBytes())); - columns.add(new PutColumn(FAM, QUAL2, uuid2.getBytes())); - - service.put(TABLE_NAME, rowKey.getBytes(), columns); - - Map lookup = new HashMap<>(); - lookup.put("rowKey", rowKey); - Optional result = service.lookup(lookup); - - Assert.assertNotNull("Result was null", result); - Assert.assertNotNull("The value was null", result.get()); - Assert.assertTrue("Wrong type.", result.get() instanceof MapRecord); - MapRecord record = (MapRecord)result.get(); - Assert.assertEquals("Qual 1 was wrong", uuid, record.getAsString("uuid")); - Assert.assertEquals("Qual 2 was wrong", uuid2, record.getAsString("uuid2")); - } -} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_RecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_RecordLookupService.java new file mode 100644 index 0000000000..ab8a37c140 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_RecordLookupService.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.when; + +public class TestHBase_1_1_2_RecordLookupService { + + static final String TABLE_NAME = "guids"; + static final String ROW = "row1"; + static final String COLS = "cf1:cq1,cf2:cq2"; + + private TestRunner runner; + private HBase_1_1_2_RecordLookupService lookupService; + private MockHBaseClientService clientService; + private TestRecordLookupProcessor testLookupProcessor; + + @Before + public void before() throws Exception { + testLookupProcessor = new TestRecordLookupProcessor(); + runner = TestRunners.newTestRunner(testLookupProcessor); + + // setup mock HBaseClientService + final Table table = Mockito.mock(Table.class); + when(table.getName()).thenReturn(TableName.valueOf(TABLE_NAME)); + + final KerberosProperties kerberosProperties = new KerberosProperties(new File("src/test/resources/krb5.conf")); + clientService = new MockHBaseClientService(table, "family", kerberosProperties); + runner.addControllerService("clientService", clientService); + runner.setProperty(clientService, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml"); + runner.enableControllerService(clientService); + + // setup HBase LookupService + lookupService = new HBase_1_1_2_RecordLookupService(); + runner.addControllerService("lookupService", lookupService); + runner.setProperty(lookupService, HBase_1_1_2_RecordLookupService.HBASE_CLIENT_SERVICE, "clientService"); + runner.setProperty(lookupService, HBase_1_1_2_RecordLookupService.TABLE_NAME, TABLE_NAME); + runner.enableControllerService(lookupService); + + // setup test processor + runner.setProperty(TestRecordLookupProcessor.HBASE_LOOKUP_SERVICE, "lookupService"); + runner.setProperty(TestRecordLookupProcessor.HBASE_ROW, ROW); + } + + @Test + public void testSuccessfulLookupAllColumns() { + // setup some staged data in the mock client service + final Map cells = new HashMap<>(); + cells.put("cq1", "v1"); + cells.put("cq2", "v2"); + clientService.addResult("row1", cells, System.currentTimeMillis()); + + // run the processor + runner.enqueue("trigger flow file"); + runner.run(); + runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_SUCCESS); + + final List records = testLookupProcessor.getLookedupRecords(); + Assert.assertNotNull(records); + Assert.assertEquals(1, records.size()); + + final Record record = records.get(0); + Assert.assertEquals("v1", record.getAsString("cq1")); + Assert.assertEquals("v2", record.getAsString("cq2")); + } + + @Test + public void testLookupWithNoResults() { + // run the processor + runner.enqueue("trigger flow file"); + runner.run(); + runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_FAILURE); + + final List records = testLookupProcessor.getLookedupRecords(); + Assert.assertNotNull(records); + Assert.assertEquals(0, records.size()); + } + + @Test + public void testLookupWhenMissingRowKeyCoordinate() { + runner.removeProperty(TestRecordLookupProcessor.HBASE_ROW); + + // run the processor + runner.enqueue("trigger flow file"); + runner.run(); + runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_FAILURE); + + final List records = testLookupProcessor.getLookedupRecords(); + Assert.assertNotNull(records); + Assert.assertEquals(0, records.size()); + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestLookupProcessor.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestLookupProcessor.java deleted file mode 100644 index 729c6f9594..0000000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestLookupProcessor.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.hbase; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; - -import java.util.ArrayList; -import java.util.List; - -public class TestLookupProcessor extends AbstractProcessor { - static final PropertyDescriptor HBASE_LOOKUP_SERVICE = new PropertyDescriptor.Builder() - .name("HBase Lookup Service") - .description("HBaseLookupService") - .identifiesControllerService(HBase_1_1_2_LookupService.class) - .required(true) - .build(); - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - } - - @Override - protected List getSupportedPropertyDescriptors() { - List propDescs = new ArrayList<>(); - propDescs.add(HBASE_LOOKUP_SERVICE); - return propDescs; - } -} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestRecordLookupProcessor.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestRecordLookupProcessor.java new file mode 100644 index 0000000000..d3df016fd0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestRecordLookupProcessor.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.hbase; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.LookupService; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.record.Record; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class TestRecordLookupProcessor extends AbstractProcessor { + + static final PropertyDescriptor HBASE_LOOKUP_SERVICE = new PropertyDescriptor.Builder() + .name("HBase Lookup Service") + .description("HBaseLookupService") + .identifiesControllerService(LookupService.class) + .required(true) + .build(); + + static final PropertyDescriptor HBASE_ROW = new PropertyDescriptor.Builder() + .name("HBase Row Id") + .description("The Row Id to Lookup.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All success FlowFiles are routed to this relationship") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All failed FlowFiles are routed to this relationship") + .build(); + + private List lookedupRecords = new ArrayList<>(); + + @Override + protected List getSupportedPropertyDescriptors() { + List propDescs = new ArrayList<>(); + propDescs.add(HBASE_LOOKUP_SERVICE); + propDescs.add(HBASE_ROW); + return propDescs; + } + + @Override + public Set getRelationships() { + Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + return relationships; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String rowKey = context.getProperty(HBASE_ROW).getValue(); + + final Map coordinates = new HashMap<>(); + coordinates.put(HBase_1_1_2_RecordLookupService.ROW_KEY_KEY, rowKey); + + final LookupService lookupService = context.getProperty(HBASE_LOOKUP_SERVICE).asControllerService(LookupService.class); + try { + final Optional record = lookupService.lookup(coordinates); + if (record.isPresent()) { + lookedupRecords.add(record.get()); + session.transfer(flowFile, REL_SUCCESS); + } else { + session.transfer(flowFile, REL_FAILURE); + } + + } catch (LookupFailureException e) { + session.transfer(flowFile, REL_FAILURE); + } + + } + + public List getLookedupRecords() { + return new ArrayList<>(lookedupRecords); + } + + public void clearLookedupRecords() { + this.lookedupRecords.clear(); + } + +}