Completed initial development of HBase_1_1_2_ClientMapCacheService.java which is compatible with DetectDuplicate (and other processors)

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
baolsen 2017-03-23 14:35:43 +02:00 committed by Bryan Bende
parent 232380dbfd
commit 152f002abf
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
8 changed files with 786 additions and 1 deletions

View File

@ -55,6 +55,16 @@ public class MockHBaseClientService extends AbstractControllerService implements
throw new UnsupportedOperationException();
}
@Override
public boolean checkAndPut(String tableName, byte[] rowId, byte[] family, byte[] qualifier, byte[]value, PutColumn column) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void delete(String tableName, byte[] rowId) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void scan(String tableName, byte[] startRow, byte[] endRow, Collection<Column> columns, ResultHandler handler) throws IOException {
if (throwException) {

View File

@ -94,6 +94,28 @@ public interface HBaseClientService extends ControllerService {
*/
void put(String tableName, byte[] rowId, Collection<PutColumn> columns) throws IOException;
/**
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, then the Put is added to HBase.
*
* @param tableName the name of an HBase table
* @param rowId the id of the row to check
* @param family the family of the row to check
* @param qualifier the qualifier of the row to check
* @param value the value of the row to check. If null, the check is for the lack of column (ie: non-existence)
* @return True if the Put was executed, false otherwise
* @throws IOException thrown when there are communication errors with HBase$
*/
boolean checkAndPut(String tableName, byte[] rowId, byte[] family, byte[] qualifier, byte[] value, PutColumn column) throws IOException;
/**
* Deletes the given row on HBase. All cells are deleted.
*
* @param tableName the name of an HBase table
* @param rowId the id of the row to delete
* @throws IOException thrown when there are communication errors with HBase
*/
void delete(String tableName, byte[] rowId) throws IOException;
/**
* Scans the given table using the optional filter criteria and passing each result to the provided handler.
*

View File

@ -45,6 +45,12 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>

View File

@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.Deserializer;
import java.io.ByteArrayOutputStream;
import org.apache.nifi.reporting.InitializationException;
import java.nio.charset.StandardCharsets;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.processor.util.StandardValidators;
@Tags({"distributed", "cache", "state", "map", "cluster","hbase"})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheClient", "org.apache.nifi.hbase.HBase_1_1_2_ClientService"})
@CapabilityDescription("Provides the ability to use an HBase table as a cache, in place of a DistributedMapCache."
+ " Uses a HBase_1_1_2_ClientService controller to communicate with HBase.")
public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService implements DistributedMapCacheClient {
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
.description("Specifies the HBase Client Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
public static final PropertyDescriptor HBASE_CACHE_TABLE_NAME = new PropertyDescriptor.Builder()
.name("HBase Cache Table Name")
.description("Name of the table on HBase to use for the cache.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor HBASE_COLUMN_FAMILY = new PropertyDescriptor.Builder()
.name("HBase Column Family")
.description("Name of the column family on HBase to use for the cache.")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("f")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor HBASE_COLUMN_QUALIFIER = new PropertyDescriptor.Builder()
.name("HBase Column Qualifier")
.description("Name of the column qualifier on HBase to use for the cache")
.defaultValue("q")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(HBASE_CACHE_TABLE_NAME);
descriptors.add(HBASE_CLIENT_SERVICE);
descriptors.add(HBASE_COLUMN_FAMILY);
descriptors.add(HBASE_COLUMN_QUALIFIER);
return descriptors;
}
// Other threads may call @OnEnabled so these are marked volatile to ensure other class methods read the updated value
private volatile String hBaseCacheTableName;
private volatile HBaseClientService hBaseClientService;
private volatile String hBaseColumnFamily;
private volatile byte[] hBaseColumnFamilyBytes;
private volatile String hBaseColumnQualifier;
private volatile byte[] hBaseColumnQualifierBytes;
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException{
hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
hBaseCacheTableName = context.getProperty(HBASE_CACHE_TABLE_NAME).evaluateAttributeExpressions().getValue();
hBaseColumnFamily = context.getProperty(HBASE_COLUMN_FAMILY).evaluateAttributeExpressions().getValue();
hBaseColumnQualifier = context.getProperty(HBASE_COLUMN_QUALIFIER).evaluateAttributeExpressions().getValue();
hBaseColumnFamilyBytes = hBaseColumnFamily.getBytes(StandardCharsets.UTF_8);
hBaseColumnQualifierBytes = hBaseColumnQualifier.getBytes(StandardCharsets.UTF_8);
}
private <T> byte[] serialize(final T value, final Serializer<T> serializer) throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializer.serialize(value, baos);
return baos.toByteArray();
}
private <T> T deserialize(final byte[] value, final Deserializer<T> deserializer) throws IOException {
return deserializer.deserialize(value);
}
@Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
final byte[] rowIdBytes = serialize(key, keySerializer);
final byte[] valueBytes = serialize(value, valueSerializer);
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
return hBaseClientService.checkAndPut(hBaseCacheTableName, rowIdBytes, hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, null, putColumn);
}
@Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
List<PutColumn> putColumns = new ArrayList<PutColumn>(1);
final byte[] rowIdBytes = serialize(key, keySerializer);
final byte[] valueBytes = serialize(value, valueSerializer);
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
putColumns.add(putColumn);
hBaseClientService.put(hBaseCacheTableName, rowIdBytes, putColumns);
}
@Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
final byte[] rowIdBytes = serialize(key, keySerializer);
final HBaseRowHandler handler = new HBaseRowHandler();
final List<Column> columnsList = new ArrayList<Column>(0);
hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, handler);
return (handler.numRows() > 0);
}
/**
* Note that the implementation of getAndPutIfAbsent is not atomic.
* The putIfAbsent is atomic, but a getAndPutIfAbsent does a get and then a putIfAbsent.
* If there is an existing value and it is updated in betweern the two steps, then the existing (unmodified) value will be returned.
* If the existing value was deleted between the two steps, getAndPutIfAbsent will correctly return null.
* This should not generally be an issue with cache processors such as DetectDuplicate.
*
*/
@Override
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException {
// Between the get and the putIfAbsent, the value could be deleted or updated.
// Logic below takes care of the deleted case but not the updated case.
// This is probably fine since DistributedMapCache and DetectDuplicate expect to receive the original cache value
// Could possibly be fixed by implementing AtomicDistributedMapCache (Map Cache protocol version 2)
final V got = get(key, keySerializer, valueDeserializer);
final boolean wasAbsent = putIfAbsent(key, value, keySerializer, valueSerializer);
if (! wasAbsent) return got;
else return null;
}
@Override
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
final byte[] rowIdBytes = serialize(key, keySerializer);
final HBaseRowHandler handler = new HBaseRowHandler();
final List<Column> columnsList = new ArrayList<Column>(0);
hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, handler);
if (handler.numRows() > 1) {
throw new IOException("Found multiple rows in HBase for key");
} else if(handler.numRows() == 1) {
return deserialize( handler.getLastResultBytes(), valueDeserializer);
} else {
return null;
}
}
@Override
public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException {
final boolean contains = containsKey(key, keySerializer);
if (contains) {
final byte[] rowIdBytes = serialize(key, keySerializer);
hBaseClientService.delete(hBaseCacheTableName, rowIdBytes);
}
return contains;
}
@Override
public long removeByPattern(String regex) throws IOException {
throw new IOException("HBase removeByPattern is not implemented");
}
@Override
public void close() throws IOException {
}
@Override
protected void finalize() throws Throwable {
}
private class HBaseRowHandler implements ResultHandler {
private int numRows = 0;
private byte[] lastResultBytes;
@Override
public void handle(byte[] row, ResultCell[] resultCells) {
numRows += 1;
for( final ResultCell resultCell : resultCells ){
lastResultBytes = resultCell.getValueArray();
}
}
public int numRows() {
return numRows;
}
public byte[] getLastResultBytes() {
return lastResultBytes;
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@ -308,6 +309,26 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
}
}
@Override
public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
Put put = new Put(rowId);
put.addColumn(
column.getColumnFamily(),
column.getColumnQualifier(),
column.getBuffer());
return table.checkAndPut(rowId, family, qualifier, value, put);
}
}
@Override
public void delete(final String tableName, final byte[] rowId) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
Delete delete = new Delete(rowId);
table.delete(delete);
}
}
@Override
public void scan(final String tableName, final Collection<Column> columns, final String filterExpression, final long minTime, final ResultHandler handler)
throws IOException {

View File

@ -13,3 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.hbase.HBase_1_1_2_ClientService
org.apache.nifi.hbase.HBase_1_1_2_ClientMapCacheService

View File

@ -0,0 +1,473 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.io.OutputStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestHBase_1_1_2_ClientMapCacheService {
private KerberosProperties kerberosPropsWithFile;
private KerberosProperties kerberosPropsWithoutFile;
private Serializer<String> stringSerializer = new StringSerializer();
private Deserializer<String> stringDeserializer = new StringDeserializer();
@Before
public void setup() {
// needed for calls to UserGroupInformation.setConfiguration() to work when passing in
// config with Kerberos authentication enabled
System.setProperty("java.security.krb5.realm", "nifi.com");
System.setProperty("java.security.krb5.kdc", "nifi.kdc");
kerberosPropsWithFile = new KerberosProperties(new File("src/test/resources/krb5.conf"));
kerberosPropsWithoutFile = new KerberosProperties(null);
}
private final String tableName = "nifi";
private final String columnFamily = "family1";
private final String columnQualifier = "qualifier1";
@Test
public void testPut() throws InitializationException, IOException {
final String row = "row1";
final String content = "content1";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService);
runner.assertValid(cacheService);
// try to put a single cell
final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
hBaseCacheService.put( row, content, stringSerializer, stringSerializer);
// verify only one call to put was made
ArgumentCaptor<Put> capture = ArgumentCaptor.forClass(Put.class);
verify(table, times(1)).put(capture.capture());
verifyPut(row, columnFamily, columnQualifier, content, capture.getValue());
}
@Test
public void testGet() throws InitializationException, IOException {
final String row = "row1";
final String content = "content1";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService);
runner.assertValid(cacheService);
final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
hBaseCacheService.put( row, content, stringSerializer, stringSerializer);
final String result = hBaseCacheService.get(row, stringSerializer, stringDeserializer);
assertEquals( content, result);
}
@Test
public void testContainsKey() throws InitializationException, IOException {
final String row = "row1";
final String content = "content1";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService);
runner.assertValid(cacheService);
final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
assertFalse( hBaseCacheService.containsKey(row , stringSerializer ) );
hBaseCacheService.put( row, content, stringSerializer, stringSerializer);
assertTrue( hBaseCacheService.containsKey(row, stringSerializer) );
}
@Test
public void testPutIfAbsent() throws InitializationException, IOException {
final String row = "row1";
final String content = "content1";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService);
runner.assertValid(cacheService);
final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
assertTrue( hBaseCacheService.putIfAbsent( row, content, stringSerializer, stringSerializer));
// verify only one call to put was made
ArgumentCaptor<Put> capture = ArgumentCaptor.forClass(Put.class);
verify(table, times(1)).put(capture.capture());
verifyPut(row, columnFamily, columnQualifier, content, capture.getValue());
assertFalse( hBaseCacheService.putIfAbsent( row, content, stringSerializer, stringSerializer));
verify(table, times(1)).put(capture.capture());
}
@Test
public void testGetAndPutIfAbsent() throws InitializationException, IOException {
final String row = "row1";
final String content = "content1";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService);
runner.assertValid(cacheService);
final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
assertNull( hBaseCacheService.getAndPutIfAbsent( row, content, stringSerializer, stringSerializer, stringDeserializer));
// verify only one call to put was made
ArgumentCaptor<Put> capture = ArgumentCaptor.forClass(Put.class);
verify(table, times(1)).put(capture.capture());
verifyPut(row, columnFamily, columnQualifier, content, capture.getValue());
final String result = hBaseCacheService.getAndPutIfAbsent( row, content, stringSerializer, stringSerializer, stringDeserializer);
verify(table, times(1)).put(capture.capture());
assertEquals( result, content);
}
private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException {
final MockHBaseClientService service = new MockHBaseClientService(table, kerberosPropsWithFile);
runner.addControllerService("hbaseClient", service);
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
runner.enableControllerService(service);
runner.setProperty(TestProcessor.HBASE_CLIENT_SERVICE, "hbaseClient");
return service;
}
private DistributedMapCacheClient configureHBaseCacheService(final TestRunner runner, final HBaseClientService service) throws InitializationException {
final HBase_1_1_2_ClientMapCacheService cacheService = new HBase_1_1_2_ClientMapCacheService();
runner.addControllerService("hbaseCache", cacheService);
runner.setProperty(cacheService, HBase_1_1_2_ClientMapCacheService.HBASE_CLIENT_SERVICE, "hbaseClient");
runner.setProperty(cacheService, HBase_1_1_2_ClientMapCacheService.HBASE_CACHE_TABLE_NAME, tableName);
runner.setProperty(cacheService, HBase_1_1_2_ClientMapCacheService.HBASE_COLUMN_FAMILY, columnFamily);
runner.setProperty(cacheService, HBase_1_1_2_ClientMapCacheService.HBASE_COLUMN_QUALIFIER, columnQualifier);
runner.enableControllerService(cacheService);
runner.setProperty(TestProcessor.HBASE_CACHE_SERVICE,"hbaseCache");
return cacheService;
}
private void verifyResultCell(final ResultCell result, final String cf, final String cq, final String val) {
final String colFamily = new String(result.getFamilyArray(), result.getFamilyOffset(), result.getFamilyLength());
assertEquals(cf, colFamily);
final String colQualifier = new String(result.getQualifierArray(), result.getQualifierOffset(), result.getQualifierLength());
assertEquals(cq, colQualifier);
final String value = new String(result.getValueArray(), result.getValueOffset(), result.getValueLength());
assertEquals(val, value);
}
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, Put put) {
assertEquals(row, new String(put.getRow()));
NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap();
assertEquals(1, familyCells.size());
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(columnFamily, new String(entry.getKey()));
assertEquals(1, entry.getValue().size());
Cell cell = entry.getValue().get(0);
assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
// Override methods to create a mock service that can return staged data
private class MockHBaseClientService extends HBase_1_1_2_ClientService {
private Table table;
private List<Result> results = new ArrayList<>();
private KerberosProperties kerberosProperties;
public MockHBaseClientService(final Table table, final KerberosProperties kerberosProperties) {
this.table = table;
this.kerberosProperties = kerberosProperties;
}
@Override
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return kerberosProperties;
}
protected void setKerberosProperties(KerberosProperties properties) {
this.kerberosProperties = properties;
}
public void addResult(final String rowKey, final Map<String, String> cells, final long timestamp) {
final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
final Cell[] cellArray = new Cell[cells.size()];
int i = 0;
for (final Map.Entry<String, String> cellEntry : cells.entrySet()) {
final Cell cell = Mockito.mock(Cell.class);
when(cell.getRowArray()).thenReturn(rowArray);
when(cell.getRowOffset()).thenReturn(0);
when(cell.getRowLength()).thenReturn((short) rowArray.length);
final String cellValue = cellEntry.getValue();
final byte[] valueArray = cellValue.getBytes(StandardCharsets.UTF_8);
when(cell.getValueArray()).thenReturn(valueArray);
when(cell.getValueOffset()).thenReturn(0);
when(cell.getValueLength()).thenReturn(valueArray.length);
final byte[] familyArray = "family1".getBytes(StandardCharsets.UTF_8);
when(cell.getFamilyArray()).thenReturn(familyArray);
when(cell.getFamilyOffset()).thenReturn(0);
when(cell.getFamilyLength()).thenReturn((byte) familyArray.length);
final String qualifier = cellEntry.getKey();
final byte[] qualifierArray = qualifier.getBytes(StandardCharsets.UTF_8);
when(cell.getQualifierArray()).thenReturn(qualifierArray);
when(cell.getQualifierOffset()).thenReturn(0);
when(cell.getQualifierLength()).thenReturn(qualifierArray.length);
when(cell.getTimestamp()).thenReturn(timestamp);
cellArray[i++] = cell;
}
final Result result = Mockito.mock(Result.class);
when(result.getRow()).thenReturn(rowArray);
when(result.rawCells()).thenReturn(cellArray);
results.add(result);
}
@Override
public void put(final String tableName, final byte[] rowId, final Collection<PutColumn> columns) throws IOException {
Put put = new Put(rowId);
Map<String,String> map = new HashMap<String,String>();
for (final PutColumn column : columns) {
put.addColumn(
column.getColumnFamily(),
column.getColumnQualifier(),
column.getBuffer());
map.put( new String( column.getColumnQualifier() ), new String(column.getBuffer()) );
}
table.put(put);
addResult( new String(rowId) , map,1);
}
@Override
public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
for (Result result: results){
if ( Arrays.equals(result.getRow(), rowId)){
Cell[] cellArray = result.rawCells();
for (Cell cell : cellArray){
if( Arrays.equals(cell.getFamilyArray(), family) && Arrays.equals(cell.getQualifierArray(), qualifier)){
//throw new RuntimeException( new String(cell.getValueArray()) );
if( value == null || Arrays.equals(cell.getValueArray(), value)) return false;
}
}
}
}
final List<PutColumn> putColumns = new ArrayList<PutColumn>();
putColumns.add(column);
put(tableName, rowId, putColumns );
return true;
}
@Override
public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, final ResultHandler handler) throws IOException {
if (startRow != endRow) throw new RuntimeException("Start and end must be equal");
for(Result result: results){
if (Arrays.equals( result.getRow() , startRow)) {
final Cell[] cellArray = result.rawCells();
final ResultCell[] resultCells = new ResultCell[cellArray.length ];
int i=0;
for (Cell cell : cellArray){
ResultCell resultCell = new ResultCell();
resultCell.setRowArray( result.getRow());
resultCell.setFamilyArray(cell.getFamilyArray());
resultCell.setQualifierArray(cell.getQualifierArray());
resultCell.setValueArray(cell.getValueArray());
resultCells[i++]=resultCell;
}
handler.handle(result.getRow(), resultCells );
}
}
}
@Override
protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
return scanner;
}
@Override
protected Connection createConnection(ConfigurationContext context) throws IOException {
Connection connection = Mockito.mock(Connection.class);
Mockito.when(connection.getTable(table.getName())).thenReturn(table);
return connection;
}
}
// handler that saves results for verification
private static final class CollectingResultHandler implements ResultHandler {
Map<String,ResultCell[]> results = new LinkedHashMap<>();
@Override
public void handle(byte[] row, ResultCell[] resultCells) {
final String rowStr = new String(row, StandardCharsets.UTF_8);
results.put(rowStr, resultCells);
}
}
private static class StringSerializer implements Serializer<String> {
@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
out.write(value.getBytes(StandardCharsets.UTF_8));
}
}
private static class StringDeserializer implements Deserializer<String> {
@Override
public String deserialize(byte[] input) throws DeserializationException, IOException{
return new String(input);
}
}
}

View File

@ -21,6 +21,7 @@ import java.util.List;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.hbase.HBaseClientService;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -35,6 +36,13 @@ public class TestProcessor extends AbstractProcessor {
.required(true)
.build();
static final PropertyDescriptor HBASE_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Cache Service")
.description("HBaseCacheService")
.identifiesControllerService(DistributedMapCacheClient.class)
.required(true)
.build();
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
@ -43,6 +51,7 @@ public class TestProcessor extends AbstractProcessor {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> propDescs = new ArrayList<>();
propDescs.add(HBASE_CLIENT_SERVICE);
propDescs.add(HBASE_CACHE_SERVICE);
return propDescs;
}
}