mirror of https://github.com/apache/nifi.git
NIFI-3644 Fixing the result handler in HBase_1_1_2_ClientMapCacheService to use the offsets for the value bytes
This closes #1645. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
152f002abf
commit
ae3db82303
|
@ -19,6 +19,7 @@ package org.apache.nifi.hbase;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
@ -229,7 +230,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
|
||||||
public void handle(byte[] row, ResultCell[] resultCells) {
|
public void handle(byte[] row, ResultCell[] resultCells) {
|
||||||
numRows += 1;
|
numRows += 1;
|
||||||
for( final ResultCell resultCell : resultCells ){
|
for( final ResultCell resultCell : resultCells ){
|
||||||
lastResultBytes = resultCell.getValueArray();
|
lastResultBytes = Arrays.copyOfRange(resultCell.getValueArray(), resultCell.getValueOffset(), resultCell.getValueLength() + resultCell.getValueOffset());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
public int numRows() {
|
public int numRows() {
|
||||||
|
|
|
@ -0,0 +1,166 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.hbase;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
|
import org.apache.nifi.hadoop.KerberosProperties;
|
||||||
|
import org.apache.nifi.hbase.put.PutColumn;
|
||||||
|
import org.apache.nifi.hbase.scan.Column;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Override methods to create a mock service that can return staged data
|
||||||
|
*/
|
||||||
|
public class MockHBaseClientService extends HBase_1_1_2_ClientService {
|
||||||
|
|
||||||
|
private Table table;
|
||||||
|
private String family;
|
||||||
|
private List<Result> results = new ArrayList<>();
|
||||||
|
private KerberosProperties kerberosProperties;
|
||||||
|
|
||||||
|
public MockHBaseClientService(final Table table, final String family, final KerberosProperties kerberosProperties) {
|
||||||
|
this.table = table;
|
||||||
|
this.family = family;
|
||||||
|
this.kerberosProperties = kerberosProperties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||||
|
return kerberosProperties;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setKerberosProperties(KerberosProperties properties) {
|
||||||
|
this.kerberosProperties = properties;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addResult(final String rowKey, final Map<String, String> cells, final long timestamp) {
|
||||||
|
final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
|
||||||
|
final Cell[] cellArray = new Cell[cells.size()];
|
||||||
|
int i = 0;
|
||||||
|
for (final Map.Entry<String, String> cellEntry : cells.entrySet()) {
|
||||||
|
final Cell cell = Mockito.mock(Cell.class);
|
||||||
|
when(cell.getRowArray()).thenReturn(rowArray);
|
||||||
|
when(cell.getRowOffset()).thenReturn(0);
|
||||||
|
when(cell.getRowLength()).thenReturn((short) rowArray.length);
|
||||||
|
|
||||||
|
final String cellValue = cellEntry.getValue();
|
||||||
|
final byte[] valueArray = cellValue.getBytes(StandardCharsets.UTF_8);
|
||||||
|
when(cell.getValueArray()).thenReturn(valueArray);
|
||||||
|
when(cell.getValueOffset()).thenReturn(0);
|
||||||
|
when(cell.getValueLength()).thenReturn(valueArray.length);
|
||||||
|
|
||||||
|
final byte[] familyArray = family.getBytes(StandardCharsets.UTF_8);
|
||||||
|
when(cell.getFamilyArray()).thenReturn(familyArray);
|
||||||
|
when(cell.getFamilyOffset()).thenReturn(0);
|
||||||
|
when(cell.getFamilyLength()).thenReturn((byte) familyArray.length);
|
||||||
|
|
||||||
|
final String qualifier = cellEntry.getKey();
|
||||||
|
final byte[] qualifierArray = qualifier.getBytes(StandardCharsets.UTF_8);
|
||||||
|
when(cell.getQualifierArray()).thenReturn(qualifierArray);
|
||||||
|
when(cell.getQualifierOffset()).thenReturn(0);
|
||||||
|
when(cell.getQualifierLength()).thenReturn(qualifierArray.length);
|
||||||
|
|
||||||
|
when(cell.getTimestamp()).thenReturn(timestamp);
|
||||||
|
|
||||||
|
cellArray[i++] = cell;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Result result = Mockito.mock(Result.class);
|
||||||
|
when(result.getRow()).thenReturn(rowArray);
|
||||||
|
when(result.rawCells()).thenReturn(cellArray);
|
||||||
|
results.add(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(final String tableName, final byte[] rowId, final Collection<PutColumn> columns) throws IOException {
|
||||||
|
Put put = new Put(rowId);
|
||||||
|
Map<String, String> map = new HashMap<String, String>();
|
||||||
|
for (final PutColumn column : columns) {
|
||||||
|
put.addColumn(
|
||||||
|
column.getColumnFamily(),
|
||||||
|
column.getColumnQualifier(),
|
||||||
|
column.getBuffer());
|
||||||
|
map.put(new String(column.getColumnQualifier()), new String(column.getBuffer()));
|
||||||
|
}
|
||||||
|
|
||||||
|
table.put(put);
|
||||||
|
addResult(new String(rowId), map, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
|
||||||
|
for (Result result : results) {
|
||||||
|
if (Arrays.equals(result.getRow(), rowId)) {
|
||||||
|
Cell[] cellArray = result.rawCells();
|
||||||
|
for (Cell cell : cellArray) {
|
||||||
|
if (Arrays.equals(cell.getFamilyArray(), family) && Arrays.equals(cell.getQualifierArray(), qualifier)) {
|
||||||
|
if (value == null || Arrays.equals(cell.getValueArray(), value)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<PutColumn> putColumns = new ArrayList<PutColumn>();
|
||||||
|
putColumns.add(column);
|
||||||
|
put(tableName, rowId, putColumns);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection<Column> columns) throws IOException {
|
||||||
|
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
|
||||||
|
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
|
||||||
|
return scanner;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime) throws IOException {
|
||||||
|
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
|
||||||
|
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
|
||||||
|
return scanner;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Connection createConnection(ConfigurationContext context) throws IOException {
|
||||||
|
Connection connection = Mockito.mock(Connection.class);
|
||||||
|
Mockito.when(connection.getTable(table.getName())).thenReturn(table);
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -18,18 +18,15 @@ package org.apache.nifi.hbase;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.nifi.distributed.cache.client.Deserializer;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
|
||||||
|
import org.apache.nifi.distributed.cache.client.Serializer;
|
||||||
|
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
|
||||||
|
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
|
||||||
import org.apache.nifi.hadoop.KerberosProperties;
|
import org.apache.nifi.hadoop.KerberosProperties;
|
||||||
import org.apache.nifi.hbase.put.PutColumn;
|
|
||||||
import org.apache.nifi.hbase.scan.Column;
|
|
||||||
import org.apache.nifi.hbase.scan.ResultCell;
|
import org.apache.nifi.hbase.scan.ResultCell;
|
||||||
import org.apache.nifi.hbase.scan.ResultHandler;
|
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
@ -38,31 +35,18 @@ import org.junit.Test;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
|
|
||||||
import org.apache.nifi.distributed.cache.client.Serializer;
|
|
||||||
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
|
|
||||||
import org.apache.nifi.distributed.cache.client.Deserializer;
|
|
||||||
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
|
|
||||||
import java.io.OutputStream;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -267,7 +251,7 @@ public class TestHBase_1_1_2_ClientMapCacheService {
|
||||||
|
|
||||||
|
|
||||||
private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException {
|
private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException {
|
||||||
final MockHBaseClientService service = new MockHBaseClientService(table, kerberosPropsWithFile);
|
final MockHBaseClientService service = new MockHBaseClientService(table, "family1", kerberosPropsWithFile);
|
||||||
runner.addControllerService("hbaseClient", service);
|
runner.addControllerService("hbaseClient", service);
|
||||||
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
|
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
|
||||||
runner.enableControllerService(service);
|
runner.enableControllerService(service);
|
||||||
|
@ -313,150 +297,6 @@ public class TestHBase_1_1_2_ClientMapCacheService {
|
||||||
assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
|
assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Override methods to create a mock service that can return staged data
|
|
||||||
private class MockHBaseClientService extends HBase_1_1_2_ClientService {
|
|
||||||
|
|
||||||
private Table table;
|
|
||||||
private List<Result> results = new ArrayList<>();
|
|
||||||
private KerberosProperties kerberosProperties;
|
|
||||||
|
|
||||||
public MockHBaseClientService(final Table table, final KerberosProperties kerberosProperties) {
|
|
||||||
this.table = table;
|
|
||||||
this.kerberosProperties = kerberosProperties;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
|
||||||
return kerberosProperties;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void setKerberosProperties(KerberosProperties properties) {
|
|
||||||
this.kerberosProperties = properties;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addResult(final String rowKey, final Map<String, String> cells, final long timestamp) {
|
|
||||||
final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
|
|
||||||
final Cell[] cellArray = new Cell[cells.size()];
|
|
||||||
int i = 0;
|
|
||||||
for (final Map.Entry<String, String> cellEntry : cells.entrySet()) {
|
|
||||||
final Cell cell = Mockito.mock(Cell.class);
|
|
||||||
when(cell.getRowArray()).thenReturn(rowArray);
|
|
||||||
when(cell.getRowOffset()).thenReturn(0);
|
|
||||||
when(cell.getRowLength()).thenReturn((short) rowArray.length);
|
|
||||||
|
|
||||||
final String cellValue = cellEntry.getValue();
|
|
||||||
final byte[] valueArray = cellValue.getBytes(StandardCharsets.UTF_8);
|
|
||||||
when(cell.getValueArray()).thenReturn(valueArray);
|
|
||||||
when(cell.getValueOffset()).thenReturn(0);
|
|
||||||
when(cell.getValueLength()).thenReturn(valueArray.length);
|
|
||||||
|
|
||||||
final byte[] familyArray = "family1".getBytes(StandardCharsets.UTF_8);
|
|
||||||
when(cell.getFamilyArray()).thenReturn(familyArray);
|
|
||||||
when(cell.getFamilyOffset()).thenReturn(0);
|
|
||||||
when(cell.getFamilyLength()).thenReturn((byte) familyArray.length);
|
|
||||||
|
|
||||||
final String qualifier = cellEntry.getKey();
|
|
||||||
final byte[] qualifierArray = qualifier.getBytes(StandardCharsets.UTF_8);
|
|
||||||
when(cell.getQualifierArray()).thenReturn(qualifierArray);
|
|
||||||
when(cell.getQualifierOffset()).thenReturn(0);
|
|
||||||
when(cell.getQualifierLength()).thenReturn(qualifierArray.length);
|
|
||||||
|
|
||||||
when(cell.getTimestamp()).thenReturn(timestamp);
|
|
||||||
|
|
||||||
cellArray[i++] = cell;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Result result = Mockito.mock(Result.class);
|
|
||||||
when(result.getRow()).thenReturn(rowArray);
|
|
||||||
when(result.rawCells()).thenReturn(cellArray);
|
|
||||||
results.add(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void put(final String tableName, final byte[] rowId, final Collection<PutColumn> columns) throws IOException {
|
|
||||||
Put put = new Put(rowId);
|
|
||||||
Map<String,String> map = new HashMap<String,String>();
|
|
||||||
for (final PutColumn column : columns) {
|
|
||||||
put.addColumn(
|
|
||||||
column.getColumnFamily(),
|
|
||||||
column.getColumnQualifier(),
|
|
||||||
column.getBuffer());
|
|
||||||
map.put( new String( column.getColumnQualifier() ), new String(column.getBuffer()) );
|
|
||||||
}
|
|
||||||
table.put(put);
|
|
||||||
|
|
||||||
addResult( new String(rowId) , map,1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
|
|
||||||
|
|
||||||
for (Result result: results){
|
|
||||||
if ( Arrays.equals(result.getRow(), rowId)){
|
|
||||||
Cell[] cellArray = result.rawCells();
|
|
||||||
for (Cell cell : cellArray){
|
|
||||||
if( Arrays.equals(cell.getFamilyArray(), family) && Arrays.equals(cell.getQualifierArray(), qualifier)){
|
|
||||||
//throw new RuntimeException( new String(cell.getValueArray()) );
|
|
||||||
if( value == null || Arrays.equals(cell.getValueArray(), value)) return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
final List<PutColumn> putColumns = new ArrayList<PutColumn>();
|
|
||||||
putColumns.add(column);
|
|
||||||
put(tableName, rowId, putColumns );
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, final ResultHandler handler) throws IOException {
|
|
||||||
if (startRow != endRow) throw new RuntimeException("Start and end must be equal");
|
|
||||||
for(Result result: results){
|
|
||||||
if (Arrays.equals( result.getRow() , startRow)) {
|
|
||||||
final Cell[] cellArray = result.rawCells();
|
|
||||||
final ResultCell[] resultCells = new ResultCell[cellArray.length ];
|
|
||||||
int i=0;
|
|
||||||
for (Cell cell : cellArray){
|
|
||||||
ResultCell resultCell = new ResultCell();
|
|
||||||
resultCell.setRowArray( result.getRow());
|
|
||||||
resultCell.setFamilyArray(cell.getFamilyArray());
|
|
||||||
resultCell.setQualifierArray(cell.getQualifierArray());
|
|
||||||
resultCell.setValueArray(cell.getValueArray());
|
|
||||||
resultCells[i++]=resultCell;
|
|
||||||
}
|
|
||||||
handler.handle(result.getRow(), resultCells );
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime) throws IOException {
|
|
||||||
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
|
|
||||||
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
|
|
||||||
return scanner;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Connection createConnection(ConfigurationContext context) throws IOException {
|
|
||||||
Connection connection = Mockito.mock(Connection.class);
|
|
||||||
Mockito.when(connection.getTable(table.getName())).thenReturn(table);
|
|
||||||
return connection;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handler that saves results for verification
|
|
||||||
private static final class CollectingResultHandler implements ResultHandler {
|
|
||||||
|
|
||||||
Map<String,ResultCell[]> results = new LinkedHashMap<>();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handle(byte[] row, ResultCell[] resultCells) {
|
|
||||||
final String rowStr = new String(row, StandardCharsets.UTF_8);
|
|
||||||
results.put(rowStr, resultCells);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class StringSerializer implements Serializer<String> {
|
private static class StringSerializer implements Serializer<String> {
|
||||||
@Override
|
@Override
|
||||||
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
|
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
|
||||||
|
|
|
@ -18,13 +18,8 @@ package org.apache.nifi.hbase;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
|
||||||
import org.apache.nifi.hadoop.KerberosProperties;
|
import org.apache.nifi.hadoop.KerberosProperties;
|
||||||
import org.apache.nifi.hbase.put.PutColumn;
|
import org.apache.nifi.hbase.put.PutColumn;
|
||||||
import org.apache.nifi.hbase.put.PutFlowFile;
|
import org.apache.nifi.hbase.put.PutFlowFile;
|
||||||
|
@ -60,6 +55,8 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class TestHBase_1_1_2_ClientService {
|
public class TestHBase_1_1_2_ClientService {
|
||||||
|
|
||||||
|
static final String COL_FAM = "nifi1";
|
||||||
|
|
||||||
private KerberosProperties kerberosPropsWithFile;
|
private KerberosProperties kerberosPropsWithFile;
|
||||||
private KerberosProperties kerberosPropsWithoutFile;
|
private KerberosProperties kerberosPropsWithoutFile;
|
||||||
|
|
||||||
|
@ -84,7 +81,7 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
when(table.getName()).thenReturn(TableName.valueOf(tableName));
|
when(table.getName()).thenReturn(TableName.valueOf(tableName));
|
||||||
|
|
||||||
// no conf file or zk properties so should be invalid
|
// no conf file or zk properties so should be invalid
|
||||||
MockHBaseClientService service = new MockHBaseClientService(table, kerberosPropsWithFile);
|
MockHBaseClientService service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
|
||||||
runner.addControllerService("hbaseClientService", service);
|
runner.addControllerService("hbaseClientService", service);
|
||||||
runner.enableControllerService(service);
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
@ -92,7 +89,7 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
runner.removeControllerService(service);
|
runner.removeControllerService(service);
|
||||||
|
|
||||||
// conf file with no zk properties should be valid
|
// conf file with no zk properties should be valid
|
||||||
service = new MockHBaseClientService(table, kerberosPropsWithFile);
|
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
|
||||||
runner.addControllerService("hbaseClientService", service);
|
runner.addControllerService("hbaseClientService", service);
|
||||||
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
|
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
|
||||||
runner.enableControllerService(service);
|
runner.enableControllerService(service);
|
||||||
|
@ -101,7 +98,7 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
runner.removeControllerService(service);
|
runner.removeControllerService(service);
|
||||||
|
|
||||||
// only quorum and no conf file should be invalid
|
// only quorum and no conf file should be invalid
|
||||||
service = new MockHBaseClientService(table, kerberosPropsWithFile);
|
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
|
||||||
runner.addControllerService("hbaseClientService", service);
|
runner.addControllerService("hbaseClientService", service);
|
||||||
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
|
||||||
runner.enableControllerService(service);
|
runner.enableControllerService(service);
|
||||||
|
@ -110,7 +107,7 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
runner.removeControllerService(service);
|
runner.removeControllerService(service);
|
||||||
|
|
||||||
// quorum and port, no znode, no conf file, should be invalid
|
// quorum and port, no znode, no conf file, should be invalid
|
||||||
service = new MockHBaseClientService(table, kerberosPropsWithFile);
|
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
|
||||||
runner.addControllerService("hbaseClientService", service);
|
runner.addControllerService("hbaseClientService", service);
|
||||||
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
|
||||||
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181");
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181");
|
||||||
|
@ -120,7 +117,7 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
runner.removeControllerService(service);
|
runner.removeControllerService(service);
|
||||||
|
|
||||||
// quorum, port, and znode, no conf file, should be valid
|
// quorum, port, and znode, no conf file, should be valid
|
||||||
service = new MockHBaseClientService(table, kerberosPropsWithFile);
|
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
|
||||||
runner.addControllerService("hbaseClientService", service);
|
runner.addControllerService("hbaseClientService", service);
|
||||||
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
|
||||||
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181");
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181");
|
||||||
|
@ -131,7 +128,7 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
runner.removeControllerService(service);
|
runner.removeControllerService(service);
|
||||||
|
|
||||||
// quorum and port with conf file should be valid
|
// quorum and port with conf file should be valid
|
||||||
service = new MockHBaseClientService(table, kerberosPropsWithFile);
|
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
|
||||||
runner.addControllerService("hbaseClientService", service);
|
runner.addControllerService("hbaseClientService", service);
|
||||||
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
|
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
|
||||||
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
|
||||||
|
@ -142,7 +139,7 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
runner.removeControllerService(service);
|
runner.removeControllerService(service);
|
||||||
|
|
||||||
// Kerberos - principal with non-set keytab and only hbase-site-security - valid because we need core-site-security to turn on security
|
// Kerberos - principal with non-set keytab and only hbase-site-security - valid because we need core-site-security to turn on security
|
||||||
service = new MockHBaseClientService(table, kerberosPropsWithFile);
|
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
|
||||||
runner.addControllerService("hbaseClientService", service);
|
runner.addControllerService("hbaseClientService", service);
|
||||||
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site-security.xml");
|
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site-security.xml");
|
||||||
runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM");
|
runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM");
|
||||||
|
@ -177,7 +174,7 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
runner.assertNotValid(service);
|
runner.assertNotValid(service);
|
||||||
|
|
||||||
// Kerberos - valid props but the KerberosProperties has a null Kerberos config file so be invalid
|
// Kerberos - valid props but the KerberosProperties has a null Kerberos config file so be invalid
|
||||||
service = new MockHBaseClientService(table, kerberosPropsWithoutFile);
|
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithoutFile);
|
||||||
runner.addControllerService("hbaseClientService", service);
|
runner.addControllerService("hbaseClientService", service);
|
||||||
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES,
|
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES,
|
||||||
"src/test/resources/hbase-site-security.xml, src/test/resources/core-site-security.xml");
|
"src/test/resources/hbase-site-security.xml, src/test/resources/core-site-security.xml");
|
||||||
|
@ -357,8 +354,8 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
assertNotNull(results);
|
assertNotNull(results);
|
||||||
assertEquals(2, results.length);
|
assertEquals(2, results.length);
|
||||||
|
|
||||||
verifyResultCell(results[0], "nifi", "greeting", "hello");
|
verifyResultCell(results[0], COL_FAM, "greeting", "hello");
|
||||||
verifyResultCell(results[1], "nifi", "name", "nifi");
|
verifyResultCell(results[1], COL_FAM, "name", "nifi");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -408,7 +405,7 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException {
|
private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException {
|
||||||
final MockHBaseClientService service = new MockHBaseClientService(table, kerberosPropsWithFile);
|
final MockHBaseClientService service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
|
||||||
runner.addControllerService("hbaseClient", service);
|
runner.addControllerService("hbaseClient", service);
|
||||||
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
|
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
|
||||||
runner.enableControllerService(service);
|
runner.enableControllerService(service);
|
||||||
|
@ -442,81 +439,6 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
|
assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Override methods to create a mock service that can return staged data
|
|
||||||
private class MockHBaseClientService extends HBase_1_1_2_ClientService {
|
|
||||||
|
|
||||||
private Table table;
|
|
||||||
private List<Result> results = new ArrayList<>();
|
|
||||||
private KerberosProperties kerberosProperties;
|
|
||||||
|
|
||||||
public MockHBaseClientService(final Table table, final KerberosProperties kerberosProperties) {
|
|
||||||
this.table = table;
|
|
||||||
this.kerberosProperties = kerberosProperties;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
|
||||||
return kerberosProperties;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void setKerberosProperties(KerberosProperties properties) {
|
|
||||||
this.kerberosProperties = properties;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addResult(final String rowKey, final Map<String, String> cells, final long timestamp) {
|
|
||||||
final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
|
|
||||||
|
|
||||||
final Cell[] cellArray = new Cell[cells.size()];
|
|
||||||
int i = 0;
|
|
||||||
for (final Map.Entry<String, String> cellEntry : cells.entrySet()) {
|
|
||||||
final Cell cell = Mockito.mock(Cell.class);
|
|
||||||
when(cell.getRowArray()).thenReturn(rowArray);
|
|
||||||
when(cell.getRowOffset()).thenReturn(0);
|
|
||||||
when(cell.getRowLength()).thenReturn((short) rowArray.length);
|
|
||||||
|
|
||||||
final String cellValue = cellEntry.getValue();
|
|
||||||
final byte[] valueArray = cellValue.getBytes(StandardCharsets.UTF_8);
|
|
||||||
when(cell.getValueArray()).thenReturn(valueArray);
|
|
||||||
when(cell.getValueOffset()).thenReturn(0);
|
|
||||||
when(cell.getValueLength()).thenReturn(valueArray.length);
|
|
||||||
|
|
||||||
final byte[] familyArray = "nifi".getBytes(StandardCharsets.UTF_8);
|
|
||||||
when(cell.getFamilyArray()).thenReturn(familyArray);
|
|
||||||
when(cell.getFamilyOffset()).thenReturn(0);
|
|
||||||
when(cell.getFamilyLength()).thenReturn((byte) familyArray.length);
|
|
||||||
|
|
||||||
final String qualifier = cellEntry.getKey();
|
|
||||||
final byte[] qualifierArray = qualifier.getBytes(StandardCharsets.UTF_8);
|
|
||||||
when(cell.getQualifierArray()).thenReturn(qualifierArray);
|
|
||||||
when(cell.getQualifierOffset()).thenReturn(0);
|
|
||||||
when(cell.getQualifierLength()).thenReturn(qualifierArray.length);
|
|
||||||
|
|
||||||
when(cell.getTimestamp()).thenReturn(timestamp);
|
|
||||||
|
|
||||||
cellArray[i++] = cell;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Result result = Mockito.mock(Result.class);
|
|
||||||
when(result.getRow()).thenReturn(rowArray);
|
|
||||||
when(result.rawCells()).thenReturn(cellArray);
|
|
||||||
results.add(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime) throws IOException {
|
|
||||||
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
|
|
||||||
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
|
|
||||||
return scanner;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Connection createConnection(ConfigurationContext context) throws IOException {
|
|
||||||
Connection connection = Mockito.mock(Connection.class);
|
|
||||||
Mockito.when(connection.getTable(table.getName())).thenReturn(table);
|
|
||||||
return connection;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handler that saves results for verification
|
// handler that saves results for verification
|
||||||
private static final class CollectingResultHandler implements ResultHandler {
|
private static final class CollectingResultHandler implements ResultHandler {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue