NIFI-259: Construct QuorumPeer object instead of ZooKeeperServer object because we want to join part of a cluster when using embedded ZK Server

This commit is contained in:
Mark Payne 2016-01-25 14:33:22 -05:00
parent fcf837b5af
commit 329e1fe2f4
1 changed files with 27 additions and 19 deletions

View File

@ -26,10 +26,10 @@ import java.util.Properties;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -43,7 +43,7 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
private ServerCnxnFactory connectionFactory; private ServerCnxnFactory connectionFactory;
private FileTxnSnapLog transactionLog; private FileTxnSnapLog transactionLog;
private ZooKeeperServer embeddedZkServer; private QuorumPeer quorumPeer;
private ZooKeeperStateServer(final Properties zkProperties) throws IOException, ConfigException { private ZooKeeperStateServer(final Properties zkProperties) throws IOException, ConfigException {
quorumPeerConfig = new QuorumPeerConfig(); quorumPeerConfig = new QuorumPeerConfig();
@ -53,25 +53,33 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
public synchronized void start() throws IOException { public synchronized void start() throws IOException {
logger.info("Starting Embedded ZooKeeper Server"); logger.info("Starting Embedded ZooKeeper Server");
final ServerConfig config = new ServerConfig();
config.readFrom(quorumPeerConfig);
try { try {
started = true; started = true;
transactionLog = new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir())); transactionLog = new FileTxnSnapLog(new File(quorumPeerConfig.getDataLogDir()), new File(quorumPeerConfig.getDataDir()));
embeddedZkServer = new ZooKeeperServer();
embeddedZkServer.setTxnLogFactory(transactionLog);
embeddedZkServer.setTickTime(config.getTickTime());
embeddedZkServer.setMinSessionTimeout(config.getMinSessionTimeout());
embeddedZkServer.setMaxSessionTimeout(config.getMaxSessionTimeout());
connectionFactory = ServerCnxnFactory.createFactory(); connectionFactory = ServerCnxnFactory.createFactory();
connectionFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); connectionFactory.configure(quorumPeerConfig.getClientPortAddress(), quorumPeerConfig.getMaxClientCnxns());
connectionFactory.startup(embeddedZkServer);
} catch (InterruptedException e) { quorumPeer = new QuorumPeer();
Thread.currentThread().interrupt(); quorumPeer.setClientPortAddress(quorumPeerConfig.getClientPortAddress());
logger.warn("Embedded ZooKeeper Server interrupted", e); quorumPeer.setTxnFactory(new FileTxnSnapLog(new File(quorumPeerConfig.getDataLogDir()), new File(quorumPeerConfig.getDataDir())));
quorumPeer.setQuorumPeers(quorumPeerConfig.getServers());
quorumPeer.setElectionType(quorumPeerConfig.getElectionAlg());
quorumPeer.setMyid(quorumPeerConfig.getServerId());
quorumPeer.setTickTime(quorumPeerConfig.getTickTime());
quorumPeer.setMinSessionTimeout(quorumPeerConfig.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(quorumPeerConfig.getMaxSessionTimeout());
quorumPeer.setInitLimit(quorumPeerConfig.getInitLimit());
quorumPeer.setSyncLimit(quorumPeerConfig.getSyncLimit());
quorumPeer.setQuorumVerifier(quorumPeerConfig.getQuorumVerifier());
quorumPeer.setCnxnFactory(connectionFactory);
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(quorumPeerConfig.getPeerType());
quorumPeer.setSyncEnabled(quorumPeerConfig.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(quorumPeerConfig.getQuorumListenOnAllIPs());
quorumPeer.start();
} catch (final IOException ioe) { } catch (final IOException ioe) {
throw new IOException("Failed to start embedded ZooKeeper Server", ioe); throw new IOException("Failed to start embedded ZooKeeper Server", ioe);
} catch (final Exception e) { } catch (final Exception e) {
@ -96,8 +104,8 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
connectionFactory.shutdown(); connectionFactory.shutdown();
} }
if (embeddedZkServer != null && embeddedZkServer.isRunning()) { if (quorumPeer != null && quorumPeer.isRunning()) {
embeddedZkServer.shutdown(); quorumPeer.shutdown();
} }
} }
} }