mirror of https://github.com/apache/nifi.git
NIFI-3059 Adds --ignore-source option to the ZooKeeper Migrator to allow data read from a source zookeeper to be written back to the same zookeeper
Added unit test to test the --ignore-source option This closes #1242
This commit is contained in:
parent
c4be800688
commit
06d7ecd324
|
@ -120,7 +120,7 @@ class ZooKeeperMigrator {
|
|||
}
|
||||
}
|
||||
|
||||
void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData) throws IOException, ExecutionException, InterruptedException {
|
||||
void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, boolean ignoreSource) throws IOException, ExecutionException, InterruptedException {
|
||||
ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig, authMode, authData);
|
||||
JsonReader jsonReader = new JsonReader(new BufferedReader(new InputStreamReader(zkData)));
|
||||
Gson gson = new GsonBuilder().create();
|
||||
|
@ -132,7 +132,7 @@ class ZooKeeperMigrator {
|
|||
LOGGER.info("Source data was obtained from ZooKeeper: {}", sourceZooKeeperEndpointConfig);
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath()),
|
||||
"Source ZooKeeper %s from %s is invalid", sourceZooKeeperEndpointConfig, zkData);
|
||||
Preconditions.checkState(!zooKeeperEndpointConfig.equals(sourceZooKeeperEndpointConfig),
|
||||
Preconditions.checkArgument( !(zooKeeperEndpointConfig.equals(sourceZooKeeperEndpointConfig) && !ignoreSource),
|
||||
"Source ZooKeeper config %s for the data provided can not be the same as the configured destination ZooKeeper config %s",
|
||||
sourceZooKeeperEndpointConfig, zooKeeperEndpointConfig);
|
||||
|
||||
|
|
|
@ -40,8 +40,8 @@ public class ZooKeeperMigratorMain {
|
|||
private static final String JAVA_HOME = "JAVA_HOME";
|
||||
private static final String NIFI_TOOLKIT_HOME = "NIFI_TOOLKIT_HOME";
|
||||
private static final String HEADER = System.lineSeparator() + "A tool for importing and exporting data from ZooKeeper." + System.lineSeparator() + System.lineSeparator();
|
||||
private static final String FOOTER = new StringBuilder(System.lineSeparator()).append("Java home: ")
|
||||
.append(System.getenv(JAVA_HOME)).append(System.lineSeparator()).append("NiFi Toolkit home: ").append(System.getenv(NIFI_TOOLKIT_HOME)).toString();
|
||||
private static final String FOOTER = System.lineSeparator() + "Java home: " +
|
||||
System.getenv(JAVA_HOME) + System.lineSeparator() + "NiFi Toolkit home: " + System.getenv(NIFI_TOOLKIT_HOME);
|
||||
|
||||
private static final Option OPTION_ZK_MIGRATOR_HELP = Option.builder("h")
|
||||
.longOpt("help")
|
||||
|
@ -81,6 +81,10 @@ public class ZooKeeperMigratorMain {
|
|||
.hasArg()
|
||||
.argName("filename")
|
||||
.build();
|
||||
private static final Option OPTION_IGNORE_SOURCE = Option.builder()
|
||||
.longOpt("ignore-source")
|
||||
.desc("ignores the source ZooKeeper endpoint specified in the exported data")
|
||||
.build();
|
||||
|
||||
private static Options createOptions() {
|
||||
final Options options = new Options();
|
||||
|
@ -88,6 +92,7 @@ public class ZooKeeperMigratorMain {
|
|||
options.addOption(OPTION_ZK_ENDPOINT);
|
||||
options.addOption(OPTION_ZK_AUTH_INFO);
|
||||
options.addOption(OPTION_FILE);
|
||||
options.addOption(OPTION_IGNORE_SOURCE);
|
||||
final OptionGroup optionGroupAuth = new OptionGroup().addOption(OPTION_ZK_AUTH_INFO).addOption(OPTION_ZK_KRB_CONF_FILE);
|
||||
optionGroupAuth.setRequired(false);
|
||||
options.addOptionGroup(optionGroupAuth);
|
||||
|
@ -125,6 +130,7 @@ public class ZooKeeperMigratorMain {
|
|||
final String filename = commandLine.getOptionValue(OPTION_FILE.getOpt());
|
||||
final String auth = commandLine.getOptionValue(OPTION_ZK_AUTH_INFO.getOpt());
|
||||
final String jaasFilename = commandLine.getOptionValue(OPTION_ZK_KRB_CONF_FILE.getOpt());
|
||||
final boolean ignoreSource = commandLine.hasOption(OPTION_IGNORE_SOURCE.getLongOpt());
|
||||
final AuthMode authMode;
|
||||
final byte[] authData;
|
||||
if (auth != null) {
|
||||
|
@ -143,7 +149,7 @@ public class ZooKeeperMigratorMain {
|
|||
if (mode.equals(Mode.READ)) {
|
||||
zookeeperMigrator.readZooKeeper(filename != null ? new FileOutputStream(Paths.get(filename).toFile()) : output, authMode, authData);
|
||||
} else {
|
||||
zookeeperMigrator.writeZooKeeper(filename != null ? new FileInputStream(Paths.get(filename).toFile()) : System.in, authMode, authData);
|
||||
zookeeperMigrator.writeZooKeeper(filename != null ? new FileInputStream(Paths.get(filename).toFile()) : System.in, authMode, authData, ignoreSource);
|
||||
}
|
||||
}
|
||||
} catch (ParseException e) {
|
||||
|
|
|
@ -206,6 +206,32 @@ class ZooKeeperMigratorTest extends Specification {
|
|||
'127.0.0.1:2181/path/node' || _
|
||||
}
|
||||
|
||||
def "Test ignore source"() {
|
||||
given:
|
||||
def server = new TestingServer()
|
||||
def connectString = "$server.connectString"
|
||||
def dataPath = 'target/test-data-ignore-source.json'
|
||||
|
||||
when: "data is read from the source zookeeper"
|
||||
ZooKeeperMigratorMain.main(['-r', '-z', connectString, '-f', dataPath] as String[])
|
||||
|
||||
then: "verify the data has been written the output file"
|
||||
new File(dataPath).exists()
|
||||
|
||||
when: "data is sent to the same zookeeper as the the source zookeeper without ignore source"
|
||||
ZooKeeperMigratorMain.main(['-s', '-z', connectString, '-f', dataPath] as String[])
|
||||
|
||||
then: "verify that a runtime exception is thrown with an illegal argument exception as the cause"
|
||||
def e = thrown(RuntimeException)
|
||||
e.cause.class == IllegalArgumentException
|
||||
|
||||
when: "data is sent to the same zookeeper as the source zookeeper with ignore source option is set"
|
||||
ZooKeeperMigratorMain.main(['-s', '-z', connectString, '-f', dataPath, '--ignore-source'] as String[])
|
||||
|
||||
then: "no exceptions are thrown"
|
||||
noExceptionThrown()
|
||||
}
|
||||
|
||||
def List<String> getChildren(ZooKeeper client, String path, List<String> ag) {
|
||||
def children = client.getChildren(path, null)
|
||||
ag.add path
|
||||
|
|
Loading…
Reference in New Issue