From 29a1d6badc08e648a50d8077905dacbf65345952 Mon Sep 17 00:00:00 2001 From: Grant Henke Date: Sat, 7 Nov 2020 10:11:50 -0600 Subject: [PATCH] NIFI-7987: Support ignore operations in the PutKudu processor --- .../nifi-kudu-processors/pom.xml | 2 +- .../kudu/AbstractKuduProcessor.java | 38 +++--------- .../nifi/processors/kudu/OperationType.java | 4 +- .../apache/nifi/processors/kudu/PutKudu.java | 44 ++++++++++--- .../nifi/processors/kudu/ITPutKudu.java | 2 +- .../nifi/processors/kudu/MockPutKudu.java | 61 +++++++++++++------ 6 files changed, 90 insertions(+), 61 deletions(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml index bf38d47317..a2fe67dd6b 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml @@ -26,7 +26,7 @@ None - 1.13.0 + 1.14.0 diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java index 0ae8e40568..a3ef9da549 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java @@ -23,18 +23,13 @@ import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.AlterTableOptions; import org.apache.kudu.client.AsyncKuduClient; -import org.apache.kudu.client.Delete; -import org.apache.kudu.client.Insert; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduException; import org.apache.kudu.client.KuduSession; -import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.OperationResponse; import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.RowError; import org.apache.kudu.client.SessionConfiguration; -import org.apache.kudu.client.Update; -import org.apache.kudu.client.Upsert; import org.apache.kudu.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; @@ -139,6 +134,14 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { return this.kerberosUser; } + protected boolean supportsIgnoreOperations() { + try { + return kuduClient.supportsIgnoreOperations(); + } catch (KuduException e) { + throw new RuntimeException(e); + } + } + protected void createKerberosUserAndOrKuduClient(ProcessContext context) throws LoginException { final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue(); @@ -428,29 +431,4 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { return alterTable; } - - protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List fieldNames, Boolean ignoreNull, Boolean lowercaseFields) { - Upsert upsert = kuduTable.newUpsert(); - buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames, ignoreNull, lowercaseFields); - return upsert; - } - - protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List fieldNames, Boolean ignoreNull, Boolean lowercaseFields) { - Insert insert = kuduTable.newInsert(); - buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames, ignoreNull, lowercaseFields); - return insert; - } - - protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List fieldNames, Boolean ignoreNull, Boolean lowercaseFields) { - Delete delete = kuduTable.newDelete(); - buildPartialRow(kuduTable.getSchema(), delete.getRow(), record, fieldNames, ignoreNull, lowercaseFields); - return delete; - } - - protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List fieldNames, Boolean ignoreNull, Boolean lowercaseFields) { - Update update = kuduTable.newUpdate(); - buildPartialRow(kuduTable.getSchema(), update.getRow(), record, fieldNames, ignoreNull, lowercaseFields); - return update; - } - } diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java index 08bcd77cb5..c335684938 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java @@ -22,5 +22,7 @@ public enum OperationType { INSERT_IGNORE, UPSERT, UPDATE, - DELETE; + DELETE, + UPDATE_IGNORE, + DELETE_IGNORE; } diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index 1a649b0433..1fb9199b4d 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -272,6 +272,7 @@ public class PutKudu extends AbstractKuduProcessor { private volatile Function recordPathOperationType; private volatile RecordPath dataRecordPath; private volatile String failureStrategy; + private volatile boolean supportsInsertIgnoreOp; @Override protected List getSupportedPropertyDescriptors() { @@ -312,6 +313,7 @@ public class PutKudu extends AbstractKuduProcessor { ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase()); createKerberosUserAndOrKuduClient(context); + supportsInsertIgnoreOp = supportsIgnoreOperations(); final String operationRecordPathValue = context.getProperty(OPERATION_RECORD_PATH).getValue(); if (operationRecordPathValue == null) { @@ -446,11 +448,14 @@ public class PutKudu extends AbstractKuduProcessor { } for (final Record dataRecord : dataRecords) { - // In the case of INSERT_IGNORE the Kudu session is modified to ignore row errors. + // If supportsIgnoreOps is false, in the case of INSERT_IGNORE the Kudu session + // is modified to ignore row errors. // Because the session is shared across flow files, for batching efficiency, we // need to flush when changing to and from INSERT_IGNORE operation types. - // This should be updated and simplified when KUDU-1563 is completed. - if (prevOperationType != operationType && (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) { + // This should be removed when the lowest supported version of Kudu supports + // ignore operations. + if (!supportsInsertIgnoreOp && prevOperationType != operationType + && (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) { flushKuduSession(kuduSession, false, pendingRowErrors); kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE); } @@ -580,22 +585,43 @@ public class PutKudu extends AbstractKuduProcessor { return kuduSession; } - private Operation createKuduOperation(OperationType operationType, Record record, + protected Operation createKuduOperation(OperationType operationType, Record record, List fieldNames, Boolean ignoreNull, Boolean lowercaseFields, KuduTable kuduTable) { + Operation operation; switch (operationType) { - case DELETE: - return deleteRecordFromKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields); case INSERT: + operation = kuduTable.newInsert(); + break; case INSERT_IGNORE: - return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields); + // If the target Kudu cluster does not support ignore operations use an insert. + // The legacy session based insert ignore will be used instead. + if (!supportsInsertIgnoreOp) { + operation = kuduTable.newInsert(); + } else { + operation = kuduTable.newInsertIgnore(); + } + break; case UPSERT: - return upsertRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields); + operation = kuduTable.newUpsert(); + break; case UPDATE: - return updateRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields); + operation = kuduTable.newUpdate(); + break; + case UPDATE_IGNORE: + operation = kuduTable.newUpdateIgnore(); + break; + case DELETE: + operation = kuduTable.newDelete(); + break; + case DELETE_IGNORE: + operation = kuduTable.newDeleteIgnore(); + break; default: throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType)); } + buildPartialRow(kuduTable.getSchema(), operation.getRow(), record, fieldNames, ignoreNull, lowercaseFields); + return operation; } private static class RecordPathOperationType implements Function { diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java index b6fa409467..7c2c84bd03 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java @@ -98,7 +98,7 @@ public class ITPutKudu { testRunner.setProperty(PutKudu.IGNORE_NULL, "true"); testRunner.setProperty(PutKudu.LOWERCASE_FIELD_NAMES, "false"); testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory"); - testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.INSERT.toString()); + testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.INSERT_IGNORE.toString()); } private void createKuduTable() throws KuduException { diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java index 31b9ac7530..1aff599592 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java @@ -18,11 +18,15 @@ package org.apache.nifi.processors.kudu; import org.apache.kudu.Schema; +import org.apache.kudu.client.DeleteIgnore; +import org.apache.kudu.client.InsertIgnore; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduSession; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Delete; import org.apache.kudu.client.Insert; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.UpdateIgnore; import org.apache.kudu.client.Upsert; import org.apache.kudu.client.Update; import org.apache.nifi.processor.ProcessContext; @@ -45,7 +49,7 @@ import static org.mockito.Mockito.when; public class MockPutKudu extends PutKudu { private KuduSession session; - private LinkedList insertQueue; + private LinkedList opQueue; // Atomic reference is used as the set and use of the schema are in different thread private AtomicReference tableSchema = new AtomicReference<>(); @@ -59,32 +63,51 @@ public class MockPutKudu extends PutKudu { public MockPutKudu(KuduSession session) { this.session = session; - this.insertQueue = new LinkedList<>(); + this.opQueue = new LinkedList<>(); } - public void queue(Insert... operations) { - insertQueue.addAll(Arrays.asList(operations)); + public void queue(Operation... operations) { + opQueue.addAll(Arrays.asList(operations)); } @Override - protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List fieldNames, Boolean ignoreNull, Boolean lowercaseFields) { - Insert insert = insertQueue.poll(); - return insert != null ? insert : mock(Insert.class); + protected Operation createKuduOperation(OperationType operationType, Record record, + List fieldNames, Boolean ignoreNull, + Boolean lowercaseFields, KuduTable kuduTable) { + Operation operation = opQueue.poll(); + if (operation == null) { + switch (operationType) { + case INSERT: + operation = mock(Insert.class); + break; + case INSERT_IGNORE: + operation = mock(InsertIgnore.class); + break; + case UPSERT: + operation = mock(Upsert.class); + break; + case UPDATE: + operation = mock(Update.class); + break; + case UPDATE_IGNORE: + operation = mock(UpdateIgnore.class); + break; + case DELETE: + operation = mock(Delete.class); + break; + case DELETE_IGNORE: + operation = mock(DeleteIgnore.class); + break; + default: + throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType)); + } + } + return operation; } @Override - protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List fieldNames, Boolean ignoreNull, Boolean lowercaseFields) { - return mock(Upsert.class); - } - - @Override - protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List fieldNames, Boolean ignoreNull, Boolean lowercaseFields) { - return mock(Delete.class); - } - - @Override - protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List fieldNames, Boolean ignoreNull, Boolean lowercaseFields) { - return mock(Update.class); + protected boolean supportsIgnoreOperations() { + return true; } @Override