diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 7fbcabe66c8..178ad41e510 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -40,6 +40,9 @@ Release 2.9.0 - UNRELEASED
YARN-4526. Make SystemClock singleton so AppSchedulingInfo could use it.
(kasha)
+ YARN-4559. Make leader elector and zk store share the same curator client.
+ (Jian He via xgong)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 6fb39450d3b..c12377b36c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -271,6 +271,11 @@
+
+
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
index 3766676f81b..8c1a6eb0fc4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
@@ -19,14 +19,11 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
-import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.service.AbstractService;
@@ -44,35 +41,23 @@ public class LeaderElectorService extends AbstractService implements
private RMContext rmContext;
private String latchPath;
private String rmId;
+ private ResourceManager rm;
- public LeaderElectorService(RMContext rmContext) {
+ public LeaderElectorService(RMContext rmContext, ResourceManager rm) {
super(LeaderElectorService.class.getName());
this.rmContext = rmContext;
-
+ this.rm = rm;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
- String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
- Preconditions.checkNotNull(zkHostPort,
- YarnConfiguration.RM_ZK_ADDRESS + " is not set");
-
rmId = HAUtil.getRMHAId(conf);
String clusterId = YarnConfiguration.getClusterId(conf);
-
- int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
- YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
- int maxRetryNum = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
- YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
-
String zkBasePath = conf.get(
YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
latchPath = zkBasePath + "/" + clusterId;
-
- curator = CuratorFrameworkFactory.builder().connectString(zkHostPort)
- .retryPolicy(new RetryNTimes(maxRetryNum, zkSessionTimeout)).build();
- curator.start();
+ curator = rm.getCurator();
initAndStartLeaderLatch();
super.serviceInit(conf);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 33431f87133..40d627ec481 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
@@ -28,7 +32,11 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
-import org.apache.hadoop.security.*;
+import org.apache.hadoop.security.AuthenticationFilterInitializer;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.AbstractService;
@@ -40,6 +48,7 @@ import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -58,8 +67,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMaste
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -78,7 +87,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -96,12 +107,15 @@ import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
+import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -158,6 +172,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected ResourceTrackerService resourceTracker;
private JvmPauseMonitor pauseMonitor;
private boolean curatorEnabled = false;
+ private CuratorFramework curator;
+ private final String zkRootNodePassword =
+ Long.toString(new SecureRandom().nextLong());
+ private boolean recoveryEnabled;
@VisibleForTesting
protected String webAppAddress;
@@ -232,7 +250,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
- LeaderElectorService elector = new LeaderElectorService(rmContext);
+ this.curator = createAndStartCurator(conf);
+ LeaderElectorService elector = new LeaderElectorService(rmContext, this);
addService(elector);
rmContext.setLeaderElectorService(elector);
}
@@ -276,7 +295,58 @@ public class ResourceManager extends CompositeService implements Recoverable {
super.serviceInit(this.conf);
}
-
+
+ public CuratorFramework createAndStartCurator(Configuration conf)
+ throws Exception {
+ String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
+ if (zkHostPort == null) {
+ throw new YarnRuntimeException(
+ YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
+ }
+ int numRetries = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
+ YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
+ int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+ YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
+ int zkRetryInterval = conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
+
+ // set up zk auths
+ List zkAuths = RMZKUtils.getZKAuths(conf);
+ List authInfos = new ArrayList<>();
+ for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
+ authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
+ }
+
+ if (HAUtil.isHAEnabled(conf) && HAUtil.getConfValueForRMInstance(
+ YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf) == null) {
+ String zkRootNodeUsername = HAUtil
+ .getConfValueForRMInstance(YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
+ byte[] defaultFencingAuth =
+ (zkRootNodeUsername + ":" + zkRootNodePassword)
+ .getBytes(Charset.forName("UTF-8"));
+ authInfos.add(new AuthInfo(new DigestAuthenticationProvider().getScheme(),
+ defaultFencingAuth));
+ }
+
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(zkHostPort)
+ .sessionTimeoutMs(zkSessionTimeout)
+ .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval))
+ .authorization(authInfos).build();
+ client.start();
+ return client;
+ }
+
+ public CuratorFramework getCurator() {
+ return this.curator;
+ }
+
+ public String getZkRootNodePassword() {
+ return this.zkRootNodePassword;
+ }
+
+
protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler,
Configuration conf) {
return new QueueACLsManager(scheduler, conf);
@@ -412,7 +482,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
private ApplicationMasterLauncher applicationMasterLauncher;
private ContainerAllocationExpirer containerAllocationExpirer;
private ResourceManager rm;
- private boolean recoveryEnabled;
private RMActiveServiceContext activeServiceContext;
RMActiveServices(ResourceManager rm) {
@@ -453,29 +522,26 @@ public class ResourceManager extends CompositeService implements Recoverable {
rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
}
- boolean isRecoveryEnabled = conf.getBoolean(
- YarnConfiguration.RECOVERY_ENABLED,
+ recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
RMStateStore rmStore = null;
- if (isRecoveryEnabled) {
- recoveryEnabled = true;
+ if (recoveryEnabled) {
rmStore = RMStateStoreFactory.getStore(conf);
boolean isWorkPreservingRecoveryEnabled =
conf.getBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
rmContext
- .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
+ .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
} else {
- recoveryEnabled = false;
rmStore = new NullRMStateStore();
}
try {
+ rmStore.setResourceManager(rm);
rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher);
- rmStore.setResourceManager(rm);
} catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced
@@ -1130,6 +1196,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
configurationProvider.close();
}
super.serviceStop();
+ if (curator != null) {
+ curator.close();
+ }
transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING);
}
@@ -1177,7 +1246,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
public ClientRMService getClientRMService() {
return this.clientRM;
}
-
+
/**
* return the scheduler.
* @return the scheduler for the Resource Manager.
@@ -1348,5 +1417,4 @@ public class ResourceManager extends CompositeService implements Recoverable {
out.println(" "
+ "[-remove-application-from-state-store ]" + "\n");
}
-
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index ae17aaa4234..159c11c42fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -95,7 +95,7 @@ public abstract class RMStateStore extends AbstractService {
"ReservationSystemRoot";
protected static final String VERSION_NODE = "RMVersionNode";
protected static final String EPOCH_NODE = "EpochNode";
- private ResourceManager resourceManager;
+ protected ResourceManager resourceManager;
private final ReadLock readLock;
private final WriteLock writeLock;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index ddb8a0bdc46..51e5829b1d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -18,26 +18,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.nio.charset.Charset;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
-import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@@ -77,7 +64,15 @@ import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
/**
* {@link RMStateStore} implementation backed by ZooKeeper.
@@ -140,12 +135,6 @@ public class ZKRMStateStore extends RMStateStore {
private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
"RMDTMasterKeysRoot";
- private String zkHostPort = null;
- private int numRetries;
- private int zkSessionTimeout;
- @VisibleForTesting
- int zkRetryInterval;
-
/** Znode paths */
private String zkRootNodePath;
private String rmAppRoot;
@@ -160,17 +149,15 @@ public class ZKRMStateStore extends RMStateStore {
/** Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
- private boolean useDefaultFencingScheme = false;
private String fencingNodePath;
private Thread verifyActiveStatusThread;
+ private int zkSessionTimeout;
/** ACL and auth info */
private List zkAcl;
- private List zkAuths;
@VisibleForTesting
List zkRootNodeAcl;
private String zkRootNodeUsername;
- private final String zkRootNodePassword = Long.toString(random.nextLong());
public static final int CREATE_DELETE_PERMS =
ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
private final String zkRootNodeAuthScheme =
@@ -204,45 +191,25 @@ public class ZKRMStateStore extends RMStateStore {
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
Id rmId = new Id(zkRootNodeAuthScheme,
DigestAuthenticationProvider.generateDigest(
- zkRootNodeUsername + ":" + zkRootNodePassword));
+ zkRootNodeUsername + ":" + resourceManager.getZkRootNodePassword()));
zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
return zkRootNodeAcl;
}
@Override
public synchronized void initInternal(Configuration conf) throws Exception {
- zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
- if (zkHostPort == null) {
- throw new YarnRuntimeException("No server address specified for " +
- "zookeeper state store for Resource Manager recovery. " +
- YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
- }
- numRetries =
- conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
- YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
+
+ /* Initialize fencing related paths, acls, and ops */
znodeWorkingPath =
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
- zkSessionTimeout =
- conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
- YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
-
- if (HAUtil.isHAEnabled(conf)) {
- zkRetryInterval = zkSessionTimeout / numRetries;
- } else {
- zkRetryInterval =
- conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
- YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
- }
+ zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
+ fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
+ rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
+ zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+ YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
zkAcl = RMZKUtils.getZKAcls(conf);
- zkAuths = RMZKUtils.getZKAuths(conf);
-
- zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
- rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
-
- /* Initialize fencing related paths, acls, and ops */
- fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
if (HAUtil.isHAEnabled(conf)) {
String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
(YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
@@ -256,7 +223,6 @@ public class ZKRMStateStore extends RMStateStore {
throw bafe;
}
} else {
- useDefaultFencingScheme = true;
zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
}
}
@@ -272,19 +238,22 @@ public class ZKRMStateStore extends RMStateStore {
amrmTokenSecretManagerRoot =
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
+ curatorFramework = resourceManager.getCurator();
+ if (curatorFramework == null) {
+ curatorFramework = resourceManager.createAndStartCurator(conf);
+ }
}
@Override
public synchronized void startInternal() throws Exception {
- // createConnection for future API calls
- createConnection();
// ensure root dirs exist
createRootDirRecursively(znodeWorkingPath);
create(zkRootNodePath);
setRootNodeAcls();
delete(fencingNodePath);
- if (HAUtil.isHAEnabled(getConfig())) {
+ if (HAUtil.isHAEnabled(getConfig()) && !HAUtil
+ .isAutomaticFailoverEnabled(getConfig())) {
verifyActiveStatusThread = new VerifyActiveStatusThread();
verifyActiveStatusThread.start();
}
@@ -332,7 +301,9 @@ public class ZKRMStateStore extends RMStateStore {
verifyActiveStatusThread.interrupt();
verifyActiveStatusThread.join(1000);
}
- IOUtils.closeStream(curatorFramework);
+ if (!HAUtil.isHAEnabled(getConfig())) {
+ IOUtils.closeStream(curatorFramework);
+ }
}
@Override
@@ -909,34 +880,6 @@ public class ZKRMStateStore extends RMStateStore {
}
}
- /*
- * ZK operations using curator
- */
- private void createConnection() throws Exception {
- // Curator connection
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
- builder = builder.connectString(zkHostPort)
- .connectionTimeoutMs(zkSessionTimeout)
- .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval));
-
- // Set up authorization based on fencing scheme
- List authInfos = new ArrayList<>();
- for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
- authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
- }
- if (useDefaultFencingScheme) {
- byte[] defaultFencingAuth =
- (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(
- Charset.forName("UTF-8"));
- authInfos.add(new AuthInfo(zkRootNodeAuthScheme, defaultFencingAuth));
- }
- builder = builder.authorization(authInfos);
-
- // Connect to ZK
- curatorFramework = builder.build();
- curatorFramework.start();
- }
-
@VisibleForTesting
byte[] getData(final String path) throws Exception {
return curatorFramework.getData().forPath(path);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 406fcf69be3..7df31cf43ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -105,6 +105,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
throws Exception {
+ setResourceManager(new ResourceManager());
init(conf);
start();
assertTrue(znodeWorkingPath.equals(workingZnode));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
index e2704044c27..4b0b06a633e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
@@ -103,6 +104,7 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
store = new ZKRMStateStore();
+ store.setResourceManager(new ResourceManager());
store.init(conf);
store.start();
when(rmContext.getStateStore()).thenReturn(store);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
index d1884505611..6b19be3904b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
import org.apache.hadoop.util.ZKUtil;
@@ -80,6 +81,7 @@ public class TestZKRMStateStoreZKClientConnections {
public TestZKRMStateStore(Configuration conf, String workingZnode)
throws Exception {
+ setResourceManager(new ResourceManager());
init(conf);
start();
assertTrue(znodeWorkingPath.equals(workingZnode));
@@ -168,24 +170,4 @@ public class TestZKRMStateStoreZKClientConnections {
zkClientTester.getRMStateStore(conf);
}
-
- @Test
- public void testZKRetryInterval() throws Exception {
- TestZKClient zkClientTester = new TestZKClient();
- YarnConfiguration conf = new YarnConfiguration();
-
- ZKRMStateStore store =
- (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
- assertEquals(YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS,
- store.zkRetryInterval);
- store.stop();
-
- conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
- store =
- (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
- assertEquals(YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS /
- YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES,
- store.zkRetryInterval);
- store.stop();
- }
}