diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index 1056f580b1..f23e956959 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -55,6 +55,16 @@ public class MockHBaseClientService extends AbstractControllerService implements throw new UnsupportedOperationException(); } + @Override + public boolean checkAndPut(String tableName, byte[] rowId, byte[] family, byte[] qualifier, byte[]value, PutColumn column) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void delete(String tableName, byte[] rowId) throws IOException { + throw new UnsupportedOperationException(); + } + @Override public void scan(String tableName, byte[] startRow, byte[] endRow, Collection columns, ResultHandler handler) throws IOException { if (throwException) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index f7718f6ef0..80b8961434 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -94,6 +94,28 @@ public interface HBaseClientService extends ControllerService { */ void put(String tableName, byte[] rowId, Collection columns) throws IOException; + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, then the Put is added to HBase. + * + * @param tableName the name of an HBase table + * @param rowId the id of the row to check + * @param family the family of the row to check + * @param qualifier the qualifier of the row to check + * @param value the value of the row to check. If null, the check is for the lack of column (ie: non-existence) + * @return True if the Put was executed, false otherwise + * @throws IOException thrown when there are communication errors with HBase$ + */ + boolean checkAndPut(String tableName, byte[] rowId, byte[] family, byte[] qualifier, byte[] value, PutColumn column) throws IOException; + + /** + * Deletes the given row on HBase. All cells are deleted. + * + * @param tableName the name of an HBase table + * @param rowId the id of the row to delete + * @throws IOException thrown when there are communication errors with HBase + */ + void delete(String tableName, byte[] rowId) throws IOException; + /** * Scans the given table using the optional filter criteria and passing each result to the provided handler. * diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml index c7fa3db783..a65727b4c5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml @@ -45,6 +45,12 @@ org.apache.nifi nifi-hadoop-utils + + org.apache.nifi + nifi-distributed-cache-client-service-api + provided + + org.apache.hbase hbase-client diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java new file mode 100644 index 0000000000..665c161159 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.Deserializer; +import java.io.ByteArrayOutputStream; +import org.apache.nifi.reporting.InitializationException; + +import java.nio.charset.StandardCharsets; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.put.PutColumn; + + +import org.apache.nifi.processor.util.StandardValidators; + +@Tags({"distributed", "cache", "state", "map", "cluster","hbase"}) +@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheClient", "org.apache.nifi.hbase.HBase_1_1_2_ClientService"}) +@CapabilityDescription("Provides the ability to use an HBase table as a cache, in place of a DistributedMapCache." + + " Uses a HBase_1_1_2_ClientService controller to communicate with HBase.") + +public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService implements DistributedMapCacheClient { + + static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("HBase Client Service") + .description("Specifies the HBase Client Controller Service to use for accessing HBase.") + .required(true) + .identifiesControllerService(HBaseClientService.class) + .build(); + + public static final PropertyDescriptor HBASE_CACHE_TABLE_NAME = new PropertyDescriptor.Builder() + .name("HBase Cache Table Name") + .description("Name of the table on HBase to use for the cache.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HBASE_COLUMN_FAMILY = new PropertyDescriptor.Builder() + .name("HBase Column Family") + .description("Name of the column family on HBase to use for the cache.") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("f") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HBASE_COLUMN_QUALIFIER = new PropertyDescriptor.Builder() + .name("HBase Column Qualifier") + .description("Name of the column qualifier on HBase to use for the cache") + .defaultValue("q") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(HBASE_CACHE_TABLE_NAME); + descriptors.add(HBASE_CLIENT_SERVICE); + descriptors.add(HBASE_COLUMN_FAMILY); + descriptors.add(HBASE_COLUMN_QUALIFIER); + return descriptors; + } + + // Other threads may call @OnEnabled so these are marked volatile to ensure other class methods read the updated value + private volatile String hBaseCacheTableName; + private volatile HBaseClientService hBaseClientService; + + private volatile String hBaseColumnFamily; + private volatile byte[] hBaseColumnFamilyBytes; + + private volatile String hBaseColumnQualifier; + private volatile byte[] hBaseColumnQualifierBytes; + + @OnEnabled + public void onConfigured(final ConfigurationContext context) throws InitializationException{ + hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + + hBaseCacheTableName = context.getProperty(HBASE_CACHE_TABLE_NAME).evaluateAttributeExpressions().getValue(); + hBaseColumnFamily = context.getProperty(HBASE_COLUMN_FAMILY).evaluateAttributeExpressions().getValue(); + hBaseColumnQualifier = context.getProperty(HBASE_COLUMN_QUALIFIER).evaluateAttributeExpressions().getValue(); + + hBaseColumnFamilyBytes = hBaseColumnFamily.getBytes(StandardCharsets.UTF_8); + hBaseColumnQualifierBytes = hBaseColumnQualifier.getBytes(StandardCharsets.UTF_8); + } + + private byte[] serialize(final T value, final Serializer serializer) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializer.serialize(value, baos); + return baos.toByteArray(); + } + private T deserialize(final byte[] value, final Deserializer deserializer) throws IOException { + return deserializer.deserialize(value); + } + + + @Override + public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + + final byte[] rowIdBytes = serialize(key, keySerializer); + final byte[] valueBytes = serialize(value, valueSerializer); + final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes); + + return hBaseClientService.checkAndPut(hBaseCacheTableName, rowIdBytes, hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, null, putColumn); + } + + @Override + public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + + List putColumns = new ArrayList(1); + final byte[] rowIdBytes = serialize(key, keySerializer); + final byte[] valueBytes = serialize(value, valueSerializer); + + final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes); + putColumns.add(putColumn); + + hBaseClientService.put(hBaseCacheTableName, rowIdBytes, putColumns); + } + + @Override + public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { + final byte[] rowIdBytes = serialize(key, keySerializer); + final HBaseRowHandler handler = new HBaseRowHandler(); + + final List columnsList = new ArrayList(0); + + hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, handler); + return (handler.numRows() > 0); + } + + /** + * Note that the implementation of getAndPutIfAbsent is not atomic. + * The putIfAbsent is atomic, but a getAndPutIfAbsent does a get and then a putIfAbsent. + * If there is an existing value and it is updated in betweern the two steps, then the existing (unmodified) value will be returned. + * If the existing value was deleted between the two steps, getAndPutIfAbsent will correctly return null. + * This should not generally be an issue with cache processors such as DetectDuplicate. + * + */ + @Override + public V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, final Deserializer valueDeserializer) throws IOException { + // Between the get and the putIfAbsent, the value could be deleted or updated. + // Logic below takes care of the deleted case but not the updated case. + // This is probably fine since DistributedMapCache and DetectDuplicate expect to receive the original cache value + // Could possibly be fixed by implementing AtomicDistributedMapCache (Map Cache protocol version 2) + final V got = get(key, keySerializer, valueDeserializer); + final boolean wasAbsent = putIfAbsent(key, value, keySerializer, valueSerializer); + + if (! wasAbsent) return got; + else return null; + } + + @Override + public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { + final byte[] rowIdBytes = serialize(key, keySerializer); + final HBaseRowHandler handler = new HBaseRowHandler(); + + final List columnsList = new ArrayList(0); + + hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, handler); + if (handler.numRows() > 1) { + throw new IOException("Found multiple rows in HBase for key"); + } else if(handler.numRows() == 1) { + return deserialize( handler.getLastResultBytes(), valueDeserializer); + } else { + return null; + } + } + + @Override + public boolean remove(final K key, final Serializer keySerializer) throws IOException { + final boolean contains = containsKey(key, keySerializer); + if (contains) { + final byte[] rowIdBytes = serialize(key, keySerializer); + hBaseClientService.delete(hBaseCacheTableName, rowIdBytes); + } + return contains; + } + + @Override + public long removeByPattern(String regex) throws IOException { + throw new IOException("HBase removeByPattern is not implemented"); + } + + @Override + public void close() throws IOException { + } + + @Override + protected void finalize() throws Throwable { + } + + private class HBaseRowHandler implements ResultHandler { + private int numRows = 0; + private byte[] lastResultBytes; + + @Override + public void handle(byte[] row, ResultCell[] resultCells) { + numRows += 1; + for( final ResultCell resultCell : resultCells ){ + lastResultBytes = resultCell.getValueArray(); + } + } + public int numRows() { + return numRows; + } + public byte[] getLastResultBytes() { + return lastResultBytes; + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index af3776fa50..fa71d0634e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -308,6 +309,26 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } } + @Override + public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException { + try (final Table table = connection.getTable(TableName.valueOf(tableName))) { + Put put = new Put(rowId); + put.addColumn( + column.getColumnFamily(), + column.getColumnQualifier(), + column.getBuffer()); + return table.checkAndPut(rowId, family, qualifier, value, put); + } + } + + @Override + public void delete(final String tableName, final byte[] rowId) throws IOException { + try (final Table table = connection.getTable(TableName.valueOf(tableName))) { + Delete delete = new Delete(rowId); + table.delete(delete); + } + } + @Override public void scan(final String tableName, final Collection columns, final String filterExpression, final long minTime, final ResultHandler handler) throws IOException { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 258d50f786..f97d88c510 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -12,4 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.hbase.HBase_1_1_2_ClientService \ No newline at end of file +org.apache.nifi.hbase.HBase_1_1_2_ClientService +org.apache.nifi.hbase.HBase_1_1_2_ClientMapCacheService diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java new file mode 100644 index 0000000000..6b1fbc71d2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; + +import java.io.OutputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestHBase_1_1_2_ClientMapCacheService { + + private KerberosProperties kerberosPropsWithFile; + private KerberosProperties kerberosPropsWithoutFile; + + private Serializer stringSerializer = new StringSerializer(); + private Deserializer stringDeserializer = new StringDeserializer(); + + @Before + public void setup() { + // needed for calls to UserGroupInformation.setConfiguration() to work when passing in + // config with Kerberos authentication enabled + System.setProperty("java.security.krb5.realm", "nifi.com"); + System.setProperty("java.security.krb5.kdc", "nifi.kdc"); + + kerberosPropsWithFile = new KerberosProperties(new File("src/test/resources/krb5.conf")); + + kerberosPropsWithoutFile = new KerberosProperties(null); + } + + private final String tableName = "nifi"; + private final String columnFamily = "family1"; + private final String columnQualifier = "qualifier1"; + + + @Test + public void testPut() throws InitializationException, IOException { + final String row = "row1"; + final String content = "content1"; + + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + + // Mock an HBase Table so we can verify the put operations later + final Table table = Mockito.mock(Table.class); + when(table.getName()).thenReturn(TableName.valueOf(tableName)); + + // create the controller service and link it to the test processor + final MockHBaseClientService service = configureHBaseClientService(runner, table); + runner.assertValid(service); + + final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE) + .asControllerService(HBaseClientService.class); + + final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService); + runner.assertValid(cacheService); + + // try to put a single cell + final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE) + .asControllerService(DistributedMapCacheClient.class); + + hBaseCacheService.put( row, content, stringSerializer, stringSerializer); + + // verify only one call to put was made + ArgumentCaptor capture = ArgumentCaptor.forClass(Put.class); + verify(table, times(1)).put(capture.capture()); + + verifyPut(row, columnFamily, columnQualifier, content, capture.getValue()); + } + + @Test + public void testGet() throws InitializationException, IOException { + final String row = "row1"; + final String content = "content1"; + + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + + // Mock an HBase Table so we can verify the put operations later + final Table table = Mockito.mock(Table.class); + when(table.getName()).thenReturn(TableName.valueOf(tableName)); + + // create the controller service and link it to the test processor + final MockHBaseClientService service = configureHBaseClientService(runner, table); + runner.assertValid(service); + + final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE) + .asControllerService(HBaseClientService.class); + + final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService); + runner.assertValid(cacheService); + + final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE) + .asControllerService(DistributedMapCacheClient.class); + + hBaseCacheService.put( row, content, stringSerializer, stringSerializer); + + final String result = hBaseCacheService.get(row, stringSerializer, stringDeserializer); + + assertEquals( content, result); + + } + + @Test + public void testContainsKey() throws InitializationException, IOException { + final String row = "row1"; + final String content = "content1"; + + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + + // Mock an HBase Table so we can verify the put operations later + final Table table = Mockito.mock(Table.class); + when(table.getName()).thenReturn(TableName.valueOf(tableName)); + + // create the controller service and link it to the test processor + final MockHBaseClientService service = configureHBaseClientService(runner, table); + runner.assertValid(service); + + final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE) + .asControllerService(HBaseClientService.class); + + final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService); + runner.assertValid(cacheService); + + final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE) + .asControllerService(DistributedMapCacheClient.class); + + assertFalse( hBaseCacheService.containsKey(row , stringSerializer ) ); + + hBaseCacheService.put( row, content, stringSerializer, stringSerializer); + + assertTrue( hBaseCacheService.containsKey(row, stringSerializer) ); + } + + @Test + public void testPutIfAbsent() throws InitializationException, IOException { + final String row = "row1"; + final String content = "content1"; + + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + + // Mock an HBase Table so we can verify the put operations later + final Table table = Mockito.mock(Table.class); + when(table.getName()).thenReturn(TableName.valueOf(tableName)); + + // create the controller service and link it to the test processor + final MockHBaseClientService service = configureHBaseClientService(runner, table); + runner.assertValid(service); + + final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE) + .asControllerService(HBaseClientService.class); + + final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService); + runner.assertValid(cacheService); + + final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE) + .asControllerService(DistributedMapCacheClient.class); + + assertTrue( hBaseCacheService.putIfAbsent( row, content, stringSerializer, stringSerializer)); + + // verify only one call to put was made + ArgumentCaptor capture = ArgumentCaptor.forClass(Put.class); + verify(table, times(1)).put(capture.capture()); + + verifyPut(row, columnFamily, columnQualifier, content, capture.getValue()); + + assertFalse( hBaseCacheService.putIfAbsent( row, content, stringSerializer, stringSerializer)); + + verify(table, times(1)).put(capture.capture()); + } + + @Test + public void testGetAndPutIfAbsent() throws InitializationException, IOException { + final String row = "row1"; + final String content = "content1"; + + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + + // Mock an HBase Table so we can verify the put operations later + final Table table = Mockito.mock(Table.class); + when(table.getName()).thenReturn(TableName.valueOf(tableName)); + + // create the controller service and link it to the test processor + final MockHBaseClientService service = configureHBaseClientService(runner, table); + runner.assertValid(service); + + final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE) + .asControllerService(HBaseClientService.class); + + final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService); + runner.assertValid(cacheService); + + final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE) + .asControllerService(DistributedMapCacheClient.class); + + assertNull( hBaseCacheService.getAndPutIfAbsent( row, content, stringSerializer, stringSerializer, stringDeserializer)); + + // verify only one call to put was made + ArgumentCaptor capture = ArgumentCaptor.forClass(Put.class); + verify(table, times(1)).put(capture.capture()); + + verifyPut(row, columnFamily, columnQualifier, content, capture.getValue()); + + final String result = hBaseCacheService.getAndPutIfAbsent( row, content, stringSerializer, stringSerializer, stringDeserializer); + + verify(table, times(1)).put(capture.capture()); + + assertEquals( result, content); + } + + + private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException { + final MockHBaseClientService service = new MockHBaseClientService(table, kerberosPropsWithFile); + runner.addControllerService("hbaseClient", service); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml"); + runner.enableControllerService(service); + runner.setProperty(TestProcessor.HBASE_CLIENT_SERVICE, "hbaseClient"); + return service; + } + + private DistributedMapCacheClient configureHBaseCacheService(final TestRunner runner, final HBaseClientService service) throws InitializationException { + final HBase_1_1_2_ClientMapCacheService cacheService = new HBase_1_1_2_ClientMapCacheService(); + runner.addControllerService("hbaseCache", cacheService); + runner.setProperty(cacheService, HBase_1_1_2_ClientMapCacheService.HBASE_CLIENT_SERVICE, "hbaseClient"); + runner.setProperty(cacheService, HBase_1_1_2_ClientMapCacheService.HBASE_CACHE_TABLE_NAME, tableName); + runner.setProperty(cacheService, HBase_1_1_2_ClientMapCacheService.HBASE_COLUMN_FAMILY, columnFamily); + runner.setProperty(cacheService, HBase_1_1_2_ClientMapCacheService.HBASE_COLUMN_QUALIFIER, columnQualifier); + runner.enableControllerService(cacheService); + runner.setProperty(TestProcessor.HBASE_CACHE_SERVICE,"hbaseCache"); + return cacheService; + } + + private void verifyResultCell(final ResultCell result, final String cf, final String cq, final String val) { + final String colFamily = new String(result.getFamilyArray(), result.getFamilyOffset(), result.getFamilyLength()); + assertEquals(cf, colFamily); + + final String colQualifier = new String(result.getQualifierArray(), result.getQualifierOffset(), result.getQualifierLength()); + assertEquals(cq, colQualifier); + + final String value = new String(result.getValueArray(), result.getValueOffset(), result.getValueLength()); + assertEquals(val, value); + } + + private void verifyPut(String row, String columnFamily, String columnQualifier, String content, Put put) { + assertEquals(row, new String(put.getRow())); + + NavigableMap> familyCells = put.getFamilyCellMap(); + assertEquals(1, familyCells.size()); + + Map.Entry> entry = familyCells.firstEntry(); + assertEquals(columnFamily, new String(entry.getKey())); + assertEquals(1, entry.getValue().size()); + + Cell cell = entry.getValue().get(0); + assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); + assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); + } + + // Override methods to create a mock service that can return staged data + private class MockHBaseClientService extends HBase_1_1_2_ClientService { + + private Table table; + private List results = new ArrayList<>(); + private KerberosProperties kerberosProperties; + + public MockHBaseClientService(final Table table, final KerberosProperties kerberosProperties) { + this.table = table; + this.kerberosProperties = kerberosProperties; + } + + @Override + protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { + return kerberosProperties; + } + + protected void setKerberosProperties(KerberosProperties properties) { + this.kerberosProperties = properties; + + } + + public void addResult(final String rowKey, final Map cells, final long timestamp) { + final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8); + final Cell[] cellArray = new Cell[cells.size()]; + int i = 0; + for (final Map.Entry cellEntry : cells.entrySet()) { + final Cell cell = Mockito.mock(Cell.class); + when(cell.getRowArray()).thenReturn(rowArray); + when(cell.getRowOffset()).thenReturn(0); + when(cell.getRowLength()).thenReturn((short) rowArray.length); + + final String cellValue = cellEntry.getValue(); + final byte[] valueArray = cellValue.getBytes(StandardCharsets.UTF_8); + when(cell.getValueArray()).thenReturn(valueArray); + when(cell.getValueOffset()).thenReturn(0); + when(cell.getValueLength()).thenReturn(valueArray.length); + + final byte[] familyArray = "family1".getBytes(StandardCharsets.UTF_8); + when(cell.getFamilyArray()).thenReturn(familyArray); + when(cell.getFamilyOffset()).thenReturn(0); + when(cell.getFamilyLength()).thenReturn((byte) familyArray.length); + + final String qualifier = cellEntry.getKey(); + final byte[] qualifierArray = qualifier.getBytes(StandardCharsets.UTF_8); + when(cell.getQualifierArray()).thenReturn(qualifierArray); + when(cell.getQualifierOffset()).thenReturn(0); + when(cell.getQualifierLength()).thenReturn(qualifierArray.length); + + when(cell.getTimestamp()).thenReturn(timestamp); + + cellArray[i++] = cell; + } + + final Result result = Mockito.mock(Result.class); + when(result.getRow()).thenReturn(rowArray); + when(result.rawCells()).thenReturn(cellArray); + results.add(result); + } + + @Override + public void put(final String tableName, final byte[] rowId, final Collection columns) throws IOException { + Put put = new Put(rowId); + Map map = new HashMap(); + for (final PutColumn column : columns) { + put.addColumn( + column.getColumnFamily(), + column.getColumnQualifier(), + column.getBuffer()); + map.put( new String( column.getColumnQualifier() ), new String(column.getBuffer()) ); + } + table.put(put); + + addResult( new String(rowId) , map,1); + } + + @Override + public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException { + + for (Result result: results){ + if ( Arrays.equals(result.getRow(), rowId)){ + Cell[] cellArray = result.rawCells(); + for (Cell cell : cellArray){ + if( Arrays.equals(cell.getFamilyArray(), family) && Arrays.equals(cell.getQualifierArray(), qualifier)){ + //throw new RuntimeException( new String(cell.getValueArray()) ); + if( value == null || Arrays.equals(cell.getValueArray(), value)) return false; + } + } + } + } + final List putColumns = new ArrayList(); + putColumns.add(column); + put(tableName, rowId, putColumns ); + return true; + } + + @Override + public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection columns, final ResultHandler handler) throws IOException { + if (startRow != endRow) throw new RuntimeException("Start and end must be equal"); + for(Result result: results){ + if (Arrays.equals( result.getRow() , startRow)) { + final Cell[] cellArray = result.rawCells(); + final ResultCell[] resultCells = new ResultCell[cellArray.length ]; + int i=0; + for (Cell cell : cellArray){ + ResultCell resultCell = new ResultCell(); + resultCell.setRowArray( result.getRow()); + resultCell.setFamilyArray(cell.getFamilyArray()); + resultCell.setQualifierArray(cell.getQualifierArray()); + resultCell.setValueArray(cell.getValueArray()); + resultCells[i++]=resultCell; + } + handler.handle(result.getRow(), resultCells ); + } + } + } + + @Override + protected ResultScanner getResults(Table table, Collection columns, Filter filter, long minTime) throws IOException { + final ResultScanner scanner = Mockito.mock(ResultScanner.class); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + return scanner; + } + + @Override + protected Connection createConnection(ConfigurationContext context) throws IOException { + Connection connection = Mockito.mock(Connection.class); + Mockito.when(connection.getTable(table.getName())).thenReturn(table); + return connection; + } + } + + // handler that saves results for verification + private static final class CollectingResultHandler implements ResultHandler { + + Map results = new LinkedHashMap<>(); + + @Override + public void handle(byte[] row, ResultCell[] resultCells) { + final String rowStr = new String(row, StandardCharsets.UTF_8); + results.put(rowStr, resultCells); + } + } + + private static class StringSerializer implements Serializer { + @Override + public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { + out.write(value.getBytes(StandardCharsets.UTF_8)); + } + } + + private static class StringDeserializer implements Deserializer { + @Override + public String deserialize(byte[] input) throws DeserializationException, IOException{ + return new String(input); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestProcessor.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestProcessor.java index 44b7e8b4b2..cc70d062ab 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestProcessor.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestProcessor.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.hbase.HBaseClientService; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -35,6 +36,13 @@ public class TestProcessor extends AbstractProcessor { .required(true) .build(); + static final PropertyDescriptor HBASE_CACHE_SERVICE = new PropertyDescriptor.Builder() + .name("HBase Cache Service") + .description("HBaseCacheService") + .identifiesControllerService(DistributedMapCacheClient.class) + .required(true) + .build(); + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { } @@ -43,6 +51,7 @@ public class TestProcessor extends AbstractProcessor { protected List getSupportedPropertyDescriptors() { List propDescs = new ArrayList<>(); propDescs.add(HBASE_CLIENT_SERVICE); + propDescs.add(HBASE_CACHE_SERVICE); return propDescs; } }