From 0e61dbc9a02801f7bef21b43c3a7bdd644c5b281 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Sat, 18 Apr 2020 14:10:27 -0400 Subject: [PATCH] NIFI-7373 Added new methods to DistributedMapCacheClient for bulk get and put. Updated HBase 1.1.2 clients. Added HBase 2 support. Added Redis support. --- ...RedisDistributedMapCacheClientService.java | 22 +++++++ ...RedisDistributedMapCacheClientService.java | 16 +++++ .../client/DistributedMapCacheClient.java | 25 +++++-- .../HBase_1_1_2_ClientMapCacheService.java | 17 +++++ .../nifi/hbase/HBase_1_1_2_ClientService.java | 2 +- .../nifi/hbase/MockHBaseClientService.java | 30 +++++++++ ...TestHBase_1_1_2_ClientMapCacheService.java | 65 +++++++++++++++++++ .../hbase/HBase_2_ClientMapCacheService.java | 17 +++++ .../nifi/hbase/HBase_2_ClientService.java | 2 +- .../nifi/hbase/MockHBaseClientService.java | 30 +++++++++ .../TestHBase_2_ClientMapCacheService.java | 65 +++++++++++++++++++ 11 files changed, 285 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java index 5cca23b73b..eb76176991 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java @@ -46,7 +46,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; @Tags({ "redis", "distributed", "cache", "map" }) @@ -194,6 +196,26 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer }); } + @Override + public void putAll(Map keysAndValues, Serializer keySerializer, Serializer valueSerializer) throws IOException { + withConnection(redisConnection -> { + Map values = new HashMap<>(); + for (Map.Entry entry : keysAndValues.entrySet()) { + final Tuple kv = serialize(entry.getKey(), entry.getValue(), keySerializer, valueSerializer); + values.put(kv.getKey(), kv.getValue()); + } + + if (getLogger().isDebugEnabled()) { + getLogger().debug(String.format("Queued up %d tuples to mset on Redis connection.", values.size())); + } + + if (!values.isEmpty()) { + redisConnection.mSet(values); + } + return null; + }); + } + @Override public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { return withConnection(redisConnection -> { diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java index 5e0ffd5614..9efef5c3bc 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java @@ -49,8 +49,10 @@ import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -243,6 +245,20 @@ public class ITRedisDistributedMapCacheClientService { Assert.assertTrue(cacheClient.removeByPattern("test-redis-processor-*") >= numToDelete); Assert.assertFalse(cacheClient.containsKey(key, stringSerializer)); + Map bulk = new HashMap<>(); + bulk.put("bulk-1", "testing1"); + bulk.put("bulk-2", "testing2"); + bulk.put("bulk-3", "testing3"); + bulk.put("bulk-4", "testing4"); + bulk.put("bulk-5", "testing5"); + + cacheClient.putAll(bulk, stringSerializer, stringSerializer); + Assert.assertTrue(cacheClient.containsKey("bulk-1", stringSerializer)); + Assert.assertTrue(cacheClient.containsKey("bulk-2", stringSerializer)); + Assert.assertTrue(cacheClient.containsKey("bulk-3", stringSerializer)); + Assert.assertTrue(cacheClient.containsKey("bulk-4", stringSerializer)); + Assert.assertTrue(cacheClient.containsKey("bulk-5", stringSerializer)); + session.transfer(flowFile, REL_SUCCESS); } catch (final Exception e) { getLogger().error("Routing to failure due to: " + e.getMessage(), e); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java index 6ab2e3b90d..d018f651e2 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java @@ -16,15 +16,15 @@ */ package org.apache.nifi.distributed.cache.client; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; + import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Set; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.controller.ControllerService; - /** * This interface defines an API that can be used for interacting with a * Distributed Cache that functions similarly to a {@link java.util.Map Map}. @@ -105,6 +105,23 @@ public interface DistributedMapCacheClient extends ControllerService { */ void put(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException; + /** + * Performs a bulk put operation. This should be used when needed to send a large batch of updates to a cache + * in a single update operation. + * + * @param keysAndValues A java.util.Map that contains an association between keys and values to be bulk inserted into the cache. + * @param keySerializer The Serializer that will be used to serialize the key into bytes + * @param valueSerializer The Serializer that will be used to serialize the value into bytes + * @param The key type + * @param The value type + * @throws IOException if unable to communicate with the remote instance + */ + default void putAll(Map keysAndValues, Serializer keySerializer, Serializer valueSerializer) throws IOException { + for (Map.Entry entry : keysAndValues.entrySet()) { + put(entry.getKey(), entry.getValue(), keySerializer, valueSerializer); + } + } + /** * Returns the value in the cache for the given key, if one exists; * otherwise returns null diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java index 95f7c22e42..a194a631d7 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java @@ -30,6 +30,7 @@ import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; @@ -43,6 +44,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS; @@ -172,6 +174,21 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService hBaseClientService.put(hBaseCacheTableName, rowIdBytes, putColumns); } + @Override + public void putAll(Map keysAndValues, Serializer keySerializer, Serializer valueSerializer) throws IOException { + List puts = new ArrayList<>(); + for (Map.Entry entry : keysAndValues.entrySet()) { + List putColumns = new ArrayList(1); + final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer); + final byte[] valueBytes = serialize(entry.getValue(), valueSerializer); + + final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression); + putColumns.add(putColumn); + puts.add(new PutFlowFile(hBaseCacheTableName, rowIdBytes, putColumns, null)); + } + hBaseClientService.put(hBaseCacheTableName, puts); + } + @Override public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { final byte[] rowIdBytes = serialize(key, keySerializer); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index b71b132508..f0c4d7eda7 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -420,7 +420,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } } - private List buildPuts(byte[] rowKey, List columns) { + protected List buildPuts(byte[] rowKey, List columns) { List retVal = new ArrayList<>(); try { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index c2cf265f4d..b8327c58d3 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.mockito.Mockito; @@ -127,6 +128,35 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService { addResult(new String(rowId), map, 1); } + @Override + public void put(final String tableName, final Collection puts) throws IOException { + final Map> sorted = new HashMap<>(); + final List newPuts = new ArrayList<>(); + + for (final PutFlowFile putFlowFile : puts) { + Map map = new HashMap(); + final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8); + List columns = sorted.get(rowKeyString); + if (columns == null) { + columns = new ArrayList<>(); + sorted.put(rowKeyString, columns); + } + + columns.addAll(putFlowFile.getColumns()); + for (PutColumn column : putFlowFile.getColumns()) { + map.put(new String(column.getColumnQualifier()), new String(column.getBuffer())); + } + + addResult(new String(putFlowFile.getRow()), map, 1); + } + + for (final Map.Entry> entry : sorted.entrySet()) { + newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue())); + } + + table.put(newPuts); + } + @Override public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException { for (Result result : results.values()) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java index 41e4a1c087..61ac52ce1e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java @@ -41,6 +41,8 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -113,6 +115,69 @@ public class TestHBase_1_1_2_ClientMapCacheService { verifyPut(row, columnFamily, columnQualifier, content, capture.getValue()); } + @Test + public void testPutAll() throws InitializationException, IOException { + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + + // Mock an HBase Table so we can verify the put operations later + final Table table = Mockito.mock(Table.class); + when(table.getName()).thenReturn(TableName.valueOf(tableName)); + + // create the controller service and link it to the test processor + final MockHBaseClientService service = configureHBaseClientService(runner, table); + runner.assertValid(service); + + final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE) + .asControllerService(HBaseClientService.class); + + final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService); + runner.assertValid(cacheService); + + // try to put a single cell + final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE) + .asControllerService(DistributedMapCacheClient.class); + + Map putz = new HashMap<>(); + List content = new ArrayList<>(); + List rows = new ArrayList<>(); + for (int x = 1; x <= 5; x++) { + putz.put(String.format("row-%d", x), String.format("content-%d", x)); + content.add(String.format("content-%d", x)); + rows.add(String.format("row-%d", x)); + } + + hBaseCacheService.putAll( putz, stringSerializer, stringSerializer); + + // verify only one call to put was made + ArgumentCaptor capture = ArgumentCaptor.forClass(List.class); + verify(table, times(1)).put(capture.capture()); + + List captured = capture.getValue(); + + + for (int x = 0; x < 5; x++) { + Put put = captured.get(x); + + String row = new String(put.getRow()); + assertTrue(rows.contains(row)); + + NavigableMap> familyCells = put.getFamilyCellMap(); + assertEquals(1, familyCells.size()); + + Map.Entry> entry = familyCells.firstEntry(); + assertEquals(columnFamily, new String(entry.getKey())); + assertEquals(1, entry.getValue().size()); + + Cell cell = entry.getValue().get(0); + String contentString = new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); + assertTrue(content.contains(contentString)); + + content.remove(contentString); + rows.remove(row); + } + } + @Test public void testGet() throws InitializationException, IOException { final String row = "row1"; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java index f8b2b89ed7..6672c3a0d5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java @@ -30,6 +30,7 @@ import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; @@ -43,6 +44,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS; @@ -173,6 +175,21 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp hBaseClientService.put(hBaseCacheTableName, rowIdBytes, putColumns); } + @Override + public void putAll(Map keysAndValues, Serializer keySerializer, Serializer valueSerializer) throws IOException { + List puts = new ArrayList<>(); + for (Map.Entry entry : keysAndValues.entrySet()) { + List putColumns = new ArrayList(1); + final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer); + final byte[] valueBytes = serialize(entry.getValue(), valueSerializer); + + final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression); + putColumns.add(putColumn); + puts.add(new PutFlowFile(hBaseCacheTableName, rowIdBytes, putColumns, null)); + } + hBaseClientService.put(hBaseCacheTableName, puts); + } + @Override public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { final byte[] rowIdBytes = serialize(key, keySerializer); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java index 59e92aa91d..eaad0577ac 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientService.java @@ -419,7 +419,7 @@ public class HBase_2_ClientService extends AbstractControllerService implements } } - private List buildPuts(byte[] rowKey, List columns) { + protected List buildPuts(byte[] rowKey, List columns) { List retVal = new ArrayList<>(); try { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index 8508818dea..b4d75fa167 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.mockito.Mockito; @@ -127,6 +128,35 @@ public class MockHBaseClientService extends HBase_2_ClientService { addResult(new String(rowId), map, 1); } + @Override + public void put(final String tableName, final Collection puts) throws IOException { + final Map> sorted = new HashMap<>(); + final List newPuts = new ArrayList<>(); + + for (final PutFlowFile putFlowFile : puts) { + Map map = new HashMap(); + final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8); + List columns = sorted.get(rowKeyString); + if (columns == null) { + columns = new ArrayList<>(); + sorted.put(rowKeyString, columns); + } + + columns.addAll(putFlowFile.getColumns()); + for (PutColumn column : putFlowFile.getColumns()) { + map.put(new String(column.getColumnQualifier()), new String(column.getBuffer())); + } + + addResult(new String(putFlowFile.getRow()), map, 1); + } + + for (final Map.Entry> entry : sorted.entrySet()) { + newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue())); + } + + table.put(newPuts); + } + @Override public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException { for (Result result : results.values()) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java index cacefb6f2f..aafaf23654 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java @@ -41,6 +41,8 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -113,6 +115,69 @@ public class TestHBase_2_ClientMapCacheService { verifyPut(row, columnFamily, columnQualifier, content, capture.getValue()); } + @Test + public void testPutAll() throws InitializationException, IOException { + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + + // Mock an HBase Table so we can verify the put operations later + final Table table = Mockito.mock(Table.class); + when(table.getName()).thenReturn(TableName.valueOf(tableName)); + + // create the controller service and link it to the test processor + final MockHBaseClientService service = configureHBaseClientService(runner, table); + runner.assertValid(service); + + final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE) + .asControllerService(HBaseClientService.class); + + final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService); + runner.assertValid(cacheService); + + // try to put a single cell + final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE) + .asControllerService(DistributedMapCacheClient.class); + + Map putz = new HashMap<>(); + List content = new ArrayList<>(); + List rows = new ArrayList<>(); + for (int x = 1; x <= 5; x++) { + putz.put(String.format("row-%d", x), String.format("content-%d", x)); + content.add(String.format("content-%d", x)); + rows.add(String.format("row-%d", x)); + } + + hBaseCacheService.putAll( putz, stringSerializer, stringSerializer); + + // verify only one call to put was made + ArgumentCaptor capture = ArgumentCaptor.forClass(List.class); + verify(table, times(1)).put(capture.capture()); + + List captured = capture.getValue(); + + + for (int x = 0; x < 5; x++) { + Put put = captured.get(x); + + String row = new String(put.getRow()); + assertTrue(rows.contains(row)); + + NavigableMap> familyCells = put.getFamilyCellMap(); + assertEquals(1, familyCells.size()); + + Map.Entry> entry = familyCells.firstEntry(); + assertEquals(columnFamily, new String(entry.getKey())); + assertEquals(1, entry.getValue().size()); + + Cell cell = entry.getValue().get(0); + String contentString = new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); + assertTrue(content.contains(contentString)); + + content.remove(contentString); + rows.remove(row); + } + } + @Test public void testGet() throws InitializationException, IOException { final String row = "row1";