NIFI-7373

Added new methods to DistributedMapCacheClient for bulk get and put. Updated HBase 1.1.2 clients.
Added HBase 2 support.
Added Redis support.
This commit is contained in:
Mike Thomsen 2020-04-18 14:10:27 -04:00 committed by Bryan Bende
parent 7f0b188be4
commit 0e61dbc9a0
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
11 changed files with 285 additions and 6 deletions

View File

@ -46,7 +46,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Tags({ "redis", "distributed", "cache", "map" })
@ -194,6 +196,26 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer
});
}
@Override
public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
withConnection(redisConnection -> {
Map<byte[], byte[]> values = new HashMap<>();
for (Map.Entry<K, V> entry : keysAndValues.entrySet()) {
final Tuple<byte[],byte[]> kv = serialize(entry.getKey(), entry.getValue(), keySerializer, valueSerializer);
values.put(kv.getKey(), kv.getValue());
}
if (getLogger().isDebugEnabled()) {
getLogger().debug(String.format("Queued up %d tuples to mset on Redis connection.", values.size()));
}
if (!values.isEmpty()) {
redisConnection.mSet(values);
}
return null;
});
}
@Override
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withConnection(redisConnection -> {

View File

@ -49,8 +49,10 @@ import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
@ -243,6 +245,20 @@ public class ITRedisDistributedMapCacheClientService {
Assert.assertTrue(cacheClient.removeByPattern("test-redis-processor-*") >= numToDelete);
Assert.assertFalse(cacheClient.containsKey(key, stringSerializer));
Map<String, String> bulk = new HashMap<>();
bulk.put("bulk-1", "testing1");
bulk.put("bulk-2", "testing2");
bulk.put("bulk-3", "testing3");
bulk.put("bulk-4", "testing4");
bulk.put("bulk-5", "testing5");
cacheClient.putAll(bulk, stringSerializer, stringSerializer);
Assert.assertTrue(cacheClient.containsKey("bulk-1", stringSerializer));
Assert.assertTrue(cacheClient.containsKey("bulk-2", stringSerializer));
Assert.assertTrue(cacheClient.containsKey("bulk-3", stringSerializer));
Assert.assertTrue(cacheClient.containsKey("bulk-4", stringSerializer));
Assert.assertTrue(cacheClient.containsKey("bulk-5", stringSerializer));
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) {
getLogger().error("Routing to failure due to: " + e.getMessage(), e);

View File

@ -16,15 +16,15 @@
*/
package org.apache.nifi.distributed.cache.client;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.controller.ControllerService;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.controller.ControllerService;
/**
* This interface defines an API that can be used for interacting with a
* Distributed Cache that functions similarly to a {@link java.util.Map Map}.
@ -105,6 +105,23 @@ public interface DistributedMapCacheClient extends ControllerService {
*/
<K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException;
/**
* Performs a bulk put operation. This should be used when needed to send a large batch of updates to a cache
* in a single update operation.
*
* @param keysAndValues A java.util.Map that contains an association between keys and values to be bulk inserted into the cache.
* @param keySerializer The Serializer that will be used to serialize the key into bytes
* @param valueSerializer The Serializer that will be used to serialize the value into bytes
* @param <K> The key type
* @param <V> The value type
* @throws IOException if unable to communicate with the remote instance
*/
default <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
for (Map.Entry<K, V> entry : keysAndValues.entrySet()) {
put(entry.getKey(), entry.getValue(), keySerializer, valueSerializer);
}
}
/**
* Returns the value in the cache for the given key, if one exists;
* otherwise returns <code>null</code>

View File

@ -30,6 +30,7 @@ import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
@ -43,6 +44,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
@ -172,6 +174,21 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
hBaseClientService.put(hBaseCacheTableName, rowIdBytes, putColumns);
}
@Override
public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
List<PutFlowFile> puts = new ArrayList<>();
for (Map.Entry<K, V> entry : keysAndValues.entrySet()) {
List<PutColumn> putColumns = new ArrayList<PutColumn>(1);
final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer);
final byte[] valueBytes = serialize(entry.getValue(), valueSerializer);
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression);
putColumns.add(putColumn);
puts.add(new PutFlowFile(hBaseCacheTableName, rowIdBytes, putColumns, null));
}
hBaseClientService.put(hBaseCacheTableName, puts);
}
@Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
final byte[] rowIdBytes = serialize(key, keySerializer);

View File

@ -420,7 +420,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
}
private List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) {
protected List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) {
List<Put> retVal = new ArrayList<>();
try {

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.mockito.Mockito;
@ -127,6 +128,35 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService {
addResult(new String(rowId), map, 1);
}
@Override
public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException {
final Map<String, List<PutColumn>> sorted = new HashMap<>();
final List<Put> newPuts = new ArrayList<>();
for (final PutFlowFile putFlowFile : puts) {
Map<String, String> map = new HashMap<String, String>();
final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8);
List<PutColumn> columns = sorted.get(rowKeyString);
if (columns == null) {
columns = new ArrayList<>();
sorted.put(rowKeyString, columns);
}
columns.addAll(putFlowFile.getColumns());
for (PutColumn column : putFlowFile.getColumns()) {
map.put(new String(column.getColumnQualifier()), new String(column.getBuffer()));
}
addResult(new String(putFlowFile.getRow()), map, 1);
}
for (final Map.Entry<String, List<PutColumn>> entry : sorted.entrySet()) {
newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue()));
}
table.put(newPuts);
}
@Override
public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
for (Result result : results.values()) {

View File

@ -41,6 +41,8 @@ import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -113,6 +115,69 @@ public class TestHBase_1_1_2_ClientMapCacheService {
verifyPut(row, columnFamily, columnQualifier, content, capture.getValue());
}
@Test
public void testPutAll() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService);
runner.assertValid(cacheService);
// try to put a single cell
final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
Map<String, String> putz = new HashMap<>();
List<String> content = new ArrayList<>();
List<String> rows = new ArrayList<>();
for (int x = 1; x <= 5; x++) {
putz.put(String.format("row-%d", x), String.format("content-%d", x));
content.add(String.format("content-%d", x));
rows.add(String.format("row-%d", x));
}
hBaseCacheService.putAll( putz, stringSerializer, stringSerializer);
// verify only one call to put was made
ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class);
verify(table, times(1)).put(capture.capture());
List<Put> captured = capture.getValue();
for (int x = 0; x < 5; x++) {
Put put = captured.get(x);
String row = new String(put.getRow());
assertTrue(rows.contains(row));
NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap();
assertEquals(1, familyCells.size());
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(columnFamily, new String(entry.getKey()));
assertEquals(1, entry.getValue().size());
Cell cell = entry.getValue().get(0);
String contentString = new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
assertTrue(content.contains(contentString));
content.remove(contentString);
rows.remove(row);
}
}
@Test
public void testGet() throws InitializationException, IOException {
final String row = "row1";

View File

@ -30,6 +30,7 @@ import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
@ -43,6 +44,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
@ -173,6 +175,21 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp
hBaseClientService.put(hBaseCacheTableName, rowIdBytes, putColumns);
}
@Override
public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
List<PutFlowFile> puts = new ArrayList<>();
for (Map.Entry<K, V> entry : keysAndValues.entrySet()) {
List<PutColumn> putColumns = new ArrayList<PutColumn>(1);
final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer);
final byte[] valueBytes = serialize(entry.getValue(), valueSerializer);
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes, defaultVisibilityExpression);
putColumns.add(putColumn);
puts.add(new PutFlowFile(hBaseCacheTableName, rowIdBytes, putColumns, null));
}
hBaseClientService.put(hBaseCacheTableName, puts);
}
@Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
final byte[] rowIdBytes = serialize(key, keySerializer);

View File

@ -419,7 +419,7 @@ public class HBase_2_ClientService extends AbstractControllerService implements
}
}
private List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) {
protected List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) {
List<Put> retVal = new ArrayList<>();
try {

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.mockito.Mockito;
@ -127,6 +128,35 @@ public class MockHBaseClientService extends HBase_2_ClientService {
addResult(new String(rowId), map, 1);
}
@Override
public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException {
final Map<String, List<PutColumn>> sorted = new HashMap<>();
final List<Put> newPuts = new ArrayList<>();
for (final PutFlowFile putFlowFile : puts) {
Map<String, String> map = new HashMap<String, String>();
final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8);
List<PutColumn> columns = sorted.get(rowKeyString);
if (columns == null) {
columns = new ArrayList<>();
sorted.put(rowKeyString, columns);
}
columns.addAll(putFlowFile.getColumns());
for (PutColumn column : putFlowFile.getColumns()) {
map.put(new String(column.getColumnQualifier()), new String(column.getBuffer()));
}
addResult(new String(putFlowFile.getRow()), map, 1);
}
for (final Map.Entry<String, List<PutColumn>> entry : sorted.entrySet()) {
newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue()));
}
table.put(newPuts);
}
@Override
public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
for (Result result : results.values()) {

View File

@ -41,6 +41,8 @@ import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -113,6 +115,69 @@ public class TestHBase_2_ClientMapCacheService {
verifyPut(row, columnFamily, columnQualifier, content, capture.getValue());
}
@Test
public void testPutAll() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService);
runner.assertValid(cacheService);
// try to put a single cell
final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
Map<String, String> putz = new HashMap<>();
List<String> content = new ArrayList<>();
List<String> rows = new ArrayList<>();
for (int x = 1; x <= 5; x++) {
putz.put(String.format("row-%d", x), String.format("content-%d", x));
content.add(String.format("content-%d", x));
rows.add(String.format("row-%d", x));
}
hBaseCacheService.putAll( putz, stringSerializer, stringSerializer);
// verify only one call to put was made
ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class);
verify(table, times(1)).put(capture.capture());
List<Put> captured = capture.getValue();
for (int x = 0; x < 5; x++) {
Put put = captured.get(x);
String row = new String(put.getRow());
assertTrue(rows.contains(row));
NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap();
assertEquals(1, familyCells.size());
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(columnFamily, new String(entry.getKey()));
assertEquals(1, entry.getValue().size());
Cell cell = entry.getValue().get(0);
String contentString = new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
assertTrue(content.contains(contentString));
content.remove(contentString);
rows.remove(row);
}
}
@Test
public void testGet() throws InitializationException, IOException {
final String row = "row1";