mirror of https://github.com/apache/nifi.git
NIFI-11539 Removed ZooKeeper Migrator Toolkit (#7237)
This commit is contained in:
parent
bec6ceda1f
commit
b9f4d02094
|
@ -31,7 +31,7 @@ import static org.mockito.Mockito.when;
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
public class WebUtilsTest {
|
public class WebUtilsTest {
|
||||||
|
|
||||||
@Mock
|
@Mock(strictness = Mock.Strictness.LENIENT)
|
||||||
private HttpServletRequest request;
|
private HttpServletRequest request;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -3046,22 +3046,6 @@ nifi.state.management.configuration.file=./conf/state-management.xml
|
||||||
nifi.state.management.provider.cluster=zk-provider
|
nifi.state.management.provider.cluster=zk-provider
|
||||||
--
|
--
|
||||||
|
|
||||||
[[zookeeper_migrator]]
|
|
||||||
=== ZooKeeper Migrator
|
|
||||||
You can use the `zk-migrator` tool to perform the following tasks:
|
|
||||||
|
|
||||||
* Moving ZooKeeper information from one ZooKeeper cluster to another
|
|
||||||
* Migrating ZooKeeper node ownership
|
|
||||||
|
|
||||||
For example, you may want to use the ZooKeeper Migrator when you are:
|
|
||||||
|
|
||||||
* Upgrading from NiFi 0.x to NiFi 1.x in which embedded ZooKeepers are used
|
|
||||||
* Migrating from an embedded ZooKeeper in NiFi 0.x or 1.x to an external ZooKeeper
|
|
||||||
* Upgrading from NiFi 0.x with an external ZooKeeper to NiFi 1.x with the same external ZooKeeper
|
|
||||||
* Migrating from an external ZooKeeper to an embedded ZooKeeper in NiFi 1.x
|
|
||||||
|
|
||||||
For more information, see the <<toolkit-guide.adoc#zookeeper_migrator,ZooKeeper Migrator>> section in the link:toolkit-guide.html[NiFi Toolkit Guide].
|
|
||||||
|
|
||||||
[[bootstrap_properties]]
|
[[bootstrap_properties]]
|
||||||
== Bootstrap Properties
|
== Bootstrap Properties
|
||||||
The _bootstrap.conf_ file in the `conf` directory allows users to configure settings for how NiFi should be started.
|
The _bootstrap.conf_ file in the `conf` directory allows users to configure settings for how NiFi should be started.
|
||||||
|
|
|
@ -25,9 +25,6 @@ The NiFi Toolkit contains several command line utilities to setup and support Ni
|
||||||
* CLI -- The `cli` tool enables administrators to interact with NiFi and NiFi Registry instances to automate tasks such as deploying versioned flows and managing process groups and cluster nodes.
|
* CLI -- The `cli` tool enables administrators to interact with NiFi and NiFi Registry instances to automate tasks such as deploying versioned flows and managing process groups and cluster nodes.
|
||||||
* Encrypt Config -- The `encrypt-config` tool encrypts the sensitive keys in the _nifi.properties_ file to facilitate the setup of a secure NiFi instance.
|
* Encrypt Config -- The `encrypt-config` tool encrypts the sensitive keys in the _nifi.properties_ file to facilitate the setup of a secure NiFi instance.
|
||||||
* TLS Toolkit -- The `tls-toolkit` utility generates the required keystores, truststore, and relevant configuration files to facilitate the setup of a secure NiFi instance.
|
* TLS Toolkit -- The `tls-toolkit` utility generates the required keystores, truststore, and relevant configuration files to facilitate the setup of a secure NiFi instance.
|
||||||
* ZooKeeper Migrator -- The `zk-migrator` tool enables administrators to:
|
|
||||||
** move ZooKeeper information from one ZooKeeper cluster to another
|
|
||||||
** migrate ZooKeeper node ownership
|
|
||||||
|
|
||||||
The utilities are executed with scripts found in the `bin` folder of your NiFi Toolkit installation.
|
The utilities are executed with scripts found in the `bin` folder of your NiFi Toolkit installation.
|
||||||
|
|
||||||
|
@ -1324,98 +1321,3 @@ $ ./bin/tls-toolkit.sh standalone -n 'node1.nifi.apache.org' \
|
||||||
* The following command will create the JKS keystore (`keystore.jks`). The `-destalias` flag is optional, as NiFi does not currently read from a specific alias in the keystore. The user will be prompted for a keystore password, which must be set and have minimum 8 characters, and a key password, which can be the same as the keystore password or different:
|
* The following command will create the JKS keystore (`keystore.jks`). The `-destalias` flag is optional, as NiFi does not currently read from a specific alias in the keystore. The user will be prompted for a keystore password, which must be set and have minimum 8 characters, and a key password, which can be the same as the keystore password or different:
|
||||||
** `keytool -importkeystore -srckeystore keystore.p12 -srcstoretype pkcs12 -destkeystore keystore.jks
|
** `keytool -importkeystore -srckeystore keystore.p12 -srcstoretype pkcs12 -destkeystore keystore.jks
|
||||||
-deststoretype jks -destalias nifi-key`
|
-deststoretype jks -destalias nifi-key`
|
||||||
|
|
||||||
[[zookeeper_migrator]]
|
|
||||||
== ZooKeeper Migrator
|
|
||||||
You can use the `zk-migrator` tool to perform the following tasks:
|
|
||||||
|
|
||||||
* Moving ZooKeeper information from one ZooKeeper cluster to another
|
|
||||||
* Migrating ZooKeeper node ownership
|
|
||||||
|
|
||||||
For example, you may want to use the ZooKeeper Migrator when you are:
|
|
||||||
|
|
||||||
* Upgrading from NiFi 0.x to NiFi 1.x in which embedded ZooKeepers are used
|
|
||||||
* Migrating from an embedded ZooKeeper in NiFi 0.x or 1.x to an external ZooKeeper
|
|
||||||
* Upgrading from NiFi 0.x with an external ZooKeeper to NiFi 1.x with the same external ZooKeeper
|
|
||||||
* Migrating from an external ZooKeeper to an embedded ZooKeeper in NiFi 1.x
|
|
||||||
|
|
||||||
=== Usage
|
|
||||||
The `zk-migrator` tool is invoked as `./bin/zk-migrator.sh` or `bin\zk-migrator.bat`.
|
|
||||||
|
|
||||||
To show help:
|
|
||||||
|
|
||||||
./bin/zk-migrator.sh -h
|
|
||||||
|
|
||||||
The following are available options:
|
|
||||||
|
|
||||||
* `-a`,`--auth <username:password>` Allows the specification of a username and password for authentication with ZooKeeper. This option is mutually exclusive with the `-k`,`--krb-conf` option.
|
|
||||||
* `-f`,`--file <filename>` The file used for ZooKeeper data serialized as JSON. When used with the `-r`,`--receive` option, data read from ZooKeeper will be stored in the given filename. When used with the `-s`,`--send` option, the data in the file will be sent to ZooKeeper.
|
|
||||||
* `-h`,`--help` Prints help, displays available parameters with descriptions
|
|
||||||
* `--ignore-source` Allows the ZooKeeper Migrator to write to the ZooKeeper and path from which the data was obtained.
|
|
||||||
* `-k`,`--krb-conf <jaas-filename>` Allows the specification of a JAAS configuration file to allow authentication with a ZooKeeper configured to use Kerberos. This option is mutually exclusive with the `-a`,`--auth` option.
|
|
||||||
* `-r`,`--receive` Receives data from ZooKeeper and writes to the given filename (if the `-f`,`--file` option is provided) or standard output. The data received will contain the full path to each node read from ZooKeeper. This option is mutually exclusive with the `-s`,`--send` option.
|
|
||||||
* `-s`,`--send` Sends data to ZooKeeper that is read from the given filename (if the `-f`,`--file` option is provided) or standard input. The paths for each node in the data being sent to ZooKeeper are absolute paths, and will be stored in ZooKeeper under the *path* portion of the `-z`,`--zookeeper` argument. Typically, the *path* portion of the argument can be omitted, which will store the nodes at their absolute paths. This option is mutually exclusive with the `-r`,`--receive` option.
|
|
||||||
* `--use-existing-acl` Allows the ZooKeeper Migrator to write ACL values retrieved from the source ZooKeeper server to destination server. Default action will apply Open rights for unsecured destinations or Creator Only rights for secured destinations.
|
|
||||||
* `-z`,`--zookeeper <zookeeper-endpoint>` The ZooKeeper server(s) to use, specified by a connect string, comprised of one or more comma-separated host:port pairs followed by a path, in the format of _host:port[,host2:port...,hostn:port]/znode/path_.
|
|
||||||
|
|
||||||
=== Migrating Between Source and Destination ZooKeepers
|
|
||||||
Before you begin, confirm that:
|
|
||||||
|
|
||||||
* You have installed the destination ZooKeeper cluster.
|
|
||||||
* You have installed and configured a NiFi cluster to use the destination ZooKeeper cluster.
|
|
||||||
* If you are migrating ZooKeepers due to upgrading NiFi from 0.x to 1.x,, you have already followed appropriate NiFi upgrade steps.
|
|
||||||
* You have configured Kerberos as needed.
|
|
||||||
* You have not started processing any dataflow (to avoid duplicate data processing).
|
|
||||||
* If one of the ZooKeeper clusters you are using is configured with Kerberos, you are running the ZooKeeper Migrator from a host that has access to NiFi’s ZooKeeper client jaas configuration file (see the <<administration-guide.adoc#zk_kerberos_client,Kerberizing NiFi's ZooKeeper Client>> section in the System Administrator's Guide for more information).
|
|
||||||
|
|
||||||
==== ZooKeeper Migration Steps
|
|
||||||
1. Collect the following information:
|
|
||||||
+
|
|
||||||
|====
|
|
||||||
|*Required Information*|*Description*
|
|
||||||
|Source ZooKeeper hostname (*sourceHostname*)|The hostname must be one of the hosts running in the ZooKeeper ensemble, which can be found in _<NiFi installation dir>/conf/zookeeper.properties_. Any of the hostnames declared in the `server.N` properties can be used.
|
|
||||||
|Destination ZooKeeper hostname (*destinationHostname*)|The hostname must be one of the hosts running in the ZooKeeper ensemble, which can be found in _<NiFi installation dir>/conf/zookeeper.properties_. Any of the hostnames declared in the `server.N` properties can be used.
|
|
||||||
|Source ZooKeeper port (*sourceClientPort*)|This can be found in _<NiFi installation dir>/conf/zookeeper.properties_. The port is specified in the `clientPort` property or at the end of the server string.
|
|
||||||
|Destination ZooKeeper port (*destinationClientPort*)|This can be found in _<NiFi installation dir>/conf/zookeeper.properties_. The port is specified in the `clientPort` property or at the end of the server string.
|
|
||||||
|Export data path|Determine the path that will store a json file containing the export of data from ZooKeeper. It must be readable and writable by the user running the zk-migrator tool.
|
|
||||||
|Source ZooKeeper Authentication Information|This information is in _<NiFi installation dir>/conf/state-management.xml_. For NiFi 0.x, if Creator Only is specified in _state-management.xml_, you need to supply authentication information using the `-a,--auth` argument with the values from the Username and Password properties in _state-management.xml_. For NiFi 1.x, supply authentication information using the `-k,--krb-conf` argument.
|
|
||||||
|
|
||||||
If the _state-management.xml_ specifies Open, no authentication is required.
|
|
||||||
|Destination ZooKeeper Authentication Information|This information is in _<NiFi installation dir>/conf/state-management.xml_. For NiFi 0.x, if Creator Only is specified in _state-management.xml_, you need to supply authentication information using the `-a,--auth` argument with the values from the Username and Password properties in state-management.xml. For NiFi 1.x, supply authentication information using the `-k,--krb-conf` argument.
|
|
||||||
|
|
||||||
If the _state-management.xml_ specifies Open, no authentication is required.
|
|
||||||
|Root path to which NiFi writes data in Source ZooKeeper (*sourceRootPath*)|This information can be found in `<NiFi installation dir>/conf/state-management.xml` under the Root Node property in the cluster-provider element. (default: `/nifi`)
|
|
||||||
|Root path to which NiFi writes data in Destination ZooKeeper (*destinationRootPath*)|This information can be found in _<NiFi installation dir>/conf/state-management.xml_ under the Root Node property in the cluster-provider element.
|
|
||||||
|====
|
|
||||||
NOTE: As of NiFi 1.10.x, because of an upgrade to ZooKeeper 3.5.x, the migrator may have a permission error `"NoAuthException: KeeperErrorCode = NoAuth for /zookeeper/config"` when attempting to use `/` as the destination root path. This is because the `/zookeeper/config` znode has read-only permissions. Instead use a destination path of `/nifi/components` or similar.
|
|
||||||
|
|
||||||
2. Stop all processors in the NiFi flow. If you are migrating between two NiFi installations, the flows on both must be stopped.
|
|
||||||
3. Export the NiFi component data from the source ZooKeeper. The following command reads from the specified ZooKeeper running on the given hostname:port, using the provided path to the data, and authenticates with ZooKeeper using the given username and password. The data read from ZooKeeper is written to the file provided.
|
|
||||||
|
|
||||||
* For NiFi 0.x
|
|
||||||
** For an open ZooKeeper:
|
|
||||||
*** `zk-migrator.sh -r -z sourceHostname:sourceClientPort/sourceRootPath/components -f /path/to/export/zk-source-data.json`
|
|
||||||
** For a ZooKeeper using username:password for authentication:
|
|
||||||
*** `zk-migrator.sh -r -z sourceHostname:sourceClientPort/sourceRootPath/components -a <username:password> -f /path/to/export/zk-source-data.json`
|
|
||||||
|
|
||||||
* For NiFi 1.x
|
|
||||||
** For an open ZooKeeper:
|
|
||||||
*** `zk-migrator.sh -r -z sourceHostname:sourceClientPort/sourceRootPath/components -f /path/to/export/zk-source-data.json`
|
|
||||||
** For a ZooKeeper using Kerberos for authentication:
|
|
||||||
*** `zk-migrator.sh -r -z sourceHostname:sourceClientPort/sourceRootPath/components -k /path/to/jaasconfig/jaas-config.conf -f /path/to/export/zk-source-data.json`
|
|
||||||
|
|
||||||
4. (Optional) If you have used the new NiFi installation to do any processing, you can also export its ZooKeeper data as a backup prior to performing the migration.
|
|
||||||
|
|
||||||
* For an open ZooKeeper:
|
|
||||||
** `zk-migrator.sh -r -z destinationHostname:destinationClientPort/destinationRootPath/components -f /path/to/export/zk-destination-backup-data.json`
|
|
||||||
* For a ZooKeeper using Kerberos for authentication:
|
|
||||||
** `zk-migrator.sh -r -z destinationHostname:destinationClientPort/destinationRootPath/components -k /path/to/jaasconfig/jaas-config.conf -f /path/to/export/zk-destination-backup-data.json`
|
|
||||||
|
|
||||||
5. Migrate the ZooKeeper data to the destination ZooKeeper. If the source and destination ZooKeepers are the same, the `--ignore-source` option can be added to the following examples.
|
|
||||||
|
|
||||||
* For an open ZooKeeper:
|
|
||||||
** `zk-migrator.sh -s -z destinationHostname:destinationClientPort/destinationRootPath/components -f /path/to/export/zk-source-data.json`
|
|
||||||
* For a ZooKeeper using Kerberos for authentication:
|
|
||||||
** `zk-migrator.sh -s -z destinationHostname:destinationClientPort/destinationRootPath/components -k /path/to/jaasconfig/jaas-config.conf -f /path/to/export/zk-source-data.json`
|
|
||||||
|
|
||||||
6. Once the migration has completed successfully, start the processors in the NiFi flow. Processing should continue from the point at which it was stopped when the NiFi flow was stopped.
|
|
|
@ -13,15 +13,6 @@ echo "Testing return values on invalid input for all commands:"
|
||||||
docker run --rm $IMAGE encrypt-config invalid 1>/dev/null 2>&1
|
docker run --rm $IMAGE encrypt-config invalid 1>/dev/null 2>&1
|
||||||
test 2 -eq $? || exit 1
|
test 2 -eq $? || exit 1
|
||||||
|
|
||||||
docker run --rm $IMAGE s2s invalid 1>/dev/null 2>&1
|
|
||||||
test 0 -eq $? || exit 1
|
|
||||||
|
|
||||||
docker run --rm $IMAGE zk-migrator invalid 1>/dev/null 2>&1
|
|
||||||
test 0 -eq $? || exit 1
|
|
||||||
|
|
||||||
docker run --rm $IMAGE node-manager invalid 1>/dev/null 2>&1
|
|
||||||
test 1 -eq $? || exit 1
|
|
||||||
|
|
||||||
docker run --rm $IMAGE cli invalid 1>/dev/null 2>&1
|
docker run --rm $IMAGE cli invalid 1>/dev/null 2>&1
|
||||||
test 255 -eq $? || exit 1
|
test 255 -eq $? || exit 1
|
||||||
|
|
||||||
|
|
|
@ -73,11 +73,6 @@ language governing permissions and limitations under the License. -->
|
||||||
<artifactId>nifi-toolkit-encrypt-config</artifactId>
|
<artifactId>nifi-toolkit-encrypt-config</artifactId>
|
||||||
<version>2.0.0-SNAPSHOT</version>
|
<version>2.0.0-SNAPSHOT</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.nifi</groupId>
|
|
||||||
<artifactId>nifi-toolkit-zookeeper-migrator</artifactId>
|
|
||||||
<version>2.0.0-SNAPSHOT</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-toolkit-cli</artifactId>
|
<artifactId>nifi-toolkit-cli</artifactId>
|
||||||
|
|
|
@ -1,41 +0,0 @@
|
||||||
@echo off
|
|
||||||
rem
|
|
||||||
rem Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
rem contributor license agreements. See the NOTICE file distributed with
|
|
||||||
rem this work for additional information regarding copyright ownership.
|
|
||||||
rem The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
rem (the "License"); you may not use this file except in compliance with
|
|
||||||
rem the License. You may obtain a copy of the License at
|
|
||||||
rem
|
|
||||||
rem http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
rem
|
|
||||||
rem Unless required by applicable law or agreed to in writing, software
|
|
||||||
rem distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
rem See the License for the specific language governing permissions and
|
|
||||||
rem limitations under the License.
|
|
||||||
rem
|
|
||||||
|
|
||||||
rem Use JAVA_HOME if it's set; otherwise, just use java
|
|
||||||
|
|
||||||
if "%JAVA_HOME%" == "" goto noJavaHome
|
|
||||||
if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome
|
|
||||||
set JAVA_EXE=%JAVA_HOME%\bin\java.exe
|
|
||||||
goto startConfig
|
|
||||||
|
|
||||||
:noJavaHome
|
|
||||||
echo The JAVA_HOME environment variable is not defined correctly.
|
|
||||||
echo Instead the PATH will be used to find the java executable.
|
|
||||||
echo.
|
|
||||||
set JAVA_EXE=java
|
|
||||||
goto startConfig
|
|
||||||
|
|
||||||
:startConfig
|
|
||||||
set LIB_DIR=%~dp0..\classpath;%~dp0..\lib
|
|
||||||
|
|
||||||
if "%JAVA_OPTS%" == "" set JAVA_OPTS=-Xms12m -Xmx24m
|
|
||||||
|
|
||||||
SET JAVA_PARAMS=-cp %LIB_DIR%\* %JAVA_OPTS% org.apache.nifi.toolkit.zkmigrator.ZooKeeperMigratorMain
|
|
||||||
|
|
||||||
cmd.exe /C ""%JAVA_EXE%" %JAVA_PARAMS% %* ""
|
|
||||||
|
|
|
@ -1,119 +0,0 @@
|
||||||
#!/bin/sh
|
|
||||||
#
|
|
||||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
# contributor license agreements. See the NOTICE file distributed with
|
|
||||||
# this work for additional information regarding copyright ownership.
|
|
||||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
# (the "License"); you may not use this file except in compliance with
|
|
||||||
# the License. You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
#
|
|
||||||
#
|
|
||||||
|
|
||||||
# Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches
|
|
||||||
|
|
||||||
SCRIPT_DIR=$(dirname "$0")
|
|
||||||
SCRIPT_NAME=$(basename "$0")
|
|
||||||
NIFI_TOOLKIT_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd)
|
|
||||||
PROGNAME=$(basename "$0")
|
|
||||||
|
|
||||||
|
|
||||||
warn() {
|
|
||||||
(>&2 echo "${PROGNAME}: $*")
|
|
||||||
}
|
|
||||||
|
|
||||||
die() {
|
|
||||||
warn "$*"
|
|
||||||
exit 1
|
|
||||||
}
|
|
||||||
|
|
||||||
detectOS() {
|
|
||||||
# OS specific support (must be 'true' or 'false').
|
|
||||||
cygwin=false;
|
|
||||||
aix=false;
|
|
||||||
os400=false;
|
|
||||||
darwin=false;
|
|
||||||
case "$(uname)" in
|
|
||||||
CYGWIN*)
|
|
||||||
cygwin=true
|
|
||||||
;;
|
|
||||||
AIX*)
|
|
||||||
aix=true
|
|
||||||
;;
|
|
||||||
OS400*)
|
|
||||||
os400=true
|
|
||||||
;;
|
|
||||||
Darwin)
|
|
||||||
darwin=true
|
|
||||||
;;
|
|
||||||
esac
|
|
||||||
# For AIX, set an environment variable
|
|
||||||
if ${aix}; then
|
|
||||||
export LDR_CNTRL=MAXDATA=0xB0000000@DSA
|
|
||||||
echo ${LDR_CNTRL}
|
|
||||||
fi
|
|
||||||
}
|
|
||||||
|
|
||||||
locateJava() {
|
|
||||||
# Setup the Java Virtual Machine
|
|
||||||
if $cygwin ; then
|
|
||||||
[ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}")
|
|
||||||
[ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}")
|
|
||||||
fi
|
|
||||||
|
|
||||||
if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then
|
|
||||||
JAVA_HOME=$(java-config --jre-home)
|
|
||||||
fi
|
|
||||||
if [ "x${JAVA}" = "x" ]; then
|
|
||||||
if [ "x${JAVA_HOME}" != "x" ]; then
|
|
||||||
if [ ! -d "${JAVA_HOME}" ]; then
|
|
||||||
die "JAVA_HOME is not valid: ${JAVA_HOME}"
|
|
||||||
fi
|
|
||||||
JAVA="${JAVA_HOME}/bin/java"
|
|
||||||
else
|
|
||||||
warn "JAVA_HOME not set; results may vary"
|
|
||||||
JAVA=$(type java)
|
|
||||||
JAVA=$(expr "${JAVA}" : '.* \(/.*\)$')
|
|
||||||
if [ "x${JAVA}" = "x" ]; then
|
|
||||||
die "java command not found"
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
}
|
|
||||||
|
|
||||||
init() {
|
|
||||||
# Determine if there is special OS handling we must perform
|
|
||||||
detectOS
|
|
||||||
|
|
||||||
# Locate the Java VM to execute
|
|
||||||
locateJava "$1"
|
|
||||||
}
|
|
||||||
|
|
||||||
run() {
|
|
||||||
LIBS="${NIFI_TOOLKIT_HOME}/lib/*"
|
|
||||||
|
|
||||||
sudo_cmd_prefix=""
|
|
||||||
if $cygwin; then
|
|
||||||
NIFI_TOOLKIT_HOME=$(cygpath --path --windows "${NIFI_TOOLKIT_HOME}")
|
|
||||||
CLASSPATH="$NIFI_TOOLKIT_HOME/classpath;$(cygpath --path --windows "${LIBS}")"
|
|
||||||
else
|
|
||||||
CLASSPATH="$NIFI_TOOLKIT_HOME/classpath:${LIBS}"
|
|
||||||
fi
|
|
||||||
|
|
||||||
export JAVA_HOME="$JAVA_HOME"
|
|
||||||
export NIFI_TOOLKIT_HOME="$NIFI_TOOLKIT_HOME"
|
|
||||||
|
|
||||||
umask 0077
|
|
||||||
exec "${JAVA}" -cp "${CLASSPATH}" ${JAVA_OPTS:--Xms12m -Xmx24m} org.apache.nifi.toolkit.zkmigrator.ZooKeeperMigratorMain "$@"
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
init "$1"
|
|
||||||
run "$@"
|
|
|
@ -1,143 +0,0 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<!--
|
|
||||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
contributor license agreements. See the NOTICE file distributed with
|
|
||||||
this work for additional information regarding copyright ownership.
|
|
||||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
(the "License"); you may not use this file except in compliance with
|
|
||||||
the License. You may obtain a copy of the License at
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
-->
|
|
||||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
|
||||||
|
|
||||||
<artifactId>nifi-toolkit-zookeeper-migrator</artifactId>
|
|
||||||
|
|
||||||
<parent>
|
|
||||||
<groupId>org.apache.nifi</groupId>
|
|
||||||
<artifactId>nifi-toolkit</artifactId>
|
|
||||||
<version>2.0.0-SNAPSHOT</version>
|
|
||||||
</parent>
|
|
||||||
<properties>
|
|
||||||
<curator.version>5.5.0</curator.version>
|
|
||||||
</properties>
|
|
||||||
<dependencies>
|
|
||||||
<dependency>
|
|
||||||
<groupId>commons-cli</groupId>
|
|
||||||
<artifactId>commons-cli</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.google.guava</groupId>
|
|
||||||
<artifactId>guava</artifactId>
|
|
||||||
<version>31.1-jre</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.slf4j</groupId>
|
|
||||||
<artifactId>slf4j-api</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.zookeeper</groupId>
|
|
||||||
<artifactId>zookeeper</artifactId>
|
|
||||||
<version>${zookeeper.version}</version>
|
|
||||||
<exclusions>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>log4j</groupId>
|
|
||||||
<artifactId>log4j</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>org.slf4j</groupId>
|
|
||||||
<artifactId>slf4j-api</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>org.slf4j</groupId>
|
|
||||||
<artifactId>slf4j-log4j12</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
</exclusions>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.google.code.gson</groupId>
|
|
||||||
<artifactId>gson</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.codehaus.groovy</groupId>
|
|
||||||
<artifactId>groovy-all</artifactId>
|
|
||||||
<version>${toolkit.groovy.version}</version>
|
|
||||||
<type>pom</type>
|
|
||||||
<scope>test</scope>
|
|
||||||
<exclusions>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>org.codehaus.groovy</groupId>
|
|
||||||
<artifactId>groovy-groovysh</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
</exclusions>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.spockframework</groupId>
|
|
||||||
<artifactId>spock-core</artifactId>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.curator</groupId>
|
|
||||||
<artifactId>curator-test</artifactId>
|
|
||||||
<version>${curator.version}</version>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.curator</groupId>
|
|
||||||
<artifactId>curator-client</artifactId>
|
|
||||||
<version>${curator.version}</version>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
|
||||||
|
|
||||||
<build>
|
|
||||||
<plugins>
|
|
||||||
<plugin>
|
|
||||||
<groupId>org.apache.rat</groupId>
|
|
||||||
<artifactId>apache-rat-plugin</artifactId>
|
|
||||||
<configuration>
|
|
||||||
<excludes combine.children="append">
|
|
||||||
<exclude>src/test/resources/test-data.json</exclude>
|
|
||||||
<exclude>src/test/resources/test-data-user-pass.json</exclude>
|
|
||||||
</excludes>
|
|
||||||
</configuration>
|
|
||||||
</plugin>
|
|
||||||
<plugin>
|
|
||||||
<groupId>org.codehaus.mojo</groupId>
|
|
||||||
<artifactId>build-helper-maven-plugin</artifactId>
|
|
||||||
<version>1.5</version>
|
|
||||||
<executions>
|
|
||||||
<execution>
|
|
||||||
<id>add-source</id>
|
|
||||||
<phase>generate-sources</phase>
|
|
||||||
<goals>
|
|
||||||
<goal>add-source</goal>
|
|
||||||
</goals>
|
|
||||||
<configuration>
|
|
||||||
<sources>
|
|
||||||
<source>src/main/groovy</source>
|
|
||||||
</sources>
|
|
||||||
</configuration>
|
|
||||||
</execution>
|
|
||||||
<execution>
|
|
||||||
<id>add-test-source</id>
|
|
||||||
<phase>generate-test-sources</phase>
|
|
||||||
<goals>
|
|
||||||
<goal>add-test-source</goal>
|
|
||||||
</goals>
|
|
||||||
<configuration>
|
|
||||||
<sources>
|
|
||||||
<source>src/test/groovy</source>
|
|
||||||
</sources>
|
|
||||||
</configuration>
|
|
||||||
</execution>
|
|
||||||
</executions>
|
|
||||||
</plugin>
|
|
||||||
</plugins>
|
|
||||||
</build>
|
|
||||||
</project>
|
|
|
@ -1,85 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.toolkit.zkmigrator;
|
|
||||||
|
|
||||||
import com.google.common.base.MoreObjects;
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import org.apache.zookeeper.data.ACL;
|
|
||||||
import org.apache.zookeeper.data.Stat;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
class DataStatAclNode {
|
|
||||||
|
|
||||||
private final String path;
|
|
||||||
private final byte[] data;
|
|
||||||
private final Stat stat;
|
|
||||||
private final List<ACL> acls;
|
|
||||||
private final long ephemeralOwner;
|
|
||||||
|
|
||||||
DataStatAclNode(String path, byte[] data, Stat stat, List<ACL> acls, long ephemeralOwner) {
|
|
||||||
this.path = Preconditions.checkNotNull(path, "path can not be null");
|
|
||||||
this.data = data;
|
|
||||||
this.stat = Preconditions.checkNotNull(stat, "stat can not be null");
|
|
||||||
this.acls = acls == null ? ImmutableList.of() : ImmutableList.copyOf(acls);
|
|
||||||
this.ephemeralOwner = ephemeralOwner;
|
|
||||||
}
|
|
||||||
|
|
||||||
String getPath() {
|
|
||||||
return path;
|
|
||||||
}
|
|
||||||
|
|
||||||
byte[] getData() {
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
|
|
||||||
Stat getStat() {
|
|
||||||
return stat;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<ACL> getAcls() {
|
|
||||||
return acls;
|
|
||||||
}
|
|
||||||
|
|
||||||
long getEphemeralOwner() {
|
|
||||||
return ephemeralOwner;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
if (this == o) return true;
|
|
||||||
if (o == null || getClass() != o.getClass()) return false;
|
|
||||||
DataStatAclNode that = (DataStatAclNode) o;
|
|
||||||
return Objects.equals(path, that.path);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return Objects.hash(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return MoreObjects.toStringHelper(this)
|
|
||||||
.add("path", path)
|
|
||||||
.add("acls", acls)
|
|
||||||
.add("ephemeralOwner", ephemeralOwner)
|
|
||||||
.toString();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,82 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.toolkit.zkmigrator;
|
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
|
||||||
import com.google.common.base.MoreObjects;
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.base.Splitter;
|
|
||||||
import com.google.common.base.Strings;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
class ZooKeeperEndpointConfig {
|
|
||||||
private final String connectString;
|
|
||||||
private final List<String> servers;
|
|
||||||
private final String path;
|
|
||||||
|
|
||||||
ZooKeeperEndpointConfig(String connectString) {
|
|
||||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(connectString), "connectString can not be null or empty");
|
|
||||||
this.connectString = connectString;
|
|
||||||
|
|
||||||
final String[] connectStringPath = connectString.split("/", 2);
|
|
||||||
this.servers = Lists.newArrayList(connectStringPath[0].split(","));
|
|
||||||
if (connectStringPath.length == 2) {
|
|
||||||
this.path = '/' + Joiner.on('/').join(Splitter.on('/').omitEmptyStrings().trimResults().split(connectStringPath[1]));
|
|
||||||
} else {
|
|
||||||
path = "/";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getConnectString() {
|
|
||||||
return connectString;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List getServers() {
|
|
||||||
return servers;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getPath() {
|
|
||||||
return path;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
if (this == o) return true;
|
|
||||||
if (o == null || getClass() != o.getClass()) return false;
|
|
||||||
ZooKeeperEndpointConfig that = (ZooKeeperEndpointConfig) o;
|
|
||||||
return Objects.equals(connectString, that.connectString)
|
|
||||||
&& Objects.equals(servers, that.servers)
|
|
||||||
&& Objects.equals(path, that.path);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return Objects.hash(connectString, servers, path);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return MoreObjects.toStringHelper(this)
|
|
||||||
.add("connectString", connectString)
|
|
||||||
.add("servers", servers)
|
|
||||||
.add("path", path)
|
|
||||||
.toString();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,352 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.toolkit.zkmigrator;
|
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.base.Splitter;
|
|
||||||
import com.google.common.base.Strings;
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
import com.google.gson.GsonBuilder;
|
|
||||||
import com.google.gson.JsonParser;
|
|
||||||
import com.google.gson.stream.JsonReader;
|
|
||||||
import com.google.gson.stream.JsonWriter;
|
|
||||||
|
|
||||||
import org.apache.zookeeper.CreateMode;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
import org.apache.zookeeper.Watcher;
|
|
||||||
import org.apache.zookeeper.ZooDefs;
|
|
||||||
import org.apache.zookeeper.ZooKeeper;
|
|
||||||
import org.apache.zookeeper.data.ACL;
|
|
||||||
import org.apache.zookeeper.data.Stat;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.BufferedWriter;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.io.OutputStreamWriter;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Spliterators;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.concurrent.CompletionStage;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.function.BiFunction;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
import java.util.function.Function;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
import java.util.stream.StreamSupport;
|
|
||||||
|
|
||||||
class ZooKeeperMigrator {
|
|
||||||
|
|
||||||
enum AuthMode {OPEN, DIGEST, SASL}
|
|
||||||
|
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperMigrator.class);
|
|
||||||
private static final String SCHEME_DIGEST = AuthMode.DIGEST.name().toLowerCase();
|
|
||||||
|
|
||||||
private final ZooKeeperEndpointConfig zooKeeperEndpointConfig;
|
|
||||||
|
|
||||||
ZooKeeperMigrator(String zooKeeperConnectString) {
|
|
||||||
LOGGER.debug("ZooKeeper connect string parameter: {}", zooKeeperConnectString);
|
|
||||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(zooKeeperConnectString), "ZooKeeper connect string must not be null");
|
|
||||||
this.zooKeeperEndpointConfig = new ZooKeeperEndpointConfig(zooKeeperConnectString);
|
|
||||||
}
|
|
||||||
|
|
||||||
void readZooKeeper(OutputStream zkData, AuthMode authMode, byte[] authData) throws IOException, KeeperException, InterruptedException, ExecutionException {
|
|
||||||
ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig.getConnectString(), authMode, authData);
|
|
||||||
JsonWriter jsonWriter = new JsonWriter(new BufferedWriter(new OutputStreamWriter(zkData)));
|
|
||||||
jsonWriter.setIndent(" ");
|
|
||||||
JsonParser jsonParser = new JsonParser();
|
|
||||||
Gson gson = new GsonBuilder().create();
|
|
||||||
|
|
||||||
jsonWriter.beginArray();
|
|
||||||
|
|
||||||
// persist source ZooKeeperEndpointConfig
|
|
||||||
gson.toJson(jsonParser.parse(gson.toJson(zooKeeperEndpointConfig)).getAsJsonObject(), jsonWriter);
|
|
||||||
|
|
||||||
LOGGER.info("Retrieving data from source ZooKeeper: {}", zooKeeperEndpointConfig);
|
|
||||||
final List<CompletableFuture<Void>> readFutures = streamPaths(getNode(zooKeeper, "/"))
|
|
||||||
.parallel()
|
|
||||||
.map(node ->
|
|
||||||
CompletableFuture.supplyAsync(() -> {
|
|
||||||
final DataStatAclNode dataStatAclNode = retrieveNode(zooKeeper, node);
|
|
||||||
LOGGER.debug("retrieved node {} from {}", dataStatAclNode, zooKeeperEndpointConfig);
|
|
||||||
return dataStatAclNode;
|
|
||||||
}).thenAccept(dataStatAclNode -> {
|
|
||||||
// persist each zookeeper node
|
|
||||||
synchronized (jsonWriter) {
|
|
||||||
gson.toJson(jsonParser.parse(gson.toJson(dataStatAclNode)).getAsJsonObject(), jsonWriter);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
).collect(Collectors.toList());
|
|
||||||
|
|
||||||
CompletableFuture<Void> allReadsFuture = CompletableFuture.allOf(readFutures.toArray(new CompletableFuture[readFutures.size()]));
|
|
||||||
final CompletableFuture<List<Void>> finishedReads = allReadsFuture
|
|
||||||
.thenApply(v -> readFutures.stream()
|
|
||||||
.map(CompletableFuture::join)
|
|
||||||
.collect(Collectors.toList()));
|
|
||||||
final List<Void> readsDone = finishedReads.get();
|
|
||||||
jsonWriter.endArray();
|
|
||||||
jsonWriter.close();
|
|
||||||
if (LOGGER.isInfoEnabled()) {
|
|
||||||
final int readCount = readsDone.size();
|
|
||||||
LOGGER.info("{} {} read from {}", readCount, readCount == 1 ? "node" : "nodes", zooKeeperEndpointConfig);
|
|
||||||
}
|
|
||||||
closeZooKeeper(zooKeeper);
|
|
||||||
}
|
|
||||||
|
|
||||||
void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, boolean ignoreSource, boolean useExistingACL) throws IOException, ExecutionException, InterruptedException {
|
|
||||||
// ensure that the chroot path exists
|
|
||||||
ZooKeeper zooKeeperRoot = getZooKeeper(Joiner.on(',').join(zooKeeperEndpointConfig.getServers()), authMode, authData);
|
|
||||||
ensureNodeExists(zooKeeperRoot, zooKeeperEndpointConfig.getPath(), CreateMode.PERSISTENT);
|
|
||||||
closeZooKeeper(zooKeeperRoot);
|
|
||||||
|
|
||||||
ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig.getConnectString(), authMode, authData);
|
|
||||||
JsonReader jsonReader = new JsonReader(new BufferedReader(new InputStreamReader(zkData)));
|
|
||||||
Gson gson = new GsonBuilder().create();
|
|
||||||
|
|
||||||
jsonReader.beginArray();
|
|
||||||
|
|
||||||
// determine source ZooKeeperEndpointConfig for this data
|
|
||||||
final ZooKeeperEndpointConfig sourceZooKeeperEndpointConfig = gson.fromJson(jsonReader, ZooKeeperEndpointConfig.class);
|
|
||||||
LOGGER.info("Source data was obtained from ZooKeeper: {}", sourceZooKeeperEndpointConfig);
|
|
||||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath())
|
|
||||||
&& sourceZooKeeperEndpointConfig.getServers() != null && sourceZooKeeperEndpointConfig.getServers().size() > 0, "Source ZooKeeper %s from %s is invalid",
|
|
||||||
sourceZooKeeperEndpointConfig, zkData);
|
|
||||||
Preconditions.checkArgument(Collections.disjoint(zooKeeperEndpointConfig.getServers(), sourceZooKeeperEndpointConfig.getServers())
|
|
||||||
|| !zooKeeperEndpointConfig.getPath().equals(sourceZooKeeperEndpointConfig.getPath()) || ignoreSource,
|
|
||||||
"Source ZooKeeper config %s for the data provided can not contain the same server and path as the configured destination ZooKeeper config %s",
|
|
||||||
sourceZooKeeperEndpointConfig, zooKeeperEndpointConfig);
|
|
||||||
|
|
||||||
// stream through each node read from the json input
|
|
||||||
final Stream<DataStatAclNode> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<DataStatAclNode>(0, 0) {
|
|
||||||
@Override
|
|
||||||
public boolean tryAdvance(Consumer<? super DataStatAclNode> action) {
|
|
||||||
try {
|
|
||||||
// stream each DataStatAclNode from configured json file
|
|
||||||
synchronized (jsonReader) {
|
|
||||||
if (jsonReader.hasNext()) {
|
|
||||||
action.accept(gson.fromJson(jsonReader, DataStatAclNode.class));
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException("unable to read nodes from json", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, false);
|
|
||||||
|
|
||||||
final List<CompletableFuture<Stat>> writeFutures = stream.parallel().map(node -> {
|
|
||||||
/*
|
|
||||||
* create stage to determine the acls that should be applied to the node.
|
|
||||||
* this stage will be used to initialize the chain
|
|
||||||
*/
|
|
||||||
final CompletableFuture<List<ACL>> determineACLStage = CompletableFuture.supplyAsync(() -> determineACLs(node, authMode, useExistingACL));
|
|
||||||
/*
|
|
||||||
* create stage to apply acls to nodes and transform node to DataStatAclNode object
|
|
||||||
*/
|
|
||||||
final Function<List<ACL>, CompletableFuture<DataStatAclNode>> transformNodeStage = acls -> CompletableFuture.supplyAsync(() -> transformNode(node, acls));
|
|
||||||
/*
|
|
||||||
* create stage to ensure that nodes exist for the entire path of the zookeeper node, must be invoked after the transformNode stage to
|
|
||||||
* ensure that the node will exist after path migration
|
|
||||||
*/
|
|
||||||
final Function<DataStatAclNode, CompletionStage<String>> ensureNodeExistsStage = dataStatAclNode ->
|
|
||||||
CompletableFuture.supplyAsync(() -> ensureNodeExists(zooKeeper, dataStatAclNode.getPath(),
|
|
||||||
dataStatAclNode.getEphemeralOwner() == 0 ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL));
|
|
||||||
/*
|
|
||||||
* create stage that waits for both the transformNode and ensureNodeExists stages complete, and also provides that the given transformed node is
|
|
||||||
* available to the next stage
|
|
||||||
*/
|
|
||||||
final BiFunction<String, DataStatAclNode, DataStatAclNode> combineEnsureNodeAndTransferNodeStage = (u, dataStatAclNode) -> dataStatAclNode;
|
|
||||||
/*
|
|
||||||
* create stage to transmit the node to the destination zookeeper endpoint, must be invoked after the node has been transformed and its path
|
|
||||||
* has been created (or already exists) in the destination zookeeper
|
|
||||||
*/
|
|
||||||
final Function<DataStatAclNode, CompletionStage<Stat>> transmitNodeStage = dataStatNode -> CompletableFuture.supplyAsync(() -> transmitNode(zooKeeper, dataStatNode));
|
|
||||||
/*
|
|
||||||
* submit the stages chained together in the proper order to perform the processing on the given node
|
|
||||||
*/
|
|
||||||
final CompletableFuture<DataStatAclNode> dataStatAclNodeCompletableFuture = determineACLStage.thenCompose(transformNodeStage);
|
|
||||||
return dataStatAclNodeCompletableFuture.thenCompose(ensureNodeExistsStage)
|
|
||||||
.thenCombine(dataStatAclNodeCompletableFuture, combineEnsureNodeAndTransferNodeStage)
|
|
||||||
.thenCompose(transmitNodeStage);
|
|
||||||
}).collect(Collectors.toList());
|
|
||||||
|
|
||||||
CompletableFuture<Void> allWritesFuture = CompletableFuture.allOf(writeFutures.toArray(new CompletableFuture[writeFutures.size()]));
|
|
||||||
final CompletableFuture<List<Stat>> finishedWrites = allWritesFuture
|
|
||||||
.thenApply(v -> writeFutures.stream()
|
|
||||||
.map(CompletableFuture::join)
|
|
||||||
.collect(Collectors.toList()));
|
|
||||||
final List<Stat> writesDone = finishedWrites.get();
|
|
||||||
if (LOGGER.isInfoEnabled()) {
|
|
||||||
final int writeCount = writesDone.size();
|
|
||||||
LOGGER.info("{} {} transferred to {}", writeCount, writeCount == 1 ? "node" : "nodes", zooKeeperEndpointConfig);
|
|
||||||
}
|
|
||||||
jsonReader.close();
|
|
||||||
closeZooKeeper(zooKeeper);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Stream<String> streamPaths(ZooKeeperNode node) {
|
|
||||||
return Stream.concat(Stream.of(node.getPath()), node.getChildren().stream().flatMap(this::streamPaths));
|
|
||||||
}
|
|
||||||
|
|
||||||
private ZooKeeperNode getNode(ZooKeeper zooKeeper, String path) throws KeeperException, InterruptedException {
|
|
||||||
LOGGER.debug("retrieving node and children at {}", path);
|
|
||||||
final List<String> children = zooKeeper.getChildren(path, false);
|
|
||||||
return new ZooKeeperNode(path, children.stream().map(s -> {
|
|
||||||
final String childPath = Joiner.on('/').skipNulls().join(path.equals("/") ? "" : path, s);
|
|
||||||
try {
|
|
||||||
return getNode(zooKeeper, childPath);
|
|
||||||
} catch (InterruptedException | KeeperException e) {
|
|
||||||
if (e instanceof InterruptedException) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
throw new RuntimeException(String.format("unable to discover sub-tree from %s", childPath), e);
|
|
||||||
}
|
|
||||||
}).collect(Collectors.toList()));
|
|
||||||
}
|
|
||||||
|
|
||||||
private DataStatAclNode retrieveNode(ZooKeeper zooKeeper, String path) {
|
|
||||||
Preconditions.checkNotNull(zooKeeper, "ZooKeeper client must not be null");
|
|
||||||
Preconditions.checkNotNull(path, "path must not be null");
|
|
||||||
final Stat stat = new Stat();
|
|
||||||
final byte[] data;
|
|
||||||
final List<ACL> acls;
|
|
||||||
final long ephemeralOwner;
|
|
||||||
try {
|
|
||||||
data = zooKeeper.getData(path, false, stat);
|
|
||||||
acls = zooKeeper.getACL(path, stat);
|
|
||||||
ephemeralOwner = stat.getEphemeralOwner();
|
|
||||||
} catch (InterruptedException | KeeperException e) {
|
|
||||||
if (e instanceof InterruptedException) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
throw new RuntimeException(String.format("unable to get data, ACLs, and stats from %s for node at path %s", zooKeeper, path), e);
|
|
||||||
}
|
|
||||||
return new DataStatAclNode(path, data, stat, acls, ephemeralOwner);
|
|
||||||
}
|
|
||||||
|
|
||||||
private String ensureNodeExists(ZooKeeper zooKeeper, String path, CreateMode createMode) {
|
|
||||||
try {
|
|
||||||
LOGGER.debug("attempting to create node at {}", path);
|
|
||||||
final ArrayList<ACL> acls = ZooDefs.Ids.OPEN_ACL_UNSAFE;
|
|
||||||
final String createNodePath = zooKeeper.create(path, new byte[0], acls, createMode);
|
|
||||||
LOGGER.info("created node at {}, acls: {}, createMode: {}", createNodePath, acls, createMode);
|
|
||||||
return createNodePath;
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
if (KeeperException.Code.NONODE.equals(e.code())) {
|
|
||||||
final List<String> pathTokens = Splitter.on('/').omitEmptyStrings().trimResults().splitToList(path);
|
|
||||||
final String parentPath = "/" + Joiner.on('/').skipNulls().join(pathTokens.subList(0, pathTokens.size() - 1));
|
|
||||||
LOGGER.debug("node doesn't exist, recursively attempting to create node at {}", parentPath);
|
|
||||||
ensureNodeExists(zooKeeper, parentPath, CreateMode.PERSISTENT);
|
|
||||||
LOGGER.debug("recursively created node at {}", parentPath);
|
|
||||||
LOGGER.debug("retrying attempt to create node at {}", path);
|
|
||||||
return ensureNodeExists(zooKeeper, path, createMode);
|
|
||||||
} else if (KeeperException.Code.NODEEXISTS.equals(e.code())) {
|
|
||||||
return path;
|
|
||||||
} else {
|
|
||||||
throw new RuntimeException(String.format("unable to create node at path %s, ZooKeeper returned %s", path, e.code()), e);
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new RuntimeException(String.format("unable to create node at path %s", path), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<ACL> determineACLs(DataStatAclNode node, AuthMode authMode, Boolean useExistingACL) {
|
|
||||||
return useExistingACL ? node.getAcls() :
|
|
||||||
(authMode.equals(AuthMode.OPEN) ? ZooDefs.Ids.OPEN_ACL_UNSAFE : ZooDefs.Ids.CREATOR_ALL_ACL);
|
|
||||||
}
|
|
||||||
|
|
||||||
private DataStatAclNode transformNode(DataStatAclNode node, List<ACL> acls) {
|
|
||||||
final DataStatAclNode migratedNode = new DataStatAclNode(node.getPath(), node.getData(), node.getStat(), acls, node.getEphemeralOwner());
|
|
||||||
LOGGER.info("transformed original node {} to {}", node, migratedNode);
|
|
||||||
return migratedNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Stat transmitNode(ZooKeeper zooKeeper, DataStatAclNode node) {
|
|
||||||
Preconditions.checkNotNull(zooKeeper, "zooKeeper must not be null");
|
|
||||||
Preconditions.checkNotNull(node, "node must not be null");
|
|
||||||
try {
|
|
||||||
LOGGER.debug("attempting to transfer node to {} with ACL {}: {}", zooKeeperEndpointConfig, node.getAcls(), node);
|
|
||||||
// set data without caring what the previous version of the data at that path
|
|
||||||
zooKeeper.setData(node.getPath(), node.getData(), -1);
|
|
||||||
zooKeeper.setACL(node.getPath(), node.getAcls(), -1);
|
|
||||||
LOGGER.info("transferred node {} in {}", node, zooKeeperEndpointConfig);
|
|
||||||
} catch (InterruptedException | KeeperException e) {
|
|
||||||
if (e instanceof InterruptedException) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
throw new RuntimeException(String.format("unable to transmit data to %s for path %s", zooKeeper, node.getPath()), e);
|
|
||||||
}
|
|
||||||
return node.getStat();
|
|
||||||
}
|
|
||||||
|
|
||||||
private ZooKeeper getZooKeeper(String zooKeeperConnectString, AuthMode authMode, byte[] authData) throws IOException {
|
|
||||||
CountDownLatch connectionLatch = new CountDownLatch(1);
|
|
||||||
ZooKeeper zooKeeper = new ZooKeeper(zooKeeperConnectString, 3000, watchedEvent -> {
|
|
||||||
if (LOGGER.isDebugEnabled()) {
|
|
||||||
LOGGER.debug("ZooKeeper server state changed to {} in {}", watchedEvent.getState(), zooKeeperConnectString);
|
|
||||||
}
|
|
||||||
if (watchedEvent.getType().equals(Watcher.Event.EventType.None) && watchedEvent.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
|
|
||||||
connectionLatch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
final boolean connected;
|
|
||||||
try {
|
|
||||||
connected = connectionLatch.await(5, TimeUnit.SECONDS);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
closeZooKeeper(zooKeeper);
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", zooKeeperConnectString), e);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!connected) {
|
|
||||||
closeZooKeeper(zooKeeper);
|
|
||||||
throw new IOException(String.format("unable to connect to %s", zooKeeperConnectString));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (authMode.equals(AuthMode.DIGEST)) {
|
|
||||||
zooKeeper.addAuthInfo(SCHEME_DIGEST, authData);
|
|
||||||
}
|
|
||||||
return zooKeeper;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void closeZooKeeper(ZooKeeper zooKeeper) {
|
|
||||||
try {
|
|
||||||
zooKeeper.close();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOGGER.warn("could not close ZooKeeper client due to interrupt", e);
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ZooKeeperEndpointConfig getZooKeeperEndpointConfig() {
|
|
||||||
return zooKeeperEndpointConfig;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,176 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.toolkit.zkmigrator;
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.base.Strings;
|
|
||||||
import org.apache.commons.cli.CommandLine;
|
|
||||||
import org.apache.commons.cli.DefaultParser;
|
|
||||||
import org.apache.commons.cli.HelpFormatter;
|
|
||||||
import org.apache.commons.cli.Option;
|
|
||||||
import org.apache.commons.cli.OptionGroup;
|
|
||||||
import org.apache.commons.cli.Options;
|
|
||||||
import org.apache.commons.cli.ParseException;
|
|
||||||
import org.apache.nifi.toolkit.zkmigrator.ZooKeeperMigrator.AuthMode;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
|
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.io.PrintStream;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.nio.file.Paths;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
|
|
||||||
public class ZooKeeperMigratorMain {
|
|
||||||
|
|
||||||
enum Mode {READ, WRITE}
|
|
||||||
|
|
||||||
private static final String JAVA_HOME = "JAVA_HOME";
|
|
||||||
private static final String NIFI_TOOLKIT_HOME = "NIFI_TOOLKIT_HOME";
|
|
||||||
private static final String HEADER = System.lineSeparator() + "A tool for importing and exporting data from ZooKeeper." + System.lineSeparator() + System.lineSeparator();
|
|
||||||
private static final String FOOTER = System.lineSeparator() + "Java home: " +
|
|
||||||
System.getenv(JAVA_HOME) + System.lineSeparator() + "NiFi Toolkit home: " + System.getenv(NIFI_TOOLKIT_HOME);
|
|
||||||
|
|
||||||
private static final Option OPTION_ZK_MIGRATOR_HELP = Option.builder("h")
|
|
||||||
.longOpt("help")
|
|
||||||
.desc("display help/usage info")
|
|
||||||
.build();
|
|
||||||
private static final Option OPTION_ZK_ENDPOINT = Option.builder("z")
|
|
||||||
.longOpt("zookeeper")
|
|
||||||
.desc("ZooKeeper endpoint string (ex. host:port/path)")
|
|
||||||
.hasArg()
|
|
||||||
.argName("zookeeper-endpoint")
|
|
||||||
.required()
|
|
||||||
.build();
|
|
||||||
private static final Option OPTION_RECEIVE = Option.builder("r")
|
|
||||||
.longOpt("receive")
|
|
||||||
.desc("receives data from zookeeper and writes to the given filename or standard output")
|
|
||||||
.build();
|
|
||||||
private static final Option OPTION_SEND = Option.builder("s")
|
|
||||||
.longOpt("send")
|
|
||||||
.desc("sends data to zookeeper read from the given filename or standard input")
|
|
||||||
.build();
|
|
||||||
private static final Option OPTION_ZK_AUTH_INFO = Option.builder("a")
|
|
||||||
.longOpt("auth")
|
|
||||||
.desc("username and password for the given ZooKeeper path")
|
|
||||||
.hasArg()
|
|
||||||
.argName("username:password")
|
|
||||||
.build();
|
|
||||||
private static final Option OPTION_ZK_KRB_CONF_FILE = Option.builder("k")
|
|
||||||
.longOpt("krb-conf")
|
|
||||||
.desc("JAAS file containing Kerberos config")
|
|
||||||
.hasArg()
|
|
||||||
.argName("jaas-filename")
|
|
||||||
.numberOfArgs(1)
|
|
||||||
.build();
|
|
||||||
private static final Option OPTION_FILE = Option.builder("f")
|
|
||||||
.longOpt("file")
|
|
||||||
.desc("file to be used for ZooKeeper data")
|
|
||||||
.hasArg()
|
|
||||||
.argName("filename")
|
|
||||||
.build();
|
|
||||||
private static final Option OPTION_IGNORE_SOURCE = Option.builder()
|
|
||||||
.longOpt("ignore-source")
|
|
||||||
.desc("ignores the source ZooKeeper endpoint specified in the exported data")
|
|
||||||
.build();
|
|
||||||
private static final Option OPTION_USE_EXISTING_ACL = Option.builder()
|
|
||||||
.longOpt("use-existing-acl")
|
|
||||||
.desc("allow write of existing acl data to destination")
|
|
||||||
.build();
|
|
||||||
|
|
||||||
private static Options createOptions() {
|
|
||||||
final Options options = new Options();
|
|
||||||
options.addOption(OPTION_ZK_MIGRATOR_HELP);
|
|
||||||
options.addOption(OPTION_ZK_ENDPOINT);
|
|
||||||
options.addOption(OPTION_ZK_AUTH_INFO);
|
|
||||||
options.addOption(OPTION_FILE);
|
|
||||||
options.addOption(OPTION_IGNORE_SOURCE);
|
|
||||||
options.addOption(OPTION_USE_EXISTING_ACL);
|
|
||||||
final OptionGroup optionGroupAuth = new OptionGroup().addOption(OPTION_ZK_AUTH_INFO).addOption(OPTION_ZK_KRB_CONF_FILE);
|
|
||||||
optionGroupAuth.setRequired(false);
|
|
||||||
options.addOptionGroup(optionGroupAuth);
|
|
||||||
final OptionGroup optionGroupReadWrite = new OptionGroup().addOption(OPTION_RECEIVE).addOption(OPTION_SEND);
|
|
||||||
optionGroupReadWrite.setRequired(true);
|
|
||||||
options.addOptionGroup(optionGroupReadWrite);
|
|
||||||
|
|
||||||
return options;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void printUsage(String errorMessage, Options options) {
|
|
||||||
Preconditions.checkNotNull(options, "command line options were not specified");
|
|
||||||
if (errorMessage != null) {
|
|
||||||
System.out.println(errorMessage + System.lineSeparator());
|
|
||||||
}
|
|
||||||
HelpFormatter helpFormatter = new HelpFormatter();
|
|
||||||
helpFormatter.setWidth(160);
|
|
||||||
helpFormatter.setDescPadding(0);
|
|
||||||
helpFormatter.printHelp(ZooKeeperMigratorMain.class.getCanonicalName(), HEADER, options, FOOTER, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
|
||||||
PrintStream output = System.out;
|
|
||||||
System.setOut(System.err);
|
|
||||||
|
|
||||||
final Options options = createOptions();
|
|
||||||
final CommandLine commandLine;
|
|
||||||
try {
|
|
||||||
commandLine = new DefaultParser().parse(options, args);
|
|
||||||
if (commandLine.hasOption(OPTION_ZK_MIGRATOR_HELP.getLongOpt())) {
|
|
||||||
printUsage(null, options);
|
|
||||||
} else {
|
|
||||||
final String zookeeperUri = commandLine.getOptionValue(OPTION_ZK_ENDPOINT.getOpt());
|
|
||||||
final Mode mode = commandLine.hasOption(OPTION_RECEIVE.getOpt()) ? Mode.READ : Mode.WRITE;
|
|
||||||
final String filename = commandLine.getOptionValue(OPTION_FILE.getOpt());
|
|
||||||
final String auth = commandLine.getOptionValue(OPTION_ZK_AUTH_INFO.getOpt());
|
|
||||||
final String jaasFilename = commandLine.getOptionValue(OPTION_ZK_KRB_CONF_FILE.getOpt());
|
|
||||||
final boolean ignoreSource = commandLine.hasOption(OPTION_IGNORE_SOURCE.getLongOpt());
|
|
||||||
final boolean useExistingACL = commandLine.hasOption(OPTION_USE_EXISTING_ACL.getLongOpt());
|
|
||||||
final AuthMode authMode;
|
|
||||||
final byte[] authData;
|
|
||||||
if (auth != null) {
|
|
||||||
authMode = AuthMode.DIGEST;
|
|
||||||
authData = auth.getBytes(StandardCharsets.UTF_8);
|
|
||||||
} else {
|
|
||||||
authData = null;
|
|
||||||
if (!Strings.isNullOrEmpty(jaasFilename)) {
|
|
||||||
authMode = AuthMode.SASL;
|
|
||||||
System.setProperty("java.security.auth.login.config", jaasFilename);
|
|
||||||
} else {
|
|
||||||
authMode = AuthMode.OPEN;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
final ZooKeeperMigrator zookeeperMigrator = new ZooKeeperMigrator(zookeeperUri);
|
|
||||||
if (mode.equals(Mode.READ)) {
|
|
||||||
try (OutputStream zkData = filename != null ? new FileOutputStream(Paths.get(filename).toFile()) : output) {
|
|
||||||
zookeeperMigrator.readZooKeeper(zkData, authMode, authData);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
try (InputStream zkData = filename != null ? new FileInputStream(Paths.get(filename).toFile()) : System.in) {
|
|
||||||
zookeeperMigrator.writeZooKeeper(zkData, authMode, authData, ignoreSource, useExistingACL);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (ParseException e) {
|
|
||||||
printUsage(e.getLocalizedMessage(), options);
|
|
||||||
} catch (IOException | KeeperException | InterruptedException | ExecutionException e) {
|
|
||||||
throw new IOException(String.format("unable to perform operation: %s", e.getLocalizedMessage()), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,66 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.toolkit.zkmigrator;
|
|
||||||
|
|
||||||
import com.google.common.base.MoreObjects;
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.base.Strings;
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
class ZooKeeperNode {
|
|
||||||
|
|
||||||
private final String path;
|
|
||||||
private final List<ZooKeeperNode> children;
|
|
||||||
|
|
||||||
public ZooKeeperNode(String path, List<ZooKeeperNode> children) {
|
|
||||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(path), "path can not be null or empty");
|
|
||||||
this.path = path;
|
|
||||||
this.children = children == null ? ImmutableList.of() : ImmutableList.copyOf(children);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getPath() {
|
|
||||||
return path;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<ZooKeeperNode> getChildren() {
|
|
||||||
return children;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object o) {
|
|
||||||
if (this == o) return true;
|
|
||||||
if (o == null || getClass() != o.getClass()) return false;
|
|
||||||
ZooKeeperNode node = (ZooKeeperNode) o;
|
|
||||||
return Objects.equals(path, node.path);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return Objects.hash(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return MoreObjects.toStringHelper(this)
|
|
||||||
.add("path", path)
|
|
||||||
.add("children", children)
|
|
||||||
.toString();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,39 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
import ch.qos.logback.classic.encoder.PatternLayoutEncoder
|
|
||||||
import ch.qos.logback.core.ConsoleAppender
|
|
||||||
import ch.qos.logback.core.status.NopStatusListener
|
|
||||||
|
|
||||||
statusListener(NopStatusListener)
|
|
||||||
|
|
||||||
appender('stdout', ConsoleAppender) {
|
|
||||||
target = 'System.out'
|
|
||||||
encoder(PatternLayoutEncoder) {
|
|
||||||
pattern = "%date %level [%thread] %logger{40} %msg%n"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
appender('stderr', ConsoleAppender) {
|
|
||||||
target = 'System.err'
|
|
||||||
encoder(PatternLayoutEncoder) {
|
|
||||||
pattern = "%date %level [%thread] %logger{40} %msg%n"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger("org.apache.nifi.toolkit.zkmigrator", INFO)
|
|
||||||
logger("org.apache.zookeeper", WARN)
|
|
||||||
root(WARN, ['stderr'])
|
|
|
@ -1,46 +0,0 @@
|
||||||
[
|
|
||||||
{
|
|
||||||
"connectString": "127.0.0.1:62317/nifi",
|
|
||||||
"servers": [
|
|
||||||
"127.0.0.1:62317"
|
|
||||||
],
|
|
||||||
"path": "/nifi"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "/",
|
|
||||||
"data": [
|
|
||||||
115,
|
|
||||||
111,
|
|
||||||
109,
|
|
||||||
101,
|
|
||||||
32,
|
|
||||||
100,
|
|
||||||
97,
|
|
||||||
116,
|
|
||||||
97
|
|
||||||
],
|
|
||||||
"stat": {
|
|
||||||
"czxid": 2,
|
|
||||||
"mzxid": 2,
|
|
||||||
"ctime": 1485792794977,
|
|
||||||
"mtime": 1485792794977,
|
|
||||||
"version": 0,
|
|
||||||
"cversion": 0,
|
|
||||||
"aversion": 0,
|
|
||||||
"ephemeralOwner": 0,
|
|
||||||
"dataLength": 9,
|
|
||||||
"numChildren": 0,
|
|
||||||
"pzxid": 2
|
|
||||||
},
|
|
||||||
"acls": [
|
|
||||||
{
|
|
||||||
"perms": 31,
|
|
||||||
"id": {
|
|
||||||
"scheme": "digest",
|
|
||||||
"id": "nifi:RuSeH3tpzgba3p9WrG/UpiSIsGg\u003d"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"ephemeralOwner": 0
|
|
||||||
}
|
|
||||||
]
|
|
|
@ -1,194 +0,0 @@
|
||||||
[
|
|
||||||
{
|
|
||||||
"connectString": "127.0.0.1:62295/nifi/components",
|
|
||||||
"servers": [
|
|
||||||
"127.0.0.1:62295"
|
|
||||||
],
|
|
||||||
"path": "/nifi/components"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "/1/a",
|
|
||||||
"data": [
|
|
||||||
115,
|
|
||||||
111,
|
|
||||||
109,
|
|
||||||
101,
|
|
||||||
32,
|
|
||||||
100,
|
|
||||||
97,
|
|
||||||
116,
|
|
||||||
97
|
|
||||||
],
|
|
||||||
"stat": {
|
|
||||||
"czxid": 6,
|
|
||||||
"mzxid": 6,
|
|
||||||
"ctime": 1485792790772,
|
|
||||||
"mtime": 1485792790772,
|
|
||||||
"version": 0,
|
|
||||||
"cversion": 0,
|
|
||||||
"aversion": 0,
|
|
||||||
"ephemeralOwner": 97372916257193984,
|
|
||||||
"dataLength": 9,
|
|
||||||
"numChildren": 0,
|
|
||||||
"pzxid": 6
|
|
||||||
},
|
|
||||||
"acls": [
|
|
||||||
{
|
|
||||||
"perms": 31,
|
|
||||||
"id": {
|
|
||||||
"scheme": "world",
|
|
||||||
"id": "anyone"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"ephemeralOwner": 97372916257193984
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "/1",
|
|
||||||
"data": [
|
|
||||||
115,
|
|
||||||
111,
|
|
||||||
109,
|
|
||||||
101,
|
|
||||||
32,
|
|
||||||
100,
|
|
||||||
97,
|
|
||||||
116,
|
|
||||||
97
|
|
||||||
],
|
|
||||||
"stat": {
|
|
||||||
"czxid": 5,
|
|
||||||
"mzxid": 5,
|
|
||||||
"ctime": 1485792790771,
|
|
||||||
"mtime": 1485792790771,
|
|
||||||
"version": 0,
|
|
||||||
"cversion": 1,
|
|
||||||
"aversion": 0,
|
|
||||||
"ephemeralOwner": 0,
|
|
||||||
"dataLength": 9,
|
|
||||||
"numChildren": 1,
|
|
||||||
"pzxid": 6
|
|
||||||
},
|
|
||||||
"acls": [
|
|
||||||
{
|
|
||||||
"perms": 31,
|
|
||||||
"id": {
|
|
||||||
"scheme": "world",
|
|
||||||
"id": "anyone"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"ephemeralOwner": 0
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "/3",
|
|
||||||
"data": [
|
|
||||||
115,
|
|
||||||
111,
|
|
||||||
109,
|
|
||||||
101,
|
|
||||||
32,
|
|
||||||
100,
|
|
||||||
97,
|
|
||||||
116,
|
|
||||||
97
|
|
||||||
],
|
|
||||||
"stat": {
|
|
||||||
"czxid": 8,
|
|
||||||
"mzxid": 8,
|
|
||||||
"ctime": 1485792790773,
|
|
||||||
"mtime": 1485792790773,
|
|
||||||
"version": 0,
|
|
||||||
"cversion": 0,
|
|
||||||
"aversion": 0,
|
|
||||||
"ephemeralOwner": 0,
|
|
||||||
"dataLength": 9,
|
|
||||||
"numChildren": 0,
|
|
||||||
"pzxid": 8
|
|
||||||
},
|
|
||||||
"acls": [
|
|
||||||
{
|
|
||||||
"perms": 31,
|
|
||||||
"id": {
|
|
||||||
"scheme": "world",
|
|
||||||
"id": "anyone"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"ephemeralOwner": 0
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "/2",
|
|
||||||
"data": [
|
|
||||||
115,
|
|
||||||
111,
|
|
||||||
109,
|
|
||||||
101,
|
|
||||||
32,
|
|
||||||
100,
|
|
||||||
97,
|
|
||||||
116,
|
|
||||||
97
|
|
||||||
],
|
|
||||||
"stat": {
|
|
||||||
"czxid": 7,
|
|
||||||
"mzxid": 7,
|
|
||||||
"ctime": 1485792790772,
|
|
||||||
"mtime": 1485792790772,
|
|
||||||
"version": 0,
|
|
||||||
"cversion": 0,
|
|
||||||
"aversion": 0,
|
|
||||||
"ephemeralOwner": 0,
|
|
||||||
"dataLength": 9,
|
|
||||||
"numChildren": 0,
|
|
||||||
"pzxid": 7
|
|
||||||
},
|
|
||||||
"acls": [
|
|
||||||
{
|
|
||||||
"perms": 31,
|
|
||||||
"id": {
|
|
||||||
"scheme": "world",
|
|
||||||
"id": "anyone"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"ephemeralOwner": 0
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "/",
|
|
||||||
"data": [
|
|
||||||
115,
|
|
||||||
111,
|
|
||||||
109,
|
|
||||||
101,
|
|
||||||
32,
|
|
||||||
100,
|
|
||||||
97,
|
|
||||||
116,
|
|
||||||
97
|
|
||||||
],
|
|
||||||
"stat": {
|
|
||||||
"czxid": 3,
|
|
||||||
"mzxid": 4,
|
|
||||||
"ctime": 1485792790757,
|
|
||||||
"mtime": 1485792790762,
|
|
||||||
"version": 1,
|
|
||||||
"cversion": 3,
|
|
||||||
"aversion": 0,
|
|
||||||
"ephemeralOwner": 0,
|
|
||||||
"dataLength": 9,
|
|
||||||
"numChildren": 3,
|
|
||||||
"pzxid": 8
|
|
||||||
},
|
|
||||||
"acls": [
|
|
||||||
{
|
|
||||||
"perms": 31,
|
|
||||||
"id": {
|
|
||||||
"scheme": "world",
|
|
||||||
"id": "anyone"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"ephemeralOwner": 0
|
|
||||||
}
|
|
||||||
]
|
|
|
@ -25,7 +25,6 @@
|
||||||
<modules>
|
<modules>
|
||||||
<module>nifi-toolkit-tls</module>
|
<module>nifi-toolkit-tls</module>
|
||||||
<module>nifi-toolkit-encrypt-config</module>
|
<module>nifi-toolkit-encrypt-config</module>
|
||||||
<module>nifi-toolkit-zookeeper-migrator</module>
|
|
||||||
<module>nifi-toolkit-flowfile-repo</module>
|
<module>nifi-toolkit-flowfile-repo</module>
|
||||||
<module>nifi-toolkit-assembly</module>
|
<module>nifi-toolkit-assembly</module>
|
||||||
<module>nifi-toolkit-cli</module>
|
<module>nifi-toolkit-cli</module>
|
||||||
|
|
Loading…
Reference in New Issue