From af2861f1057bc71b450fca883449588a79a7f058 Mon Sep 17 00:00:00 2001 From: Jeff Storck Date: Thu, 26 Jan 2017 14:30:32 -0500 Subject: [PATCH] NIFI-3300 Implemented usage of ZooKeeper chroot capability in the connect string Updated ZooKeeper connect string parsing tests Updated admin doc for ZooKeeper Migrator migration of nifi root nodes, updated source and destination ZK check by servers in the connection string instead of the entire connection string Added check between source and destination ZooKeeper paths to allow data to be written to the same ZooKeeper with a different path Added test for writing to the same ZooKeeper with a different path Added type parameter to server list in for ZooKeeperEndpointConfig This closes #1456. Signed-off-by: Bryan Rosander --- .../main/asciidoc/administration-guide.adoc | 6 +- .../nifi-toolkit-zookeeper-migrator/pom.xml | 6 + .../zkmigrator/ZooKeeperEndpointConfig.java | 26 +- .../toolkit/zkmigrator/ZooKeeperMigrator.java | 54 ++-- .../zkmigrator/ZooKeeperMigratorTest.groovy | 120 +++++---- .../test/resources/test-data-user-pass.json | 24 +- .../src/test/resources/test-data.json | 253 +++++++++--------- 7 files changed, 263 insertions(+), 226 deletions(-) diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index f970232a7b..25b2f11906 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -1678,7 +1678,7 @@ You can use the following command line options with the ZooKeeper Migrator: * `-k,--krb-conf ` Allows the specification of a JAAS configuration file to allow authentication with a ZooKeeper configured to use Kerberos. This option is mutually exclusive with the `-a,--auth` option. * `-r,--receive` Receives data from ZooKeeper and writes to the given filename (if the `-f,--file` option is provided) or standard output. The data received will contain the full path to each node read from ZooKeeper. This option is mutually exclusive with the `-s,--send` option. * `-s,--send` Sends data to ZooKeeper that is read from the given filename (if the `-f,--file` option is provided) or standard input. The paths for each node in the data being sent to ZooKeeper are absolute paths, and will be stored in ZooKeeper under the *path* portion of the `-z,--zookeeper` argument. Typically, the *path* portion of the argument can be omitted, which will store the nodes at their absolute paths. This option is mutually exclusive with the `-r,--receive` option. -* `-z,--zookeeper ` The ZooKeeper server to use, specified by a connection string with path, in the format of _host:port/znode/path_. +* `-z,--zookeeper ` The ZooKeeper server(s) to use, specified by a connect string, comprised of one or more comma-separated host:port pairs followed by a path, in the format of _host:port[,host2:port...,hostn:port]/znode/path_. [[migrating_between_source_destination_zookeepers]] ==== Migrating Between Source and Destination ZooKeepers @@ -1737,9 +1737,9 @@ If the state-management.xml specifies Open, no authentication is required. 5. Migrate the ZooKeeper data to the destination ZooKeeper. If the source and destination ZooKeepers are the same, the `--ignore-source` option can be added to the following examples. * For an open ZooKeeper: -** zk-migrator.sh -s -z *destinationHostname:destinationClientPort*/ -f /*path*/*to*/*export*/*zk-source-data.json* +** zk-migrator.sh -s -z *destinationHostname:destinationClientPort*/*destinationRootPath*/components -f /*path*/*to*/*export*/*zk-source-data.json* * For a ZooKeeper using Kerberos for authentication: -** zk-migrator.sh -s -z *destinationHostname:destinationClientPort*/ -k /*path*/*to*/*jaasconfig*/*jaas-config.conf* -f /*path*/*to*/*export*/*zk-source-data.json* +** zk-migrator.sh -s -z *destinationHostname:destinationClientPort*/*destinationRootPath*/components -k /*path*/*to*/*jaasconfig*/*jaas-config.conf* -f /*path*/*to*/*export*/*zk-source-data.json* 6. Once the migration has completed successfully, start the processors in the NiFi flow. Processing should continue from the point at which it was stopped when the NiFi flow was stopped. diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/pom.xml b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/pom.xml index 5c7d06185f..c964910acd 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/pom.xml +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/pom.xml @@ -69,6 +69,12 @@ curator-test test + + org.apache.curator + curator-client + 2.11.0 + test + diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperEndpointConfig.java b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperEndpointConfig.java index c5b40b2177..8578a587eb 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperEndpointConfig.java +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperEndpointConfig.java @@ -21,24 +21,37 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import java.util.List; import java.util.Objects; class ZooKeeperEndpointConfig { private final String connectString; + private final List servers; private final String path; - ZooKeeperEndpointConfig(String connectString, String path) { + ZooKeeperEndpointConfig(String connectString) { Preconditions.checkArgument(!Strings.isNullOrEmpty(connectString), "connectString can not be null or empty"); - Preconditions.checkArgument(!Strings.isNullOrEmpty(path), "path can not be null or empty"); this.connectString = connectString; - this.path = '/' + Joiner.on('/').join(Splitter.on('/').omitEmptyStrings().trimResults().split(path)); + + final String[] connectStringPath = connectString.split("/", 2); + this.servers = Lists.newArrayList(connectStringPath[0].split(",")); + if (connectStringPath.length == 2) { + this.path = '/' + Joiner.on('/').join(Splitter.on('/').omitEmptyStrings().trimResults().split(connectStringPath[1])); + } else { + path = "/"; + } } public String getConnectString() { return connectString; } + public List getServers() { + return servers; + } + public String getPath() { return path; } @@ -48,18 +61,21 @@ class ZooKeeperEndpointConfig { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ZooKeeperEndpointConfig that = (ZooKeeperEndpointConfig) o; - return Objects.equals(connectString, that.connectString) && Objects.equals(path, that.path); + return Objects.equals(connectString, that.connectString) + && Objects.equals(servers, that.servers) + && Objects.equals(path, that.path); } @Override public int hashCode() { - return Objects.hash(connectString, path); + return Objects.hash(connectString, servers, path); } @Override public String toString() { return MoreObjects.toStringHelper(this) .add("connectString", connectString) + .add("servers", servers) .add("path", path) .toString(); } diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java index fc7f647370..fa71ae0d8f 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java @@ -43,6 +43,7 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Spliterators; import java.util.concurrent.CompletableFuture; @@ -66,23 +67,14 @@ class ZooKeeperMigrator { private final ZooKeeperEndpointConfig zooKeeperEndpointConfig; - ZooKeeperMigrator(String zookeeperEndpoint) { - LOGGER.debug("ZooKeeper endpoint parameter: {}", zookeeperEndpoint); - Preconditions.checkArgument(!Strings.isNullOrEmpty(zookeeperEndpoint), "zookeeper endpoint must not be null"); - final String[] connectStringPath = zookeeperEndpoint.split("/", 2); - Preconditions.checkArgument(connectStringPath.length >= 1, "invalid ZooKeeper endpoint: %s", zookeeperEndpoint); - final String connectString = connectStringPath[0]; - final String path; - if (connectStringPath.length == 2) { - path = connectStringPath[1]; - } else { - path = ""; - } - this.zooKeeperEndpointConfig = new ZooKeeperEndpointConfig(connectString, "/" + path); + ZooKeeperMigrator(String zooKeeperConnectString) { + LOGGER.debug("ZooKeeper connect string parameter: {}", zooKeeperConnectString); + Preconditions.checkArgument(!Strings.isNullOrEmpty(zooKeeperConnectString), "ZooKeeper connect string must not be null"); + this.zooKeeperEndpointConfig = new ZooKeeperEndpointConfig(zooKeeperConnectString); } void readZooKeeper(OutputStream zkData, AuthMode authMode, byte[] authData) throws IOException, KeeperException, InterruptedException, ExecutionException { - ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig, authMode, authData); + ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig.getConnectString(), authMode, authData); JsonWriter jsonWriter = new JsonWriter(new BufferedWriter(new OutputStreamWriter(zkData))); jsonWriter.setIndent(" "); JsonParser jsonParser = new JsonParser(); @@ -93,8 +85,8 @@ class ZooKeeperMigrator { // persist source ZooKeeperEndpointConfig gson.toJson(jsonParser.parse(gson.toJson(zooKeeperEndpointConfig)).getAsJsonObject(), jsonWriter); - LOGGER.info("Persisting data from source ZooKeeper: {}", zooKeeperEndpointConfig); - final List> readFutures = streamPaths(getNode(zooKeeper, zooKeeperEndpointConfig.getPath())) + LOGGER.info("Retrieving data from source ZooKeeper: {}", zooKeeperEndpointConfig); + final List> readFutures = streamPaths(getNode(zooKeeper, "/")) .parallel() .map(node -> CompletableFuture.supplyAsync(() -> { @@ -125,7 +117,12 @@ class ZooKeeperMigrator { } void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, boolean ignoreSource) throws IOException, ExecutionException, InterruptedException { - ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig, authMode, authData); + // ensure that the chroot path exists + ZooKeeper zooKeeperRoot = getZooKeeper(Joiner.on(',').join(zooKeeperEndpointConfig.getServers()), authMode, authData); + ensureNodeExists(zooKeeperRoot, zooKeeperEndpointConfig.getPath(), CreateMode.PERSISTENT); + closeZooKeeper(zooKeeperRoot); + + ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig.getConnectString(), authMode, authData); JsonReader jsonReader = new JsonReader(new BufferedReader(new InputStreamReader(zkData))); Gson gson = new GsonBuilder().create(); @@ -134,10 +131,12 @@ class ZooKeeperMigrator { // determine source ZooKeeperEndpointConfig for this data final ZooKeeperEndpointConfig sourceZooKeeperEndpointConfig = gson.fromJson(jsonReader, ZooKeeperEndpointConfig.class); LOGGER.info("Source data was obtained from ZooKeeper: {}", sourceZooKeeperEndpointConfig); - Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath()), - "Source ZooKeeper %s from %s is invalid", sourceZooKeeperEndpointConfig, zkData); - Preconditions.checkArgument(!(zooKeeperEndpointConfig.equals(sourceZooKeeperEndpointConfig) && !ignoreSource), - "Source ZooKeeper config %s for the data provided can not be the same as the configured destination ZooKeeper config %s", + Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath()) + && sourceZooKeeperEndpointConfig.getServers() != null && sourceZooKeeperEndpointConfig.getServers().size() > 0, "Source ZooKeeper %s from %s is invalid", + sourceZooKeeperEndpointConfig, zkData); + Preconditions.checkArgument(Collections.disjoint(zooKeeperEndpointConfig.getServers(), sourceZooKeeperEndpointConfig.getServers()) + || !zooKeeperEndpointConfig.getPath().equals(sourceZooKeeperEndpointConfig.getPath()) || ignoreSource, + "Source ZooKeeper config %s for the data provided can not contain the same server and path as the configured destination ZooKeeper config %s", sourceZooKeeperEndpointConfig, zooKeeperEndpointConfig); // stream through each node read from the json input @@ -271,9 +270,8 @@ class ZooKeeperMigrator { } private DataStatAclNode transformNode(DataStatAclNode node, AuthMode destinationAuthMode) { - String migrationPath = '/' + Joiner.on('/').skipNulls().join(Splitter.on('/').omitEmptyStrings().trimResults().split(zooKeeperEndpointConfig.getPath() + node.getPath())); // For the NiFi use case, all nodes will be migrated to CREATOR_ALL_ACL - final DataStatAclNode migratedNode = new DataStatAclNode(migrationPath, node.getData(), node.getStat(), + final DataStatAclNode migratedNode = new DataStatAclNode(node.getPath(), node.getData(), node.getStat(), destinationAuthMode.equals(AuthMode.OPEN) ? ZooDefs.Ids.OPEN_ACL_UNSAFE : ZooDefs.Ids.CREATOR_ALL_ACL, node.getEphemeralOwner()); LOGGER.info("transformed original node {} to {}", node, migratedNode); @@ -298,11 +296,11 @@ class ZooKeeperMigrator { return node.getStat(); } - private ZooKeeper getZooKeeper(ZooKeeperEndpointConfig zooKeeperEndpointConfig, AuthMode authMode, byte[] authData) throws IOException { + private ZooKeeper getZooKeeper(String zooKeeperConnectString, AuthMode authMode, byte[] authData) throws IOException { CountDownLatch connectionLatch = new CountDownLatch(1); - ZooKeeper zooKeeper = new ZooKeeper(zooKeeperEndpointConfig.getConnectString(), 3000, watchedEvent -> { + ZooKeeper zooKeeper = new ZooKeeper(zooKeeperConnectString, 3000, watchedEvent -> { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("ZooKeeper server state changed to {} in {}", watchedEvent.getState(), zooKeeperEndpointConfig); + LOGGER.debug("ZooKeeper server state changed to {} in {}", watchedEvent.getState(), zooKeeperConnectString); } if (watchedEvent.getType().equals(Watcher.Event.EventType.None) && watchedEvent.getState().equals(Watcher.Event.KeeperState.SyncConnected)) { connectionLatch.countDown(); @@ -315,12 +313,12 @@ class ZooKeeperMigrator { } catch (InterruptedException e) { closeZooKeeper(zooKeeper); Thread.currentThread().interrupt(); - throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", zooKeeperEndpointConfig), e); + throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", zooKeeperConnectString), e); } if (!connected) { closeZooKeeper(zooKeeper); - throw new IOException(String.format("unable to connect to %s", zooKeeperEndpointConfig)); + throw new IOException(String.format("unable to connect to %s", zooKeeperConnectString)); } if (authMode.equals(AuthMode.DIGEST)) { diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy index 5562734889..299fda521b 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy @@ -19,8 +19,10 @@ package org.apache.nifi.toolkit.zkmigrator import com.google.gson.Gson import com.google.gson.stream.JsonReader import org.apache.curator.test.TestingServer +import org.apache.curator.utils.ZKPaths import org.apache.zookeeper.CreateMode import org.apache.zookeeper.WatchedEvent +import org.apache.zookeeper.ZKUtil import org.apache.zookeeper.ZooDefs import org.apache.zookeeper.ZooKeeper import spock.lang.Ignore @@ -49,21 +51,22 @@ class ZooKeeperMigratorTest extends Specification { noExceptionThrown() } - def "Receive from open ZooKeeper without ACL migration"() { + def "Receive from open ZooKeeper"() { given: def server = new TestingServer() def client = new ZooKeeper(server.connectString, 3000, { WatchedEvent watchedEvent -> }) - def migrationPathRoot = '/nifi' - client.create(migrationPathRoot, 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) - def subPath = '1' - client.create("$migrationPathRoot/$subPath", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) - subPath = '1/a' - client.create("$migrationPathRoot/$subPath", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) - subPath = '2' - client.create("$migrationPathRoot/$subPath", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) - subPath = '3' - client.create("$migrationPathRoot/$subPath", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) + def migrationPathRoot = '/nifi/components' + ZKPaths.mkdirs(client, migrationPathRoot) + client.setData(migrationPathRoot, 'some data'.bytes, 0) + def componentName = '1' + client.create("$migrationPathRoot/$componentName", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) + componentName = '1/a' + client.create("$migrationPathRoot/$componentName", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) + componentName = '2' + client.create("$migrationPathRoot/$componentName", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) + componentName = '3' + client.create("$migrationPathRoot/$componentName", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) def outputFilePath = 'target/test-data.json' when: @@ -72,7 +75,7 @@ class ZooKeeperMigratorTest extends Specification { then: noExceptionThrown() def persistedData = new Gson().fromJson(new JsonReader(new FileReader(outputFilePath)), List) as List - 6 == persistedData.size(); + persistedData.size() == 6 } def "Send to open ZooKeeper without ACL migration"() { @@ -87,8 +90,8 @@ class ZooKeeperMigratorTest extends Specification { then: noExceptionThrown() - def nodes = getChildren(client, migrationPathRoot, []) - 6 == nodes.size() + def nodes = ZKPaths.getSortedChildren(client, '/').collect { ZKUtil.listSubTreeBFS(client, "/$it") }.flatten() + nodes.size() == 6 } def "Send to open ZooKeeper without ACL migration with new multi-node parent"() { @@ -103,8 +106,8 @@ class ZooKeeperMigratorTest extends Specification { then: noExceptionThrown() - def nodes = getChildren(client, migrationPathRoot, []) - 6 == nodes.size() + def nodes = ZKPaths.getSortedChildren(client, '/').collect { ZKUtil.listSubTreeBFS(client, "/$it") }.flatten() + nodes.size() == 7 } def "Receive all nodes from ZooKeeper root"() { @@ -123,7 +126,7 @@ class ZooKeeperMigratorTest extends Specification { then: noExceptionThrown() def persistedData = new Gson().fromJson(new JsonReader(new FileReader(outputFilePath)), List) as List - 5 == persistedData.size(); + persistedData.size() == 5 } def "Receive Zookeeper node created with username and password"() { @@ -144,7 +147,7 @@ class ZooKeeperMigratorTest extends Specification { then: noExceptionThrown() def persistedData = new Gson().fromJson(new JsonReader(new FileReader(outputFilePath)), List) as List - 2 == persistedData.size(); + persistedData.size() == 2 } def "Send to Zookeeper a node created with username and password"() { @@ -162,48 +165,59 @@ class ZooKeeperMigratorTest extends Specification { then: noExceptionThrown() - def nodes = getChildren(client, migrationPathRoot, []) - 2 == nodes.size() + def nodes = ZKPaths.getSortedChildren(client, '/').collect { ZKUtil.listSubTreeBFS(client, "/$it") }.flatten() + nodes.size() == 3 } - def "Send to open Zookeeper root"() { + def "Send to open Zookeeper with ACL migration"() { given: def server = new TestingServer() def client = new ZooKeeper(server.connectString, 3000, { WatchedEvent watchedEvent -> }) - def migrationPathRoot = '/' + def migrationPathRoot = '/nifi-open' when: ZooKeeperMigratorMain.main(['-s', '-z', "$server.connectString$migrationPathRoot", '-f', 'src/test/resources/test-data-user-pass.json'] as String[]) then: noExceptionThrown() - def nodes = getChildren(client, migrationPathRoot, []) - 4 == nodes.size() + def nodes = ZKPaths.getSortedChildren(client, '/').collect { ZKUtil.listSubTreeBFS(client, "/$it") }.flatten() + nodes.size() == 3 } def "Parse Zookeeper connect string and path"() { when: - def zooKeeperMigrator = new ZooKeeperMigrator("$connectStringAndPath") - def tokens = connectStringAndPath.split('/', 2) as List - def connectString = tokens[0] - def path = '/' + (tokens.size() > 1 ? tokens[1] : '') + def zooKeeperMigrator = new ZooKeeperMigrator("$connectString") then: - connectString == zooKeeperMigrator.getZooKeeperEndpointConfig().connectString - path == zooKeeperMigrator.getZooKeeperEndpointConfig().path + zooKeeperMigrator.zooKeeperEndpointConfig.connectString == connectString + zooKeeperMigrator.zooKeeperEndpointConfig.servers == servers.split(',').collect() + zooKeeperMigrator.zooKeeperEndpointConfig.path == path where: - connectStringAndPath || _ - '127.0.0.1' || _ - '127.0.0.1/' || _ - '127.0.0.1:2181' || _ - '127.0.0.1:2181/' || _ - '127.0.0.1/path' || _ - '127.0.0.1/path/node' || _ - '127.0.0.1:2181/' || _ - '127.0.0.1:2181/path' || _ - '127.0.0.1:2181/path/node' || _ + connectString | path | servers || _ + '127.0.0.1' | '/' | '127.0.0.1' || _ + '127.0.0.1,127.0.0.2' | '/' | '127.0.0.1,127.0.0.2' || _ + '127.0.0.1/' | '/' | '127.0.0.1' || _ + '127.0.0.1,127.0.0.2/' | '/' | '127.0.0.1,127.0.0.2' || _ + '127.0.0.1:2181' | '/' | '127.0.0.1:2181' || _ + '127.0.0.1,127.0.0.2:2181' | '/' | '127.0.0.1,127.0.0.2:2181' || _ + '127.0.0.1:2181/' | '/' | '127.0.0.1:2181' || _ + '127.0.0.1,127.0.0.2:2181/' | '/' | '127.0.0.1,127.0.0.2:2181' || _ + '127.0.0.1/path' | '/path' | '127.0.0.1' || _ + '127.0.0.1,127.0.0.2/path' | '/path' | '127.0.0.1,127.0.0.2' || _ + '127.0.0.1/path/node' | '/path/node' | '127.0.0.1' || _ + '127.0.0.1,127.0.0.2/path/node' | '/path/node' | '127.0.0.1,127.0.0.2' || _ + '127.0.0.1:2181/' | '/' | '127.0.0.1:2181' || _ + '127.0.0.1,127.0.0.2:2181/' | '/' | '127.0.0.1,127.0.0.2:2181' || _ + '127.0.0.1:2181/path' | '/path' | '127.0.0.1:2181' || _ + '127.0.0.1,127.0.0.2:2181/path' | '/path' | '127.0.0.1,127.0.0.2:2181' || _ + '127.0.0.1:2181/path/node' | '/path/node' | '127.0.0.1:2181' || _ + '127.0.0.1,127.0.0.2:2181/path/node' | '/path/node' | '127.0.0.1,127.0.0.2:2181' || _ + '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' | '/' | '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' || _ + '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183/' | '/' | '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' || _ + '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183/path' | '/path' | '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' || _ + '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183/path/node' | '/path/node' | '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' || _ } def "Test ignore source"() { @@ -215,7 +229,7 @@ class ZooKeeperMigratorTest extends Specification { when: "data is read from the source zookeeper" ZooKeeperMigratorMain.main(['-r', '-z', connectString, '-f', dataPath] as String[]) - then: "verify the data has been written the output file" + then: "verify the data has been written to the output file" new File(dataPath).exists() when: "data is sent to the same zookeeper as the the source zookeeper without ignore source" @@ -231,13 +245,21 @@ class ZooKeeperMigratorTest extends Specification { noExceptionThrown() } - def List getChildren(ZooKeeper client, String path, List ag) { - def children = client.getChildren(path, null) - ag.add path - children.forEach { - def childPath = "/${(path.tokenize('/') + it).join('/')}" - getChildren(client, childPath, ag) - } - ag + def "Send to same ZooKeeper with different path"() { + def server = new TestingServer() + def connectString = "$server.connectString" + def dataPath = 'target/test-data-different-path.json' + + when: "data is read from the source zookeeper" + ZooKeeperMigratorMain.main(['-r', '-z', connectString, '-f', dataPath] as String[]) + + then: "verify the data has been written to the output file" + new File(dataPath).exists() + + when: "data is sent to the same zookeeper as the the source zookeeper with a different path" + ZooKeeperMigratorMain.main(['-s', '-z', "$connectString/new-path", '-f', dataPath] as String[]) + + then: "no exceptions are thrown" + noExceptionThrown() } } diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data-user-pass.json b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data-user-pass.json index e38bd91b18..87cb130d24 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data-user-pass.json +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data-user-pass.json @@ -1,21 +1,13 @@ [ { - "connectString": "127.0.0.1:62406", - "path": "/nifi", - "auth": [ - 110, - 105, - 102, - 105, - 58, - 110, - 105, - 102, - 105 - ] + "connectString": "127.0.0.1:62317/nifi", + "servers": [ + "127.0.0.1:62317" + ], + "path": "/nifi" }, { - "path": "/nifi", + "path": "/", "data": [ 115, 111, @@ -30,8 +22,8 @@ "stat": { "czxid": 2, "mzxid": 2, - "ctime": 1478010596964, - "mtime": 1478010596964, + "ctime": 1485792794977, + "mtime": 1485792794977, "version": 0, "cversion": 0, "aversion": 0, diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data.json b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data.json index 758270a0f6..a7e5edc5d1 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data.json +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data.json @@ -1,121 +1,13 @@ [ { - "connectString": "127.0.0.1:0", - "path": "/nifi" + "connectString": "127.0.0.1:62295/nifi/components", + "servers": [ + "127.0.0.1:62295" + ], + "path": "/nifi/components" }, { - "path": "/nifi", - "data": [ - 115, - 111, - 109, - 101, - 32, - 100, - 97, - 116, - 97 - ], - "stat": { - "czxid": 2, - "mzxid": 2, - "ctime": 1477602095884, - "mtime": 1477602095884, - "version": 0, - "cversion": 3, - "aversion": 0, - "ephemeralOwner": 0, - "dataLength": 9, - "numChildren": 3, - "pzxid": 6 - }, - "acls": [ - { - "perms": 31, - "id": { - "scheme": "world", - "id": "anyone" - } - } - ], - "ephemeralOwner": 0 - }, - { - "path": "/nifi/1/a", - "data": [ - 115, - 111, - 109, - 101, - 32, - 100, - 97, - 116, - 97 - ], - "stat": { - "czxid": 4, - "mzxid": 4, - "ctime": 1477602095888, - "mtime": 1477602095888, - "version": 0, - "cversion": 0, - "aversion": 0, - "ephemeralOwner": 0, - "dataLength": 9, - "numChildren": 0, - "pzxid": 4 - }, - "acls": [ - { - "perms": 31, - "id": { - "scheme": "world", - "id": "anyone" - } - } - ], - "ephemeralOwner": 0 - }, - { - "path": "/nifi/2", - "data": [ - 115, - 111, - 109, - 101, - 32, - 100, - 97, - 116, - 97 - ], - "stat": { - "czxid": 5, - "mzxid": 5, - "ctime": 1477602095889, - "mtime": 1477602095889, - "version": 0, - "cversion": 0, - "aversion": 0, - "ephemeralOwner": 0, - "dataLength": 9, - "numChildren": 0, - "pzxid": 5 - }, - "acls": [ - { - "perms": 31, - "id": { - "scheme": "world", - "id": "anyone" - } - } - ], - "ephemeralOwner": 0 - }, - { - "path": "/nifi/3", + "path": "/1/a", "data": [ 115, 111, @@ -130,12 +22,12 @@ "stat": { "czxid": 6, "mzxid": 6, - "ctime": 1477602095890, - "mtime": 1477602095890, + "ctime": 1485792790772, + "mtime": 1485792790772, "version": 0, "cversion": 0, "aversion": 0, - "ephemeralOwner": 0, + "ephemeralOwner": 97372916257193984, "dataLength": 9, "numChildren": 0, "pzxid": 6 @@ -149,10 +41,121 @@ } } ], + "ephemeralOwner": 97372916257193984 + }, + { + "path": "/1", + "data": [ + 115, + 111, + 109, + 101, + 32, + 100, + 97, + 116, + 97 + ], + "stat": { + "czxid": 5, + "mzxid": 5, + "ctime": 1485792790771, + "mtime": 1485792790771, + "version": 0, + "cversion": 1, + "aversion": 0, + "ephemeralOwner": 0, + "dataLength": 9, + "numChildren": 1, + "pzxid": 6 + }, + "acls": [ + { + "perms": 31, + "id": { + "scheme": "world", + "id": "anyone" + } + } + ], "ephemeralOwner": 0 }, { - "path": "/nifi/1", + "path": "/3", + "data": [ + 115, + 111, + 109, + 101, + 32, + 100, + 97, + 116, + 97 + ], + "stat": { + "czxid": 8, + "mzxid": 8, + "ctime": 1485792790773, + "mtime": 1485792790773, + "version": 0, + "cversion": 0, + "aversion": 0, + "ephemeralOwner": 0, + "dataLength": 9, + "numChildren": 0, + "pzxid": 8 + }, + "acls": [ + { + "perms": 31, + "id": { + "scheme": "world", + "id": "anyone" + } + } + ], + "ephemeralOwner": 0 + }, + { + "path": "/2", + "data": [ + 115, + 111, + 109, + 101, + 32, + 100, + 97, + 116, + 97 + ], + "stat": { + "czxid": 7, + "mzxid": 7, + "ctime": 1485792790772, + "mtime": 1485792790772, + "version": 0, + "cversion": 0, + "aversion": 0, + "ephemeralOwner": 0, + "dataLength": 9, + "numChildren": 0, + "pzxid": 7 + }, + "acls": [ + { + "perms": 31, + "id": { + "scheme": "world", + "id": "anyone" + } + } + ], + "ephemeralOwner": 0 + }, + { + "path": "/", "data": [ 115, 111, @@ -166,16 +169,16 @@ ], "stat": { "czxid": 3, - "mzxid": 3, - "ctime": 1477602095888, - "mtime": 1477602095888, - "version": 0, - "cversion": 1, + "mzxid": 4, + "ctime": 1485792790757, + "mtime": 1485792790762, + "version": 1, + "cversion": 3, "aversion": 0, "ephemeralOwner": 0, "dataLength": 9, - "numChildren": 1, - "pzxid": 4 + "numChildren": 3, + "pzxid": 8 }, "acls": [ {