mirror of https://github.com/apache/nifi.git
NIFI-3158 Added cleanup of resources and refactored exception handling
This closes #1427. Signed-off-by: Bryan Rosander <brosander@apache.org>
This commit is contained in:
parent
7f0171ffa2
commit
516075de02
|
@ -121,6 +121,7 @@ class ZooKeeperMigrator {
|
||||||
final int readCount = readsDone.size();
|
final int readCount = readsDone.size();
|
||||||
LOGGER.info("{} {} read from {}", readCount, readCount == 1 ? "node" : "nodes", zooKeeperEndpointConfig);
|
LOGGER.info("{} {} read from {}", readCount, readCount == 1 ? "node" : "nodes", zooKeeperEndpointConfig);
|
||||||
}
|
}
|
||||||
|
closeZooKeeper(zooKeeper);
|
||||||
}
|
}
|
||||||
|
|
||||||
void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, boolean ignoreSource) throws IOException, ExecutionException, InterruptedException {
|
void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, boolean ignoreSource) throws IOException, ExecutionException, InterruptedException {
|
||||||
|
@ -199,6 +200,7 @@ class ZooKeeperMigrator {
|
||||||
LOGGER.info("{} {} transferred to {}", writeCount, writeCount == 1 ? "node" : "nodes", zooKeeperEndpointConfig);
|
LOGGER.info("{} {} transferred to {}", writeCount, writeCount == 1 ? "node" : "nodes", zooKeeperEndpointConfig);
|
||||||
}
|
}
|
||||||
jsonReader.close();
|
jsonReader.close();
|
||||||
|
closeZooKeeper(zooKeeper);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Stream<String> streamPaths(ZooKeeperNode node) {
|
private Stream<String> streamPaths(ZooKeeperNode node) {
|
||||||
|
@ -212,7 +214,10 @@ class ZooKeeperMigrator {
|
||||||
final String childPath = Joiner.on('/').skipNulls().join(path.equals("/") ? "" : path, s);
|
final String childPath = Joiner.on('/').skipNulls().join(path.equals("/") ? "" : path, s);
|
||||||
try {
|
try {
|
||||||
return getNode(zooKeeper, childPath);
|
return getNode(zooKeeper, childPath);
|
||||||
} catch (Exception e) {
|
} catch (InterruptedException | KeeperException e) {
|
||||||
|
if (e instanceof InterruptedException) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
throw new RuntimeException(String.format("unable to discover sub-tree from %s", childPath), e);
|
throw new RuntimeException(String.format("unable to discover sub-tree from %s", childPath), e);
|
||||||
}
|
}
|
||||||
}).collect(Collectors.toList()));
|
}).collect(Collectors.toList()));
|
||||||
|
@ -229,7 +234,10 @@ class ZooKeeperMigrator {
|
||||||
data = zooKeeper.getData(path, false, stat);
|
data = zooKeeper.getData(path, false, stat);
|
||||||
acls = zooKeeper.getACL(path, stat);
|
acls = zooKeeper.getACL(path, stat);
|
||||||
ephemeralOwner = stat.getEphemeralOwner();
|
ephemeralOwner = stat.getEphemeralOwner();
|
||||||
} catch (Exception e) {
|
} catch (InterruptedException | KeeperException e) {
|
||||||
|
if (e instanceof InterruptedException) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
throw new RuntimeException(String.format("unable to get data, ACLs, and stats from %s for node at path %s", zooKeeper, path), e);
|
throw new RuntimeException(String.format("unable to get data, ACLs, and stats from %s for node at path %s", zooKeeper, path), e);
|
||||||
}
|
}
|
||||||
return new DataStatAclNode(path, data, stat, acls, ephemeralOwner);
|
return new DataStatAclNode(path, data, stat, acls, ephemeralOwner);
|
||||||
|
@ -257,6 +265,7 @@ class ZooKeeperMigrator {
|
||||||
throw new RuntimeException(String.format("unable to create node at path %s, ZooKeeper returned %s", path, e.code()), e);
|
throw new RuntimeException(String.format("unable to create node at path %s, ZooKeeper returned %s", path, e.code()), e);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
throw new RuntimeException(String.format("unable to create node at path %s", path), e);
|
throw new RuntimeException(String.format("unable to create node at path %s", path), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -280,7 +289,10 @@ class ZooKeeperMigrator {
|
||||||
zooKeeper.setData(node.getPath(), node.getData(), -1);
|
zooKeeper.setData(node.getPath(), node.getData(), -1);
|
||||||
zooKeeper.setACL(node.getPath(), node.getAcls(), -1);
|
zooKeeper.setACL(node.getPath(), node.getAcls(), -1);
|
||||||
LOGGER.info("transferred node {} in {}", node, zooKeeperEndpointConfig);
|
LOGGER.info("transferred node {} in {}", node, zooKeeperEndpointConfig);
|
||||||
} catch (Exception e) {
|
} catch (InterruptedException | KeeperException e) {
|
||||||
|
if (e instanceof InterruptedException) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
throw new RuntimeException(String.format("unable to transmit data to %s for path %s", zooKeeper, node.getPath()), e);
|
throw new RuntimeException(String.format("unable to transmit data to %s for path %s", zooKeeper, node.getPath()), e);
|
||||||
}
|
}
|
||||||
return node.getStat();
|
return node.getStat();
|
||||||
|
@ -302,13 +314,13 @@ class ZooKeeperMigrator {
|
||||||
connected = connectionLatch.await(5, TimeUnit.SECONDS);
|
connected = connectionLatch.await(5, TimeUnit.SECONDS);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
closeZooKeeper(zooKeeper);
|
closeZooKeeper(zooKeeper);
|
||||||
Thread.currentThread().interrupt(); // preserve interrupt
|
Thread.currentThread().interrupt();
|
||||||
throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", zooKeeperEndpointConfig), e);
|
throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", zooKeeperEndpointConfig), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!connected) {
|
if (!connected) {
|
||||||
closeZooKeeper(zooKeeper);
|
closeZooKeeper(zooKeeper);
|
||||||
throw new IOException(String.format("unable to connect to %s, state is %s", zooKeeperEndpointConfig, zooKeeper.getState()));
|
throw new IOException(String.format("unable to connect to %s", zooKeeperEndpointConfig));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (authMode.equals(AuthMode.DIGEST)) {
|
if (authMode.equals(AuthMode.DIGEST)) {
|
||||||
|
@ -322,7 +334,7 @@ class ZooKeeperMigrator {
|
||||||
zooKeeper.close();
|
zooKeeper.close();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOGGER.warn("could not close ZooKeeper client due to interrupt", e);
|
LOGGER.warn("could not close ZooKeeper client due to interrupt", e);
|
||||||
Thread.currentThread().interrupt(); // preserve interrupt
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,12 +26,17 @@ import org.apache.commons.cli.OptionGroup;
|
||||||
import org.apache.commons.cli.Options;
|
import org.apache.commons.cli.Options;
|
||||||
import org.apache.commons.cli.ParseException;
|
import org.apache.commons.cli.ParseException;
|
||||||
import org.apache.nifi.toolkit.zkmigrator.ZooKeeperMigrator.AuthMode;
|
import org.apache.nifi.toolkit.zkmigrator.ZooKeeperMigrator.AuthMode;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
public class ZooKeeperMigratorMain {
|
public class ZooKeeperMigratorMain {
|
||||||
|
|
||||||
|
@ -114,7 +119,7 @@ public class ZooKeeperMigratorMain {
|
||||||
helpFormatter.printHelp(ZooKeeperMigratorMain.class.getCanonicalName(), HEADER, options, FOOTER, true);
|
helpFormatter.printHelp(ZooKeeperMigratorMain.class.getCanonicalName(), HEADER, options, FOOTER, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) throws IOException {
|
||||||
PrintStream output = System.out;
|
PrintStream output = System.out;
|
||||||
System.setOut(System.err);
|
System.setOut(System.err);
|
||||||
|
|
||||||
|
@ -147,15 +152,19 @@ public class ZooKeeperMigratorMain {
|
||||||
}
|
}
|
||||||
final ZooKeeperMigrator zookeeperMigrator = new ZooKeeperMigrator(zookeeperUri);
|
final ZooKeeperMigrator zookeeperMigrator = new ZooKeeperMigrator(zookeeperUri);
|
||||||
if (mode.equals(Mode.READ)) {
|
if (mode.equals(Mode.READ)) {
|
||||||
zookeeperMigrator.readZooKeeper(filename != null ? new FileOutputStream(Paths.get(filename).toFile()) : output, authMode, authData);
|
try (OutputStream zkData = filename != null ? new FileOutputStream(Paths.get(filename).toFile()) : output) {
|
||||||
|
zookeeperMigrator.readZooKeeper(zkData, authMode, authData);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
zookeeperMigrator.writeZooKeeper(filename != null ? new FileInputStream(Paths.get(filename).toFile()) : System.in, authMode, authData, ignoreSource);
|
try (InputStream zkData = filename != null ? new FileInputStream(Paths.get(filename).toFile()) : System.in) {
|
||||||
|
zookeeperMigrator.writeZooKeeper(zkData, authMode, authData, ignoreSource);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (ParseException e) {
|
} catch (ParseException e) {
|
||||||
printUsage(e.getLocalizedMessage(), options);
|
printUsage(e.getLocalizedMessage(), options);
|
||||||
} catch (Exception e) {
|
} catch (IOException | KeeperException | InterruptedException | ExecutionException e) {
|
||||||
throw new RuntimeException(String.format("unable to perform operation: %s", e.getLocalizedMessage()), e);
|
throw new IOException(String.format("unable to perform operation: %s", e.getLocalizedMessage()), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -221,9 +221,8 @@ class ZooKeeperMigratorTest extends Specification {
|
||||||
when: "data is sent to the same zookeeper as the the source zookeeper without ignore source"
|
when: "data is sent to the same zookeeper as the the source zookeeper without ignore source"
|
||||||
ZooKeeperMigratorMain.main(['-s', '-z', connectString, '-f', dataPath] as String[])
|
ZooKeeperMigratorMain.main(['-s', '-z', connectString, '-f', dataPath] as String[])
|
||||||
|
|
||||||
then: "verify that a runtime exception is thrown with an illegal argument exception as the cause"
|
then: "verify that an illegal argument exception is thrown"
|
||||||
def e = thrown(RuntimeException)
|
thrown(IllegalArgumentException)
|
||||||
e.cause.class == IllegalArgumentException
|
|
||||||
|
|
||||||
when: "data is sent to the same zookeeper as the source zookeeper with ignore source option is set"
|
when: "data is sent to the same zookeeper as the source zookeeper with ignore source option is set"
|
||||||
ZooKeeperMigratorMain.main(['-s', '-z', connectString, '-f', dataPath, '--ignore-source'] as String[])
|
ZooKeeperMigratorMain.main(['-s', '-z', connectString, '-f', dataPath, '--ignore-source'] as String[])
|
||||||
|
|
|
@ -61,7 +61,7 @@
|
||||||
"version": 0,
|
"version": 0,
|
||||||
"cversion": 0,
|
"cversion": 0,
|
||||||
"aversion": 0,
|
"aversion": 0,
|
||||||
"ephemeralOwner": 96836130884026368,
|
"ephemeralOwner": 0,
|
||||||
"dataLength": 9,
|
"dataLength": 9,
|
||||||
"numChildren": 0,
|
"numChildren": 0,
|
||||||
"pzxid": 4
|
"pzxid": 4
|
||||||
|
@ -75,7 +75,7 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"ephemeralOwner": 96836130884026368
|
"ephemeralOwner": 0
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"path": "/nifi/2",
|
"path": "/nifi/2",
|
||||||
|
|
Loading…
Reference in New Issue