From eb7d49cdff93b638ee7e0ea67a4525a85fbf0ec0 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 10 Nov 2023 13:19:47 +0100 Subject: [PATCH] NIFI-12344 PutKudu Operation should accept Debezium Types This closes #8004 Signed-off-by: David Handermann --- .../apache/nifi/processors/kudu/PutKudu.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index 3a3273559f..eaa07617c2 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -165,8 +165,8 @@ public class PutKudu extends AbstractKuduProcessor { .name("Operation RecordPath") .displayName("Operation RecordPath") .description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record in order to determine the Kudu Operation Type. When evaluated, the " + - "RecordPath must evaluate to one of hte valid Kudu Operation Types, or the incoming FlowFile will be routed to failure. If this property is specified, the property" + - " will be ignored.") + "RecordPath must evaluate to one of the valid Kudu Operation Types (Debezium style operation types are also supported: \"r\" and \"c\" for INSERT, \"u\" for UPDATE, and \"d\" for " + + "DELETE), or the incoming FlowFile will be routed to failure. If this property is specified, the property will be ignored.") .required(false) .addValidator(new RecordPathValidator()) .expressionLanguageSupported(NONE) @@ -688,7 +688,18 @@ public class PutKudu extends AbstractKuduProcessor { final String resultValue = String.valueOf(resultList.get(0).getValue()); try { - return OperationType.valueOf(resultValue.toUpperCase()); + // Support Operation Type character values from Debezium + switch (resultValue) { + case "c": + case "r": + return OperationType.INSERT; + case "u": + return OperationType.UPDATE; + case "d": + return OperationType.DELETE; + default: + return OperationType.valueOf(resultValue.toUpperCase()); + } } catch (final IllegalArgumentException iae) { throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record to determine Kudu Operation Type but found invalid value: " + resultValue); }