YARN-4559. Make leader elector and zk store share the same curator

client. Contributed by Jian He

(cherry picked from commit 890a2ebd1a)
This commit is contained in:
Xuan 2016-01-20 14:48:10 -08:00
parent c57f55223c
commit be3322792f
9 changed files with 129 additions and 140 deletions

View File

@ -40,6 +40,9 @@ Release 2.9.0 - UNRELEASED
YARN-4526. Make SystemClock singleton so AppSchedulingInfo could use it.
(kasha)
YARN-4559. Make leader elector and zk store share the same curator client.
(Jian He via xgong)
OPTIMIZATIONS
BUG FIXES

View File

@ -271,6 +271,11 @@
<Field name="numRetries" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore" />
<Field name="resourceManager"/>
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer"/>
<Field name="renewalTimer" />

View File

@ -19,14 +19,11 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.service.AbstractService;
@ -44,35 +41,23 @@ public class LeaderElectorService extends AbstractService implements
private RMContext rmContext;
private String latchPath;
private String rmId;
private ResourceManager rm;
public LeaderElectorService(RMContext rmContext) {
public LeaderElectorService(RMContext rmContext, ResourceManager rm) {
super(LeaderElectorService.class.getName());
this.rmContext = rmContext;
this.rm = rm;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
Preconditions.checkNotNull(zkHostPort,
YarnConfiguration.RM_ZK_ADDRESS + " is not set");
rmId = HAUtil.getRMHAId(conf);
String clusterId = YarnConfiguration.getClusterId(conf);
int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
int maxRetryNum = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
String zkBasePath = conf.get(
YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
latchPath = zkBasePath + "/" + clusterId;
curator = CuratorFrameworkFactory.builder().connectString(zkHostPort)
.retryPolicy(new RetryNTimes(maxRetryNum, zkSessionTimeout)).build();
curator.start();
curator = rm.getCurator();
initAndStartLeaderLatch();
super.serviceInit(conf);
}

View File

@ -21,6 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
@ -28,7 +32,11 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.*;
import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.AbstractService;
@ -40,6 +48,7 @@ import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -58,8 +67,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMaste
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@ -78,7 +87,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@ -96,12 +107,15 @@ import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@ -158,6 +172,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected ResourceTrackerService resourceTracker;
private JvmPauseMonitor pauseMonitor;
private boolean curatorEnabled = false;
private CuratorFramework curator;
private final String zkRootNodePassword =
Long.toString(new SecureRandom().nextLong());
private boolean recoveryEnabled;
@VisibleForTesting
protected String webAppAddress;
@ -232,7 +250,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
LeaderElectorService elector = new LeaderElectorService(rmContext);
this.curator = createAndStartCurator(conf);
LeaderElectorService elector = new LeaderElectorService(rmContext, this);
addService(elector);
rmContext.setLeaderElectorService(elector);
}
@ -276,7 +295,58 @@ public class ResourceManager extends CompositeService implements Recoverable {
super.serviceInit(this.conf);
}
public CuratorFramework createAndStartCurator(Configuration conf)
throws Exception {
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
if (zkHostPort == null) {
throw new YarnRuntimeException(
YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
}
int numRetries = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
int zkRetryInterval = conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
// set up zk auths
List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
List<AuthInfo> authInfos = new ArrayList<>();
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
}
if (HAUtil.isHAEnabled(conf) && HAUtil.getConfValueForRMInstance(
YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf) == null) {
String zkRootNodeUsername = HAUtil
.getConfValueForRMInstance(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
byte[] defaultFencingAuth =
(zkRootNodeUsername + ":" + zkRootNodePassword)
.getBytes(Charset.forName("UTF-8"));
authInfos.add(new AuthInfo(new DigestAuthenticationProvider().getScheme(),
defaultFencingAuth));
}
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkHostPort)
.sessionTimeoutMs(zkSessionTimeout)
.retryPolicy(new RetryNTimes(numRetries, zkRetryInterval))
.authorization(authInfos).build();
client.start();
return client;
}
public CuratorFramework getCurator() {
return this.curator;
}
public String getZkRootNodePassword() {
return this.zkRootNodePassword;
}
protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler,
Configuration conf) {
return new QueueACLsManager(scheduler, conf);
@ -412,7 +482,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
private ApplicationMasterLauncher applicationMasterLauncher;
private ContainerAllocationExpirer containerAllocationExpirer;
private ResourceManager rm;
private boolean recoveryEnabled;
private RMActiveServiceContext activeServiceContext;
RMActiveServices(ResourceManager rm) {
@ -453,29 +522,26 @@ public class ResourceManager extends CompositeService implements Recoverable {
rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
}
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
RMStateStore rmStore = null;
if (isRecoveryEnabled) {
recoveryEnabled = true;
if (recoveryEnabled) {
rmStore = RMStateStoreFactory.getStore(conf);
boolean isWorkPreservingRecoveryEnabled =
conf.getBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
rmContext
.setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
.setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
} else {
recoveryEnabled = false;
rmStore = new NullRMStateStore();
}
try {
rmStore.setResourceManager(rm);
rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher);
rmStore.setResourceManager(rm);
} catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced
@ -1130,6 +1196,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
configurationProvider.close();
}
super.serviceStop();
if (curator != null) {
curator.close();
}
transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING);
}
@ -1177,7 +1246,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
public ClientRMService getClientRMService() {
return this.clientRM;
}
/**
* return the scheduler.
* @return the scheduler for the Resource Manager.
@ -1348,5 +1417,4 @@ public class ResourceManager extends CompositeService implements Recoverable {
out.println(" "
+ "[-remove-application-from-state-store <appId>]" + "\n");
}
}

View File

@ -95,7 +95,7 @@ public abstract class RMStateStore extends AbstractService {
"ReservationSystemRoot";
protected static final String VERSION_NODE = "RMVersionNode";
protected static final String EPOCH_NODE = "EpochNode";
private ResourceManager resourceManager;
protected ResourceManager resourceManager;
private final ReadLock readLock;
private final WriteLock writeLock;

View File

@ -18,26 +18,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.nio.charset.Charset;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@ -77,7 +64,15 @@ import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* {@link RMStateStore} implementation backed by ZooKeeper.
@ -140,12 +135,6 @@ public class ZKRMStateStore extends RMStateStore {
private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
"RMDTMasterKeysRoot";
private String zkHostPort = null;
private int numRetries;
private int zkSessionTimeout;
@VisibleForTesting
int zkRetryInterval;
/** Znode paths */
private String zkRootNodePath;
private String rmAppRoot;
@ -160,17 +149,15 @@ public class ZKRMStateStore extends RMStateStore {
/** Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
private boolean useDefaultFencingScheme = false;
private String fencingNodePath;
private Thread verifyActiveStatusThread;
private int zkSessionTimeout;
/** ACL and auth info */
private List<ACL> zkAcl;
private List<ZKUtil.ZKAuthInfo> zkAuths;
@VisibleForTesting
List<ACL> zkRootNodeAcl;
private String zkRootNodeUsername;
private final String zkRootNodePassword = Long.toString(random.nextLong());
public static final int CREATE_DELETE_PERMS =
ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
private final String zkRootNodeAuthScheme =
@ -204,45 +191,25 @@ public class ZKRMStateStore extends RMStateStore {
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
Id rmId = new Id(zkRootNodeAuthScheme,
DigestAuthenticationProvider.generateDigest(
zkRootNodeUsername + ":" + zkRootNodePassword));
zkRootNodeUsername + ":" + resourceManager.getZkRootNodePassword()));
zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
return zkRootNodeAcl;
}
@Override
public synchronized void initInternal(Configuration conf) throws Exception {
zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
if (zkHostPort == null) {
throw new YarnRuntimeException("No server address specified for " +
"zookeeper state store for Resource Manager recovery. " +
YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
}
numRetries =
conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
/* Initialize fencing related paths, acls, and ops */
znodeWorkingPath =
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
zkSessionTimeout =
conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
if (HAUtil.isHAEnabled(conf)) {
zkRetryInterval = zkSessionTimeout / numRetries;
} else {
zkRetryInterval =
conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
}
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
zkAcl = RMZKUtils.getZKAcls(conf);
zkAuths = RMZKUtils.getZKAuths(conf);
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
/* Initialize fencing related paths, acls, and ops */
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
if (HAUtil.isHAEnabled(conf)) {
String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
(YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
@ -256,7 +223,6 @@ public class ZKRMStateStore extends RMStateStore {
throw bafe;
}
} else {
useDefaultFencingScheme = true;
zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
}
}
@ -272,19 +238,22 @@ public class ZKRMStateStore extends RMStateStore {
amrmTokenSecretManagerRoot =
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
curatorFramework = resourceManager.getCurator();
if (curatorFramework == null) {
curatorFramework = resourceManager.createAndStartCurator(conf);
}
}
@Override
public synchronized void startInternal() throws Exception {
// createConnection for future API calls
createConnection();
// ensure root dirs exist
createRootDirRecursively(znodeWorkingPath);
create(zkRootNodePath);
setRootNodeAcls();
delete(fencingNodePath);
if (HAUtil.isHAEnabled(getConfig())) {
if (HAUtil.isHAEnabled(getConfig()) && !HAUtil
.isAutomaticFailoverEnabled(getConfig())) {
verifyActiveStatusThread = new VerifyActiveStatusThread();
verifyActiveStatusThread.start();
}
@ -332,7 +301,9 @@ public class ZKRMStateStore extends RMStateStore {
verifyActiveStatusThread.interrupt();
verifyActiveStatusThread.join(1000);
}
IOUtils.closeStream(curatorFramework);
if (!HAUtil.isHAEnabled(getConfig())) {
IOUtils.closeStream(curatorFramework);
}
}
@Override
@ -909,34 +880,6 @@ public class ZKRMStateStore extends RMStateStore {
}
}
/*
* ZK operations using curator
*/
private void createConnection() throws Exception {
// Curator connection
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
builder = builder.connectString(zkHostPort)
.connectionTimeoutMs(zkSessionTimeout)
.retryPolicy(new RetryNTimes(numRetries, zkRetryInterval));
// Set up authorization based on fencing scheme
List<AuthInfo> authInfos = new ArrayList<>();
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
}
if (useDefaultFencingScheme) {
byte[] defaultFencingAuth =
(zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(
Charset.forName("UTF-8"));
authInfos.add(new AuthInfo(zkRootNodeAuthScheme, defaultFencingAuth));
}
builder = builder.authorization(authInfos);
// Connect to ZK
curatorFramework = builder.build();
curatorFramework.start();
}
@VisibleForTesting
byte[] getData(final String path) throws Exception {
return curatorFramework.getData().forPath(path);

View File

@ -105,6 +105,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
throws Exception {
setResourceManager(new ResourceManager());
init(conf);
start();
assertTrue(znodeWorkingPath.equals(workingZnode));

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
@ -103,6 +104,7 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
store = new ZKRMStateStore();
store.setResourceManager(new ResourceManager());
store.init(conf);
store.start();
when(rmContext.getStateStore()).thenReturn(store);

View File

@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
import org.apache.hadoop.util.ZKUtil;
@ -80,6 +81,7 @@ public class TestZKRMStateStoreZKClientConnections {
public TestZKRMStateStore(Configuration conf, String workingZnode)
throws Exception {
setResourceManager(new ResourceManager());
init(conf);
start();
assertTrue(znodeWorkingPath.equals(workingZnode));
@ -168,24 +170,4 @@ public class TestZKRMStateStoreZKClientConnections {
zkClientTester.getRMStateStore(conf);
}
@Test
public void testZKRetryInterval() throws Exception {
TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration();
ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
assertEquals(YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS,
store.zkRetryInterval);
store.stop();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
assertEquals(YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS /
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES,
store.zkRetryInterval);
store.stop();
}
}