mirror of https://github.com/apache/nifi.git
NIFI-3300 Implemented usage of ZooKeeper chroot capability in the connect string
Updated ZooKeeper connect string parsing tests Updated admin doc for ZooKeeper Migrator migration of nifi root nodes, updated source and destination ZK check by servers in the connection string instead of the entire connection string Added check between source and destination ZooKeeper paths to allow data to be written to the same ZooKeeper with a different path Added test for writing to the same ZooKeeper with a different path Added type parameter to server list in for ZooKeeperEndpointConfig This closes #1456. Signed-off-by: Bryan Rosander <brosander@apache.org>
This commit is contained in:
parent
f77b5a5014
commit
af2861f105
|
@ -1678,7 +1678,7 @@ You can use the following command line options with the ZooKeeper Migrator:
|
|||
* `-k,--krb-conf <jaas-filename>` Allows the specification of a JAAS configuration file to allow authentication with a ZooKeeper configured to use Kerberos. This option is mutually exclusive with the `-a,--auth` option.
|
||||
* `-r,--receive` Receives data from ZooKeeper and writes to the given filename (if the `-f,--file` option is provided) or standard output. The data received will contain the full path to each node read from ZooKeeper. This option is mutually exclusive with the `-s,--send` option.
|
||||
* `-s,--send` Sends data to ZooKeeper that is read from the given filename (if the `-f,--file` option is provided) or standard input. The paths for each node in the data being sent to ZooKeeper are absolute paths, and will be stored in ZooKeeper under the *path* portion of the `-z,--zookeeper` argument. Typically, the *path* portion of the argument can be omitted, which will store the nodes at their absolute paths. This option is mutually exclusive with the `-r,--receive` option.
|
||||
* `-z,--zookeeper <zookeeper-endpoint>` The ZooKeeper server to use, specified by a connection string with path, in the format of _host:port/znode/path_.
|
||||
* `-z,--zookeeper <zookeeper-endpoint>` The ZooKeeper server(s) to use, specified by a connect string, comprised of one or more comma-separated host:port pairs followed by a path, in the format of _host:port[,host2:port...,hostn:port]/znode/path_.
|
||||
|
||||
[[migrating_between_source_destination_zookeepers]]
|
||||
==== Migrating Between Source and Destination ZooKeepers
|
||||
|
@ -1737,9 +1737,9 @@ If the state-management.xml specifies Open, no authentication is required.
|
|||
5. Migrate the ZooKeeper data to the destination ZooKeeper. If the source and destination ZooKeepers are the same, the `--ignore-source` option can be added to the following examples.
|
||||
|
||||
* For an open ZooKeeper:
|
||||
** zk-migrator.sh -s -z *destinationHostname:destinationClientPort*/ -f /*path*/*to*/*export*/*zk-source-data.json*
|
||||
** zk-migrator.sh -s -z *destinationHostname:destinationClientPort*/*destinationRootPath*/components -f /*path*/*to*/*export*/*zk-source-data.json*
|
||||
* For a ZooKeeper using Kerberos for authentication:
|
||||
** zk-migrator.sh -s -z *destinationHostname:destinationClientPort*/ -k /*path*/*to*/*jaasconfig*/*jaas-config.conf* -f /*path*/*to*/*export*/*zk-source-data.json*
|
||||
** zk-migrator.sh -s -z *destinationHostname:destinationClientPort*/*destinationRootPath*/components -k /*path*/*to*/*jaasconfig*/*jaas-config.conf* -f /*path*/*to*/*export*/*zk-source-data.json*
|
||||
|
||||
6. Once the migration has completed successfully, start the processors in the NiFi flow. Processing should continue from the point at which it was stopped when the NiFi flow was stopped.
|
||||
|
||||
|
|
|
@ -69,6 +69,12 @@
|
|||
<artifactId>curator-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.curator</groupId>
|
||||
<artifactId>curator-client</artifactId>
|
||||
<version>2.11.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -21,24 +21,37 @@ import com.google.common.base.MoreObjects;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
class ZooKeeperEndpointConfig {
|
||||
private final String connectString;
|
||||
private final List<String> servers;
|
||||
private final String path;
|
||||
|
||||
ZooKeeperEndpointConfig(String connectString, String path) {
|
||||
ZooKeeperEndpointConfig(String connectString) {
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(connectString), "connectString can not be null or empty");
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(path), "path can not be null or empty");
|
||||
this.connectString = connectString;
|
||||
this.path = '/' + Joiner.on('/').join(Splitter.on('/').omitEmptyStrings().trimResults().split(path));
|
||||
|
||||
final String[] connectStringPath = connectString.split("/", 2);
|
||||
this.servers = Lists.newArrayList(connectStringPath[0].split(","));
|
||||
if (connectStringPath.length == 2) {
|
||||
this.path = '/' + Joiner.on('/').join(Splitter.on('/').omitEmptyStrings().trimResults().split(connectStringPath[1]));
|
||||
} else {
|
||||
path = "/";
|
||||
}
|
||||
}
|
||||
|
||||
public String getConnectString() {
|
||||
return connectString;
|
||||
}
|
||||
|
||||
public List getServers() {
|
||||
return servers;
|
||||
}
|
||||
|
||||
public String getPath() {
|
||||
return path;
|
||||
}
|
||||
|
@ -48,18 +61,21 @@ class ZooKeeperEndpointConfig {
|
|||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
ZooKeeperEndpointConfig that = (ZooKeeperEndpointConfig) o;
|
||||
return Objects.equals(connectString, that.connectString) && Objects.equals(path, that.path);
|
||||
return Objects.equals(connectString, that.connectString)
|
||||
&& Objects.equals(servers, that.servers)
|
||||
&& Objects.equals(path, that.path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(connectString, path);
|
||||
return Objects.hash(connectString, servers, path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("connectString", connectString)
|
||||
.add("servers", servers)
|
||||
.add("path", path)
|
||||
.toString();
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import java.io.InputStreamReader;
|
|||
import java.io.OutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Spliterators;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
@ -66,23 +67,14 @@ class ZooKeeperMigrator {
|
|||
|
||||
private final ZooKeeperEndpointConfig zooKeeperEndpointConfig;
|
||||
|
||||
ZooKeeperMigrator(String zookeeperEndpoint) {
|
||||
LOGGER.debug("ZooKeeper endpoint parameter: {}", zookeeperEndpoint);
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(zookeeperEndpoint), "zookeeper endpoint must not be null");
|
||||
final String[] connectStringPath = zookeeperEndpoint.split("/", 2);
|
||||
Preconditions.checkArgument(connectStringPath.length >= 1, "invalid ZooKeeper endpoint: %s", zookeeperEndpoint);
|
||||
final String connectString = connectStringPath[0];
|
||||
final String path;
|
||||
if (connectStringPath.length == 2) {
|
||||
path = connectStringPath[1];
|
||||
} else {
|
||||
path = "";
|
||||
}
|
||||
this.zooKeeperEndpointConfig = new ZooKeeperEndpointConfig(connectString, "/" + path);
|
||||
ZooKeeperMigrator(String zooKeeperConnectString) {
|
||||
LOGGER.debug("ZooKeeper connect string parameter: {}", zooKeeperConnectString);
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(zooKeeperConnectString), "ZooKeeper connect string must not be null");
|
||||
this.zooKeeperEndpointConfig = new ZooKeeperEndpointConfig(zooKeeperConnectString);
|
||||
}
|
||||
|
||||
void readZooKeeper(OutputStream zkData, AuthMode authMode, byte[] authData) throws IOException, KeeperException, InterruptedException, ExecutionException {
|
||||
ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig, authMode, authData);
|
||||
ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig.getConnectString(), authMode, authData);
|
||||
JsonWriter jsonWriter = new JsonWriter(new BufferedWriter(new OutputStreamWriter(zkData)));
|
||||
jsonWriter.setIndent(" ");
|
||||
JsonParser jsonParser = new JsonParser();
|
||||
|
@ -93,8 +85,8 @@ class ZooKeeperMigrator {
|
|||
// persist source ZooKeeperEndpointConfig
|
||||
gson.toJson(jsonParser.parse(gson.toJson(zooKeeperEndpointConfig)).getAsJsonObject(), jsonWriter);
|
||||
|
||||
LOGGER.info("Persisting data from source ZooKeeper: {}", zooKeeperEndpointConfig);
|
||||
final List<CompletableFuture<Void>> readFutures = streamPaths(getNode(zooKeeper, zooKeeperEndpointConfig.getPath()))
|
||||
LOGGER.info("Retrieving data from source ZooKeeper: {}", zooKeeperEndpointConfig);
|
||||
final List<CompletableFuture<Void>> readFutures = streamPaths(getNode(zooKeeper, "/"))
|
||||
.parallel()
|
||||
.map(node ->
|
||||
CompletableFuture.supplyAsync(() -> {
|
||||
|
@ -125,7 +117,12 @@ class ZooKeeperMigrator {
|
|||
}
|
||||
|
||||
void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, boolean ignoreSource) throws IOException, ExecutionException, InterruptedException {
|
||||
ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig, authMode, authData);
|
||||
// ensure that the chroot path exists
|
||||
ZooKeeper zooKeeperRoot = getZooKeeper(Joiner.on(',').join(zooKeeperEndpointConfig.getServers()), authMode, authData);
|
||||
ensureNodeExists(zooKeeperRoot, zooKeeperEndpointConfig.getPath(), CreateMode.PERSISTENT);
|
||||
closeZooKeeper(zooKeeperRoot);
|
||||
|
||||
ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig.getConnectString(), authMode, authData);
|
||||
JsonReader jsonReader = new JsonReader(new BufferedReader(new InputStreamReader(zkData)));
|
||||
Gson gson = new GsonBuilder().create();
|
||||
|
||||
|
@ -134,10 +131,12 @@ class ZooKeeperMigrator {
|
|||
// determine source ZooKeeperEndpointConfig for this data
|
||||
final ZooKeeperEndpointConfig sourceZooKeeperEndpointConfig = gson.fromJson(jsonReader, ZooKeeperEndpointConfig.class);
|
||||
LOGGER.info("Source data was obtained from ZooKeeper: {}", sourceZooKeeperEndpointConfig);
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath()),
|
||||
"Source ZooKeeper %s from %s is invalid", sourceZooKeeperEndpointConfig, zkData);
|
||||
Preconditions.checkArgument(!(zooKeeperEndpointConfig.equals(sourceZooKeeperEndpointConfig) && !ignoreSource),
|
||||
"Source ZooKeeper config %s for the data provided can not be the same as the configured destination ZooKeeper config %s",
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath())
|
||||
&& sourceZooKeeperEndpointConfig.getServers() != null && sourceZooKeeperEndpointConfig.getServers().size() > 0, "Source ZooKeeper %s from %s is invalid",
|
||||
sourceZooKeeperEndpointConfig, zkData);
|
||||
Preconditions.checkArgument(Collections.disjoint(zooKeeperEndpointConfig.getServers(), sourceZooKeeperEndpointConfig.getServers())
|
||||
|| !zooKeeperEndpointConfig.getPath().equals(sourceZooKeeperEndpointConfig.getPath()) || ignoreSource,
|
||||
"Source ZooKeeper config %s for the data provided can not contain the same server and path as the configured destination ZooKeeper config %s",
|
||||
sourceZooKeeperEndpointConfig, zooKeeperEndpointConfig);
|
||||
|
||||
// stream through each node read from the json input
|
||||
|
@ -271,9 +270,8 @@ class ZooKeeperMigrator {
|
|||
}
|
||||
|
||||
private DataStatAclNode transformNode(DataStatAclNode node, AuthMode destinationAuthMode) {
|
||||
String migrationPath = '/' + Joiner.on('/').skipNulls().join(Splitter.on('/').omitEmptyStrings().trimResults().split(zooKeeperEndpointConfig.getPath() + node.getPath()));
|
||||
// For the NiFi use case, all nodes will be migrated to CREATOR_ALL_ACL
|
||||
final DataStatAclNode migratedNode = new DataStatAclNode(migrationPath, node.getData(), node.getStat(),
|
||||
final DataStatAclNode migratedNode = new DataStatAclNode(node.getPath(), node.getData(), node.getStat(),
|
||||
destinationAuthMode.equals(AuthMode.OPEN) ? ZooDefs.Ids.OPEN_ACL_UNSAFE : ZooDefs.Ids.CREATOR_ALL_ACL,
|
||||
node.getEphemeralOwner());
|
||||
LOGGER.info("transformed original node {} to {}", node, migratedNode);
|
||||
|
@ -298,11 +296,11 @@ class ZooKeeperMigrator {
|
|||
return node.getStat();
|
||||
}
|
||||
|
||||
private ZooKeeper getZooKeeper(ZooKeeperEndpointConfig zooKeeperEndpointConfig, AuthMode authMode, byte[] authData) throws IOException {
|
||||
private ZooKeeper getZooKeeper(String zooKeeperConnectString, AuthMode authMode, byte[] authData) throws IOException {
|
||||
CountDownLatch connectionLatch = new CountDownLatch(1);
|
||||
ZooKeeper zooKeeper = new ZooKeeper(zooKeeperEndpointConfig.getConnectString(), 3000, watchedEvent -> {
|
||||
ZooKeeper zooKeeper = new ZooKeeper(zooKeeperConnectString, 3000, watchedEvent -> {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("ZooKeeper server state changed to {} in {}", watchedEvent.getState(), zooKeeperEndpointConfig);
|
||||
LOGGER.debug("ZooKeeper server state changed to {} in {}", watchedEvent.getState(), zooKeeperConnectString);
|
||||
}
|
||||
if (watchedEvent.getType().equals(Watcher.Event.EventType.None) && watchedEvent.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
|
||||
connectionLatch.countDown();
|
||||
|
@ -315,12 +313,12 @@ class ZooKeeperMigrator {
|
|||
} catch (InterruptedException e) {
|
||||
closeZooKeeper(zooKeeper);
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", zooKeeperEndpointConfig), e);
|
||||
throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", zooKeeperConnectString), e);
|
||||
}
|
||||
|
||||
if (!connected) {
|
||||
closeZooKeeper(zooKeeper);
|
||||
throw new IOException(String.format("unable to connect to %s", zooKeeperEndpointConfig));
|
||||
throw new IOException(String.format("unable to connect to %s", zooKeeperConnectString));
|
||||
}
|
||||
|
||||
if (authMode.equals(AuthMode.DIGEST)) {
|
||||
|
|
|
@ -19,8 +19,10 @@ package org.apache.nifi.toolkit.zkmigrator
|
|||
import com.google.gson.Gson
|
||||
import com.google.gson.stream.JsonReader
|
||||
import org.apache.curator.test.TestingServer
|
||||
import org.apache.curator.utils.ZKPaths
|
||||
import org.apache.zookeeper.CreateMode
|
||||
import org.apache.zookeeper.WatchedEvent
|
||||
import org.apache.zookeeper.ZKUtil
|
||||
import org.apache.zookeeper.ZooDefs
|
||||
import org.apache.zookeeper.ZooKeeper
|
||||
import spock.lang.Ignore
|
||||
|
@ -49,21 +51,22 @@ class ZooKeeperMigratorTest extends Specification {
|
|||
noExceptionThrown()
|
||||
}
|
||||
|
||||
def "Receive from open ZooKeeper without ACL migration"() {
|
||||
def "Receive from open ZooKeeper"() {
|
||||
given:
|
||||
def server = new TestingServer()
|
||||
def client = new ZooKeeper(server.connectString, 3000, { WatchedEvent watchedEvent ->
|
||||
})
|
||||
def migrationPathRoot = '/nifi'
|
||||
client.create(migrationPathRoot, 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
|
||||
def subPath = '1'
|
||||
client.create("$migrationPathRoot/$subPath", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
|
||||
subPath = '1/a'
|
||||
client.create("$migrationPathRoot/$subPath", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
|
||||
subPath = '2'
|
||||
client.create("$migrationPathRoot/$subPath", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
|
||||
subPath = '3'
|
||||
client.create("$migrationPathRoot/$subPath", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
|
||||
def migrationPathRoot = '/nifi/components'
|
||||
ZKPaths.mkdirs(client, migrationPathRoot)
|
||||
client.setData(migrationPathRoot, 'some data'.bytes, 0)
|
||||
def componentName = '1'
|
||||
client.create("$migrationPathRoot/$componentName", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
|
||||
componentName = '1/a'
|
||||
client.create("$migrationPathRoot/$componentName", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
|
||||
componentName = '2'
|
||||
client.create("$migrationPathRoot/$componentName", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
|
||||
componentName = '3'
|
||||
client.create("$migrationPathRoot/$componentName", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
|
||||
def outputFilePath = 'target/test-data.json'
|
||||
|
||||
when:
|
||||
|
@ -72,7 +75,7 @@ class ZooKeeperMigratorTest extends Specification {
|
|||
then:
|
||||
noExceptionThrown()
|
||||
def persistedData = new Gson().fromJson(new JsonReader(new FileReader(outputFilePath)), List) as List
|
||||
6 == persistedData.size();
|
||||
persistedData.size() == 6
|
||||
}
|
||||
|
||||
def "Send to open ZooKeeper without ACL migration"() {
|
||||
|
@ -87,8 +90,8 @@ class ZooKeeperMigratorTest extends Specification {
|
|||
|
||||
then:
|
||||
noExceptionThrown()
|
||||
def nodes = getChildren(client, migrationPathRoot, [])
|
||||
6 == nodes.size()
|
||||
def nodes = ZKPaths.getSortedChildren(client, '/').collect { ZKUtil.listSubTreeBFS(client, "/$it") }.flatten()
|
||||
nodes.size() == 6
|
||||
}
|
||||
|
||||
def "Send to open ZooKeeper without ACL migration with new multi-node parent"() {
|
||||
|
@ -103,8 +106,8 @@ class ZooKeeperMigratorTest extends Specification {
|
|||
|
||||
then:
|
||||
noExceptionThrown()
|
||||
def nodes = getChildren(client, migrationPathRoot, [])
|
||||
6 == nodes.size()
|
||||
def nodes = ZKPaths.getSortedChildren(client, '/').collect { ZKUtil.listSubTreeBFS(client, "/$it") }.flatten()
|
||||
nodes.size() == 7
|
||||
}
|
||||
|
||||
def "Receive all nodes from ZooKeeper root"() {
|
||||
|
@ -123,7 +126,7 @@ class ZooKeeperMigratorTest extends Specification {
|
|||
then:
|
||||
noExceptionThrown()
|
||||
def persistedData = new Gson().fromJson(new JsonReader(new FileReader(outputFilePath)), List) as List
|
||||
5 == persistedData.size();
|
||||
persistedData.size() == 5
|
||||
}
|
||||
|
||||
def "Receive Zookeeper node created with username and password"() {
|
||||
|
@ -144,7 +147,7 @@ class ZooKeeperMigratorTest extends Specification {
|
|||
then:
|
||||
noExceptionThrown()
|
||||
def persistedData = new Gson().fromJson(new JsonReader(new FileReader(outputFilePath)), List) as List
|
||||
2 == persistedData.size();
|
||||
persistedData.size() == 2
|
||||
}
|
||||
|
||||
def "Send to Zookeeper a node created with username and password"() {
|
||||
|
@ -162,48 +165,59 @@ class ZooKeeperMigratorTest extends Specification {
|
|||
|
||||
then:
|
||||
noExceptionThrown()
|
||||
def nodes = getChildren(client, migrationPathRoot, [])
|
||||
2 == nodes.size()
|
||||
def nodes = ZKPaths.getSortedChildren(client, '/').collect { ZKUtil.listSubTreeBFS(client, "/$it") }.flatten()
|
||||
nodes.size() == 3
|
||||
}
|
||||
|
||||
def "Send to open Zookeeper root"() {
|
||||
def "Send to open Zookeeper with ACL migration"() {
|
||||
given:
|
||||
def server = new TestingServer()
|
||||
def client = new ZooKeeper(server.connectString, 3000, { WatchedEvent watchedEvent ->
|
||||
})
|
||||
def migrationPathRoot = '/'
|
||||
def migrationPathRoot = '/nifi-open'
|
||||
|
||||
when:
|
||||
ZooKeeperMigratorMain.main(['-s', '-z', "$server.connectString$migrationPathRoot", '-f', 'src/test/resources/test-data-user-pass.json'] as String[])
|
||||
|
||||
then:
|
||||
noExceptionThrown()
|
||||
def nodes = getChildren(client, migrationPathRoot, [])
|
||||
4 == nodes.size()
|
||||
def nodes = ZKPaths.getSortedChildren(client, '/').collect { ZKUtil.listSubTreeBFS(client, "/$it") }.flatten()
|
||||
nodes.size() == 3
|
||||
}
|
||||
|
||||
def "Parse Zookeeper connect string and path"() {
|
||||
when:
|
||||
def zooKeeperMigrator = new ZooKeeperMigrator("$connectStringAndPath")
|
||||
def tokens = connectStringAndPath.split('/', 2) as List
|
||||
def connectString = tokens[0]
|
||||
def path = '/' + (tokens.size() > 1 ? tokens[1] : '')
|
||||
def zooKeeperMigrator = new ZooKeeperMigrator("$connectString")
|
||||
|
||||
then:
|
||||
connectString == zooKeeperMigrator.getZooKeeperEndpointConfig().connectString
|
||||
path == zooKeeperMigrator.getZooKeeperEndpointConfig().path
|
||||
zooKeeperMigrator.zooKeeperEndpointConfig.connectString == connectString
|
||||
zooKeeperMigrator.zooKeeperEndpointConfig.servers == servers.split(',').collect()
|
||||
zooKeeperMigrator.zooKeeperEndpointConfig.path == path
|
||||
|
||||
where:
|
||||
connectStringAndPath || _
|
||||
'127.0.0.1' || _
|
||||
'127.0.0.1/' || _
|
||||
'127.0.0.1:2181' || _
|
||||
'127.0.0.1:2181/' || _
|
||||
'127.0.0.1/path' || _
|
||||
'127.0.0.1/path/node' || _
|
||||
'127.0.0.1:2181/' || _
|
||||
'127.0.0.1:2181/path' || _
|
||||
'127.0.0.1:2181/path/node' || _
|
||||
connectString | path | servers || _
|
||||
'127.0.0.1' | '/' | '127.0.0.1' || _
|
||||
'127.0.0.1,127.0.0.2' | '/' | '127.0.0.1,127.0.0.2' || _
|
||||
'127.0.0.1/' | '/' | '127.0.0.1' || _
|
||||
'127.0.0.1,127.0.0.2/' | '/' | '127.0.0.1,127.0.0.2' || _
|
||||
'127.0.0.1:2181' | '/' | '127.0.0.1:2181' || _
|
||||
'127.0.0.1,127.0.0.2:2181' | '/' | '127.0.0.1,127.0.0.2:2181' || _
|
||||
'127.0.0.1:2181/' | '/' | '127.0.0.1:2181' || _
|
||||
'127.0.0.1,127.0.0.2:2181/' | '/' | '127.0.0.1,127.0.0.2:2181' || _
|
||||
'127.0.0.1/path' | '/path' | '127.0.0.1' || _
|
||||
'127.0.0.1,127.0.0.2/path' | '/path' | '127.0.0.1,127.0.0.2' || _
|
||||
'127.0.0.1/path/node' | '/path/node' | '127.0.0.1' || _
|
||||
'127.0.0.1,127.0.0.2/path/node' | '/path/node' | '127.0.0.1,127.0.0.2' || _
|
||||
'127.0.0.1:2181/' | '/' | '127.0.0.1:2181' || _
|
||||
'127.0.0.1,127.0.0.2:2181/' | '/' | '127.0.0.1,127.0.0.2:2181' || _
|
||||
'127.0.0.1:2181/path' | '/path' | '127.0.0.1:2181' || _
|
||||
'127.0.0.1,127.0.0.2:2181/path' | '/path' | '127.0.0.1,127.0.0.2:2181' || _
|
||||
'127.0.0.1:2181/path/node' | '/path/node' | '127.0.0.1:2181' || _
|
||||
'127.0.0.1,127.0.0.2:2181/path/node' | '/path/node' | '127.0.0.1,127.0.0.2:2181' || _
|
||||
'127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' | '/' | '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' || _
|
||||
'127.0.0.1,127.0.0.2:2182,127.0.0.3:2183/' | '/' | '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' || _
|
||||
'127.0.0.1,127.0.0.2:2182,127.0.0.3:2183/path' | '/path' | '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' || _
|
||||
'127.0.0.1,127.0.0.2:2182,127.0.0.3:2183/path/node' | '/path/node' | '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' || _
|
||||
}
|
||||
|
||||
def "Test ignore source"() {
|
||||
|
@ -215,7 +229,7 @@ class ZooKeeperMigratorTest extends Specification {
|
|||
when: "data is read from the source zookeeper"
|
||||
ZooKeeperMigratorMain.main(['-r', '-z', connectString, '-f', dataPath] as String[])
|
||||
|
||||
then: "verify the data has been written the output file"
|
||||
then: "verify the data has been written to the output file"
|
||||
new File(dataPath).exists()
|
||||
|
||||
when: "data is sent to the same zookeeper as the the source zookeeper without ignore source"
|
||||
|
@ -231,13 +245,21 @@ class ZooKeeperMigratorTest extends Specification {
|
|||
noExceptionThrown()
|
||||
}
|
||||
|
||||
def List<String> getChildren(ZooKeeper client, String path, List<String> ag) {
|
||||
def children = client.getChildren(path, null)
|
||||
ag.add path
|
||||
children.forEach {
|
||||
def childPath = "/${(path.tokenize('/') + it).join('/')}"
|
||||
getChildren(client, childPath, ag)
|
||||
}
|
||||
ag
|
||||
def "Send to same ZooKeeper with different path"() {
|
||||
def server = new TestingServer()
|
||||
def connectString = "$server.connectString"
|
||||
def dataPath = 'target/test-data-different-path.json'
|
||||
|
||||
when: "data is read from the source zookeeper"
|
||||
ZooKeeperMigratorMain.main(['-r', '-z', connectString, '-f', dataPath] as String[])
|
||||
|
||||
then: "verify the data has been written to the output file"
|
||||
new File(dataPath).exists()
|
||||
|
||||
when: "data is sent to the same zookeeper as the the source zookeeper with a different path"
|
||||
ZooKeeperMigratorMain.main(['-s', '-z', "$connectString/new-path", '-f', dataPath] as String[])
|
||||
|
||||
then: "no exceptions are thrown"
|
||||
noExceptionThrown()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,21 +1,13 @@
|
|||
[
|
||||
{
|
||||
"connectString": "127.0.0.1:62406",
|
||||
"path": "/nifi",
|
||||
"auth": [
|
||||
110,
|
||||
105,
|
||||
102,
|
||||
105,
|
||||
58,
|
||||
110,
|
||||
105,
|
||||
102,
|
||||
105
|
||||
]
|
||||
"connectString": "127.0.0.1:62317/nifi",
|
||||
"servers": [
|
||||
"127.0.0.1:62317"
|
||||
],
|
||||
"path": "/nifi"
|
||||
},
|
||||
{
|
||||
"path": "/nifi",
|
||||
"path": "/",
|
||||
"data": [
|
||||
115,
|
||||
111,
|
||||
|
@ -30,8 +22,8 @@
|
|||
"stat": {
|
||||
"czxid": 2,
|
||||
"mzxid": 2,
|
||||
"ctime": 1478010596964,
|
||||
"mtime": 1478010596964,
|
||||
"ctime": 1485792794977,
|
||||
"mtime": 1485792794977,
|
||||
"version": 0,
|
||||
"cversion": 0,
|
||||
"aversion": 0,
|
||||
|
|
|
@ -1,121 +1,13 @@
|
|||
[
|
||||
{
|
||||
"connectString": "127.0.0.1:0",
|
||||
"path": "/nifi"
|
||||
"connectString": "127.0.0.1:62295/nifi/components",
|
||||
"servers": [
|
||||
"127.0.0.1:62295"
|
||||
],
|
||||
"path": "/nifi/components"
|
||||
},
|
||||
{
|
||||
"path": "/nifi",
|
||||
"data": [
|
||||
115,
|
||||
111,
|
||||
109,
|
||||
101,
|
||||
32,
|
||||
100,
|
||||
97,
|
||||
116,
|
||||
97
|
||||
],
|
||||
"stat": {
|
||||
"czxid": 2,
|
||||
"mzxid": 2,
|
||||
"ctime": 1477602095884,
|
||||
"mtime": 1477602095884,
|
||||
"version": 0,
|
||||
"cversion": 3,
|
||||
"aversion": 0,
|
||||
"ephemeralOwner": 0,
|
||||
"dataLength": 9,
|
||||
"numChildren": 3,
|
||||
"pzxid": 6
|
||||
},
|
||||
"acls": [
|
||||
{
|
||||
"perms": 31,
|
||||
"id": {
|
||||
"scheme": "world",
|
||||
"id": "anyone"
|
||||
}
|
||||
}
|
||||
],
|
||||
"ephemeralOwner": 0
|
||||
},
|
||||
{
|
||||
"path": "/nifi/1/a",
|
||||
"data": [
|
||||
115,
|
||||
111,
|
||||
109,
|
||||
101,
|
||||
32,
|
||||
100,
|
||||
97,
|
||||
116,
|
||||
97
|
||||
],
|
||||
"stat": {
|
||||
"czxid": 4,
|
||||
"mzxid": 4,
|
||||
"ctime": 1477602095888,
|
||||
"mtime": 1477602095888,
|
||||
"version": 0,
|
||||
"cversion": 0,
|
||||
"aversion": 0,
|
||||
"ephemeralOwner": 0,
|
||||
"dataLength": 9,
|
||||
"numChildren": 0,
|
||||
"pzxid": 4
|
||||
},
|
||||
"acls": [
|
||||
{
|
||||
"perms": 31,
|
||||
"id": {
|
||||
"scheme": "world",
|
||||
"id": "anyone"
|
||||
}
|
||||
}
|
||||
],
|
||||
"ephemeralOwner": 0
|
||||
},
|
||||
{
|
||||
"path": "/nifi/2",
|
||||
"data": [
|
||||
115,
|
||||
111,
|
||||
109,
|
||||
101,
|
||||
32,
|
||||
100,
|
||||
97,
|
||||
116,
|
||||
97
|
||||
],
|
||||
"stat": {
|
||||
"czxid": 5,
|
||||
"mzxid": 5,
|
||||
"ctime": 1477602095889,
|
||||
"mtime": 1477602095889,
|
||||
"version": 0,
|
||||
"cversion": 0,
|
||||
"aversion": 0,
|
||||
"ephemeralOwner": 0,
|
||||
"dataLength": 9,
|
||||
"numChildren": 0,
|
||||
"pzxid": 5
|
||||
},
|
||||
"acls": [
|
||||
{
|
||||
"perms": 31,
|
||||
"id": {
|
||||
"scheme": "world",
|
||||
"id": "anyone"
|
||||
}
|
||||
}
|
||||
],
|
||||
"ephemeralOwner": 0
|
||||
},
|
||||
{
|
||||
"path": "/nifi/3",
|
||||
"path": "/1/a",
|
||||
"data": [
|
||||
115,
|
||||
111,
|
||||
|
@ -130,12 +22,12 @@
|
|||
"stat": {
|
||||
"czxid": 6,
|
||||
"mzxid": 6,
|
||||
"ctime": 1477602095890,
|
||||
"mtime": 1477602095890,
|
||||
"ctime": 1485792790772,
|
||||
"mtime": 1485792790772,
|
||||
"version": 0,
|
||||
"cversion": 0,
|
||||
"aversion": 0,
|
||||
"ephemeralOwner": 0,
|
||||
"ephemeralOwner": 97372916257193984,
|
||||
"dataLength": 9,
|
||||
"numChildren": 0,
|
||||
"pzxid": 6
|
||||
|
@ -149,10 +41,121 @@
|
|||
}
|
||||
}
|
||||
],
|
||||
"ephemeralOwner": 97372916257193984
|
||||
},
|
||||
{
|
||||
"path": "/1",
|
||||
"data": [
|
||||
115,
|
||||
111,
|
||||
109,
|
||||
101,
|
||||
32,
|
||||
100,
|
||||
97,
|
||||
116,
|
||||
97
|
||||
],
|
||||
"stat": {
|
||||
"czxid": 5,
|
||||
"mzxid": 5,
|
||||
"ctime": 1485792790771,
|
||||
"mtime": 1485792790771,
|
||||
"version": 0,
|
||||
"cversion": 1,
|
||||
"aversion": 0,
|
||||
"ephemeralOwner": 0,
|
||||
"dataLength": 9,
|
||||
"numChildren": 1,
|
||||
"pzxid": 6
|
||||
},
|
||||
"acls": [
|
||||
{
|
||||
"perms": 31,
|
||||
"id": {
|
||||
"scheme": "world",
|
||||
"id": "anyone"
|
||||
}
|
||||
}
|
||||
],
|
||||
"ephemeralOwner": 0
|
||||
},
|
||||
{
|
||||
"path": "/nifi/1",
|
||||
"path": "/3",
|
||||
"data": [
|
||||
115,
|
||||
111,
|
||||
109,
|
||||
101,
|
||||
32,
|
||||
100,
|
||||
97,
|
||||
116,
|
||||
97
|
||||
],
|
||||
"stat": {
|
||||
"czxid": 8,
|
||||
"mzxid": 8,
|
||||
"ctime": 1485792790773,
|
||||
"mtime": 1485792790773,
|
||||
"version": 0,
|
||||
"cversion": 0,
|
||||
"aversion": 0,
|
||||
"ephemeralOwner": 0,
|
||||
"dataLength": 9,
|
||||
"numChildren": 0,
|
||||
"pzxid": 8
|
||||
},
|
||||
"acls": [
|
||||
{
|
||||
"perms": 31,
|
||||
"id": {
|
||||
"scheme": "world",
|
||||
"id": "anyone"
|
||||
}
|
||||
}
|
||||
],
|
||||
"ephemeralOwner": 0
|
||||
},
|
||||
{
|
||||
"path": "/2",
|
||||
"data": [
|
||||
115,
|
||||
111,
|
||||
109,
|
||||
101,
|
||||
32,
|
||||
100,
|
||||
97,
|
||||
116,
|
||||
97
|
||||
],
|
||||
"stat": {
|
||||
"czxid": 7,
|
||||
"mzxid": 7,
|
||||
"ctime": 1485792790772,
|
||||
"mtime": 1485792790772,
|
||||
"version": 0,
|
||||
"cversion": 0,
|
||||
"aversion": 0,
|
||||
"ephemeralOwner": 0,
|
||||
"dataLength": 9,
|
||||
"numChildren": 0,
|
||||
"pzxid": 7
|
||||
},
|
||||
"acls": [
|
||||
{
|
||||
"perms": 31,
|
||||
"id": {
|
||||
"scheme": "world",
|
||||
"id": "anyone"
|
||||
}
|
||||
}
|
||||
],
|
||||
"ephemeralOwner": 0
|
||||
},
|
||||
{
|
||||
"path": "/",
|
||||
"data": [
|
||||
115,
|
||||
111,
|
||||
|
@ -166,16 +169,16 @@
|
|||
],
|
||||
"stat": {
|
||||
"czxid": 3,
|
||||
"mzxid": 3,
|
||||
"ctime": 1477602095888,
|
||||
"mtime": 1477602095888,
|
||||
"version": 0,
|
||||
"cversion": 1,
|
||||
"mzxid": 4,
|
||||
"ctime": 1485792790757,
|
||||
"mtime": 1485792790762,
|
||||
"version": 1,
|
||||
"cversion": 3,
|
||||
"aversion": 0,
|
||||
"ephemeralOwner": 0,
|
||||
"dataLength": 9,
|
||||
"numChildren": 1,
|
||||
"pzxid": 4
|
||||
"numChildren": 3,
|
||||
"pzxid": 8
|
||||
},
|
||||
"acls": [
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue