YARN-4559. Make leader elector and zk store share the same curator

client. Contributed by Jian He
This commit is contained in:
Xuan 2016-01-20 14:48:10 -08:00
parent 2ec438e8f7
commit 890a2ebd1a
9 changed files with 129 additions and 140 deletions

View File

@ -98,6 +98,9 @@ Release 2.9.0 - UNRELEASED
YARN-4526. Make SystemClock singleton so AppSchedulingInfo could use it. YARN-4526. Make SystemClock singleton so AppSchedulingInfo could use it.
(kasha) (kasha)
YARN-4559. Make leader elector and zk store share the same curator client.
(Jian He via xgong)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -271,6 +271,11 @@
<Field name="numRetries" /> <Field name="numRetries" />
<Bug pattern="IS2_INCONSISTENT_SYNC" /> <Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match> </Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore" />
<Field name="resourceManager"/>
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match> <Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer"/> <Class name="org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer"/>
<Field name="renewalTimer" /> <Field name="renewalTimer" />

View File

@ -19,14 +19,11 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
@ -44,35 +41,23 @@ public class LeaderElectorService extends AbstractService implements
private RMContext rmContext; private RMContext rmContext;
private String latchPath; private String latchPath;
private String rmId; private String rmId;
private ResourceManager rm;
public LeaderElectorService(RMContext rmContext) { public LeaderElectorService(RMContext rmContext, ResourceManager rm) {
super(LeaderElectorService.class.getName()); super(LeaderElectorService.class.getName());
this.rmContext = rmContext; this.rmContext = rmContext;
this.rm = rm;
} }
@Override @Override
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
Preconditions.checkNotNull(zkHostPort,
YarnConfiguration.RM_ZK_ADDRESS + " is not set");
rmId = HAUtil.getRMHAId(conf); rmId = HAUtil.getRMHAId(conf);
String clusterId = YarnConfiguration.getClusterId(conf); String clusterId = YarnConfiguration.getClusterId(conf);
int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
int maxRetryNum = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
String zkBasePath = conf.get( String zkBasePath = conf.get(
YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH, YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH); YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
latchPath = zkBasePath + "/" + clusterId; latchPath = zkBasePath + "/" + clusterId;
curator = rm.getCurator();
curator = CuratorFrameworkFactory.builder().connectString(zkHostPort)
.retryPolicy(new RetryNTimes(maxRetryNum, zkSessionTimeout)).build();
curator.start();
initAndStartLeaderLatch(); initAndStartLeaderLatch();
super.serviceInit(conf); super.serviceInit(conf);
} }

View File

