mirror of https://github.com/apache/nifi.git
NIFI-4255 - Added flag to allow migration of existing (source) acls to destination, update to documentation
Added build-helper-maven-plugin to nifi-toolkit-zookeeper-migrator pom to build groovy test code Moved spock spec from src/test/java to src/test/groovy Minor code formatting updates to ZooKeeperMigrator.java This closes #2065
This commit is contained in:
parent
cf57639396
commit
acee2627ca
|
@ -2272,6 +2272,7 @@ You can use the following command line options with the ZooKeeper Migrator:
|
|||
* `-k`,`--krb-conf <jaas-filename>` Allows the specification of a JAAS configuration file to allow authentication with a ZooKeeper configured to use Kerberos. This option is mutually exclusive with the `-a`,`--auth` option.
|
||||
* `-r`,`--receive` Receives data from ZooKeeper and writes to the given filename (if the `-f`,`--file` option is provided) or standard output. The data received will contain the full path to each node read from ZooKeeper. This option is mutually exclusive with the `-s`,`--send` option.
|
||||
* `-s`,`--send` Sends data to ZooKeeper that is read from the given filename (if the `-f`,`--file` option is provided) or standard input. The paths for each node in the data being sent to ZooKeeper are absolute paths, and will be stored in ZooKeeper under the *path* portion of the `-z`,`--zookeeper` argument. Typically, the *path* portion of the argument can be omitted, which will store the nodes at their absolute paths. This option is mutually exclusive with the `-r`,`--receive` option.
|
||||
* `--use-existing-acl` Allows the Zookeeper Migrator to write ACL values retrieved from the source Zookeeper server to destination server. Default action will apply Open rights for unsecured destinations or Creator Only rights for secured destinations.
|
||||
* `-z`,`--zookeeper <zookeeper-endpoint>` The ZooKeeper server(s) to use, specified by a connect string, comprised of one or more comma-separated host:port pairs followed by a path, in the format of _host:port[,host2:port...,hostn:port]/znode/path_.
|
||||
|
||||
[[migrating_between_source_destination_zookeepers]]
|
||||
|
|
|
@ -89,6 +89,37 @@
|
|||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>build-helper-maven-plugin</artifactId>
|
||||
<version>1.5</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>add-source</id>
|
||||
<phase>generate-sources</phase>
|
||||
<goals>
|
||||
<goal>add-source</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<sources>
|
||||
<source>src/main/groovy</source>
|
||||
</sources>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>add-test-source</id>
|
||||
<phase>generate-test-sources</phase>
|
||||
<goals>
|
||||
<goal>add-test-source</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<sources>
|
||||
<source>src/test/groovy</source>
|
||||
</sources>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.gson.GsonBuilder;
|
|||
import com.google.gson.JsonParser;
|
||||
import com.google.gson.stream.JsonReader;
|
||||
import com.google.gson.stream.JsonWriter;
|
||||
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
|
@ -116,7 +117,7 @@ class ZooKeeperMigrator {
|
|||
closeZooKeeper(zooKeeper);
|
||||
}
|
||||
|
||||
void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, boolean ignoreSource) throws IOException, ExecutionException, InterruptedException {
|
||||
void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, boolean ignoreSource, boolean useExistingACL) throws IOException, ExecutionException, InterruptedException {
|
||||
// ensure that the chroot path exists
|
||||
ZooKeeper zooKeeperRoot = getZooKeeper(Joiner.on(',').join(zooKeeperEndpointConfig.getServers()), authMode, authData);
|
||||
ensureNodeExists(zooKeeperRoot, zooKeeperEndpointConfig.getPath(), CreateMode.PERSISTENT);
|
||||
|
@ -132,7 +133,7 @@ class ZooKeeperMigrator {
|
|||
final ZooKeeperEndpointConfig sourceZooKeeperEndpointConfig = gson.fromJson(jsonReader, ZooKeeperEndpointConfig.class);
|
||||
LOGGER.info("Source data was obtained from ZooKeeper: {}", sourceZooKeeperEndpointConfig);
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath())
|
||||
&& sourceZooKeeperEndpointConfig.getServers() != null && sourceZooKeeperEndpointConfig.getServers().size() > 0, "Source ZooKeeper %s from %s is invalid",
|
||||
&& sourceZooKeeperEndpointConfig.getServers() != null && sourceZooKeeperEndpointConfig.getServers().size() > 0, "Source ZooKeeper %s from %s is invalid",
|
||||
sourceZooKeeperEndpointConfig, zkData);
|
||||
Preconditions.checkArgument(Collections.disjoint(zooKeeperEndpointConfig.getServers(), sourceZooKeeperEndpointConfig.getServers())
|
||||
|| !zooKeeperEndpointConfig.getPath().equals(sourceZooKeeperEndpointConfig.getPath()) || ignoreSource,
|
||||
|
@ -161,16 +162,21 @@ class ZooKeeperMigrator {
|
|||
|
||||
final List<CompletableFuture<Stat>> writeFutures = stream.parallel().map(node -> {
|
||||
/*
|
||||
* create stage to migrate paths and ACLs based on the migration parent path plus the node path and the given AuthMode,
|
||||
* this stage must be run first
|
||||
* create stage to determine the acls that should be applied to the node.
|
||||
* this stage will be used to initialize the chain
|
||||
*/
|
||||
final CompletableFuture<DataStatAclNode> transformNodeStage = CompletableFuture.supplyAsync(() -> transformNode(node, authMode));
|
||||
final CompletableFuture<List<ACL>> determineACLStage = CompletableFuture.supplyAsync(() -> determineACLs(node, authMode, useExistingACL));
|
||||
/*
|
||||
* create stage to apply acls to nodes and transform node to DataStatAclNode object
|
||||
*/
|
||||
final Function<List<ACL>, CompletableFuture<DataStatAclNode>> transformNodeStage = acls -> CompletableFuture.supplyAsync(() -> transformNode(node, acls));
|
||||
/*
|
||||
* create stage to ensure that nodes exist for the entire path of the zookeeper node, must be invoked after the transformNode stage to
|
||||
* ensure that the node will exist after path migration
|
||||
*/
|
||||
final Function<DataStatAclNode, String> ensureNodeExistsStage = dataStatAclNode ->
|
||||
ensureNodeExists(zooKeeper, dataStatAclNode.getPath(), dataStatAclNode.getEphemeralOwner() == 0 ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL);
|
||||
final Function<DataStatAclNode, CompletionStage<String>> ensureNodeExistsStage = dataStatAclNode ->
|
||||
CompletableFuture.supplyAsync(() -> ensureNodeExists(zooKeeper, dataStatAclNode.getPath(),
|
||||
dataStatAclNode.getEphemeralOwner() == 0 ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL));
|
||||
/*
|
||||
* create stage that waits for both the transformNode and ensureNodeExists stages complete, and also provides that the given transformed node is
|
||||
* available to the next stage
|
||||
|
@ -180,12 +186,14 @@ class ZooKeeperMigrator {
|
|||
* create stage to transmit the node to the destination zookeeper endpoint, must be invoked after the node has been transformed and its path
|
||||
* has been created (or already exists) in the destination zookeeper
|
||||
*/
|
||||
final Function<DataStatAclNode, CompletionStage<Stat>> transmitNodeStage = dataStatNode ->
|
||||
CompletableFuture.supplyAsync(() -> transmitNode(zooKeeper, dataStatNode));
|
||||
final Function<DataStatAclNode, CompletionStage<Stat>> transmitNodeStage = dataStatNode -> CompletableFuture.supplyAsync(() -> transmitNode(zooKeeper, dataStatNode));
|
||||
/*
|
||||
* submit the stages chained together in the proper order to perform the processing on the given node
|
||||
*/
|
||||
return transformNodeStage.thenApply(ensureNodeExistsStage).thenCombine(transformNodeStage, combineEnsureNodeAndTransferNodeStage).thenCompose(transmitNodeStage);
|
||||
final CompletableFuture<DataStatAclNode> dataStatAclNodeCompletableFuture = determineACLStage.thenCompose(transformNodeStage);
|
||||
return dataStatAclNodeCompletableFuture.thenCompose(ensureNodeExistsStage)
|
||||
.thenCombine(dataStatAclNodeCompletableFuture, combineEnsureNodeAndTransferNodeStage)
|
||||
.thenCompose(transmitNodeStage);
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
CompletableFuture<Void> allWritesFuture = CompletableFuture.allOf(writeFutures.toArray(new CompletableFuture[writeFutures.size()]));
|
||||
|
@ -269,11 +277,13 @@ class ZooKeeperMigrator {
|
|||
}
|
||||
}
|
||||
|
||||
private DataStatAclNode transformNode(DataStatAclNode node, AuthMode destinationAuthMode) {
|
||||
// For the NiFi use case, all nodes will be migrated to CREATOR_ALL_ACL
|
||||
final DataStatAclNode migratedNode = new DataStatAclNode(node.getPath(), node.getData(), node.getStat(),
|
||||
destinationAuthMode.equals(AuthMode.OPEN) ? ZooDefs.Ids.OPEN_ACL_UNSAFE : ZooDefs.Ids.CREATOR_ALL_ACL,
|
||||
node.getEphemeralOwner());
|
||||
private List<ACL> determineACLs(DataStatAclNode node, AuthMode authMode, Boolean useExistingACL) {
|
||||
return useExistingACL ? node.getAcls() :
|
||||
(authMode.equals(AuthMode.OPEN) ? ZooDefs.Ids.OPEN_ACL_UNSAFE : ZooDefs.Ids.CREATOR_ALL_ACL);
|
||||
}
|
||||
|
||||
private DataStatAclNode transformNode(DataStatAclNode node, List<ACL> acls) {
|
||||
final DataStatAclNode migratedNode = new DataStatAclNode(node.getPath(), node.getData(), node.getStat(), acls, node.getEphemeralOwner());
|
||||
LOGGER.info("transformed original node {} to {}", node, migratedNode);
|
||||
return migratedNode;
|
||||
}
|
||||
|
|
|
@ -90,6 +90,10 @@ public class ZooKeeperMigratorMain {
|
|||
.longOpt("ignore-source")
|
||||
.desc("ignores the source ZooKeeper endpoint specified in the exported data")
|
||||
.build();
|
||||
private static final Option OPTION_USE_EXISTING_ACL = Option.builder()
|
||||
.longOpt("use-existing-acl")
|
||||
.desc("allow write of existing acl data to destination")
|
||||
.build();
|
||||
|
||||
private static Options createOptions() {
|
||||
final Options options = new Options();
|
||||
|
@ -98,6 +102,7 @@ public class ZooKeeperMigratorMain {
|
|||
options.addOption(OPTION_ZK_AUTH_INFO);
|
||||
options.addOption(OPTION_FILE);
|
||||
options.addOption(OPTION_IGNORE_SOURCE);
|
||||
options.addOption(OPTION_USE_EXISTING_ACL);
|
||||
final OptionGroup optionGroupAuth = new OptionGroup().addOption(OPTION_ZK_AUTH_INFO).addOption(OPTION_ZK_KRB_CONF_FILE);
|
||||
optionGroupAuth.setRequired(false);
|
||||
options.addOptionGroup(optionGroupAuth);
|
||||
|
@ -136,6 +141,7 @@ public class ZooKeeperMigratorMain {
|
|||
final String auth = commandLine.getOptionValue(OPTION_ZK_AUTH_INFO.getOpt());
|
||||
final String jaasFilename = commandLine.getOptionValue(OPTION_ZK_KRB_CONF_FILE.getOpt());
|
||||
final boolean ignoreSource = commandLine.hasOption(OPTION_IGNORE_SOURCE.getLongOpt());
|
||||
final boolean useExistingACL = commandLine.hasOption(OPTION_USE_EXISTING_ACL.getLongOpt());
|
||||
final AuthMode authMode;
|
||||
final byte[] authData;
|
||||
if (auth != null) {
|
||||
|
@ -157,7 +163,7 @@ public class ZooKeeperMigratorMain {
|
|||
}
|
||||
} else {
|
||||
try (InputStream zkData = filename != null ? new FileInputStream(Paths.get(filename).toFile()) : System.in) {
|
||||
zookeeperMigrator.writeZooKeeper(zkData, authMode, authData, ignoreSource);
|
||||
zookeeperMigrator.writeZooKeeper(zkData, authMode, authData, ignoreSource, useExistingACL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ import org.apache.zookeeper.WatchedEvent
|
|||
import org.apache.zookeeper.ZKUtil
|
||||
import org.apache.zookeeper.ZooDefs
|
||||
import org.apache.zookeeper.ZooKeeper
|
||||
import org.apache.zookeeper.data.Stat
|
||||
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider
|
||||
import spock.lang.Ignore
|
||||
import spock.lang.Specification
|
||||
import spock.lang.Unroll
|
||||
|
@ -185,6 +187,29 @@ class ZooKeeperMigratorTest extends Specification {
|
|||
nodes.size() == 3
|
||||
}
|
||||
|
||||
def "Send to open Zookeeper using existing ACL"() {
|
||||
given:
|
||||
def server = new TestingServer()
|
||||
def securedClient = new ZooKeeper(server.connectString, 3000, { WatchedEvent watchedEvent -> })
|
||||
def userPass = "nifi:nifi"
|
||||
securedClient.addAuthInfo("digest",userPass.getBytes(StandardCharsets.UTF_8))
|
||||
def digest = DigestAuthenticationProvider.generateDigest(userPass)
|
||||
def migrationPathRoot = '/nifi'
|
||||
def stat = new Stat()
|
||||
|
||||
when:
|
||||
ZooKeeperMigratorMain.main(['-s', '-z', "$server.connectString$migrationPathRoot", '-f', 'src/test/resources/test-data-user-pass.json','--use-existing-acl'] as String[])
|
||||
|
||||
then:
|
||||
noExceptionThrown()
|
||||
def acl = securedClient.getACL("/nifi",stat)
|
||||
acl.get(0).id.scheme == "digest"
|
||||
acl.get(0).id.id == digest
|
||||
def nodes = ZKPaths.getSortedChildren(securedClient, '/nifi').collect { ZKUtil.listSubTreeBFS(securedClient, "/$it") }.flatten()
|
||||
nodes.size() == 0
|
||||
}
|
||||
|
||||
|
||||
def "Parse Zookeeper connect string and path"() {
|
||||
when:
|
||||
def zooKeeperMigrator = new ZooKeeperMigrator("$connectString")
|
Loading…
Reference in New Issue