From 329e1fe2f4f17832d38e5058f39cef483bde986c Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 25 Jan 2016 14:33:22 -0500 Subject: [PATCH] NIFI-259: Construct QuorumPeer object instead of ZooKeeperServer object because we want to join part of a cluster when using embedded ZK Server --- .../state/server/ZooKeeperStateServer.java | 46 +++++++++++-------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java index e65c3755c5..de4176427c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java @@ -26,10 +26,10 @@ import java.util.Properties; import org.apache.nifi.util.NiFiProperties; import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ServerConfig; -import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; @@ -43,7 +43,7 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain { private ServerCnxnFactory connectionFactory; private FileTxnSnapLog transactionLog; - private ZooKeeperServer embeddedZkServer; + private QuorumPeer quorumPeer; private ZooKeeperStateServer(final Properties zkProperties) throws IOException, ConfigException { quorumPeerConfig = new QuorumPeerConfig(); @@ -53,25 +53,33 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain { public synchronized void start() throws IOException { logger.info("Starting Embedded ZooKeeper Server"); - final ServerConfig config = new ServerConfig(); - config.readFrom(quorumPeerConfig); try { started = true; - transactionLog = new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir())); - - embeddedZkServer = new ZooKeeperServer(); - embeddedZkServer.setTxnLogFactory(transactionLog); - embeddedZkServer.setTickTime(config.getTickTime()); - embeddedZkServer.setMinSessionTimeout(config.getMinSessionTimeout()); - embeddedZkServer.setMaxSessionTimeout(config.getMaxSessionTimeout()); + transactionLog = new FileTxnSnapLog(new File(quorumPeerConfig.getDataLogDir()), new File(quorumPeerConfig.getDataDir())); connectionFactory = ServerCnxnFactory.createFactory(); - connectionFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); - connectionFactory.startup(embeddedZkServer); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.warn("Embedded ZooKeeper Server interrupted", e); + connectionFactory.configure(quorumPeerConfig.getClientPortAddress(), quorumPeerConfig.getMaxClientCnxns()); + + quorumPeer = new QuorumPeer(); + quorumPeer.setClientPortAddress(quorumPeerConfig.getClientPortAddress()); + quorumPeer.setTxnFactory(new FileTxnSnapLog(new File(quorumPeerConfig.getDataLogDir()), new File(quorumPeerConfig.getDataDir()))); + quorumPeer.setQuorumPeers(quorumPeerConfig.getServers()); + quorumPeer.setElectionType(quorumPeerConfig.getElectionAlg()); + quorumPeer.setMyid(quorumPeerConfig.getServerId()); + quorumPeer.setTickTime(quorumPeerConfig.getTickTime()); + quorumPeer.setMinSessionTimeout(quorumPeerConfig.getMinSessionTimeout()); + quorumPeer.setMaxSessionTimeout(quorumPeerConfig.getMaxSessionTimeout()); + quorumPeer.setInitLimit(quorumPeerConfig.getInitLimit()); + quorumPeer.setSyncLimit(quorumPeerConfig.getSyncLimit()); + quorumPeer.setQuorumVerifier(quorumPeerConfig.getQuorumVerifier()); + quorumPeer.setCnxnFactory(connectionFactory); + quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); + quorumPeer.setLearnerType(quorumPeerConfig.getPeerType()); + quorumPeer.setSyncEnabled(quorumPeerConfig.getSyncEnabled()); + quorumPeer.setQuorumListenOnAllIPs(quorumPeerConfig.getQuorumListenOnAllIPs()); + + quorumPeer.start(); } catch (final IOException ioe) { throw new IOException("Failed to start embedded ZooKeeper Server", ioe); } catch (final Exception e) { @@ -96,8 +104,8 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain { connectionFactory.shutdown(); } - if (embeddedZkServer != null && embeddedZkServer.isRunning()) { - embeddedZkServer.shutdown(); + if (quorumPeer != null && quorumPeer.isRunning()) { + quorumPeer.shutdown(); } } }