@ -21,6 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
@ -28,7 +32,11 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.http.lib.StaticUserWebFilter; import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.*; import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
@ -40,6 +48,7 @@ import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -58,8 +67,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMaste
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@ -78,7 +87,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@ -96,12 +107,15 @@ import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder; import org.apache.hadoop.yarn.webapp.WebApps.Builder;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -158,6 +172,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected ResourceTrackerService resourceTracker; protected ResourceTrackerService resourceTracker;
private JvmPauseMonitor pauseMonitor; private JvmPauseMonitor pauseMonitor;
private boolean curatorEnabled = false; private boolean curatorEnabled = false;
private CuratorFramework curator;
private final String zkRootNodePassword =
Long.toString(new SecureRandom().nextLong());
private boolean recoveryEnabled;
@VisibleForTesting @VisibleForTesting
protected String webAppAddress; protected String webAppAddress;
@ -232,7 +250,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) { if (curatorEnabled) {
LeaderElectorService elector = new LeaderElectorService(rmContext); this.curator = createAndStartCurator(conf);
LeaderElectorService elector = new LeaderElectorService(rmContext, this);
addService(elector); addService(elector);
rmContext.setLeaderElectorService(elector); rmContext.setLeaderElectorService(elector);
} }
@ -277,6 +296,57 @@ public class ResourceManager extends CompositeService implements Recoverable {
super.serviceInit(this.conf); super.serviceInit(this.conf);
} }
public CuratorFramework createAndStartCurator(Configuration conf)
throws Exception {
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
if (zkHostPort == null) {
throw new YarnRuntimeException(
YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
}
int numRetries = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
int zkRetryInterval = conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
// set up zk auths
List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
List<AuthInfo> authInfos = new ArrayList<>();
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
}
if (HAUtil.isHAEnabled(conf) && HAUtil.getConfValueForRMInstance(
YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf) == null) {
String zkRootNodeUsername = HAUtil
.getConfValueForRMInstance(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
byte[] defaultFencingAuth =
(zkRootNodeUsername + ":" + zkRootNodePassword)
.getBytes(Charset.forName("UTF-8"));
authInfos.add(new AuthInfo(new DigestAuthenticationProvider().getScheme(),
defaultFencingAuth));
}
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkHostPort)
.sessionTimeoutMs(zkSessionTimeout)
.retryPolicy(new RetryNTimes(numRetries, zkRetryInterval))
.authorization(authInfos).build();
client.start();
return client;
}
public CuratorFramework getCurator() {
return this.curator;
}
public String getZkRootNodePassword() {
return this.zkRootNodePassword;
}
protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler, protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler,
Configuration conf) { Configuration conf) {
return new QueueACLsManager(scheduler, conf); return new QueueACLsManager(scheduler, conf);
@ -412,7 +482,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
private ApplicationMasterLauncher applicationMasterLauncher; private ApplicationMasterLauncher applicationMasterLauncher;
private ContainerAllocationExpirer containerAllocationExpirer; private ContainerAllocationExpirer containerAllocationExpirer;
private ResourceManager rm; private ResourceManager rm;
private boolean recoveryEnabled;
private RMActiveServiceContext activeServiceContext; private RMActiveServiceContext activeServiceContext;
RMActiveServices(ResourceManager rm) { RMActiveServices(ResourceManager rm) {
@ -453,29 +522,26 @@ public class ResourceManager extends CompositeService implements Recoverable {
rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater); rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
} }
boolean isRecoveryEnabled = conf.getBoolean( recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
RMStateStore rmStore = null; RMStateStore rmStore = null;
if (isRecoveryEnabled) { if (recoveryEnabled) {
recoveryEnabled = true;
rmStore = RMStateStoreFactory.getStore(conf); rmStore = RMStateStoreFactory.getStore(conf);
boolean isWorkPreservingRecoveryEnabled = boolean isWorkPreservingRecoveryEnabled =
conf.getBoolean( conf.getBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED); YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
rmContext rmContext
.setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled); .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
} else { } else {
recoveryEnabled = false;
rmStore = new NullRMStateStore(); rmStore = new NullRMStateStore();
} }
try { try {
rmStore.setResourceManager(rm);
rmStore.init(conf); rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher); rmStore.setRMDispatcher(rmDispatcher);
rmStore.setResourceManager(rm);
} catch (Exception e) { } catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for // the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced // HA and we need to give up master status if we got fenced
@ -1130,6 +1196,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
configurationProvider.close(); configurationProvider.close();
} }
super.serviceStop(); super.serviceStop();
if (curator != null) {
curator.close();
}
transitionToStandby(false); transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING); rmContext.setHAServiceState(HAServiceState.STOPPING);
} }
@ -1348,5 +1417,4 @@ public class ResourceManager extends CompositeService implements Recoverable {
out.println(" " out.println(" "
+ "[-remove-application-from-state-store <appId>]" + "\n"); + "[-remove-application-from-state-store <appId>]" + "\n");
} }
} }

View File

