YARN-4559. Make leader elector and zk store share the same curator
client. Contributed by Jian He (cherry picked from commit890a2ebd1a
) (cherry picked from commitbe3322792f
)
This commit is contained in:
parent
059d33bb8b
commit
e0401e0781
|
@ -275,6 +275,11 @@
|
|||
<Field name="numRetries" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore" />
|
||||
<Field name="resourceManager"/>
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer"/>
|
||||
<Field name="renewalTimer" />
|
||||
|
|
|
@ -19,14 +19,11 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.framework.recipes.leader.LeaderLatch;
|
||||
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
|
||||
import org.apache.curator.retry.RetryNTimes;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
|
@ -44,35 +41,23 @@ public class LeaderElectorService extends AbstractService implements
|
|||
private RMContext rmContext;
|
||||
private String latchPath;
|
||||
private String rmId;
|
||||
private ResourceManager rm;
|
||||
|
||||
public LeaderElectorService(RMContext rmContext) {
|
||||
public LeaderElectorService(RMContext rmContext, ResourceManager rm) {
|
||||
super(LeaderElectorService.class.getName());
|
||||
this.rmContext = rmContext;
|
||||
|
||||
this.rm = rm;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
|
||||
Preconditions.checkNotNull(zkHostPort,
|
||||
YarnConfiguration.RM_ZK_ADDRESS + " is not set");
|
||||
|
||||
rmId = HAUtil.getRMHAId(conf);
|
||||
String clusterId = YarnConfiguration.getClusterId(conf);
|
||||
|
||||
int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
|
||||
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
|
||||
int maxRetryNum = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
|
||||
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
|
||||
|
||||
String zkBasePath = conf.get(
|
||||
YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
|
||||
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
|
||||
latchPath = zkBasePath + "/" + clusterId;
|
||||
|
||||
curator = CuratorFrameworkFactory.builder().connectString(zkHostPort)
|
||||
.retryPolicy(new RetryNTimes(maxRetryNum, zkSessionTimeout)).build();
|
||||
curator.start();
|
||||
curator = rm.getCurator();
|
||||
initAndStartLeaderLatch();
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
|
|
@ -21,6 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.curator.framework.AuthInfo;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.retry.RetryNTimes;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
|
@ -28,7 +32,11 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|||
import org.apache.hadoop.http.lib.StaticUserWebFilter;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||
import org.apache.hadoop.security.*;
|
||||
import org.apache.hadoop.security.AuthenticationFilterInitializer;
|
||||
import org.apache.hadoop.security.Groups;
|
||||
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
|
@ -40,6 +48,7 @@ import org.apache.hadoop.util.JvmPauseMonitor;
|
|||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.ZKUtil;
|
||||
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -58,8 +67,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMaste
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
|
@ -78,7 +87,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
|
@ -96,12 +107,15 @@ import org.apache.hadoop.yarn.webapp.WebApp;
|
|||
import org.apache.hadoop.yarn.webapp.WebApps;
|
||||
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.Charset;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -158,6 +172,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
protected ResourceTrackerService resourceTracker;
|
||||
private JvmPauseMonitor pauseMonitor;
|
||||
private boolean curatorEnabled = false;
|
||||
private CuratorFramework curator;
|
||||
private final String zkRootNodePassword =
|
||||
Long.toString(new SecureRandom().nextLong());
|
||||
private boolean recoveryEnabled;
|
||||
|
||||
@VisibleForTesting
|
||||
protected String webAppAddress;
|
||||
|
@ -237,7 +255,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
|
||||
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
|
||||
if (curatorEnabled) {
|
||||
LeaderElectorService elector = new LeaderElectorService(rmContext);
|
||||
this.curator = createAndStartCurator(conf);
|
||||
LeaderElectorService elector = new LeaderElectorService(rmContext, this);
|
||||
addService(elector);
|
||||
rmContext.setLeaderElectorService(elector);
|
||||
}
|
||||
|
@ -281,7 +300,58 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
|
||||
super.serviceInit(this.conf);
|
||||
}
|
||||
|
||||
|
||||
public CuratorFramework createAndStartCurator(Configuration conf)
|
||||
throws Exception {
|
||||
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
|
||||
if (zkHostPort == null) {
|
||||
throw new YarnRuntimeException(
|
||||
YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
|
||||
}
|
||||
int numRetries = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
|
||||
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
|
||||
int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
|
||||
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
|
||||
int zkRetryInterval = conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
|
||||
|
||||
// set up zk auths
|
||||
List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
|
||||
List<AuthInfo> authInfos = new ArrayList<>();
|
||||
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
|
||||
authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
|
||||
}
|
||||
|
||||
if (HAUtil.isHAEnabled(conf) && HAUtil.getConfValueForRMInstance(
|
||||
YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf) == null) {
|
||||
String zkRootNodeUsername = HAUtil
|
||||
.getConfValueForRMInstance(YarnConfiguration.RM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
|
||||
byte[] defaultFencingAuth =
|
||||
(zkRootNodeUsername + ":" + zkRootNodePassword)
|
||||
.getBytes(Charset.forName("UTF-8"));
|
||||
authInfos.add(new AuthInfo(new DigestAuthenticationProvider().getScheme(),
|
||||
defaultFencingAuth));
|
||||
}
|
||||
|
||||
CuratorFramework client = CuratorFrameworkFactory.builder()
|
||||
.connectString(zkHostPort)
|
||||
.sessionTimeoutMs(zkSessionTimeout)
|
||||
.retryPolicy(new RetryNTimes(numRetries, zkRetryInterval))
|
||||
.authorization(authInfos).build();
|
||||
client.start();
|
||||
return client;
|
||||
}
|
||||
|
||||
public CuratorFramework getCurator() {
|
||||
return this.curator;
|
||||
}
|
||||
|
||||
public String getZkRootNodePassword() {
|
||||
return this.zkRootNodePassword;
|
||||
}
|
||||
|
||||
|
||||
protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler,
|
||||
Configuration conf) {
|
||||
return new QueueACLsManager(scheduler, conf);
|
||||
|
@ -417,7 +487,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
private ApplicationMasterLauncher applicationMasterLauncher;
|
||||
private ContainerAllocationExpirer containerAllocationExpirer;
|
||||
private ResourceManager rm;
|
||||
private boolean recoveryEnabled;
|
||||
private RMActiveServiceContext activeServiceContext;
|
||||
|
||||
RMActiveServices(ResourceManager rm) {
|
||||
|
@ -458,29 +527,26 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
|
||||
}
|
||||
|
||||
boolean isRecoveryEnabled = conf.getBoolean(
|
||||
YarnConfiguration.RECOVERY_ENABLED,
|
||||
recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
|
||||
|
||||
RMStateStore rmStore = null;
|
||||
if (isRecoveryEnabled) {
|
||||
recoveryEnabled = true;
|
||||
if (recoveryEnabled) {
|
||||
rmStore = RMStateStoreFactory.getStore(conf);
|
||||
boolean isWorkPreservingRecoveryEnabled =
|
||||
conf.getBoolean(
|
||||
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
|
||||
rmContext
|
||||
.setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
|
||||
.setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
|
||||
} else {
|
||||
recoveryEnabled = false;
|
||||
rmStore = new NullRMStateStore();
|
||||
}
|
||||
|
||||
try {
|
||||
rmStore.setResourceManager(rm);
|
||||
rmStore.init(conf);
|
||||
rmStore.setRMDispatcher(rmDispatcher);
|
||||
rmStore.setResourceManager(rm);
|
||||
} catch (Exception e) {
|
||||
// the Exception from stateStore.init() needs to be handled for
|
||||
// HA and we need to give up master status if we got fenced
|
||||
|
@ -1117,6 +1183,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
configurationProvider.close();
|
||||
}
|
||||
super.serviceStop();
|
||||
if (curator != null) {
|
||||
curator.close();
|
||||
}
|
||||
transitionToStandby(false);
|
||||
rmContext.setHAServiceState(HAServiceState.STOPPING);
|
||||
}
|
||||
|
@ -1164,7 +1233,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
public ClientRMService getClientRMService() {
|
||||
return this.clientRM;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* return the scheduler.
|
||||
* @return the scheduler for the Resource Manager.
|
||||
|
@ -1335,5 +1404,4 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
out.println(" "
|
||||
+ "[-remove-application-from-state-store <appId>]" + "\n");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -96,7 +96,7 @@ public abstract class RMStateStore extends AbstractService {
|
|||
"ReservationSystemRoot";
|
||||
protected static final String VERSION_NODE = "RMVersionNode";
|
||||
protected static final String EPOCH_NODE = "EpochNode";
|
||||
private ResourceManager resourceManager;
|
||||
protected ResourceManager resourceManager;
|
||||
private final ReadLock readLock;
|
||||
private final WriteLock writeLock;
|
||||
|
||||
|
|
|
@ -18,26 +18,13 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.curator.framework.AuthInfo;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.framework.api.transaction.CuratorTransaction;
|
||||
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
|
||||
import org.apache.curator.retry.RetryNTimes;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -77,7 +64,15 @@ import org.apache.zookeeper.data.Id;
|
|||
import org.apache.zookeeper.data.Stat;
|
||||
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* {@link RMStateStore} implementation backed by ZooKeeper.
|
||||
|
@ -140,12 +135,6 @@ public class ZKRMStateStore extends RMStateStore {
|
|||
private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
|
||||
"RMDTMasterKeysRoot";
|
||||
|
||||
private String zkHostPort = null;
|
||||
private int numRetries;
|
||||
private int zkSessionTimeout;
|
||||
@VisibleForTesting
|
||||
int zkRetryInterval;
|
||||
|
||||
/** Znode paths */
|
||||
private String zkRootNodePath;
|
||||
private String rmAppRoot;
|
||||
|
@ -160,17 +149,15 @@ public class ZKRMStateStore extends RMStateStore {
|
|||
|
||||
/** Fencing related variables */
|
||||
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
|
||||
private boolean useDefaultFencingScheme = false;
|
||||
private String fencingNodePath;
|
||||
private Thread verifyActiveStatusThread;
|
||||
private int zkSessionTimeout;
|
||||
|
||||
/** ACL and auth info */
|
||||
private List<ACL> zkAcl;
|
||||
private List<ZKUtil.ZKAuthInfo> zkAuths;
|
||||
@VisibleForTesting
|
||||
List<ACL> zkRootNodeAcl;
|
||||
private String zkRootNodeUsername;
|
||||
private final String zkRootNodePassword = Long.toString(random.nextLong());
|
||||
public static final int CREATE_DELETE_PERMS =
|
||||
ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
|
||||
private final String zkRootNodeAuthScheme =
|
||||
|
@ -204,45 +191,25 @@ public class ZKRMStateStore extends RMStateStore {
|
|||
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
|
||||
Id rmId = new Id(zkRootNodeAuthScheme,
|
||||
DigestAuthenticationProvider.generateDigest(
|
||||
zkRootNodeUsername + ":" + zkRootNodePassword));
|
||||
zkRootNodeUsername + ":" + resourceManager.getZkRootNodePassword()));
|
||||
zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
|
||||
return zkRootNodeAcl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void initInternal(Configuration conf) throws Exception {
|
||||
zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
|
||||
if (zkHostPort == null) {
|
||||
throw new YarnRuntimeException("No server address specified for " +
|
||||
"zookeeper state store for Resource Manager recovery. " +
|
||||
YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
|
||||
}
|
||||
numRetries =
|
||||
conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
|
||||
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
|
||||
|
||||
/* Initialize fencing related paths, acls, and ops */
|
||||
znodeWorkingPath =
|
||||
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
|
||||
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
|
||||
zkSessionTimeout =
|
||||
conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
|
||||
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
|
||||
|
||||
if (HAUtil.isHAEnabled(conf)) {
|
||||
zkRetryInterval = zkSessionTimeout / numRetries;
|
||||
} else {
|
||||
zkRetryInterval =
|
||||
conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
|
||||
}
|
||||
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
|
||||
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
|
||||
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
|
||||
zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
|
||||
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
|
||||
|
||||
zkAcl = RMZKUtils.getZKAcls(conf);
|
||||
zkAuths = RMZKUtils.getZKAuths(conf);
|
||||
|
||||
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
|
||||
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
|
||||
|
||||
/* Initialize fencing related paths, acls, and ops */
|
||||
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
|
||||
if (HAUtil.isHAEnabled(conf)) {
|
||||
String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
|
||||
(YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
|
||||
|
@ -256,7 +223,6 @@ public class ZKRMStateStore extends RMStateStore {
|
|||
throw bafe;
|
||||
}
|
||||
} else {
|
||||
useDefaultFencingScheme = true;
|
||||
zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
|
||||
}
|
||||
}
|
||||
|
@ -272,19 +238,22 @@ public class ZKRMStateStore extends RMStateStore {
|
|||
amrmTokenSecretManagerRoot =
|
||||
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
|
||||
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
|
||||
curatorFramework = resourceManager.getCurator();
|
||||
if (curatorFramework == null) {
|
||||
curatorFramework = resourceManager.createAndStartCurator(conf);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void startInternal() throws Exception {
|
||||
// createConnection for future API calls
|
||||
createConnection();
|
||||
|
||||
// ensure root dirs exist
|
||||
createRootDirRecursively(znodeWorkingPath);
|
||||
create(zkRootNodePath);
|
||||
setRootNodeAcls();
|
||||
delete(fencingNodePath);
|
||||
if (HAUtil.isHAEnabled(getConfig())) {
|
||||
if (HAUtil.isHAEnabled(getConfig()) && !HAUtil
|
||||
.isAutomaticFailoverEnabled(getConfig())) {
|
||||
verifyActiveStatusThread = new VerifyActiveStatusThread();
|
||||
verifyActiveStatusThread.start();
|
||||
}
|
||||
|
@ -332,7 +301,9 @@ public class ZKRMStateStore extends RMStateStore {
|
|||
verifyActiveStatusThread.interrupt();
|
||||
verifyActiveStatusThread.join(1000);
|
||||
}
|
||||
IOUtils.closeStream(curatorFramework);
|
||||
if (!HAUtil.isHAEnabled(getConfig())) {
|
||||
IOUtils.closeStream(curatorFramework);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -890,34 +861,6 @@ public class ZKRMStateStore extends RMStateStore {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* ZK operations using curator
|
||||
*/
|
||||
private void createConnection() throws Exception {
|
||||
// Curator connection
|
||||
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
|
||||
builder = builder.connectString(zkHostPort)
|
||||
.connectionTimeoutMs(zkSessionTimeout)
|
||||
.retryPolicy(new RetryNTimes(numRetries, zkRetryInterval));
|
||||
|
||||
// Set up authorization based on fencing scheme
|
||||
List<AuthInfo> authInfos = new ArrayList<>();
|
||||
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
|
||||
authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
|
||||
}
|
||||
if (useDefaultFencingScheme) {
|
||||
byte[] defaultFencingAuth =
|
||||
(zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(
|
||||
Charset.forName("UTF-8"));
|
||||
authInfos.add(new AuthInfo(zkRootNodeAuthScheme, defaultFencingAuth));
|
||||
}
|
||||
builder = builder.authorization(authInfos);
|
||||
|
||||
// Connect to ZK
|
||||
curatorFramework = builder.build();
|
||||
curatorFramework.start();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
byte[] getData(final String path) throws Exception {
|
||||
return curatorFramework.getData().forPath(path);
|
||||
|
|
|
@ -106,6 +106,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
|||
|
||||
public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
|
||||
throws Exception {
|
||||
setResourceManager(new ResourceManager());
|
||||
init(conf);
|
||||
start();
|
||||
assertTrue(znodeWorkingPath.equals(workingZnode));
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||
|
@ -103,6 +104,7 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
|
|||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||
|
||||
store = new ZKRMStateStore();
|
||||
store.setResourceManager(new ResourceManager());
|
||||
store.init(conf);
|
||||
store.start();
|
||||
when(rmContext.getStateStore()).thenReturn(store);
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.curator.test.TestingServer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
|
||||
import org.apache.hadoop.util.ZKUtil;
|
||||
|
||||
|
@ -80,6 +81,7 @@ public class TestZKRMStateStoreZKClientConnections {
|
|||
|
||||
public TestZKRMStateStore(Configuration conf, String workingZnode)
|
||||
throws Exception {
|
||||
setResourceManager(new ResourceManager());
|
||||
init(conf);
|
||||
start();
|
||||
assertTrue(znodeWorkingPath.equals(workingZnode));
|
||||
|
@ -168,24 +170,4 @@ public class TestZKRMStateStoreZKClientConnections {
|
|||
|
||||
zkClientTester.getRMStateStore(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZKRetryInterval() throws Exception {
|
||||
TestZKClient zkClientTester = new TestZKClient();
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
|
||||
ZKRMStateStore store =
|
||||
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
|
||||
assertEquals(YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS,
|
||||
store.zkRetryInterval);
|
||||
store.stop();
|
||||
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
store =
|
||||
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
|
||||
assertEquals(YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS /
|
||||
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES,
|
||||
store.zkRetryInterval);
|
||||
store.stop();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue