NIFI-259: Update ZooKeeperStateServer so that the thread starting it doesn't block indefinitely, as this was causing NiFi to not shut down properly; applied patch for NIFI-1415

This commit is contained in:
Mark Payne 2016-01-20 11:52:41 -05:00
parent 593f1288d8
commit d696391f76
2 changed files with 42 additions and 24 deletions

View File

@ -25,8 +25,11 @@ import java.io.InputStream;
import java.util.Properties;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
@ -35,8 +38,12 @@ import org.slf4j.LoggerFactory;
public class ZooKeeperStateServer extends ZooKeeperServerMain {
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperStateServer.class);
private QuorumPeerConfig quorumPeerConfig;
private boolean started = false;
private final QuorumPeerConfig quorumPeerConfig;
private volatile boolean started = false;
private ServerCnxnFactory connectionFactory;
private FileTxnSnapLog transactionLog;
private ZooKeeperServer embeddedZkServer;
private ZooKeeperStateServer(final Properties zkProperties) throws IOException, ConfigException {
quorumPeerConfig = new QuorumPeerConfig();
@ -46,11 +53,25 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
public synchronized void start() throws IOException {
logger.info("Starting Embedded ZooKeeper Server");
final ServerConfig serverConfig = new ServerConfig();
serverConfig.readFrom(quorumPeerConfig);
final ServerConfig config = new ServerConfig();
config.readFrom(quorumPeerConfig);
try {
runFromConfig(serverConfig);
started = true;
transactionLog = new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir()));
embeddedZkServer = new ZooKeeperServer();
embeddedZkServer.setTxnLogFactory(transactionLog);
embeddedZkServer.setTickTime(config.getTickTime());
embeddedZkServer.setMinSessionTimeout(config.getMinSessionTimeout());
embeddedZkServer.setMaxSessionTimeout(config.getMaxSessionTimeout());
connectionFactory = ServerCnxnFactory.createFactory();
connectionFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
connectionFactory.startup(embeddedZkServer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Embedded ZooKeeper Server interrupted", e);
} catch (final IOException ioe) {
throw new IOException("Failed to start embedded ZooKeeper Server", ioe);
} catch (final Exception e) {
@ -62,7 +83,22 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
public synchronized void shutdown() {
if (started) {
started = false;
super.shutdown();
if (transactionLog != null) {
try {
transactionLog.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Transaction Log", ioe);
}
}
if (connectionFactory != null) {
connectionFactory.shutdown();
}
if (embeddedZkServer != null && embeddedZkServer.isRunning()) {
embeddedZkServer.shutdown();
}
}
}

View File

@ -18,8 +18,6 @@
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -30,8 +28,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
@ -72,20 +68,6 @@ public class FetchSFTP extends FetchFileTransfer {
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
if (!validationContext.getProperty(SFTPTransfer.PASSWORD).isSet() && !(validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PASSPHRASE).isSet()
&& validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PATH).isSet())) {
return Collections.singleton(new ValidationResult.Builder()
.subject("Password")
.valid(false)
.explanation("Must set either password or Private Key Path & Passphrase")
.build());
}
return Collections.emptyList();
}
@Override
protected FileTransfer createFileTransfer(final ProcessContext context) {
return new SFTPTransfer(context, getLogger());