@ -95,7 +95,7 @@ public abstract class RMStateStore extends AbstractService {
"ReservationSystemRoot"; "ReservationSystemRoot";
protected static final String VERSION_NODE = "RMVersionNode"; protected static final String VERSION_NODE = "RMVersionNode";
protected static final String EPOCH_NODE = "EpochNode"; protected static final String EPOCH_NODE = "EpochNode";
private ResourceManager resourceManager; protected ResourceManager resourceManager;
private final ReadLock readLock; private final ReadLock readLock;
private final WriteLock writeLock; private final WriteLock writeLock;

View File

@ -18,26 +18,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery; package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.ByteArrayInputStream; import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.nio.charset.Charset;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorTransaction; import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -77,7 +64,15 @@ import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import com.google.common.annotations.VisibleForTesting; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/** /**
* {@link RMStateStore} implementation backed by ZooKeeper. * {@link RMStateStore} implementation backed by ZooKeeper.
@ -140,12 +135,6 @@ public class ZKRMStateStore extends RMStateStore {
private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME = private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
"RMDTMasterKeysRoot"; "RMDTMasterKeysRoot";
private String zkHostPort = null;
private int numRetries;
private int zkSessionTimeout;
@VisibleForTesting
int zkRetryInterval;
/** Znode paths */ /** Znode paths */
private String zkRootNodePath; private String zkRootNodePath;
private String rmAppRoot; private String rmAppRoot;
@ -160,17 +149,15 @@ public class ZKRMStateStore extends RMStateStore {
/** Fencing related variables */ /** Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
private boolean useDefaultFencingScheme = false;
private String fencingNodePath; private String fencingNodePath;
private Thread verifyActiveStatusThread; private Thread verifyActiveStatusThread;
private int zkSessionTimeout;
/** ACL and auth info */ /** ACL and auth info */
private List<ACL> zkAcl; private List<ACL> zkAcl;
private List<ZKUtil.ZKAuthInfo> zkAuths;
@VisibleForTesting @VisibleForTesting
List<ACL> zkRootNodeAcl; List<ACL> zkRootNodeAcl;
private String zkRootNodeUsername; private String zkRootNodeUsername;
private final String zkRootNodePassword = Long.toString(random.nextLong());
public static final int CREATE_DELETE_PERMS = public static final int CREATE_DELETE_PERMS =
ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE; ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
private final String zkRootNodeAuthScheme = private final String zkRootNodeAuthScheme =
@ -204,45 +191,25 @@ public class ZKRMStateStore extends RMStateStore {
YarnConfiguration.DEFAULT_RM_ADDRESS, conf); YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
Id rmId = new Id(zkRootNodeAuthScheme, Id rmId = new Id(zkRootNodeAuthScheme,
DigestAuthenticationProvider.generateDigest( DigestAuthenticationProvider.generateDigest(
zkRootNodeUsername + ":" + zkRootNodePassword)); zkRootNodeUsername + ":" + resourceManager.getZkRootNodePassword()));
zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId)); zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
return zkRootNodeAcl; return zkRootNodeAcl;
} }
@Override @Override
public synchronized void initInternal(Configuration conf) throws Exception { public synchronized void initInternal(Configuration conf) throws Exception {
zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
if (zkHostPort == null) { /* Initialize fencing related paths, acls, and ops */
throw new YarnRuntimeException("No server address specified for " +
"zookeeper state store for Resource Manager recovery. " +
YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
}
numRetries =
conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
znodeWorkingPath = znodeWorkingPath =
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH); YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
zkSessionTimeout = zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
if (HAUtil.isHAEnabled(conf)) { YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
zkRetryInterval = zkSessionTimeout / numRetries;
} else {
zkRetryInterval =
conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
}
zkAcl = RMZKUtils.getZKAcls(conf); zkAcl = RMZKUtils.getZKAcls(conf);
zkAuths = RMZKUtils.getZKAuths(conf);
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
/* Initialize fencing related paths, acls, and ops */
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
if (HAUtil.isHAEnabled(conf)) { if (HAUtil.isHAEnabled(conf)) {
String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
(YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf); (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
@ -256,7 +223,6 @@ public class ZKRMStateStore extends RMStateStore {
throw bafe; throw bafe;
} }
} else { } else {
useDefaultFencingScheme = true;
zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl); zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
} }
} }
@ -272,19 +238,22 @@ public class ZKRMStateStore extends RMStateStore {
amrmTokenSecretManagerRoot = amrmTokenSecretManagerRoot =
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT); getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT); reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
curatorFramework = resourceManager.getCurator();
if (curatorFramework == null) {
curatorFramework = resourceManager.createAndStartCurator(conf);
}
} }
@Override @Override
public synchronized void startInternal() throws Exception { public synchronized void startInternal() throws Exception {
// createConnection for future API calls
createConnection();
// ensure root dirs exist // ensure root dirs exist
createRootDirRecursively(znodeWorkingPath); createRootDirRecursively(znodeWorkingPath);
create(zkRootNodePath); create(zkRootNodePath);
setRootNodeAcls(); setRootNodeAcls();
delete(fencingNodePath); delete(fencingNodePath);
if (HAUtil.isHAEnabled(getConfig())) { if (HAUtil.isHAEnabled(getConfig()) && !HAUtil
.isAutomaticFailoverEnabled(getConfig())) {
verifyActiveStatusThread = new VerifyActiveStatusThread(); verifyActiveStatusThread = new VerifyActiveStatusThread();
verifyActiveStatusThread.start(); verifyActiveStatusThread.start();
} }
@ -332,7 +301,9 @@ public class ZKRMStateStore extends RMStateStore {
verifyActiveStatusThread.interrupt(); verifyActiveStatusThread.interrupt();
verifyActiveStatusThread.join(1000); verifyActiveStatusThread.join(1000);
} }
IOUtils.closeStream(curatorFramework); if (!HAUtil.isHAEnabled(getConfig())) {
IOUtils.closeStream(curatorFramework);
}
} }
@Override @Override
@ -909,34 +880,6 @@ public class ZKRMStateStore extends RMStateStore {
} }
} }
/*
* ZK operations using curator
*/
private void createConnection() throws Exception {
// Curator connection
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
builder = builder.connectString(zkHostPort)
.connectionTimeoutMs(zkSessionTimeout)
.retryPolicy(new RetryNTimes(numRetries, zkRetryInterval));
// Set up authorization based on fencing scheme
List<AuthInfo> authInfos = new ArrayList<>();
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
}
if (useDefaultFencingScheme) {
byte[] defaultFencingAuth =
(zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(
Charset.forName("UTF-8"));
authInfos.add(new AuthInfo(zkRootNodeAuthScheme, defaultFencingAuth));
}
builder = builder.authorization(authInfos);
// Connect to ZK
curatorFramework = builder.build();
curatorFramework.start();
}
@VisibleForTesting @VisibleForTesting
byte[] getData(final String path) throws Exception { byte[] getData(final String path) throws Exception {
return curatorFramework.getData().forPath(path); return curatorFramework.getData().forPath(path);

View File

@ -105,6 +105,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
public TestZKRMStateStoreInternal(Configuration conf, String workingZnode) public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
throws Exception { throws Exception {
setResourceManager(new ResourceManager());
init(conf); init(conf);
start(); start();
assertTrue(znodeWorkingPath.equals(workingZnode)); assertTrue(znodeWorkingPath.equals(workingZnode));

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
@ -103,6 +104,7 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
store = new ZKRMStateStore(); store = new ZKRMStateStore();
store.setResourceManager(new ResourceManager());
store.init(conf); store.init(conf);
store.start(); store.start();
when(rmContext.getStateStore()).thenReturn(store); when(rmContext.getStateStore()).thenReturn(store);

View File

@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.curator.test.TestingServer; import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.util.ZKUtil;
@ -80,6 +81,7 @@ public class TestZKRMStateStoreZKClientConnections {
public TestZKRMStateStore(Configuration conf, String workingZnode) public TestZKRMStateStore(Configuration conf, String workingZnode)
throws Exception { throws Exception {
setResourceManager(new ResourceManager());
init(conf); init(conf);
start(); start();
assertTrue(znodeWorkingPath.equals(workingZnode)); assertTrue(znodeWorkingPath.equals(workingZnode));
@ -168,24 +170,4 @@ public class TestZKRMStateStoreZKClientConnections {
zkClientTester.getRMStateStore(conf); zkClientTester.getRMStateStore(conf);
} }
@Test
public void testZKRetryInterval() throws Exception {
TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration();
ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
assertEquals(YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS,
store.zkRetryInterval);
store.stop();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
assertEquals(YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS /
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES,
store.zkRetryInterval);
store.stop();
}
} }