Merge -c 1516337 from trunk to branch-2 to fix YARN-1082. Create base directories on HDFS after RM login to ensure RM recovery doesn't fail in secure mode. Contributed by Vinod K V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1516338 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-08-21 23:55:59 +00:00
parent a38dff29bc
commit a878fbceee
9 changed files with 70 additions and 29 deletions

View File

@ -67,6 +67,9 @@ Release 2.1.1-beta - UNRELEASED
YARN-881. Priority#compareTo method seems to be wrong. (Jian He via bikas)
YARN-1082. Create base directories on HDFS after RM login to ensure RM
recovery doesn't fail in secure mode. (vinodkv via acmurthy)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -100,7 +100,7 @@ public class RMContextImpl implements RMContext {
containerTokenSecretManager, nmTokenSecretManager,
clientToAMTokenSecretManager);
RMStateStore nullStore = new NullRMStateStore();
nullStore.setDispatcher(rmDispatcher);
nullStore.setRMDispatcher(rmDispatcher);
try {
nullStore.init(new YarnConfiguration());
setStateStore(nullStore);

View File

@ -67,18 +67,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
@ -186,9 +186,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
recoveryEnabled = false;
rmStore = new NullRMStateStore();
}
try {
rmStore.init(conf);
rmStore.setDispatcher(rmDispatcher);
rmStore.setRMDispatcher(rmDispatcher);
} catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced
@ -275,7 +276,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
@VisibleForTesting
protected void setRMStateStore(RMStateStore rmStore) {
rmStore.setDispatcher(rmDispatcher);
rmStore.setRMDispatcher(rmDispatcher);
((RMContextImpl) rmContext).setStateStore(rmStore);
}
@ -601,9 +602,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.containerTokenSecretManager.start();
this.nmTokenSecretManager.start();
RMStateStore rmStore = rmContext.getStateStore();
// The state store needs to start irrespective of recoveryEnabled as apps
// need events to move to further states.
rmStore.start();
if(recoveryEnabled) {
try {
RMStateStore rmStore = rmContext.getStateStore();
RMState state = rmStore.loadState();
recover(state);
} catch (Exception e) {

View File

@ -70,7 +70,7 @@ public class FileSystemRMStateStore extends RMStateStore {
private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
"RMDTSequenceNumber_";
private FileSystem fs;
protected FileSystem fs;
private Path rootDirPath;
private Path rmDTSecretManagerRoot;
@ -80,6 +80,7 @@ public class FileSystemRMStateStore extends RMStateStore {
@VisibleForTesting
Path fsWorkingPath;
@Override
public synchronized void initInternal(Configuration conf)
throws Exception{
@ -87,9 +88,14 @@ public class FileSystemRMStateStore extends RMStateStore {
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
}
// create filesystem
fs = fsWorkingPath.getFileSystem(conf);
@Override
protected void startInternal() throws Exception {
// create filesystem only now, as part of service-start. By this time, RM is
// authenticated with kerberos so we are good to create a file-system
// handle.
fs = fsWorkingPath.getFileSystem(getConfig());
fs.mkdirs(rmDTSecretManagerRoot);
fs.mkdirs(rmAppRoot);
}

View File

@ -66,6 +66,10 @@ public class MemoryRMStateStore extends RMStateStore {
public synchronized void initInternal(Configuration conf) {
}
@Override
protected synchronized void startInternal() throws Exception {
}
@Override
protected synchronized void closeInternal() throws Exception {
}

View File

@ -21,10 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@Unstable
public class NullRMStateStore extends RMStateStore {
@ -34,6 +34,11 @@ public class NullRMStateStore extends RMStateStore {
// Do nothing
}
@Override
protected void startInternal() throws Exception {
// Do nothing
}
@Override
protected void closeInternal() throws Exception {
// Do nothing

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -60,9 +61,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
* Real store implementations need to derive from it and implement blocking
* store and load methods to actually store and load the state.
*/
public abstract class RMStateStore {
public abstract class RMStateStore extends AbstractService {
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
public RMStateStore() {
super(RMStateStore.class.getName());
}
/**
* State of an application attempt
*/
@ -174,31 +180,39 @@ public abstract class RMStateStore {
* Dispatcher used to send state operation completion events to
* ResourceManager services
*/
public void setDispatcher(Dispatcher dispatcher) {
public void setRMDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
AsyncDispatcher dispatcher;
public synchronized void init(Configuration conf) throws Exception{
public synchronized void serviceInit(Configuration conf) throws Exception{
// create async handler
dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.register(RMStateStoreEventType.class,
new ForwardingEventHandler());
dispatcher.start();
initInternal(conf);
}
protected synchronized void serviceStart() throws Exception {
dispatcher.start();
startInternal();
}
/**
* Derived classes initialize themselves using this method.
* The base class is initialized and the event dispatcher is ready to use at
* this point
*/
protected abstract void initInternal(Configuration conf) throws Exception;
public synchronized void close() throws Exception {
/**
* Derived classes start themselves using this method.
* The base class is started and the event dispatcher is ready to use at
* this point
*/
protected abstract void startInternal() throws Exception;
public synchronized void serviceStop() throws Exception {
closeInternal();
dispatcher.stop();
}

View File

@ -129,7 +129,10 @@ public class TestRMStateStore {
class TestFileSystemRMStore extends FileSystemRMStateStore {
TestFileSystemRMStore(Configuration conf) throws Exception {
init(conf);
Assert.assertNull(fs);
assertTrue(workingDirPathURI.equals(fsWorkingPath));
start();
Assert.assertNotNull(fs);
}
}
@ -218,7 +221,7 @@ public class TestRMStateStore {
Configuration conf = new YarnConfiguration();
RMStateStore store = stateStoreHelper.getRMStateStore();
TestDispatcher dispatcher = new TestDispatcher();
store.setDispatcher(dispatcher);
store.setRMDispatcher(dispatcher);
AMRMTokenSecretManager appTokenMgr =
new AMRMTokenSecretManager(conf);
@ -327,7 +330,7 @@ public class TestRMStateStore {
RMStateStoreHelper stateStoreHelper) throws Exception {
RMStateStore store = stateStoreHelper.getRMStateStore();
TestDispatcher dispatcher = new TestDispatcher();
store.setDispatcher(dispatcher);
store.setRMDispatcher(dispatcher);
// store RM delegation token;
RMDelegationTokenIdentifier dtId1 =

View File

@ -39,12 +39,10 @@ import javax.xml.parsers.ParserConfigurationException;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -57,11 +55,11 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -81,8 +79,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -132,7 +128,12 @@ public class TestFairScheduler {
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
resourceManager = new ResourceManager();
resourceManager.init(conf);
// TODO: This test should really be using MockRM. For now starting stuff
// that is needed at a bare minimum.
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
resourceManager.getRMContext().getStateStore().start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// to initialize the master key
resourceManager.getRMContainerTokenSecretManager().rollMasterKey();