diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java index 3f75639cdb..62d024ac89 100644 --- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java +++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java @@ -273,7 +273,7 @@ public class ScanAccumulo extends BaseAccumuloProcessor { try{ final RecordSchema writeSchema = determineRecordSchema(writerFactory, flowAttributes, valueIncluded); - try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) { + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, Collections.emptyMap())) { int i = 0; writer.beginRecordSet(); diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/PutRecordIT.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/PutRecordIT.java index eaba0b8b47..452071ef46 100644 --- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/PutRecordIT.java +++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/PutRecordIT.java @@ -22,6 +22,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -133,7 +134,7 @@ public class PutRecordIT { void verifyKey(String tableName, Set expectedKeys, Authorizations auths) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { if (null == auths) auths = new Authorizations(); - try(BatchScanner scanner = accumulo.getConnector("root","password").createBatchScanner(tableName,auths,1)) { + try(BatchScanner scanner = accumulo.createAccumuloClient("root", new PasswordToken("password")).createBatchScanner(tableName,auths,1)) { List ranges = new ArrayList<>(); ranges.add(new Range()); scanner.setRanges(ranges); @@ -157,7 +158,8 @@ public class PutRecordIT { String tableName = UUID.randomUUID().toString(); tableName=tableName.replace("-","a"); if (null != defaultVis) - accumulo.getConnector("root","password").securityOperations().changeUserAuthorizations("root",defaultVis); + accumulo.createAccumuloClient("root", new PasswordToken("password")).securityOperations().changeUserAuthorizations("root",defaultVis); + TestRunner runner = getTestRunner(tableName, DEFAULT_COLUMN_FAMILY); runner.setProperty(PutAccumuloRecord.CREATE_TABLE, "True"); runner.setProperty(PutAccumuloRecord.ROW_FIELD_NAME, "id"); diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/ScanAccumuloIT.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/ScanAccumuloIT.java index 2eaefef158..5e9958b7c8 100644 --- a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/ScanAccumuloIT.java +++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/ScanAccumuloIT.java @@ -16,7 +16,7 @@ * limitations under the License. */ package org.apache.nifi.accumulo.processors; - +import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; @@ -24,6 +24,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; @@ -89,7 +90,7 @@ public class ScanAccumuloIT { BatchWriterConfig writerConfig = new BatchWriterConfig(); writerConfig.setMaxWriteThreads(2); writerConfig.setMaxMemory(1024*1024); - MultiTableBatchWriter writer = accumulo.getConnector("root","password").createMultiTableBatchWriter(writerConfig); + MultiTableBatchWriter writer = accumulo.createAccumuloClient("root", new PasswordToken("password")).createMultiTableBatchWriter(writerConfig); long ts = System.currentTimeMillis(); @@ -139,7 +140,7 @@ public class ScanAccumuloIT { void verifyKey(String tableName, Set expectedKeys, Authorizations auths) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { if (null == auths) auths = new Authorizations(); - try(BatchScanner scanner = accumulo.getConnector("root","password").createBatchScanner(tableName,auths,1)) { + try(BatchScanner scanner = accumulo.createAccumuloClient("root",new PasswordToken("password")).createBatchScanner(tableName,auths,1)) { List ranges = new ArrayList<>(); ranges.add(new Range()); scanner.setRanges(ranges); @@ -163,9 +164,12 @@ public class ScanAccumuloIT { String auths, Authorizations defaultVis, boolean deletes, int expected) throws Exception { String tableName = UUID.randomUUID().toString(); tableName=tableName.replace("-","a"); - accumulo.getConnector("root","password").tableOperations().create(tableName); + AccumuloClient client = accumulo.createAccumuloClient("root",new PasswordToken("password")); + client.tableOperations().create(tableName); + if (null != defaultVis) - accumulo.getConnector("root","password").securityOperations().changeUserAuthorizations("root",defaultVis); + client.securityOperations().changeUserAuthorizations("root",defaultVis); + TestRunner runner = getTestRunner(tableName, DEFAULT_COLUMN_FAMILY); runner.setProperty(ScanAccumulo.START_KEY, row); if (!cf.isEmpty()) @@ -175,7 +179,7 @@ public class ScanAccumuloIT { runner.setProperty(ScanAccumulo.AUTHORIZATIONS, auths); runner.setProperty(ScanAccumulo.END_KEY, endrow); - AccumuloService client = MockAccumuloService.getService(runner,accumulo.getZooKeepers(),accumulo.getInstanceName(),"root","password"); + AccumuloService service = MockAccumuloService.getService(runner,accumulo.getZooKeepers(),accumulo.getInstanceName(),"root","password"); Set expectedKeys = generateTestData(runner,row,tableName,valueincq,delim, auths); if (sendFlowFile) { runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l