NIFI-11044 Script/commands to migrate Kafka processors

This closes #6838.

Reviewed-by: Robert Kalmar <rfrostkalmar@gmail.com>
Reviewed-by: Zoltan Kornel Torok <taz19880922@gmail.com>

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Timea Barna 2023-01-12 09:31:38 +01:00 committed by Peter Turcsanyi
parent dd2fc4bb9d
commit 00985edd80
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
27 changed files with 2101 additions and 1 deletions

View File

@ -1583,4 +1583,71 @@ NOTE: As of NiFi 1.10.x, because of an upgrade to ZooKeeper 3.5.x, the migrator
* For a ZooKeeper using Kerberos for authentication:
** `zk-migrator.sh -s -z destinationHostname:destinationClientPort/destinationRootPath/components -k /path/to/jaasconfig/jaas-config.conf -f /path/to/export/zk-source-data.json`
6. Once the migration has completed successfully, start the processors in the NiFi flow. Processing should continue from the point at which it was stopped when the NiFi flow was stopped.
6. Once the migration has completed successfully, start the processors in the NiFi flow. Processing should continue from the point at which it was stopped when the NiFi flow was stopped.
[[kafka_migrator]]
== Kafka Processor Migrator
With NiFi version 1.15.3, Kafka processor versions 0.8, 0.9, 0.10 and 0.11 were removed.
In large flows having many numbers of components it is challenging to replace these processors manually.
This tool can be used to update a flow in an automated way.
=== Usage
Running the script requires 3 mandatory and 1 optional parameters:
* Input file, the full path of the flow.xml.gz in which the replacement is required.
* Output file, the full path of the file where the results should be saved.
* Transaction, whether the new processors should be configured with or without transaction usage.
* Optional: Kafka Brokers, a comma separated list of Kafka Brokers in <host>:<port> format.
Different input and output files must be used.
Kafka Broker argument can be omitted if flow does not contain GetKafka or PutKafka processors.
1. Run script, a possible example:
./bin/kafka-migrator.sh -i "/tmp/flow/flow.xml.gz" -o "/tmp/flow/flow_result.xml.gz" -t false -k "mykafkaserver1:1234,mykafkaserver2:1235"
2. Rename flow_result.xml.gz file to flow.xml.gz, do not overwrite your input file.
3. Copy flow.xml.gz file to all the NiFi nodes conf directory
4. Start NiFi
5. Verify the results.
=== Expected Behaviour
* Flow replacement:
* For all replaced processors:
** changing class and artifact
** configure transaction as true
*** 'Delivery Guarantee' property will be set to 'Replicated'
*** if 'Honor-Transactions' and 'Use-Transactions' properties are present in the file they will be set to true
*** if 'Honor-Transactions' and 'Use-Transactions' not present they will be translated as true in NiFi
** configure transaction as false
*** 'Delivery Guarantee' property will keep its original setting.
*** 'Honor-Transactions' and 'Use-Transactions' will be set to false
* For version 0.8 processors (when kafka broker list argument provided)
** remove all version 0.8 properties
** add version 2.0 properties with default value except for 'Topic Name', 'Group ID',
'Partition', 'Kafka Key', 'Delivery Guarantee' (if transaction false) and
'Compression Codec' values which will be copied over
* Template replacement:
* For all replaced processors:
** changing type and artifact
** configure transaction as true
*** 'Delivery Guarantee' property will be set to 'Replicated'
*** if 'Honor-Transactions' and 'Use-Transactions' properties are present in the file they will be set to true
*** if 'Honor-Transactions' and 'Use-Transactions' not present they will be translated as true in NiFi
** configure transaction as false
*** 'Delivery Guarantee' property will keep its original setting.
*** 'Honor-Transactions' and 'Use-Transactions' will be set to false
* For version 0.8 processors (when kafka broker list argument provided)
** remove all version 0.8 properties and descriptors
** add version 2.0 properties with default value except for 'Topic Name', 'Group ID',
'Partition', 'Kafka Key', 'Delivery Guarantee' (if transaction false) and
'Compression Codec' values which will be copied over
*** add version 2.0 descriptors
=== Limitations
* All deprecated Kafka processors will be replaced with version 2.0 processors.
* Script will not rename the processors, only their type will be changed.
* Transaction setting will be applied to all the replaced processors.
* The flow.xml.gz file needs to be fit in memory and process time depends on the file size.
* Processors in templates will be replaced as well, please download the original templates if desired.

View File

@ -93,6 +93,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-toolkit-flowanalyzer</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-toolkit-kafka-migrator</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-toolkit-cli</artifactId>

View File

@ -0,0 +1,41 @@
@echo off
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
rem Use JAVA_HOME if it's set; otherwise, just use java
if "%JAVA_HOME%" == "" goto noJavaHome
if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome
set JAVA_EXE=%JAVA_HOME%\bin\java.exe
goto startConfig
:noJavaHome
echo The JAVA_HOME environment variable is not defined correctly.
echo Instead the PATH will be used to find the java executable.
echo.
set JAVA_EXE=java
goto startConfig
:startConfig
set LIB_DIR=%~dp0..\classpath;%~dp0..\lib
if "%JAVA_OPTS%" == "" set JAVA_OPTS=-Xms12m -Xmx24m
SET JAVA_PARAMS=-cp %LIB_DIR%\* %JAVA_OPTS% org.apache.nifi.toolkit.kafkamigrator.KafkaMigratorMain
cmd.exe /C ""%JAVA_EXE%" %JAVA_PARAMS% %* ""

View File

@ -0,0 +1,119 @@
#!/bin/sh
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches
SCRIPT_DIR=$(dirname "$0")
SCRIPT_NAME=$(basename "$0")
NIFI_TOOLKIT_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd)
PROGNAME=$(basename "$0")
warn() {
(>&2 echo "${PROGNAME}: $*")
}
die() {
warn "$*"
exit 1
}
detectOS() {
# OS specific support (must be 'true' or 'false').
cygwin=false;
aix=false;
os400=false;
darwin=false;
case "$(uname)" in
CYGWIN*)
cygwin=true
;;
AIX*)
aix=true
;;
OS400*)
os400=true
;;
Darwin)
darwin=true
;;
esac
# For AIX, set an environment variable
if ${aix}; then
export LDR_CNTRL=MAXDATA=0xB0000000@DSA
echo ${LDR_CNTRL}
fi
}
locateJava() {
# Setup the Java Virtual Machine
if $cygwin ; then
[ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}")
[ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}")
fi
if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then
JAVA_HOME=$(java-config --jre-home)
fi
if [ "x${JAVA}" = "x" ]; then
if [ "x${JAVA_HOME}" != "x" ]; then
if [ ! -d "${JAVA_HOME}" ]; then
die "JAVA_HOME is not valid: ${JAVA_HOME}"
fi
JAVA="${JAVA_HOME}/bin/java"
else
warn "JAVA_HOME not set; results may vary"
JAVA=$(type java)
JAVA=$(expr "${JAVA}" : '.* \(/.*\)$')
if [ "x${JAVA}" = "x" ]; then
die "java command not found"
fi
fi
fi
}
init() {
# Determine if there is special OS handling we must perform
detectOS
# Locate the Java VM to execute
locateJava "$1"
}
run() {
LIBS="${NIFI_TOOLKIT_HOME}/lib/*"
sudo_cmd_prefix=""
if $cygwin; then
NIFI_TOOLKIT_HOME=$(cygpath --path --windows "${NIFI_TOOLKIT_HOME}")
CLASSPATH="$NIFI_TOOLKIT_HOME/classpath;$(cygpath --path --windows "${LIBS}")"
else
CLASSPATH="$NIFI_TOOLKIT_HOME/classpath:${LIBS}"
fi
export JAVA_HOME="$JAVA_HOME"
export NIFI_TOOLKIT_HOME="$NIFI_TOOLKIT_HOME"
umask 0077
exec "${JAVA}" -cp "${CLASSPATH}" ${JAVA_OPTS:--Xms12m -Xmx24m} org.apache.nifi.toolkit.kafkamigrator.KafkaMigratorMain "$@"
}
init "$1"
run "$@"

