mirror of https://github.com/apache/nifi.git
There was some minor changes required to stop using deprecated APIs byu accumulo, inspecting the deprecated makes the fixes really simple
* writerFactory.createWriter receives an extra parameteres with a map, in the implementation it sends an empty one if not passed. * tests were relying on getConnector, that is replaced by createAccumuloClient Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #4942.
This commit is contained in:
parent
64b12176b2
commit
fa3da2aa1c
|
@ -273,7 +273,7 @@ public class ScanAccumulo extends BaseAccumuloProcessor {
|
|||
try{
|
||||
final RecordSchema writeSchema = determineRecordSchema(writerFactory, flowAttributes, valueIncluded);
|
||||
|
||||
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
|
||||
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, Collections.emptyMap())) {
|
||||
|
||||
int i = 0;
|
||||
writer.beginRecordSet();
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
|
|||
import org.apache.accumulo.core.client.BatchScanner;
|
||||
import org.apache.accumulo.core.client.TableExistsException;
|
||||
import org.apache.accumulo.core.client.TableNotFoundException;
|
||||
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
|
||||
import org.apache.accumulo.core.data.Key;
|
||||
import org.apache.accumulo.core.data.Range;
|
||||
import org.apache.accumulo.core.data.Value;
|
||||
|
@ -133,7 +134,7 @@ public class PutRecordIT {
|
|||
void verifyKey(String tableName, Set<Key> expectedKeys, Authorizations auths) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
|
||||
if (null == auths)
|
||||
auths = new Authorizations();
|
||||
try(BatchScanner scanner = accumulo.getConnector("root","password").createBatchScanner(tableName,auths,1)) {
|
||||
try(BatchScanner scanner = accumulo.createAccumuloClient("root", new PasswordToken("password")).createBatchScanner(tableName,auths,1)) {
|
||||
List<Range> ranges = new ArrayList<>();
|
||||
ranges.add(new Range());
|
||||
scanner.setRanges(ranges);
|
||||
|
@ -157,7 +158,8 @@ public class PutRecordIT {
|
|||
String tableName = UUID.randomUUID().toString();
|
||||
tableName=tableName.replace("-","a");
|
||||
if (null != defaultVis)
|
||||
accumulo.getConnector("root","password").securityOperations().changeUserAuthorizations("root",defaultVis);
|
||||
accumulo.createAccumuloClient("root", new PasswordToken("password")).securityOperations().changeUserAuthorizations("root",defaultVis);
|
||||
|
||||
TestRunner runner = getTestRunner(tableName, DEFAULT_COLUMN_FAMILY);
|
||||
runner.setProperty(PutAccumuloRecord.CREATE_TABLE, "True");
|
||||
runner.setProperty(PutAccumuloRecord.ROW_FIELD_NAME, "id");
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.accumulo.processors;
|
||||
|
||||
import org.apache.accumulo.core.client.AccumuloClient;
|
||||
import org.apache.accumulo.core.client.AccumuloException;
|
||||
import org.apache.accumulo.core.client.AccumuloSecurityException;
|
||||
import org.apache.accumulo.core.client.BatchScanner;
|
||||
|
@ -24,6 +24,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
|
|||
import org.apache.accumulo.core.client.MultiTableBatchWriter;
|
||||
import org.apache.accumulo.core.client.TableExistsException;
|
||||
import org.apache.accumulo.core.client.TableNotFoundException;
|
||||
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
|
||||
import org.apache.accumulo.core.data.Key;
|
||||
import org.apache.accumulo.core.data.Mutation;
|
||||
import org.apache.accumulo.core.data.Range;
|
||||
|
@ -89,7 +90,7 @@ public class ScanAccumuloIT {
|
|||
BatchWriterConfig writerConfig = new BatchWriterConfig();
|
||||
writerConfig.setMaxWriteThreads(2);
|
||||
writerConfig.setMaxMemory(1024*1024);
|
||||
MultiTableBatchWriter writer = accumulo.getConnector("root","password").createMultiTableBatchWriter(writerConfig);
|
||||
MultiTableBatchWriter writer = accumulo.createAccumuloClient("root", new PasswordToken("password")).createMultiTableBatchWriter(writerConfig);
|
||||
|
||||
long ts = System.currentTimeMillis();
|
||||
|
||||
|
@ -139,7 +140,7 @@ public class ScanAccumuloIT {
|
|||
void verifyKey(String tableName, Set<Key> expectedKeys, Authorizations auths) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
|
||||
if (null == auths)
|
||||
auths = new Authorizations();
|
||||
try(BatchScanner scanner = accumulo.getConnector("root","password").createBatchScanner(tableName,auths,1)) {
|
||||
try(BatchScanner scanner = accumulo.createAccumuloClient("root",new PasswordToken("password")).createBatchScanner(tableName,auths,1)) {
|
||||
List<Range> ranges = new ArrayList<>();
|
||||
ranges.add(new Range());
|
||||
scanner.setRanges(ranges);
|
||||
|
@ -163,9 +164,12 @@ public class ScanAccumuloIT {
|
|||
String auths, Authorizations defaultVis, boolean deletes, int expected) throws Exception {
|
||||
String tableName = UUID.randomUUID().toString();
|
||||
tableName=tableName.replace("-","a");
|
||||
accumulo.getConnector("root","password").tableOperations().create(tableName);
|
||||
AccumuloClient client = accumulo.createAccumuloClient("root",new PasswordToken("password"));
|
||||
client.tableOperations().create(tableName);
|
||||
|
||||
if (null != defaultVis)
|
||||
accumulo.getConnector("root","password").securityOperations().changeUserAuthorizations("root",defaultVis);
|
||||
client.securityOperations().changeUserAuthorizations("root",defaultVis);
|
||||
|
||||
TestRunner runner = getTestRunner(tableName, DEFAULT_COLUMN_FAMILY);
|
||||
runner.setProperty(ScanAccumulo.START_KEY, row);
|
||||
if (!cf.isEmpty())
|
||||
|
@ -175,7 +179,7 @@ public class ScanAccumuloIT {
|
|||
runner.setProperty(ScanAccumulo.AUTHORIZATIONS, auths);
|
||||
runner.setProperty(ScanAccumulo.END_KEY, endrow);
|
||||
|
||||
AccumuloService client = MockAccumuloService.getService(runner,accumulo.getZooKeepers(),accumulo.getInstanceName(),"root","password");
|
||||
AccumuloService service = MockAccumuloService.getService(runner,accumulo.getZooKeepers(),accumulo.getInstanceName(),"root","password");
|
||||
Set<Key> expectedKeys = generateTestData(runner,row,tableName,valueincq,delim, auths);
|
||||
if (sendFlowFile) {
|
||||
runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l
|
||||
|
|
Loading…
Reference in New Issue