From 78c03795f62bee5d5249688ff699c5911da11bcf Mon Sep 17 00:00:00 2001 From: Timea Barna Date: Tue, 13 Apr 2021 14:17:45 +0200 Subject: [PATCH] NIFI-8372 Refactoring ScanAccumulo unit test and add test for value in result This closes #4998. Signed-off-by: Peter Turcsanyi --- .../accumulo/processors/ScanAccumuloIT.java | 308 ++++++++++-------- 1 file changed, 164 insertions(+), 144 deletions(-) diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/ScanAccumuloIT.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/ScanAccumuloIT.java index 5e9958b7c8..08659f3d9c 100644 --- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/ScanAccumuloIT.java +++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/ScanAccumuloIT.java @@ -16,25 +16,20 @@ * limitations under the License. */ package org.apache.nifi.accumulo.processors; -import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.commons.lang3.SystemUtils; import org.apache.hadoop.io.Text; -import org.apache.nifi.accumulo.controllerservices.AccumuloService; import org.apache.nifi.accumulo.controllerservices.MockAccumuloService; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordWriter; @@ -47,197 +42,222 @@ import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; import java.util.UUID; public class ScanAccumuloIT { public static final String DEFAULT_COLUMN_FAMILY = "family1"; + private static final MockRecordWriter PARSER = new MockRecordWriter(); /** * Though deprecated in 2.0 it still functions very well */ private static MiniAccumuloCluster accumulo; - private TestRunner getTestRunner(String table, String columnFamily) { - final TestRunner runner = TestRunners.newTestRunner(ScanAccumulo.class); - runner.enforceReadStreamsClosed(false); - runner.setProperty(ScanAccumulo.TABLE_NAME, table); - return runner; - } - - - - @BeforeClass - public static void setupInstance() throws IOException, InterruptedException, AccumuloSecurityException, AccumuloException, TableExistsException { + public static void setupInstance() throws IOException, InterruptedException { Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS); Path tempDirectory = Files.createTempDirectory("acc"); // JUnit and Guava supply mechanisms for creating temp directories accumulo = new MiniAccumuloCluster(tempDirectory.toFile(), "password"); accumulo.start(); } - private Set generateTestData(TestRunner runner, String definedRow, String table, boolean valueincq, String delim, String cv) - throws IOException, AccumuloSecurityException, AccumuloException, TableNotFoundException { - - BatchWriterConfig writerConfig = new BatchWriterConfig(); - writerConfig.setMaxWriteThreads(2); - writerConfig.setMaxMemory(1024*1024); - MultiTableBatchWriter writer = accumulo.createAccumuloClient("root", new PasswordToken("password")).createMultiTableBatchWriter(writerConfig); - - long ts = System.currentTimeMillis(); - - - final MockRecordWriter parser = new MockRecordWriter(); - try { - runner.addControllerService("parser", parser); - } catch (InitializationException e) { - throw new IOException(e); - } - runner.enableControllerService(parser); - runner.setProperty(ScanAccumulo.RECORD_WRITER,"parser"); - - - Set expectedKeys = new HashSet<>(); - ColumnVisibility colViz = new ColumnVisibility(); - if (null != cv) - colViz = new ColumnVisibility(cv); - Random random = new Random(); - for (int x = 0; x < 5; x++) { - //final int row = random.nextInt(10000000); - final String row = definedRow.isEmpty() ? UUID.randomUUID().toString() : definedRow; - final String cf = UUID.randomUUID().toString(); - final String cq = UUID.randomUUID().toString(); - Text keyCq = new Text("name"); - if (valueincq){ - if (null != delim && !delim.isEmpty()) - keyCq.append(delim.getBytes(),0,delim.length()); - keyCq.append(cf.getBytes(),0,cf.length()); - } - expectedKeys.add(new Key(new Text(row), new Text(DEFAULT_COLUMN_FAMILY), keyCq, colViz,ts)); - keyCq = new Text("code"); - if (valueincq){ - if (null != delim && !delim.isEmpty()) - keyCq.append(delim.getBytes(),0,delim.length()); - keyCq.append(cq.getBytes(),0,cq.length()); - } - expectedKeys.add(new Key(new Text(row), new Text(DEFAULT_COLUMN_FAMILY), keyCq, colViz, ts)); - Mutation m = new Mutation(row); - m.put(new Text(DEFAULT_COLUMN_FAMILY),new Text(keyCq),colViz,ts, new Value()); - writer.getBatchWriter(table).addMutation(m); - } - writer.flush(); - return expectedKeys; - } - - void verifyKey(String tableName, Set expectedKeys, Authorizations auths) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { - if (null == auths) - auths = new Authorizations(); - try(BatchScanner scanner = accumulo.createAccumuloClient("root",new PasswordToken("password")).createBatchScanner(tableName,auths,1)) { - List ranges = new ArrayList<>(); - ranges.add(new Range()); - scanner.setRanges(ranges); - for (Map.Entry kv : scanner) { - Assert.assertTrue(kv.getKey() + " not in expected keys",expectedKeys.remove(kv.getKey())); - } - } - Assert.assertEquals(0, expectedKeys.size()); - - } - - private void basicPutSetup(boolean sendFlowFile, boolean valueincq) throws Exception { - basicPutSetup(sendFlowFile,"","","","",valueincq,null,"",null,false,5); - } - - private void basicPutSetup(boolean sendFlowFile, boolean valueincq, final String delim) throws Exception { - basicPutSetup(sendFlowFile,"","","","",valueincq,delim,"",null,false,5); - } - - private void basicPutSetup(boolean sendFlowFile,String row,String endrow, String cf,String endcf, boolean valueincq,String delim, - String auths, Authorizations defaultVis, boolean deletes, int expected) throws Exception { - String tableName = UUID.randomUUID().toString(); - tableName=tableName.replace("-","a"); - AccumuloClient client = accumulo.createAccumuloClient("root",new PasswordToken("password")); - client.tableOperations().create(tableName); - - if (null != defaultVis) - client.securityOperations().changeUserAuthorizations("root",defaultVis); - - TestRunner runner = getTestRunner(tableName, DEFAULT_COLUMN_FAMILY); - runner.setProperty(ScanAccumulo.START_KEY, row); - if (!cf.isEmpty()) - runner.setProperty(ScanAccumulo.COLUMNFAMILY, cf); - if (!endcf.isEmpty()) - runner.setProperty(ScanAccumulo.COLUMNFAMILY_END, endcf); - runner.setProperty(ScanAccumulo.AUTHORIZATIONS, auths); - runner.setProperty(ScanAccumulo.END_KEY, endrow); - - AccumuloService service = MockAccumuloService.getService(runner,accumulo.getZooKeepers(),accumulo.getInstanceName(),"root","password"); - Set expectedKeys = generateTestData(runner,row,tableName,valueincq,delim, auths); - if (sendFlowFile) { - runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l - } - runner.run(); - - - List results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS); - for(MockFlowFile ff : results){ - String attr = ff.getAttribute("record.count"); - Assert.assertEquals(expected,Integer.valueOf(attr).intValue()); - } - Assert.assertTrue("Wrong count, received " + results.size(), results.size() == 1); - } - - - - @Test public void testPullDatWithFlowFile() throws Exception { - basicPutSetup(true,false); + TestRunner runner = createTestEnvironment("","","","",false,"",null); + // This is to coax the processor into reading the data in the reader. + runner.enqueue("Test".getBytes(StandardCharsets.UTF_8)); + + runner.run(); + List results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS); + + Assert.assertEquals("Wrong count, received " + results.size(), 1, results.size()); + assertRecordCount(results, 5); } @Test public void testPullDatWithOutFlowFile() throws Exception { - basicPutSetup(false,false); + TestRunner runner = createTestEnvironment("","","","",false,"",null); + + runner.run(); + List results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS); + + Assert.assertEquals("Wrong count, received " + results.size(), 1, results.size()); + assertRecordCount(results, 5); } @Test public void testSameRowCf() throws Exception { - basicPutSetup(false,"2019","2019","family1","family2",false,null,"",null,false,1); + TestRunner runner = createTestEnvironment("2019","2019","family1","family2",false,"",null); + + runner.run(); + List results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS); + + Assert.assertEquals("Wrong count, received " + results.size(), 1, results.size()); + assertRecordCount(results, 1); } @Test public void testSameRowCfValueInCq() throws Exception { - basicPutSetup(false,"2019","2019","family1","family2",true,null,"",null,false,5); + TestRunner runner = createTestEnvironment("2019","2019","family1","family2",true,"",null); + + runner.run(); + List results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS); + + Assert.assertEquals("Wrong count, received " + results.size(), 1, results.size()); + assertRecordCount(results, 5); } @Test public void testSameRowCfValueInCqWithAuths() throws Exception { - basicPutSetup(false,"2019","2019","family1","family2",true,null,"abcd",new Authorizations("abcd"),false,5); + TestRunner runner = createTestEnvironment("2019","2019","family1","family2",true,"abcd",new Authorizations("abcd")); + + runner.run(); + List results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS); + + Assert.assertEquals("Wrong count, received " + results.size(), 1, results.size()); + assertRecordCount(results, 5); } @Test(expected = AssertionError.class) public void testSameRowCfValueInCqErrorCfEnd() throws Exception { - basicPutSetup(false,"2019","2019","family1","",true,null,"",null,false,5); + TestRunner runner = createTestEnvironment("2019","2019","family1","",true,"",null); + + runner.run(); + List results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS); + + Assert.assertEquals("Wrong count, received " + results.size(), 1, results.size()); + assertRecordCount(results, 5); } @Test(expected = AssertionError.class) public void testSameRowCfValueInCqErrorCf() throws Exception { - basicPutSetup(false,"2019","2019","","family2",true,null,"",null,false,5); + TestRunner runner = createTestEnvironment("2019","2019","","family2",true,"",null); + + runner.run(); + List results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS); + + Assert.assertEquals("Wrong count, received " + results.size(), 1, results.size()); + assertRecordCount(results, 5); } @Test(expected = AssertionError.class) public void testSameRowCfValueInCqErrorNotLess() throws Exception { - basicPutSetup(false,"2019","2019","family1","family1",true,null,"",null,false,5); + TestRunner runner = createTestEnvironment("2019","2019","family1","family1",true,"",null); + + runner.run(); + List results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS); + + Assert.assertEquals("Wrong count, received " + results.size(), 1, results.size()); + assertRecordCount(results, 5); } + @Test + public void testValueIsPresentByDefault() throws Exception { + TestRunner runner = createTestEnvironment("2019","2019","family1","family2",false,"",null); + runner.run(); + List results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS); + Assert.assertEquals("Wrong count, received " + results.size(), 1, results.size()); + assertValueInResult(results, "\"Test\"\n"); + } + + @Test + public void testValueIsNotPresentWhenDisabled() throws Exception { + TestRunner runner = createTestEnvironment("2019", "2019", "family1", "family2", false, "", null); + runner.setProperty(ScanAccumulo.VALUE_INCLUDED_IN_RESULT, "False"); + + runner.run(); + List results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS); + + Assert.assertEquals("Wrong count, received " + results.size(), 1, results.size()); + assertValueInResult(results, "\n"); + } + + private TestRunner createTestEnvironment(String row, String endrow, String cf, String endcf, boolean valueincq, + String auths, Authorizations defaultVis) throws Exception { + String tableName = createTable(defaultVis); + TestRunner runner = configureTestRunner(row, endrow, cf, endcf, auths, tableName); + generateTestData(row,tableName,valueincq, auths); + return runner; + } + + private String createTable(Authorizations defaultVis) throws AccumuloException, AccumuloSecurityException, TableExistsException { + String tableName = UUID.randomUUID().toString(); + tableName=tableName.replace("-","a"); + if (null != defaultVis) + accumulo.getConnector("root","password").securityOperations().changeUserAuthorizations("root", defaultVis); + accumulo.getConnector("root","password").tableOperations().create(tableName); + return tableName; + } + + private TestRunner configureTestRunner(String row, String endrow, String cf, String endcf, String auths, String tableName) throws InitializationException { + TestRunner runner = getTestRunner(); + runner.setProperty(ScanAccumulo.TABLE_NAME, tableName); + runner.setProperty(ScanAccumulo.START_KEY, row); + if (!cf.isEmpty()) + runner.setProperty(ScanAccumulo.COLUMNFAMILY, cf); + if (!endcf.isEmpty()) + runner.setProperty(ScanAccumulo.COLUMNFAMILY_END, endcf); + runner.setProperty(ScanAccumulo.AUTHORIZATIONS, auths); + runner.setProperty(ScanAccumulo.END_KEY, endrow); + return runner; + } + + private TestRunner getTestRunner() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(ScanAccumulo.class); + runner.enforceReadStreamsClosed(false); + + MockAccumuloService.getService(runner,accumulo.getZooKeepers(),accumulo.getInstanceName(),"root","password"); + + runner.addControllerService("parser", PARSER); + runner.enableControllerService(PARSER); + runner.setProperty(ScanAccumulo.RECORD_WRITER,"parser"); + + return runner; + } + + private void generateTestData(String definedRow, String table, boolean valueincq, String cv) + throws AccumuloSecurityException, AccumuloException, TableNotFoundException { + + BatchWriterConfig writerConfig = new BatchWriterConfig(); + writerConfig.setMaxWriteThreads(2); + writerConfig.setMaxMemory(1024*1024); + MultiTableBatchWriter writer = accumulo.getConnector("root","password").createMultiTableBatchWriter(writerConfig); + + long ts = System.currentTimeMillis(); + ColumnVisibility colViz = new ColumnVisibility(); + if (null != cv) + colViz = new ColumnVisibility(cv); + for (int x = 0; x < 5; x++) { + final String row = definedRow.isEmpty() ? UUID.randomUUID().toString() : definedRow; + final String cq = UUID.randomUUID().toString(); + Text keyCq = new Text("code"); + if (valueincq){ + keyCq.append(cq.getBytes(),0,cq.length()); + } + Mutation m = new Mutation(row); + m.put(new Text(DEFAULT_COLUMN_FAMILY),new Text(keyCq),colViz,ts, new Value("Test")); + writer.getBatchWriter(table).addMutation(m); + } + writer.flush(); + } + + private void assertRecordCount(List results, int expected) { + for (MockFlowFile ff : results){ + String attr = ff.getAttribute("record.count"); + Assert.assertEquals(expected, Integer.valueOf(attr).intValue()); + } + } + + private void assertValueInResult(List results, String expected) { + for (MockFlowFile ff : results) { + Assert.assertEquals(expected, ff.getContent()); + } + } }