diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java index 1a646b0f9f..e65c3755c5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java @@ -25,8 +25,11 @@ import java.io.InputStream; import java.util.Properties; import org.apache.nifi.util.NiFiProperties; +import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; @@ -35,8 +38,12 @@ import org.slf4j.LoggerFactory; public class ZooKeeperStateServer extends ZooKeeperServerMain { private static final Logger logger = LoggerFactory.getLogger(ZooKeeperStateServer.class); - private QuorumPeerConfig quorumPeerConfig; - private boolean started = false; + private final QuorumPeerConfig quorumPeerConfig; + private volatile boolean started = false; + + private ServerCnxnFactory connectionFactory; + private FileTxnSnapLog transactionLog; + private ZooKeeperServer embeddedZkServer; private ZooKeeperStateServer(final Properties zkProperties) throws IOException, ConfigException { quorumPeerConfig = new QuorumPeerConfig(); @@ -46,11 +53,25 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain { public synchronized void start() throws IOException { logger.info("Starting Embedded ZooKeeper Server"); - final ServerConfig serverConfig = new ServerConfig(); - serverConfig.readFrom(quorumPeerConfig); + final ServerConfig config = new ServerConfig(); + config.readFrom(quorumPeerConfig); try { - runFromConfig(serverConfig); started = true; + + transactionLog = new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir())); + + embeddedZkServer = new ZooKeeperServer(); + embeddedZkServer.setTxnLogFactory(transactionLog); + embeddedZkServer.setTickTime(config.getTickTime()); + embeddedZkServer.setMinSessionTimeout(config.getMinSessionTimeout()); + embeddedZkServer.setMaxSessionTimeout(config.getMaxSessionTimeout()); + + connectionFactory = ServerCnxnFactory.createFactory(); + connectionFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); + connectionFactory.startup(embeddedZkServer); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Embedded ZooKeeper Server interrupted", e); } catch (final IOException ioe) { throw new IOException("Failed to start embedded ZooKeeper Server", ioe); } catch (final Exception e) { @@ -62,7 +83,22 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain { public synchronized void shutdown() { if (started) { started = false; - super.shutdown(); + + if (transactionLog != null) { + try { + transactionLog.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Transaction Log", ioe); + } + } + + if (connectionFactory != null) { + connectionFactory.shutdown(); + } + + if (embeddedZkServer != null && embeddedZkServer.isRunning()) { + embeddedZkServer.shutdown(); + } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java index 8379987871..dd6d880652 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java @@ -18,8 +18,6 @@ package org.apache.nifi.processors.standard; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.List; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -30,8 +28,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; @@ -72,20 +68,6 @@ public class FetchSFTP extends FetchFileTransfer { return properties; } - @Override - protected Collection customValidate(final ValidationContext validationContext) { - if (!validationContext.getProperty(SFTPTransfer.PASSWORD).isSet() && !(validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PASSPHRASE).isSet() - && validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PATH).isSet())) { - return Collections.singleton(new ValidationResult.Builder() - .subject("Password") - .valid(false) - .explanation("Must set either password or Private Key Path & Passphrase") - .build()); - } - - return Collections.emptyList(); - } - @Override protected FileTransfer createFileTransfer(final ProcessContext context) { return new SFTPTransfer(context, getLogger());