From ae3db823037ef01f8dc123e494f1d9e6522f29fe Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Wed, 24 May 2017 13:37:07 -0400 Subject: [PATCH] NIFI-3644 Fixing the result handler in HBase_1_1_2_ClientMapCacheService to use the offsets for the value bytes This closes #1645. Signed-off-by: Bryan Bende --- .../HBase_1_1_2_ClientMapCacheService.java | 3 +- .../nifi/hbase/MockHBaseClientService.java | 166 +++++++++++++++++ ...TestHBase_1_1_2_ClientMapCacheService.java | 176 +----------------- .../hbase/TestHBase_1_1_2_ClientService.java | 104 ++--------- 4 files changed, 189 insertions(+), 260 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java index 665c161159..a5bdd0e185 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java @@ -19,6 +19,7 @@ package org.apache.nifi.hbase; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -229,7 +230,7 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService public void handle(byte[] row, ResultCell[] resultCells) { numRows += 1; for( final ResultCell resultCell : resultCells ){ - lastResultBytes = resultCell.getValueArray(); + lastResultBytes = Arrays.copyOfRange(resultCell.getValueArray(), resultCell.getValueOffset(), resultCell.getValueLength() + resultCell.getValueOffset()); } } public int numRows() { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java new file mode 100644 index 0000000000..8a04e5164f --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.scan.Column; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.when; + +/** + * Override methods to create a mock service that can return staged data + */ +public class MockHBaseClientService extends HBase_1_1_2_ClientService { + + private Table table; + private String family; + private List results = new ArrayList<>(); + private KerberosProperties kerberosProperties; + + public MockHBaseClientService(final Table table, final String family, final KerberosProperties kerberosProperties) { + this.table = table; + this.family = family; + this.kerberosProperties = kerberosProperties; + } + + @Override + protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { + return kerberosProperties; + } + + protected void setKerberosProperties(KerberosProperties properties) { + this.kerberosProperties = properties; + + } + + public void addResult(final String rowKey, final Map cells, final long timestamp) { + final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8); + final Cell[] cellArray = new Cell[cells.size()]; + int i = 0; + for (final Map.Entry cellEntry : cells.entrySet()) { + final Cell cell = Mockito.mock(Cell.class); + when(cell.getRowArray()).thenReturn(rowArray); + when(cell.getRowOffset()).thenReturn(0); + when(cell.getRowLength()).thenReturn((short) rowArray.length); + + final String cellValue = cellEntry.getValue(); + final byte[] valueArray = cellValue.getBytes(StandardCharsets.UTF_8); + when(cell.getValueArray()).thenReturn(valueArray); + when(cell.getValueOffset()).thenReturn(0); + when(cell.getValueLength()).thenReturn(valueArray.length); + + final byte[] familyArray = family.getBytes(StandardCharsets.UTF_8); + when(cell.getFamilyArray()).thenReturn(familyArray); + when(cell.getFamilyOffset()).thenReturn(0); + when(cell.getFamilyLength()).thenReturn((byte) familyArray.length); + + final String qualifier = cellEntry.getKey(); + final byte[] qualifierArray = qualifier.getBytes(StandardCharsets.UTF_8); + when(cell.getQualifierArray()).thenReturn(qualifierArray); + when(cell.getQualifierOffset()).thenReturn(0); + when(cell.getQualifierLength()).thenReturn(qualifierArray.length); + + when(cell.getTimestamp()).thenReturn(timestamp); + + cellArray[i++] = cell; + } + + final Result result = Mockito.mock(Result.class); + when(result.getRow()).thenReturn(rowArray); + when(result.rawCells()).thenReturn(cellArray); + results.add(result); + } + + @Override + public void put(final String tableName, final byte[] rowId, final Collection columns) throws IOException { + Put put = new Put(rowId); + Map map = new HashMap(); + for (final PutColumn column : columns) { + put.addColumn( + column.getColumnFamily(), + column.getColumnQualifier(), + column.getBuffer()); + map.put(new String(column.getColumnQualifier()), new String(column.getBuffer())); + } + + table.put(put); + addResult(new String(rowId), map, 1); + } + + @Override + public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException { + for (Result result : results) { + if (Arrays.equals(result.getRow(), rowId)) { + Cell[] cellArray = result.rawCells(); + for (Cell cell : cellArray) { + if (Arrays.equals(cell.getFamilyArray(), family) && Arrays.equals(cell.getQualifierArray(), qualifier)) { + if (value == null || Arrays.equals(cell.getValueArray(), value)) { + return false; + } + } + } + } + } + + final List putColumns = new ArrayList(); + putColumns.add(column); + put(tableName, rowId, putColumns); + return true; + } + + @Override + protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection columns) throws IOException { + final ResultScanner scanner = Mockito.mock(ResultScanner.class); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + return scanner; + } + + @Override + protected ResultScanner getResults(Table table, Collection columns, Filter filter, long minTime) throws IOException { + final ResultScanner scanner = Mockito.mock(ResultScanner.class); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + return scanner; + } + + @Override + protected Connection createConnection(ConfigurationContext context) throws IOException { + Connection connection = Mockito.mock(Connection.class); + Mockito.when(connection.getTable(table.getName())).thenReturn(table); + return connection; + } + +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java index 6b1fbc71d2..06848f9241 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java @@ -18,18 +18,15 @@ package org.apache.nifi.hbase; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; import org.apache.nifi.hadoop.KerberosProperties; -import org.apache.nifi.hbase.put.PutColumn; -import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; -import org.apache.nifi.hbase.scan.ResultHandler; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -38,31 +35,18 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; -import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.distributed.cache.client.exception.SerializationException; -import org.apache.nifi.distributed.cache.client.Deserializer; -import org.apache.nifi.distributed.cache.client.exception.DeserializationException; - import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.io.OutputStream; - import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; - import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -267,7 +251,7 @@ public class TestHBase_1_1_2_ClientMapCacheService { private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException { - final MockHBaseClientService service = new MockHBaseClientService(table, kerberosPropsWithFile); + final MockHBaseClientService service = new MockHBaseClientService(table, "family1", kerberosPropsWithFile); runner.addControllerService("hbaseClient", service); runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml"); runner.enableControllerService(service); @@ -313,150 +297,6 @@ public class TestHBase_1_1_2_ClientMapCacheService { assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } - // Override methods to create a mock service that can return staged data - private class MockHBaseClientService extends HBase_1_1_2_ClientService { - - private Table table; - private List results = new ArrayList<>(); - private KerberosProperties kerberosProperties; - - public MockHBaseClientService(final Table table, final KerberosProperties kerberosProperties) { - this.table = table; - this.kerberosProperties = kerberosProperties; - } - - @Override - protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { - return kerberosProperties; - } - - protected void setKerberosProperties(KerberosProperties properties) { - this.kerberosProperties = properties; - - } - - public void addResult(final String rowKey, final Map cells, final long timestamp) { - final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8); - final Cell[] cellArray = new Cell[cells.size()]; - int i = 0; - for (final Map.Entry cellEntry : cells.entrySet()) { - final Cell cell = Mockito.mock(Cell.class); - when(cell.getRowArray()).thenReturn(rowArray); - when(cell.getRowOffset()).thenReturn(0); - when(cell.getRowLength()).thenReturn((short) rowArray.length); - - final String cellValue = cellEntry.getValue(); - final byte[] valueArray = cellValue.getBytes(StandardCharsets.UTF_8); - when(cell.getValueArray()).thenReturn(valueArray); - when(cell.getValueOffset()).thenReturn(0); - when(cell.getValueLength()).thenReturn(valueArray.length); - - final byte[] familyArray = "family1".getBytes(StandardCharsets.UTF_8); - when(cell.getFamilyArray()).thenReturn(familyArray); - when(cell.getFamilyOffset()).thenReturn(0); - when(cell.getFamilyLength()).thenReturn((byte) familyArray.length); - - final String qualifier = cellEntry.getKey(); - final byte[] qualifierArray = qualifier.getBytes(StandardCharsets.UTF_8); - when(cell.getQualifierArray()).thenReturn(qualifierArray); - when(cell.getQualifierOffset()).thenReturn(0); - when(cell.getQualifierLength()).thenReturn(qualifierArray.length); - - when(cell.getTimestamp()).thenReturn(timestamp); - - cellArray[i++] = cell; - } - - final Result result = Mockito.mock(Result.class); - when(result.getRow()).thenReturn(rowArray); - when(result.rawCells()).thenReturn(cellArray); - results.add(result); - } - - @Override - public void put(final String tableName, final byte[] rowId, final Collection columns) throws IOException { - Put put = new Put(rowId); - Map map = new HashMap(); - for (final PutColumn column : columns) { - put.addColumn( - column.getColumnFamily(), - column.getColumnQualifier(), - column.getBuffer()); - map.put( new String( column.getColumnQualifier() ), new String(column.getBuffer()) ); - } - table.put(put); - - addResult( new String(rowId) , map,1); - } - - @Override - public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException { - - for (Result result: results){ - if ( Arrays.equals(result.getRow(), rowId)){ - Cell[] cellArray = result.rawCells(); - for (Cell cell : cellArray){ - if( Arrays.equals(cell.getFamilyArray(), family) && Arrays.equals(cell.getQualifierArray(), qualifier)){ - //throw new RuntimeException( new String(cell.getValueArray()) ); - if( value == null || Arrays.equals(cell.getValueArray(), value)) return false; - } - } - } - } - final List putColumns = new ArrayList(); - putColumns.add(column); - put(tableName, rowId, putColumns ); - return true; - } - - @Override - public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection columns, final ResultHandler handler) throws IOException { - if (startRow != endRow) throw new RuntimeException("Start and end must be equal"); - for(Result result: results){ - if (Arrays.equals( result.getRow() , startRow)) { - final Cell[] cellArray = result.rawCells(); - final ResultCell[] resultCells = new ResultCell[cellArray.length ]; - int i=0; - for (Cell cell : cellArray){ - ResultCell resultCell = new ResultCell(); - resultCell.setRowArray( result.getRow()); - resultCell.setFamilyArray(cell.getFamilyArray()); - resultCell.setQualifierArray(cell.getQualifierArray()); - resultCell.setValueArray(cell.getValueArray()); - resultCells[i++]=resultCell; - } - handler.handle(result.getRow(), resultCells ); - } - } - } - - @Override - protected ResultScanner getResults(Table table, Collection columns, Filter filter, long minTime) throws IOException { - final ResultScanner scanner = Mockito.mock(ResultScanner.class); - Mockito.when(scanner.iterator()).thenReturn(results.iterator()); - return scanner; - } - - @Override - protected Connection createConnection(ConfigurationContext context) throws IOException { - Connection connection = Mockito.mock(Connection.class); - Mockito.when(connection.getTable(table.getName())).thenReturn(table); - return connection; - } - } - - // handler that saves results for verification - private static final class CollectingResultHandler implements ResultHandler { - - Map results = new LinkedHashMap<>(); - - @Override - public void handle(byte[] row, ResultCell[] resultCells) { - final String rowStr = new String(row, StandardCharsets.UTF_8); - results.put(rowStr, resultCells); - } - } - private static class StringSerializer implements Serializer { @Override public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java index 6e7230710a..90c408fa38 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java @@ -18,13 +18,8 @@ package org.apache.nifi.hbase; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; @@ -60,6 +55,8 @@ import static org.mockito.Mockito.when; public class TestHBase_1_1_2_ClientService { + static final String COL_FAM = "nifi1"; + private KerberosProperties kerberosPropsWithFile; private KerberosProperties kerberosPropsWithoutFile; @@ -84,7 +81,7 @@ public class TestHBase_1_1_2_ClientService { when(table.getName()).thenReturn(TableName.valueOf(tableName)); // no conf file or zk properties so should be invalid - MockHBaseClientService service = new MockHBaseClientService(table, kerberosPropsWithFile); + MockHBaseClientService service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.enableControllerService(service); @@ -92,7 +89,7 @@ public class TestHBase_1_1_2_ClientService { runner.removeControllerService(service); // conf file with no zk properties should be valid - service = new MockHBaseClientService(table, kerberosPropsWithFile); + service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml"); runner.enableControllerService(service); @@ -101,7 +98,7 @@ public class TestHBase_1_1_2_ClientService { runner.removeControllerService(service); // only quorum and no conf file should be invalid - service = new MockHBaseClientService(table, kerberosPropsWithFile); + service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.enableControllerService(service); @@ -110,7 +107,7 @@ public class TestHBase_1_1_2_ClientService { runner.removeControllerService(service); // quorum and port, no znode, no conf file, should be invalid - service = new MockHBaseClientService(table, kerberosPropsWithFile); + service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181"); @@ -120,7 +117,7 @@ public class TestHBase_1_1_2_ClientService { runner.removeControllerService(service); // quorum, port, and znode, no conf file, should be valid - service = new MockHBaseClientService(table, kerberosPropsWithFile); + service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181"); @@ -131,7 +128,7 @@ public class TestHBase_1_1_2_ClientService { runner.removeControllerService(service); // quorum and port with conf file should be valid - service = new MockHBaseClientService(table, kerberosPropsWithFile); + service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); @@ -142,7 +139,7 @@ public class TestHBase_1_1_2_ClientService { runner.removeControllerService(service); // Kerberos - principal with non-set keytab and only hbase-site-security - valid because we need core-site-security to turn on security - service = new MockHBaseClientService(table, kerberosPropsWithFile); + service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site-security.xml"); runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM"); @@ -177,7 +174,7 @@ public class TestHBase_1_1_2_ClientService { runner.assertNotValid(service); // Kerberos - valid props but the KerberosProperties has a null Kerberos config file so be invalid - service = new MockHBaseClientService(table, kerberosPropsWithoutFile); + service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithoutFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site-security.xml, src/test/resources/core-site-security.xml"); @@ -357,8 +354,8 @@ public class TestHBase_1_1_2_ClientService { assertNotNull(results); assertEquals(2, results.length); - verifyResultCell(results[0], "nifi", "greeting", "hello"); - verifyResultCell(results[1], "nifi", "name", "nifi"); + verifyResultCell(results[0], COL_FAM, "greeting", "hello"); + verifyResultCell(results[1], COL_FAM, "name", "nifi"); } @Test @@ -408,7 +405,7 @@ public class TestHBase_1_1_2_ClientService { } private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException { - final MockHBaseClientService service = new MockHBaseClientService(table, kerberosPropsWithFile); + final MockHBaseClientService service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); runner.addControllerService("hbaseClient", service); runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml"); runner.enableControllerService(service); @@ -442,81 +439,6 @@ public class TestHBase_1_1_2_ClientService { assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } - // Override methods to create a mock service that can return staged data - private class MockHBaseClientService extends HBase_1_1_2_ClientService { - - private Table table; - private List results = new ArrayList<>(); - private KerberosProperties kerberosProperties; - - public MockHBaseClientService(final Table table, final KerberosProperties kerberosProperties) { - this.table = table; - this.kerberosProperties = kerberosProperties; - } - - @Override - protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { - return kerberosProperties; - } - - protected void setKerberosProperties(KerberosProperties properties) { - this.kerberosProperties = properties; - } - - public void addResult(final String rowKey, final Map cells, final long timestamp) { - final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8); - - final Cell[] cellArray = new Cell[cells.size()]; - int i = 0; - for (final Map.Entry cellEntry : cells.entrySet()) { - final Cell cell = Mockito.mock(Cell.class); - when(cell.getRowArray()).thenReturn(rowArray); - when(cell.getRowOffset()).thenReturn(0); - when(cell.getRowLength()).thenReturn((short) rowArray.length); - - final String cellValue = cellEntry.getValue(); - final byte[] valueArray = cellValue.getBytes(StandardCharsets.UTF_8); - when(cell.getValueArray()).thenReturn(valueArray); - when(cell.getValueOffset()).thenReturn(0); - when(cell.getValueLength()).thenReturn(valueArray.length); - - final byte[] familyArray = "nifi".getBytes(StandardCharsets.UTF_8); - when(cell.getFamilyArray()).thenReturn(familyArray); - when(cell.getFamilyOffset()).thenReturn(0); - when(cell.getFamilyLength()).thenReturn((byte) familyArray.length); - - final String qualifier = cellEntry.getKey(); - final byte[] qualifierArray = qualifier.getBytes(StandardCharsets.UTF_8); - when(cell.getQualifierArray()).thenReturn(qualifierArray); - when(cell.getQualifierOffset()).thenReturn(0); - when(cell.getQualifierLength()).thenReturn(qualifierArray.length); - - when(cell.getTimestamp()).thenReturn(timestamp); - - cellArray[i++] = cell; - } - - final Result result = Mockito.mock(Result.class); - when(result.getRow()).thenReturn(rowArray); - when(result.rawCells()).thenReturn(cellArray); - results.add(result); - } - - @Override - protected ResultScanner getResults(Table table, Collection columns, Filter filter, long minTime) throws IOException { - final ResultScanner scanner = Mockito.mock(ResultScanner.class); - Mockito.when(scanner.iterator()).thenReturn(results.iterator()); - return scanner; - } - - @Override - protected Connection createConnection(ConfigurationContext context) throws IOException { - Connection connection = Mockito.mock(Connection.class); - Mockito.when(connection.getTable(table.getName())).thenReturn(table); - return connection; - } - } - // handler that saves results for verification private static final class CollectingResultHandler implements ResultHandler {