NIFI-12344 PutKudu Operation should accept Debezium Types

This closes #8004

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Pierre Villard 2023-11-10 13:19:47 +01:00 committed by exceptionfactory
parent 279084ddfe
commit eb7d49cdff
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
1 changed files with 14 additions and 3 deletions

View File

@ -165,8 +165,8 @@ public class PutKudu extends AbstractKuduProcessor {
.name("Operation RecordPath")
.displayName("Operation RecordPath")
.description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record in order to determine the Kudu Operation Type. When evaluated, the " +
"RecordPath must evaluate to one of hte valid Kudu Operation Types, or the incoming FlowFile will be routed to failure. If this property is specified, the <Kudu Operation Type> property" +
" will be ignored.")
"RecordPath must evaluate to one of the valid Kudu Operation Types (Debezium style operation types are also supported: \"r\" and \"c\" for INSERT, \"u\" for UPDATE, and \"d\" for "
+ "DELETE), or the incoming FlowFile will be routed to failure. If this property is specified, the <Kudu Operation Type> property will be ignored.")
.required(false)
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(NONE)
@ -688,7 +688,18 @@ public class PutKudu extends AbstractKuduProcessor {
final String resultValue = String.valueOf(resultList.get(0).getValue());
try {
// Support Operation Type character values from Debezium
switch (resultValue) {
case "c":
case "r":
return OperationType.INSERT;
case "u":
return OperationType.UPDATE;
case "d":
return OperationType.DELETE;
default:
return OperationType.valueOf(resultValue.toUpperCase());
}
} catch (final IllegalArgumentException iae) {
throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record to determine Kudu Operation Type but found invalid value: " + resultValue);
}