NIFI-7987: Support ignore operations in the PutKudu processor

This commit is contained in:
Grant Henke 2020-11-07 10:11:50 -06:00 committed by markap14
parent d72bfc76b5
commit 29a1d6badc
6 changed files with 90 additions and 61 deletions

View File

@ -26,7 +26,7 @@
<properties>
<exclude.tests>None</exclude.tests>
<kudu.version>1.13.0</kudu.version>
<kudu.version>1.14.0</kudu.version>
</properties>
<build>
<extensions>

View File

@ -23,18 +23,13 @@ import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.AlterTableOptions;
import org.apache.kudu.client.AsyncKuduClient;
import org.apache.kudu.client.Delete;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Update;
import org.apache.kudu.client.Upsert;
import org.apache.kudu.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
@ -139,6 +134,14 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
return this.kerberosUser;
}
protected boolean supportsIgnoreOperations() {
try {
return kuduClient.supportsIgnoreOperations();
} catch (KuduException e) {
throw new RuntimeException(e);
}
}
protected void createKerberosUserAndOrKuduClient(ProcessContext context) throws LoginException {
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
@ -428,29 +431,4 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
return alterTable;
}
protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
Upsert upsert = kuduTable.newUpsert();
buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
return upsert;
}
protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
Insert insert = kuduTable.newInsert();
buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
return insert;
}
protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
Delete delete = kuduTable.newDelete();
buildPartialRow(kuduTable.getSchema(), delete.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
return delete;
}
protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
Update update = kuduTable.newUpdate();
buildPartialRow(kuduTable.getSchema(), update.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
return update;
}
}

View File

@ -22,5 +22,7 @@ public enum OperationType {
INSERT_IGNORE,
UPSERT,
UPDATE,
DELETE;
DELETE,
UPDATE_IGNORE,
DELETE_IGNORE;
}

View File

@ -272,6 +272,7 @@ public class PutKudu extends AbstractKuduProcessor {
private volatile Function<Record, OperationType> recordPathOperationType;
private volatile RecordPath dataRecordPath;
private volatile String failureStrategy;
private volatile boolean supportsInsertIgnoreOp;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -312,6 +313,7 @@ public class PutKudu extends AbstractKuduProcessor {
ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase());
createKerberosUserAndOrKuduClient(context);
supportsInsertIgnoreOp = supportsIgnoreOperations();
final String operationRecordPathValue = context.getProperty(OPERATION_RECORD_PATH).getValue();
if (operationRecordPathValue == null) {
@ -446,11 +448,14 @@ public class PutKudu extends AbstractKuduProcessor {
}
for (final Record dataRecord : dataRecords) {
// In the case of INSERT_IGNORE the Kudu session is modified to ignore row errors.
// If supportsIgnoreOps is false, in the case of INSERT_IGNORE the Kudu session
// is modified to ignore row errors.
// Because the session is shared across flow files, for batching efficiency, we
// need to flush when changing to and from INSERT_IGNORE operation types.
// This should be updated and simplified when KUDU-1563 is completed.
if (prevOperationType != operationType && (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
// This should be removed when the lowest supported version of Kudu supports
// ignore operations.
if (!supportsInsertIgnoreOp && prevOperationType != operationType
&& (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
flushKuduSession(kuduSession, false, pendingRowErrors);
kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE);
}
@ -580,22 +585,43 @@ public class PutKudu extends AbstractKuduProcessor {
return kuduSession;
}
private Operation createKuduOperation(OperationType operationType, Record record,
protected Operation createKuduOperation(OperationType operationType, Record record,
List<String> fieldNames, Boolean ignoreNull,
Boolean lowercaseFields, KuduTable kuduTable) {
Operation operation;
switch (operationType) {
case DELETE:
return deleteRecordFromKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
case INSERT:
operation = kuduTable.newInsert();
break;
case INSERT_IGNORE:
return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
// If the target Kudu cluster does not support ignore operations use an insert.
// The legacy session based insert ignore will be used instead.
if (!supportsInsertIgnoreOp) {
operation = kuduTable.newInsert();
} else {
operation = kuduTable.newInsertIgnore();
}
break;
case UPSERT:
return upsertRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
operation = kuduTable.newUpsert();
break;
case UPDATE:
return updateRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
operation = kuduTable.newUpdate();
break;
case UPDATE_IGNORE:
operation = kuduTable.newUpdateIgnore();
break;
case DELETE:
operation = kuduTable.newDelete();
break;
case DELETE_IGNORE:
operation = kuduTable.newDeleteIgnore();
break;
default:
throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType));
}
buildPartialRow(kuduTable.getSchema(), operation.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
return operation;
}
private static class RecordPathOperationType implements Function<Record, OperationType> {

View File

@ -98,7 +98,7 @@ public class ITPutKudu {
testRunner.setProperty(PutKudu.IGNORE_NULL, "true");
testRunner.setProperty(PutKudu.LOWERCASE_FIELD_NAMES, "false");
testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory");
testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.INSERT.toString());
testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.INSERT_IGNORE.toString());
}
private void createKuduTable() throws KuduException {

View File

@ -18,11 +18,15 @@
package org.apache.nifi.processors.kudu;
import org.apache.kudu.Schema;
import org.apache.kudu.client.DeleteIgnore;
import org.apache.kudu.client.InsertIgnore;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Delete;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.UpdateIgnore;
import org.apache.kudu.client.Upsert;
import org.apache.kudu.client.Update;
import org.apache.nifi.processor.ProcessContext;
@ -45,7 +49,7 @@ import static org.mockito.Mockito.when;
public class MockPutKudu extends PutKudu {
private KuduSession session;
private LinkedList<Insert> insertQueue;
private LinkedList<Operation> opQueue;
// Atomic reference is used as the set and use of the schema are in different thread
private AtomicReference<Schema> tableSchema = new AtomicReference<>();
@ -59,32 +63,51 @@ public class MockPutKudu extends PutKudu {
public MockPutKudu(KuduSession session) {
this.session = session;
this.insertQueue = new LinkedList<>();
this.opQueue = new LinkedList<>();
}
public void queue(Insert... operations) {
insertQueue.addAll(Arrays.asList(operations));
public void queue(Operation... operations) {
opQueue.addAll(Arrays.asList(operations));
}
@Override
protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
Insert insert = insertQueue.poll();
return insert != null ? insert : mock(Insert.class);
protected Operation createKuduOperation(OperationType operationType, Record record,
List<String> fieldNames, Boolean ignoreNull,
Boolean lowercaseFields, KuduTable kuduTable) {
Operation operation = opQueue.poll();
if (operation == null) {
switch (operationType) {
case INSERT:
operation = mock(Insert.class);
break;
case INSERT_IGNORE:
operation = mock(InsertIgnore.class);
break;
case UPSERT:
operation = mock(Upsert.class);
break;
case UPDATE:
operation = mock(Update.class);
break;
case UPDATE_IGNORE:
operation = mock(UpdateIgnore.class);
break;
case DELETE:
operation = mock(Delete.class);
break;
case DELETE_IGNORE:
operation = mock(DeleteIgnore.class);
break;
default:
throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType));
}
}
return operation;
}
@Override
protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
return mock(Upsert.class);
}
@Override
protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
return mock(Delete.class);
}
@Override
protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
return mock(Update.class);
protected boolean supportsIgnoreOperations() {
return true;
}
@Override