View File

@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-toolkit</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-toolkit-kafka-migrator</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-xml-processing</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/flow.xml</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator;
import org.apache.nifi.toolkit.kafkamigrator.service.KafkaFlowMigrationService;
import org.apache.nifi.toolkit.kafkamigrator.service.KafkaTemplateMigrationService;
import org.apache.nifi.xml.processing.parsers.DocumentProvider;
import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
import org.apache.nifi.xml.processing.transform.StandardTransformProvider;
import org.w3c.dom.Document;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
public class KafkaMigratorMain {
private static void printUsage() {
System.out.println("This application replaces Kafka processors from version 0.8, 0.9, 0.10 and 0.11 to version 2.0 processors" +
" in a flow.xml.gz file.");
System.out.println("\n");
System.out.println("Usage: kafka-migrator.sh -i <path to input flow.xml.gz> -o <path to output flow.xml.gz>" +
" -t <use transaction true or false>\noptional: -k <comma separated kafka brokers in <host>:<port> format. " +
"Required for version 0.8 processors only>");
}
public static void main(final String[] args) throws Exception {
if (showingUsageNeeded(args)) {
printUsage();
return;
}
String input = "";
if (args[0].equalsIgnoreCase("-i")) {
input = args[1];
}
String output = "";
if (args[2].equalsIgnoreCase("-o")) {
output = args[3];
}
if (input.equalsIgnoreCase(output)) {
System.out.println("Input and output files should be different.");
return;
}
String transaction = "";
if (args[4].equalsIgnoreCase("-t")) {
transaction = args[5];
}
if (!(transaction.equalsIgnoreCase("true") || transaction.equalsIgnoreCase("false"))) {
System.out.println("Transaction argument should be either true or false.");
return;
}
String kafkaBrokers = "";
if (args.length == 8) {
if (args[6].equalsIgnoreCase("-k") && args[7].matches(".+:\\d+")) {
kafkaBrokers = args[7];
} else {
System.out.println("Kafka Brokers must be in a <host>:<port> format, can be separated by comma. " +
"For example: hostname:1234, host:5678");
return;
}
}
final MigratorConfigurationBuilder configurationBuilder = new MigratorConfigurationBuilder();
configurationBuilder.setKafkaBrokers(kafkaBrokers)
.setTransaction(Boolean.parseBoolean(transaction));
final InputStream fileStream = Files.newInputStream(Paths.get(input));
final OutputStream outputStream = Files.newOutputStream(Paths.get(output));
final InputStream gzipStream = new GZIPInputStream(fileStream);
final OutputStream gzipOutStream = new GZIPOutputStream(outputStream);
System.out.println("Using flow=" + input);
try {
final DocumentProvider documentProvider = new StandardDocumentProvider();
final Document document = documentProvider.parse(gzipStream);
final KafkaFlowMigrationService flowMigrationService = new KafkaFlowMigrationService();
final KafkaTemplateMigrationService templateMigrationService = new KafkaTemplateMigrationService();
System.out.println("Replacing processors.");
flowMigrationService.replaceKafkaProcessors(document, configurationBuilder);
templateMigrationService.replaceKafkaProcessors(document, configurationBuilder);
final StreamResult streamResult = new StreamResult(gzipOutStream);
final StandardTransformProvider transformProvider = new StandardTransformProvider();
transformProvider.setIndent(true);
transformProvider.transform(new DOMSource(document), streamResult);
System.out.println("Replacing completed.");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Exception occurred while attempting to parse flow.xml.gz. Cause: " + e.getCause());
} finally {
gzipOutStream.close();
outputStream.close();
gzipStream.close();
fileStream.close();
}
}
private static boolean showingUsageNeeded(String[] args) {
return args.length < 6 || args[0].equalsIgnoreCase("-h") || args[0].equalsIgnoreCase("--help");
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.ProcessorDescriptor;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.PropertyXpathDescriptor;
public class MigratorConfiguration {
final private String kafkaBrokers;
final private boolean transaction;
final private boolean isVersion8Processor;
final private ProcessorDescriptor processorDescriptor;
final private PropertyXpathDescriptor propertyXpathDescriptor;
public MigratorConfiguration(final String kafkaBrokers, final boolean transaction, final boolean isVersion8Processor,
final ProcessorDescriptor processorDescriptor, final PropertyXpathDescriptor propertyXpathDescriptor) {
this.kafkaBrokers = kafkaBrokers;
this.transaction = transaction;
this.isVersion8Processor = isVersion8Processor;
this.processorDescriptor = processorDescriptor;
this.propertyXpathDescriptor = propertyXpathDescriptor;
}
public String getKafkaBrokers() {
return kafkaBrokers;
}
public boolean isTransaction() {
return transaction;
}
public boolean isVersion8Processor() {
return isVersion8Processor;
}
public ProcessorDescriptor getProcessorDescriptor() {
return processorDescriptor;
}
public PropertyXpathDescriptor getPropertyXpathDescriptor() {
return propertyXpathDescriptor;
}
public static class MigratorConfigurationBuilder {
private String kafkaBrokers;
private boolean transaction;
private boolean isVersion8Processor;
private ProcessorDescriptor processorDescriptor;
private PropertyXpathDescriptor propertyXpathDescriptor;
public MigratorConfigurationBuilder setKafkaBrokers(final String kafkaBrokers) {
this.kafkaBrokers = kafkaBrokers;
return this;
}
public MigratorConfigurationBuilder setTransaction(final boolean transaction) {
this.transaction = transaction;
return this;
}
public MigratorConfigurationBuilder setIsVersion8Processor(final boolean isVersion8Processor) {
this.isVersion8Processor = isVersion8Processor;
return this;
}
public MigratorConfigurationBuilder setProcessorDescriptor(final ProcessorDescriptor processorDescriptor) {
this.processorDescriptor = processorDescriptor;
return this;
}
public MigratorConfigurationBuilder setPropertyXpathDescriptor(final PropertyXpathDescriptor propertyXpathDescriptor) {
this.propertyXpathDescriptor = propertyXpathDescriptor;
return this;
}
public MigratorConfiguration build() {
return new MigratorConfiguration(kafkaBrokers, transaction, isVersion8Processor,
processorDescriptor, propertyXpathDescriptor);
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.descriptor;
import java.util.HashMap;
import java.util.Map;
public class FlowPropertyXpathDescriptor implements PropertyXpathDescriptor {
private static final Map<String, String> CONSUME_TRANSACTION_PROPERTIES;
private static final Map<String, String> PUBLISH_TRANSACTION_PROPERTIES;
private static final Map<KafkaProcessorType, Map<String, String>> TRANSACTION_PROPERTIES;
static {
CONSUME_TRANSACTION_PROPERTIES = new HashMap<>();
CONSUME_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty", "property[name=\"honor-transactions\"]/value");
CONSUME_TRANSACTION_PROPERTIES.put("transactionTagName", "honor-transactions");
PUBLISH_TRANSACTION_PROPERTIES = new HashMap<>();
PUBLISH_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty", "property[name=\"use-transactions\"]/value");
PUBLISH_TRANSACTION_PROPERTIES.put("transactionTagName", "use-transactions");
TRANSACTION_PROPERTIES = new HashMap<>();
TRANSACTION_PROPERTIES.put(KafkaProcessorType.CONSUME, CONSUME_TRANSACTION_PROPERTIES);
TRANSACTION_PROPERTIES.put(KafkaProcessorType.PUBLISH, PUBLISH_TRANSACTION_PROPERTIES);
}
private final KafkaProcessorType processorType;
public FlowPropertyXpathDescriptor(final KafkaProcessorType processorType) {
this.processorType = processorType;
}
@Override
public String getXpathForProperties() {
return "property";
}
@Override
public String getPropertyKeyTagName() {
return "name";
}
@Override
public String getPropertyTagName() {
return "property";
}
@Override
public String getXpathForTransactionProperty() {
return TRANSACTION_PROPERTIES.get(processorType).get("xpathForTransactionProperty");
}
@Override
public String getTransactionTagName() {
return TRANSACTION_PROPERTIES.get(processorType).get("transactionTagName");
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.descriptor;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class KafkaProcessorDescriptor implements ProcessorDescriptor {
private static final Map<String, String> CONSUME_KAFKA_PROCESSOR_PROPERTIES;
private static final Map<String, String> CONSUME_PROPERTIES_TO_BE_SAVED;
private static final Map<String, String> PUBLISH_KAFKA_PROCESSOR_PROPERTIES;
private static final Map<String, String> PUBLISH_PROPERTIES_TO_BE_SAVED;
private static final Map<String, String> CONTROLLER_SERVICES;
private static final Map<KafkaProcessorType, Map<String, String>> PROPERTIES;
private static final Map<KafkaProcessorType, Map<String, String>> PROPERTIES_TO_BE_SAVED;
static {
CONSUME_KAFKA_PROCESSOR_PROPERTIES = new HashMap<>();
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("security.protocol", "PLAINTEXT");
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.mechanism", "GSSAPI");
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.service.name", null);
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("kerberos-credentials-service", null);
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.principal", null);
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.keytab", null);
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.username", null);
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.password", null);
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.token.auth", "false");
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("ssl.context.service", null);
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("topic", null);
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("topic_type", "names");
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("group.id", null);
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("auto.offset.reset", "latest");
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("key-attribute-encoding", "utf-8");
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("message-demarcator", null);
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("separate-by-key", "false");
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("message-header-encoding", "UTF-8");
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("header-name-regex", null);
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("max.poll.records", "10000");
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("max-uncommit-offset-wait", "1 secs");
CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("Communications Timeout", "60 secs");
CONSUME_PROPERTIES_TO_BE_SAVED = new HashMap<>();
CONSUME_PROPERTIES_TO_BE_SAVED.put("Topic Name", "topic");
CONSUME_PROPERTIES_TO_BE_SAVED.put("Group ID", "group.id");
PUBLISH_KAFKA_PROCESSOR_PROPERTIES = new HashMap<>();
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("security.protocol", "PLAINTEXT");
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.mechanism", "GSSAPI");
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.service.name", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("kerberos-credentials-service", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.principal", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.keytab", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.username", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.password", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.token.auth", "false");
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("ssl.context.service", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("topic", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("acks", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("Failure Strategy", "Route to Failure");
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("transactional-id-prefix", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("attribute-name-regex", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("message-header-encoding", "UTF-8");
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("kafka-key", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("key-attribute-encoding", "utf-8");
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("message-demarcator", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("max.request.size", "1 MB");
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("ack.wait.time", "5 secs");
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("max.block.ms", "5 sec");
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("partition", null);
PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("compression.type", null);
PUBLISH_PROPERTIES_TO_BE_SAVED = new HashMap<>();
PUBLISH_PROPERTIES_TO_BE_SAVED.put("Topic Name", "topic");
PUBLISH_PROPERTIES_TO_BE_SAVED.put("Partition", "partition");
PUBLISH_PROPERTIES_TO_BE_SAVED.put("Kafka Key", "kafka-key");
PUBLISH_PROPERTIES_TO_BE_SAVED.put("Delivery Guarantee", "acks");
PUBLISH_PROPERTIES_TO_BE_SAVED.put( "Compression Codec", "compression.type");
CONTROLLER_SERVICES = new HashMap<>();
CONTROLLER_SERVICES.put("kerberos-credentials-service", "org.apache.nifi.kerberos.KerberosCredentialsService");
CONTROLLER_SERVICES.put("ssl.context.service", "org.apache.nifi.ssl.SSLContextService");
PROPERTIES = new HashMap<>();
PROPERTIES.put(KafkaProcessorType.CONSUME, CONSUME_KAFKA_PROCESSOR_PROPERTIES);
PROPERTIES.put(KafkaProcessorType.PUBLISH, PUBLISH_KAFKA_PROCESSOR_PROPERTIES);
PROPERTIES_TO_BE_SAVED = new HashMap<>();
PROPERTIES_TO_BE_SAVED.put(KafkaProcessorType.CONSUME, CONSUME_PROPERTIES_TO_BE_SAVED);
PROPERTIES_TO_BE_SAVED.put(KafkaProcessorType.PUBLISH, PUBLISH_PROPERTIES_TO_BE_SAVED);
}
private final KafkaProcessorType processorType;
public KafkaProcessorDescriptor(final KafkaProcessorType processorType) {
this.processorType = processorType;
}
@Override
public Map<String, String> getProcessorProperties() {
return Collections.unmodifiableMap(PROPERTIES.get(processorType));
}
@Override
public Map<String, String> getPropertiesToBeSaved() {
return Collections.unmodifiableMap(PROPERTIES_TO_BE_SAVED.get(processorType));
}
@Override
public Map<String, String> getControllerServicesForTemplates() {
return Collections.unmodifiableMap(CONTROLLER_SERVICES);
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.descriptor;
public enum KafkaProcessorType {
PUBLISH("Publish"),
CONSUME("Consume"),
PUT("Put");
private final String processorType;
KafkaProcessorType(String processorType) {
this.processorType = processorType;
}
public String getProcessorType() {
return processorType;
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.descriptor;
import java.util.Map;
public interface ProcessorDescriptor {
Map<String, String> getProcessorProperties();
Map<String, String> getPropertiesToBeSaved();
Map<String, String> getControllerServicesForTemplates();
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.descriptor;
public interface PropertyXpathDescriptor {
String getXpathForProperties();
String getPropertyKeyTagName();
String getPropertyTagName();
String getXpathForTransactionProperty();
String getTransactionTagName();
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.descriptor;
import java.util.HashMap;
import java.util.Map;
public class TemplatePropertyXpathDescriptor implements PropertyXpathDescriptor {
private static final Map<String, String> CONSUME_TRANSACTION_PROPERTIES;
private static final Map<String, String> PUBLISH_TRANSACTION_PROPERTIES;
private static final Map<KafkaProcessorType, Map<String, String>> TRANSACTION_PROPERTIES;
static {
CONSUME_TRANSACTION_PROPERTIES = new HashMap<>();
CONSUME_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty", "entry[key=\"honor-transactions\"]/value");
CONSUME_TRANSACTION_PROPERTIES.put("transactionTagName", "honor-transactions");
PUBLISH_TRANSACTION_PROPERTIES = new HashMap<>();
PUBLISH_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty", "entry[key=\"use-transactions\"]/value");
PUBLISH_TRANSACTION_PROPERTIES.put("transactionTagName", "use-transactions");
TRANSACTION_PROPERTIES = new HashMap<>();
TRANSACTION_PROPERTIES.put(KafkaProcessorType.CONSUME, CONSUME_TRANSACTION_PROPERTIES);
TRANSACTION_PROPERTIES.put(KafkaProcessorType.PUBLISH, PUBLISH_TRANSACTION_PROPERTIES);
}
private final KafkaProcessorType processorType;
public TemplatePropertyXpathDescriptor(final KafkaProcessorType processorType) {
this.processorType = processorType;
}
@Override
public String getXpathForProperties() {
return "entry";
}
@Override
public String getPropertyKeyTagName() {
return "key";
}
@Override
public String getPropertyTagName() {
return "entry";
}
@Override
public String getXpathForTransactionProperty() {
return TRANSACTION_PROPERTIES.get(processorType).get("xpathForTransactionProperty");
}
@Override
public String getTransactionTagName() {
return TRANSACTION_PROPERTIES.get(processorType).get("transactionTagName");
}
}

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.migrator;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import java.util.HashMap;
import java.util.Map;
public abstract class AbstractKafkaMigrator implements Migrator {
static final XPath XPATH = XPathFactory.newInstance().newXPath();
private final static String NEW_KAFKA_PROCESSOR_VERSION = "_2_0";
private final static String ARTIFACT = "nifi-kafka-2-0-nar";
private final static String PATH_FOR_ARTIFACT = "bundle/artifact";
final boolean isVersion8Processor;
final boolean isKafkaBrokersPresent;
final Map<String, String> kafkaProcessorProperties;
final Map<String, String> propertiesToBeSaved;
final Map<String, String> controllerServices;
final String xpathForProperties;
final String propertyKeyTagName;
final String propertyTagName;
final String xpathForTransactionProperty;
final String transactionTagName;
final boolean transaction;
public AbstractKafkaMigrator(final MigratorConfiguration configuration) {
final String kafkaBrokers = configuration.getKafkaBrokers();
this.isKafkaBrokersPresent = !kafkaBrokers.isEmpty();
this.isVersion8Processor = configuration.isVersion8Processor();
this.kafkaProcessorProperties = new HashMap<>(configuration.getProcessorDescriptor().getProcessorProperties());
this.propertiesToBeSaved = configuration.getProcessorDescriptor().getPropertiesToBeSaved();
this.controllerServices = configuration.getProcessorDescriptor().getControllerServicesForTemplates();
this.xpathForProperties = configuration.getPropertyXpathDescriptor().getXpathForProperties();
this.propertyKeyTagName = configuration.getPropertyXpathDescriptor().getPropertyKeyTagName();
this.propertyTagName = configuration.getPropertyXpathDescriptor().getPropertyTagName();
this.xpathForTransactionProperty = configuration.getPropertyXpathDescriptor().getXpathForTransactionProperty();
this.transactionTagName = configuration.getPropertyXpathDescriptor().getTransactionTagName();
this.transaction = configuration.isTransaction();
if (isKafkaBrokersPresent) {
kafkaProcessorProperties.put("bootstrap.servers", kafkaBrokers);
}
}
@Override
public void configureProperties(final Node node) throws XPathExpressionException {
if (isVersion8Processor && isKafkaBrokersPresent) {
final NodeList properties = (NodeList) XPATH.evaluate(xpathForProperties, node, XPathConstants.NODESET);
for (int i = 0; i < properties.getLength(); i++) {
final Node property = properties.item(i);
saveRequiredProperties(property);
removeElement(node, property);
}
addNewProperties(node);
}
}
@Override
public void configureDescriptors(final Node node) throws XPathExpressionException {
if(isVersion8Processor && isKafkaBrokersPresent) {
final Element descriptorElement = (Element) XPATH.evaluate("config/descriptors", node, XPathConstants.NODE);
final NodeList descriptors = (NodeList) XPATH.evaluate("entry", descriptorElement, XPathConstants.NODESET);
for (int i = 0; i < descriptors.getLength(); i++) {
final Node descriptor = descriptors.item(i);
removeElement(descriptorElement, descriptor);
}
addNewDescriptors(descriptorElement);
}
}
@Override
public void configureComponentSpecificSteps(final Node node) throws XPathExpressionException {
final String transactionString = Boolean.toString(transaction);
final Element transactionsElement = (Element) XPATH.evaluate(xpathForTransactionProperty, node, XPathConstants.NODE);
if (transactionsElement != null) {
transactionsElement.setTextContent(transactionString);
} else {
addNewProperty(node, transactionTagName, transactionString);
}
kafkaProcessorProperties.put(transactionTagName, transactionString);
}
public void replaceClassName(final Element className) {
final String processorName = StringUtils.substringAfterLast(className.getTextContent(), ".");
final String newClassName = replaceClassNameWithNewProcessorName(className.getTextContent(), processorName);
className.setTextContent(newClassName);
}
public void replaceArtifact(final Node processor) throws XPathExpressionException {
((Element) XPATH.evaluate(PATH_FOR_ARTIFACT, processor, XPathConstants.NODE)).setTextContent(ARTIFACT);
}
private static String replaceClassNameWithNewProcessorName(final String className, final String processorName) {
final String newProcessorName = StringUtils.replaceEach(processorName, new String[]{"Get", "Put"}, new String[]{"pubsub.Consume", "pubsub.Publish"});
final String processorNameWithNewVersion =
newProcessorName.replaceFirst("$|_0_1\\d?", NEW_KAFKA_PROCESSOR_VERSION);
return StringUtils.replace(className, processorName, processorNameWithNewVersion);
}
private void addNewDescriptors(final Node node) {
for (String key: kafkaProcessorProperties.keySet()) {
final Element descriptorElement = node.getOwnerDocument().createElement("entry");
node.appendChild(descriptorElement);
final Element descriptorKeyElement = descriptorElement.getOwnerDocument().createElement("key");
descriptorKeyElement.setTextContent(key);
descriptorElement.appendChild(descriptorKeyElement);
final Element descriptorValueElement = descriptorElement.getOwnerDocument().createElement("value");
descriptorElement.appendChild(descriptorValueElement);
final Element descriptorNameElement = descriptorValueElement.getOwnerDocument().createElement("name");
descriptorNameElement.setTextContent(key);
descriptorValueElement.appendChild(descriptorNameElement);
if (controllerServices.containsKey(key)) {
final Element controllerServiceElement = descriptorValueElement.getOwnerDocument().createElement("identifiesControllerService");
controllerServiceElement.setTextContent(controllerServices.get(key));
descriptorValueElement.appendChild(controllerServiceElement);
}
}
}
private void saveRequiredProperties(final Node property) throws XPathExpressionException {
final String propertyToBeSaved = propertiesToBeSaved.get(XPATH.evaluate(propertyKeyTagName, property));
if (propertyToBeSaved != null) {
String propertyValue = XPATH.evaluate("value", property);
kafkaProcessorProperties.put(propertyToBeSaved, convert(propertyValue));
}
}
private String convert(final String propertyValue) {
return propertyValue.isEmpty() ? null : propertyValue;
}
private void addNewProperties(final Node node) {
for (Map.Entry<String, String> entry : kafkaProcessorProperties.entrySet()) {
addNewProperty(node, entry.getKey(), entry.getValue());
}
}
private void addNewProperty(final Node node, final String key, final String value) {
final Element propertyElement = node.getOwnerDocument().createElement(propertyTagName);
node.appendChild(propertyElement);
final Element propertyKeyElement = propertyElement.getOwnerDocument().createElement(propertyKeyTagName);
propertyKeyElement.setTextContent(key);
propertyElement.appendChild(propertyKeyElement);
if (value != null) {
final Element propertyValueElement = propertyElement.getOwnerDocument().createElement("value");
propertyValueElement.setTextContent(value);
propertyElement.appendChild(propertyValueElement);
}
}
private void removeElement(final Node node, final Node element) {
node.removeChild(element);
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.migrator;
import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import javax.xml.xpath.XPathExpressionException;
public class ConsumeKafkaFlowMigrator extends AbstractKafkaMigrator {
public ConsumeKafkaFlowMigrator(final MigratorConfiguration configuration) {
super(configuration);
}
@Override
public void migrate(final Element className, final Node processor) throws XPathExpressionException {
configureProperties(processor);
configureComponentSpecificSteps(processor);
replaceClassName(className);
replaceArtifact(processor);
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.migrator;
import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
public class ConsumeKafkaTemplateMigrator extends AbstractKafkaMigrator {
public ConsumeKafkaTemplateMigrator(final MigratorConfiguration configuration) {
super(configuration);
}
@Override
public void configureProperties(final Node node) throws XPathExpressionException {
final Element propertyElement = (Element) XPATH.evaluate("config/properties", node, XPathConstants.NODE);
super.configureProperties(propertyElement);
}
@Override
public void configureComponentSpecificSteps(final Node node) throws XPathExpressionException {
final Element propertyElement = (Element) XPATH.evaluate("config/properties", node, XPathConstants.NODE);
super.configureComponentSpecificSteps(propertyElement);
}
@Override
public void migrate(final Element className, final Node processor) throws XPathExpressionException {
configureProperties(processor);
configureComponentSpecificSteps(processor);
configureDescriptors(processor);
replaceClassName(className);
replaceArtifact(processor);
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.migrator;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import javax.xml.xpath.XPathExpressionException;
public interface Migrator {
void configureProperties(final Node node) throws XPathExpressionException;
void configureDescriptors(final Node node) throws XPathExpressionException;
void configureComponentSpecificSteps(final Node node) throws XPathExpressionException;
void migrate(final Element className, final Node node) throws XPathExpressionException;
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.migrator;
import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
public class PublishKafkaFlowMigrator extends AbstractKafkaMigrator {
public PublishKafkaFlowMigrator(final MigratorConfiguration configuration) {
super(configuration);
}
@Override
public void configureComponentSpecificSteps(final Node node) throws XPathExpressionException {
final Element deliveryGuaranteeValue = (Element) XPATH.evaluate("property[name=\"acks\"]/value", node, XPathConstants.NODE);
if (this.transaction && deliveryGuaranteeValue != null) {
deliveryGuaranteeValue.setTextContent("all");
}
super.configureComponentSpecificSteps(node);
}
@Override
public void migrate(final Element className, final Node processor) throws XPathExpressionException {
configureProperties(processor);
configureComponentSpecificSteps(processor);
replaceClassName(className);
replaceArtifact(processor);
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.migrator;
import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
public class PublishKafkaTemplateMigrator extends AbstractKafkaMigrator {
public PublishKafkaTemplateMigrator(final MigratorConfiguration configuration) {
super(configuration);
}
@Override
public void configureProperties(final Node node) throws XPathExpressionException {
final Element propertyElement = (Element) XPATH.evaluate("config/properties", node, XPathConstants.NODE);
super.configureProperties(propertyElement);
}
@Override
public void configureComponentSpecificSteps(final Node node) throws XPathExpressionException {
//add value if null
final Element propertyElement = (Element) XPATH.evaluate("config/properties", node, XPathConstants.NODE);
final Element deliveryGuaranteeValue = (Element) XPATH.evaluate("entry[key=\"acks\"]/value", propertyElement, XPathConstants.NODE);
if (this.transaction && deliveryGuaranteeValue != null) {
deliveryGuaranteeValue.setTextContent("all");
}
super.configureComponentSpecificSteps(propertyElement);
}
@Override
public void migrate(final Element className, final Node processor) throws XPathExpressionException {
configureProperties(processor);
configureComponentSpecificSteps(processor);
configureDescriptors(processor);
replaceClassName(className);
replaceArtifact(processor);
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.service;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaFlowMigrator;
import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaFlowMigrator;
import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.FlowPropertyXpathDescriptor;
public class KafkaFlowMigrationService implements KafkaMigrationService {
private static final String XPATH_FOR_PROCESSORS_IN_FLOW = ".//processor";
private static final String CLASS_TAG_NAME = "class";
public KafkaFlowMigrationService() {
}
@Override
public String getPathForProcessors() {
return XPATH_FOR_PROCESSORS_IN_FLOW;
}
@Override
public String getPathForClass() {
return CLASS_TAG_NAME;
}
@Override
public Migrator createPublishMigrator(final MigratorConfigurationBuilder configurationBuilder) {
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
.setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
.setPropertyXpathDescriptor(new FlowPropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
return new PublishKafkaFlowMigrator(configurationBuilder.build());
}
@Override
public Migrator createConsumeMigrator(final MigratorConfigurationBuilder configurationBuilder) {
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
.setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
.setPropertyXpathDescriptor(new FlowPropertyXpathDescriptor(KafkaProcessorType.CONSUME));
return new ConsumeKafkaFlowMigrator(configurationBuilder.build());
}
@Override
public Migrator createVersionEightPublishMigrator(final MigratorConfigurationBuilder configurationBuilder) {
configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
.setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
.setPropertyXpathDescriptor(new FlowPropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
return new PublishKafkaFlowMigrator(configurationBuilder.build());
}
@Override
public Migrator createVersionEightConsumeMigrator(final MigratorConfigurationBuilder configurationBuilder) {
configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
.setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
.setPropertyXpathDescriptor(new FlowPropertyXpathDescriptor(KafkaProcessorType.CONSUME));
return new ConsumeKafkaFlowMigrator(configurationBuilder.build());
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.service;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
public interface KafkaMigrationService {
String REGEX_FOR_REPLACEABLE_PROCESSOR_NAMES = "(Get|Put|Consume|Publish)Kafka(Record)?(_0_1\\d)?";
boolean IS_VERSION_EIGHT_PROCESSOR = Boolean.TRUE;
boolean IS_NOT_VERSION_EIGHT_PROCESSOR = Boolean.FALSE;
String getPathForProcessors();
String getPathForClass();
Migrator createPublishMigrator(final MigratorConfigurationBuilder configurationBuilder);
Migrator createConsumeMigrator(final MigratorConfigurationBuilder configurationBuilder);
Migrator createVersionEightPublishMigrator(final MigratorConfigurationBuilder configurationBuilder);
Migrator createVersionEightConsumeMigrator(final MigratorConfigurationBuilder configurationBuilder);
default void replaceKafkaProcessors(final Document document, final MigratorConfigurationBuilder configurationBuilder) throws XPathExpressionException {
Migrator migrator;
final XPath xPath = XPathFactory.newInstance().newXPath();
final NodeList processors = (NodeList) xPath.evaluate(getPathForProcessors(), document, XPathConstants.NODESET);
for (int i = 0; i < processors.getLength(); i++) {
final Node processor = processors.item(i);
final Element className = ((Element) xPath.evaluate(getPathForClass(), processor, XPathConstants.NODE));
final String processorName = StringUtils.substringAfterLast(className.getTextContent(), ".");
if (processorName.matches(REGEX_FOR_REPLACEABLE_PROCESSOR_NAMES)) {
if (processorName.contains(KafkaProcessorType.PUBLISH.getProcessorType())) {
migrator = createPublishMigrator(configurationBuilder);
} else if (processorName.contains(KafkaProcessorType.PUT.getProcessorType())) {
migrator = createVersionEightPublishMigrator(configurationBuilder);
} else if (processorName.contains(KafkaProcessorType.CONSUME.getProcessorType())) {
migrator = createConsumeMigrator(configurationBuilder);
} else {
migrator = createVersionEightConsumeMigrator(configurationBuilder);
}
migrator.migrate(className, processor);
}
}
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator.service;
import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaTemplateMigrator;
import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaTemplateMigrator;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.TemplatePropertyXpathDescriptor;
public class KafkaTemplateMigrationService implements KafkaMigrationService {
private static final String XPATH_FOR_PROCESSORS_IN_TEMPLATE = ".//processors";
private static final String TYPE_TAG_NAME = "type";
public KafkaTemplateMigrationService() {
}
@Override
public String getPathForProcessors() {
return XPATH_FOR_PROCESSORS_IN_TEMPLATE;
}
@Override
public String getPathForClass() {
return TYPE_TAG_NAME;
}
@Override
public Migrator createPublishMigrator(final MigratorConfigurationBuilder configurationBuilder) {
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
.setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
.setPropertyXpathDescriptor(new TemplatePropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
return new PublishKafkaTemplateMigrator(configurationBuilder.build());
}
@Override
public Migrator createConsumeMigrator(final MigratorConfigurationBuilder configurationBuilder) {
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
.setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
.setPropertyXpathDescriptor(new TemplatePropertyXpathDescriptor(KafkaProcessorType.CONSUME));
return new ConsumeKafkaTemplateMigrator(configurationBuilder.build());
}
@Override
public Migrator createVersionEightPublishMigrator(final MigratorConfigurationBuilder configurationBuilder) {
configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
.setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
.setPropertyXpathDescriptor(new TemplatePropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
return new PublishKafkaTemplateMigrator(configurationBuilder.build());
}
@Override
public Migrator createVersionEightConsumeMigrator(final MigratorConfigurationBuilder configurationBuilder) {
configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
.setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
.setPropertyXpathDescriptor(new TemplatePropertyXpathDescriptor(KafkaProcessorType.CONSUME));
return new ConsumeKafkaTemplateMigrator(configurationBuilder.build());
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator;
import org.apache.nifi.toolkit.kafkamigrator.service.KafkaFlowMigrationService;
import org.apache.nifi.toolkit.kafkamigrator.service.KafkaTemplateMigrationService;
import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
import org.junit.jupiter.api.Test;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
public class KafkaMigrationServiceTest {
private static final List<String> EXPECTED_CLASS_OR_TYPE_NAMES =
Arrays.asList("org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0",
"org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0",
"org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0");
private static final List<String> EXPECTED_ARTIFACTS =
Arrays.asList("nifi-kafka-2-0-nar", "nifi-kafka-2-0-nar", "nifi-kafka-2-0-nar");
private static final MigratorConfigurationBuilder CONFIGURATION_BUILDER =
new MigratorConfigurationBuilder().setKafkaBrokers("kafkaBrokers, localhost:1234")
.setTransaction(Boolean.FALSE);
private static final XPath XPATH = XPathFactory.newInstance().newXPath();
private static final String PATH_FOR_PROCESSORS_IN_FLOWS = ".//processor";
private static final String PATH_FOR_PROCESSORS_IN_TEMPLATES = ".//processors";
private static final String PATH_FOR_CLASS_ELEMENT = "class";
private static final String PATH_FOR_TYPE_ELEMENT = "type";
private static final String PATH_FOR_ARTIFACT_ELEMENT = "bundle/artifact";
@Test
public void testClassReplacement() throws XPathExpressionException, IOException {
final KafkaFlowMigrationService kafkaMigrationService = new KafkaFlowMigrationService();
final Document document = KafkaMigrationUtil.parseDocument();
final List<String> originalClassNames = createClassResultList(document);
kafkaMigrationService.replaceKafkaProcessors(document, CONFIGURATION_BUILDER);
final List<String> actualClassNames = createClassResultList(document);
assertSuccess(EXPECTED_CLASS_OR_TYPE_NAMES, actualClassNames, originalClassNames);
}
@Test
public void testTypeReplacement() throws XPathExpressionException, IOException {
final KafkaTemplateMigrationService kafkaMigrationService = new KafkaTemplateMigrationService();
final Document document = KafkaMigrationUtil.parseDocument();
final List<String> originalTypeNames = createTypeResultList(document);
kafkaMigrationService.replaceKafkaProcessors(document, CONFIGURATION_BUILDER);
final List<String> actualTypeNames = createTypeResultList(document);
assertSuccess(EXPECTED_CLASS_OR_TYPE_NAMES, actualTypeNames, originalTypeNames);
}
@Test
public void testArtifactReplacementInTemplate() throws XPathExpressionException, IOException {
final KafkaTemplateMigrationService kafkaMigrationService = new KafkaTemplateMigrationService();
final Document document = KafkaMigrationUtil.parseDocument();
final List<String> originalArtifacts = createArtifactResultListForTemplate(document);
kafkaMigrationService.replaceKafkaProcessors(document, CONFIGURATION_BUILDER);
final List<String> actualArtifacts = createArtifactResultListForTemplate(document);
assertSuccess(EXPECTED_ARTIFACTS, actualArtifacts, originalArtifacts);
}
@Test
public void testArtifactReplacementInFlow() throws XPathExpressionException, IOException {
final KafkaFlowMigrationService kafkaMigrationService = new KafkaFlowMigrationService();
final Document document = KafkaMigrationUtil.parseDocument();
final List<String> originalArtifacts = createArtifactResultListForFlow(document);
kafkaMigrationService.replaceKafkaProcessors(document, CONFIGURATION_BUILDER);
final List<String> actualArtifacts = createArtifactResultListForFlow(document);
assertSuccess(EXPECTED_ARTIFACTS, actualArtifacts, originalArtifacts);
}
private List<String> createClassResultList(final Document document) throws XPathExpressionException {
return createProcessorResultListForFlow(document, PATH_FOR_CLASS_ELEMENT);
}
private List<String> createArtifactResultListForFlow(final Document document) throws XPathExpressionException {
return createProcessorResultListForFlow(document, PATH_FOR_ARTIFACT_ELEMENT);
}
private List<String> createTypeResultList(final Document document) throws XPathExpressionException {
return createProcessorResultListForTemplate(document, PATH_FOR_TYPE_ELEMENT);
}
private List<String> createArtifactResultListForTemplate(final Document document) throws XPathExpressionException {
return createProcessorResultListForTemplate(document, PATH_FOR_ARTIFACT_ELEMENT);
}
private List<String> createProcessorResultListForFlow(final Document document, final String elementPath) throws XPathExpressionException {
return createProcessorResultList(document, PATH_FOR_PROCESSORS_IN_FLOWS, elementPath);
}
private List<String> createProcessorResultListForTemplate(final Document document, final String elementPath) throws XPathExpressionException {
return createProcessorResultList(document, PATH_FOR_PROCESSORS_IN_TEMPLATES, elementPath);
}
private List<String> createProcessorResultList(final Document document, final String processorPath, final String elementPath) throws XPathExpressionException {
final List<String> resultList = new ArrayList<>();
final NodeList processors = (NodeList) XPATH.evaluate(processorPath, document, XPathConstants.NODESET);
for (int i = 0; i < processors.getLength(); i++) {
resultList.add(XPATH.evaluate(elementPath, processors.item(i)));
}
return resultList;
}
private void assertSuccess(final List<String> expectedArtifacts, final List<String> actualArtifacts, final List<String> originalArtifacts) {
assertArrayEquals(expectedArtifacts.toArray(), actualArtifacts.toArray());
assertNoReplacementHappened(originalArtifacts, actualArtifacts);
assertReplacementHappened(originalArtifacts, actualArtifacts);
}
private void assertNoReplacementHappened(final List<String> originalArtifacts, final List<String> actualArtifacts) {
assertEquals(originalArtifacts.get(0), actualArtifacts.get(0));
}
private void assertReplacementHappened(final List<String> originalArtifacts, final List<String> actualArtifacts) {
assertNotEquals(originalArtifacts.get(1), actualArtifacts.get(1));
assertNotEquals(originalArtifacts.get(2), actualArtifacts.get(2));
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator;
import org.apache.nifi.xml.processing.parsers.DocumentProvider;
import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
import org.w3c.dom.Document;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
public class KafkaMigrationUtil {
public static Document parseDocument() throws IOException {
final DocumentProvider documentProvider = new StandardDocumentProvider();
return documentProvider.parse(Files.newInputStream(Paths.get("src/test/resources/flow.xml")));
}
}

View File

@ -0,0 +1,278 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.kafkamigrator;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.FlowPropertyXpathDescriptor;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.PropertyXpathDescriptor;
import org.apache.nifi.toolkit.kafkamigrator.descriptor.TemplatePropertyXpathDescriptor;
import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaFlowMigrator;
import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaTemplateMigrator;
import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaFlowMigrator;
import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaTemplateMigrator;
import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
import org.junit.jupiter.api.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class KafkaMigratorTest {
private static final XPath XPATH = XPathFactory.newInstance().newXPath();
private static final String XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW = ".//processor[class='org.apache.nifi.processors.kafka.PutKafka']";
private static final String XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE = ".//processors[type='org.apache.nifi.processors.kafka.PutKafka']";
private static final String XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW = ".//processor[class='org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10']";
private static final String XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE = ".//processors[type='org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10']";
private static final KafkaProcessorDescriptor PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR = new KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH);
private static final boolean WITH_TRANSACTION = Boolean.TRUE;
private static final boolean WITHOUT_TRANSACTION = Boolean.FALSE;
private static final boolean IS_VERSION_EIGHT_PROCESSOR = Boolean.TRUE;
private static final boolean IS_NOT_VERSION_EIGHT_PROCESSOR = Boolean.FALSE;
private static final String FLOW = "Flow";
private static final String TEMPLATE = "Template";
@Test
public void testPropertiesRemoved() throws XPathExpressionException, IOException {
final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, FLOW);
final PublishKafkaFlowMigrator flowMigrator = new PublishKafkaFlowMigrator(configuration);
final Document document = KafkaMigrationUtil.parseDocument();
final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, XPathConstants.NODE);
flowMigrator.configureProperties(processor);
assertPropertyRemoveSuccess(processor);
}
@Test
public void testPropertiesAdded() throws XPathExpressionException, IOException {
final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,KafkaProcessorType.PUBLISH, FLOW);
final PublishKafkaFlowMigrator flowMigrator = new PublishKafkaFlowMigrator(configuration);
final Document document = KafkaMigrationUtil.parseDocument();
final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, XPathConstants.NODE);
flowMigrator.configureProperties(processor);
assertPropertyAddSuccess(processor);
}
@Test
public void testPropertiesSaved() throws XPathExpressionException, IOException {
final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, FLOW);
final PublishKafkaFlowMigrator flowMigrator = new PublishKafkaFlowMigrator(configuration);
final Document document = KafkaMigrationUtil.parseDocument();
final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, XPathConstants.NODE);
final List<String> oldValues = getOldValues(processor);
flowMigrator.configureProperties(processor);
final List<String> newValues = getNewValues(processor);
assertEquals(oldValues, newValues);
}
@Test
public void testDescriptorsAdded() throws XPathExpressionException, IOException {
final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, TEMPLATE);
final PublishKafkaTemplateMigrator templateMigrator = new PublishKafkaTemplateMigrator(configuration);
final Document document = KafkaMigrationUtil.parseDocument();
final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document, XPathConstants.NODE);
templateMigrator.configureDescriptors(processor);
assertDescriptorAddSuccess(processor);
}
@Test
public void testDescriptorsRemoved() throws XPathExpressionException, IOException {
final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, TEMPLATE);
final PublishKafkaTemplateMigrator templateMigrator = new PublishKafkaTemplateMigrator(configuration);
final Document document = KafkaMigrationUtil.parseDocument();
final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document, XPathConstants.NODE);
templateMigrator.configureDescriptors(processor);
assertDescriptorRemoveSuccess(processor);
}
@Test
public void testTransactionFlowPropertyForConsumeProcessorWithTrue() throws XPathExpressionException, IOException {
final MigratorConfiguration configuration = getConfiguration(WITH_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.CONSUME, FLOW);
final ConsumeKafkaFlowMigrator flowMigrator = new ConsumeKafkaFlowMigrator(configuration);
final Document document = KafkaMigrationUtil.parseDocument();
final Node processor = (Node) XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW, document, XPathConstants.NODE);
flowMigrator.configureComponentSpecificSteps(processor);
assertEquals("true", XPATH.evaluate("property[name='honor-transactions']/value", processor));
}
@Test
public void testTransactionTemplatePropertyForConsumeProcessorWithTrue() throws XPathExpressionException, IOException {
final MigratorConfiguration configuration = getConfiguration(WITH_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.CONSUME, TEMPLATE);
final ConsumeKafkaTemplateMigrator templateMigrator = new ConsumeKafkaTemplateMigrator(configuration);
final Document document = KafkaMigrationUtil.parseDocument();
final Node processor = (Node) XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE, document, XPathConstants.NODE);
templateMigrator.configureComponentSpecificSteps(processor);
assertEquals("true", XPATH.evaluate("config/properties/entry[key='honor-transactions']/value", processor));
}
@Test
public void testTransactionFlowPropertyForConsumeProcessorWithFalse() throws XPathExpressionException, IOException {
final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.CONSUME, FLOW);
final ConsumeKafkaFlowMigrator flowMigrator = new ConsumeKafkaFlowMigrator(configuration);
final Document document = KafkaMigrationUtil.parseDocument();
final Node processor = (Node) XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW, document, XPathConstants.NODE);
flowMigrator.configureComponentSpecificSteps(processor);
assertEquals("false", XPATH.evaluate("property[name='honor-transactions']/value", processor));
}
@Test
public void testTransactionTemplatePropertyForConsumeProcessorWithFalse() throws XPathExpressionException, IOException {
final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.CONSUME, TEMPLATE);
final ConsumeKafkaTemplateMigrator templateMigrator = new ConsumeKafkaTemplateMigrator(configuration);
final Document document = KafkaMigrationUtil.parseDocument();
final Node processor = (Node) XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE, document, XPathConstants.NODE);
templateMigrator.configureComponentSpecificSteps(processor);
assertEquals("false", XPATH.evaluate("config/properties/entry[key='honor-transactions']/value", processor));
}
@Test
public void testTransactionFlowPropertyForPublishProcessorWithTrue() throws XPathExpressionException, IOException {
final MigratorConfiguration configuration = getConfiguration(WITH_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, FLOW);
final PublishKafkaFlowMigrator flowMigrator = new PublishKafkaFlowMigrator(configuration);
final Document document = KafkaMigrationUtil.parseDocument();
final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, XPathConstants.NODE);
flowMigrator.configureComponentSpecificSteps(processor);
assertEquals("true", XPATH.evaluate("property[name='use-transactions']/value", processor));
assertEquals("", XPATH.evaluate("property[name='acks']/value", processor));
}
@Test
public void testTransactionTemplatePropertyForPublishProcessorWithTrue() throws XPathExpressionException, IOException {
final MigratorConfiguration configuration = getConfiguration(WITH_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, TEMPLATE);
final PublishKafkaTemplateMigrator templateMigrator = new PublishKafkaTemplateMigrator(configuration);
final Document document = KafkaMigrationUtil.parseDocument();
final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document, XPathConstants.NODE);
templateMigrator.configureComponentSpecificSteps(processor);
assertEquals("true", XPATH.evaluate("config/properties/entry[key='use-transactions']/value", processor));
assertEquals("", XPATH.evaluate("config/properties/entry[key='acks']/value", processor));
}
@Test
public void testTransactionFlowPropertyForPublishProcessorWithFalse() throws XPathExpressionException, IOException {
final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, FLOW);
final PublishKafkaFlowMigrator flowMigrator = new PublishKafkaFlowMigrator(configuration);
final Document document = KafkaMigrationUtil.parseDocument();
final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, XPathConstants.NODE);
flowMigrator.configureComponentSpecificSteps(processor);
assertEquals("false", XPATH.evaluate("property[name='use-transactions']/value", processor));
}
@Test
public void testTransactionTemplatePropertyForPublishProcessorWithFalse() throws XPathExpressionException, IOException {
final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, TEMPLATE);
final PublishKafkaTemplateMigrator templateMigrator = new PublishKafkaTemplateMigrator(configuration);
final Document document = KafkaMigrationUtil.parseDocument();
final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document, XPathConstants.NODE);
templateMigrator.configureComponentSpecificSteps(processor);
assertEquals("false", XPATH.evaluate("config/properties/entry[key='use-transactions']/value", processor));
}
private List<String> getValues(final Collection<String> properties, final Node node) throws XPathExpressionException {
final List<String> result = new ArrayList<>();
for (String propertyName : properties) {
result.add(XPATH.evaluate(String.format("property[name='%s']/value", propertyName), node));
}
return result;
}
private List<String> getOldValues(final Node node) throws XPathExpressionException {
return getValues(PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getPropertiesToBeSaved().keySet(), node);
}
private List<String> getNewValues(final Node node) throws XPathExpressionException {
return getValues(PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getPropertiesToBeSaved().values(), node);
}
private void assertPropertyRemoveSuccess(final Node node) throws XPathExpressionException {
assertTrue(XPATH.evaluate("property[name='Known Brokers']", node).isEmpty());
}
private void assertDescriptorRemoveSuccess(final Node node) throws XPathExpressionException {
assertTrue(XPATH.evaluate("config/descriptors/entry[key='Known Brokers']", node).isEmpty());
}
private void assertAddSuccess(final String xpath, final Node node) throws XPathExpressionException {
for (String propertyName: PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getProcessorProperties().keySet()) {
assertFalse(XPATH.evaluate(String.format(xpath, propertyName), node).isEmpty());
}
}
private void assertPropertyAddSuccess(final Node node) throws XPathExpressionException {
assertAddSuccess("property[name='%s']/name", node);
}
private void assertDescriptorAddSuccess(final Node node) throws XPathExpressionException {
assertAddSuccess("config/descriptors/entry[key='%s']/key", node);
}
private MigratorConfiguration getConfiguration(final boolean transaction, final boolean isVersion8Processor,
final KafkaProcessorType processorType, final String migrationType) {
final MigratorConfigurationBuilder configurationBuilder = new MigratorConfigurationBuilder();
final PropertyXpathDescriptor propertyXpathDescriptor;
if (migrationType.equalsIgnoreCase("Flow")) {
propertyXpathDescriptor = new FlowPropertyXpathDescriptor(processorType);
} else {
propertyXpathDescriptor= new TemplatePropertyXpathDescriptor(processorType);
}
return configurationBuilder.setKafkaBrokers("kafkaBrokers, localhost:1234")
.setTransaction(transaction)
.setIsVersion8Processor(isVersion8Processor)
.setProcessorDescriptor(new KafkaProcessorDescriptor(processorType))
.setPropertyXpathDescriptor(propertyXpathDescriptor)
.build();
}
}

View File

@ -0,0 +1,136 @@
<rootGroup>
<processGroup>
<processor>
<class>org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0</class>
<bundle>
<group>org.apache.nifi</group>
<artifact>nifi-kafka-2-0-nar</artifact>
<version>1.13.2.2.1.2.0-283</version>
</bundle>
<property>
<name>bootstrap.servers</name>
<value>localhost:9092</value>
</property>
</processor>
<processor>
<class>org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10</class>
<bundle>
<group>org.apache.nifi</group>
<artifact>nifi-kafka-0-10-nar</artifact>
<version>1.13.2.2.1.2.0-283</version>
</bundle>
<property>
<name>bootstrap.servers</name>
<value>localhost:9092</value>
</property>
</processor>
<processor>
<class>org.apache.nifi.processors.kafka.PutKafka</class>
<bundle>
<group>org.apache.nifi</group>
<artifact>nifi-kafka-0-8-nar</artifact>
<version>1.13.2.2.1.2.0-283</version>
</bundle>
<property>
<name>Known Brokers</name>
</property>
<property>
<name>Topic Name</name>
<value>test-topic</value>
</property>
<property>
<name>Partition</name>
<value>test-partition</value>
</property>
<property>
<name>Kafka Key</name>
<value>kafka-key</value>
</property>
<property>
<name>Delivery Guarantee</name>
<value>1</value>
</property>
<property>
<name>Compression Codec</name>
<value>gzip</value>
</property>
</processor>
</processGroup>
<template encoding-version="1.3">
<snippet>
<processGroups>
<contents>
<processors>
<bundle>
<artifact>nifi-kafka-2-0-nar</artifact>
<group>org.apache.nifi</group>
<version>1.13.2.2.1.2.0-283</version>
</bundle>
<config>
<descriptors>
<entry>
<key>Known Brokers</key>
<value>
<name>Known Brokers</name>
</value>
</entry>
</descriptors>
<properties>
<entry>
<key>Known Brokers</key>
</entry>
</properties>
</config>
<type>org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0</type>
</processors>
<processors>
<bundle>
<artifact>nifi-kafka-0-10-nar</artifact>
<group>org.apache.nifi</group>
<version>1.13.2.2.1.2.0-283</version>
</bundle>
<config>
<descriptors>
<entry>
<key>Known Brokers</key>
<value>
<name>Known Brokers</name>
</value>
</entry>
</descriptors>
<properties>
<entry>
<key>Known Brokers</key>
</entry>
</properties>
</config>
<type>org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10</type>
</processors>
<processors>
<bundle>
<artifact>nifi-kafka-0-8-nar</artifact>
<group>org.apache.nifi</group>
<version>1.13.2.2.1.2.0-283</version>
</bundle>
<config>
<descriptors>
<entry>
<key>Known Brokers</key>
<value>
<name>Known Brokers</name>
</value>
</entry>
</descriptors>
<properties>
<entry>
<key>Known Brokers</key>
</entry>
</properties>
</config>
<type>org.apache.nifi.processors.kafka.PutKafka</type>
</processors>
</contents>
</processGroups>
</snippet>
</template>
</rootGroup>

View File

@ -33,6 +33,7 @@
<module>nifi-toolkit-flowanalyzer</module>
<module>nifi-toolkit-cli</module>
<module>nifi-toolkit-api</module>
<module>nifi-toolkit-kafka-migrator</module>
</modules>
<properties>
<toolkit.groovy.version>3.0.8</toolkit.groovy.version>