Merge -c 1423758 from trunk to branch-2 to fix YARN-230. RM Restart phase 1 - includes support for saving/restarting all applications on an RM bounce. Contributed by Bikas Saha.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1423759 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-12-19 04:34:54 +00:00
parent 7b7fc7b788
commit 50a515b45b
51 changed files with 1798 additions and 131 deletions

View File

@ -8,6 +8,9 @@ Release 2.0.3-alpha - Unreleased
YARN-145. Add a Web UI to the fair share scheduler. (Sandy Ryza via tomwhite)
YARN-230. RM Restart phase 1 - includes support for saving/restarting all
applications on an RM bounce. (Bikas Saha via acmurthy)
IMPROVEMENTS
YARN-78. Changed UnManagedAM application to use YarnClient. (Bikas Saha via

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/*
* Contains the state data that needs to be persisted for an ApplicationAttempt
*/
@Public
@Unstable
public interface ApplicationAttemptStateData {
/**
* The ApplicationAttemptId for the application attempt
* @return ApplicationAttemptId for the application attempt
*/
@Public
@Unstable
public ApplicationAttemptId getAttemptId();
public void setAttemptId(ApplicationAttemptId attemptId);
/*
* The master container running the application attempt
* @return Container that hosts the attempt
*/
@Public
@Unstable
public Container getMasterContainer();
public void setMasterContainer(Container container);
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Contains all the state data that needs to be stored persistently
* for an Application
*/
@Public
@Unstable
public interface ApplicationStateData {
/**
* The time at which the application was received by the Resource Manager
* @return submitTime
*/
@Public
@Unstable
public long getSubmitTime();
@Public
@Unstable
public void setSubmitTime(long submitTime);
/**
* The {@link ApplicationSubmissionContext} for the application
* {@link ApplicationId} can be obtained from the this
* @return ApplicationSubmissionContext
*/
@Public
@Unstable
public ApplicationSubmissionContext getApplicationSubmissionContext();
@Public
@Unstable
public void setApplicationSubmissionContext(
ApplicationSubmissionContext context);
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProtoOrBuilder;
public class ApplicationAttemptStateDataPBImpl
extends ProtoBase<ApplicationAttemptStateDataProto>
implements ApplicationAttemptStateData {
ApplicationAttemptStateDataProto proto =
ApplicationAttemptStateDataProto.getDefaultInstance();
ApplicationAttemptStateDataProto.Builder builder = null;
boolean viaProto = false;
private ApplicationAttemptId attemptId = null;
private Container masterContainer = null;
public ApplicationAttemptStateDataPBImpl() {
builder = ApplicationAttemptStateDataProto.newBuilder();
}
public ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto proto) {
this.proto = proto;
viaProto = true;
}
public ApplicationAttemptStateDataProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (this.attemptId != null) {
builder.setAttemptId(((ApplicationAttemptIdPBImpl)attemptId).getProto());
}
if(this.masterContainer != null) {
builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ApplicationAttemptStateDataProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public ApplicationAttemptId getAttemptId() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
if(attemptId != null) {
return attemptId;
}
if (!p.hasAttemptId()) {
return null;
}
attemptId = new ApplicationAttemptIdPBImpl(p.getAttemptId());
return attemptId;
}
@Override
public void setAttemptId(ApplicationAttemptId attemptId) {
maybeInitBuilder();
if (attemptId == null) {
builder.clearAttemptId();
}
this.attemptId = attemptId;
}
@Override
public Container getMasterContainer() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
if(masterContainer != null) {
return masterContainer;
}
if (!p.hasMasterContainer()) {
return null;
}
masterContainer = new ContainerPBImpl(p.getMasterContainer());
return masterContainer;
}
@Override
public void setMasterContainer(Container container) {
maybeInitBuilder();
if (container == null) {
builder.clearMasterContainer();
}
this.masterContainer = container;
}
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.ApplicationStateData;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProtoOrBuilder;
public class ApplicationStateDataPBImpl
extends ProtoBase<ApplicationStateDataProto>
implements ApplicationStateData {
ApplicationStateDataProto proto =
ApplicationStateDataProto.getDefaultInstance();
ApplicationStateDataProto.Builder builder = null;
boolean viaProto = false;
private ApplicationSubmissionContext applicationSubmissionContext = null;
public ApplicationStateDataPBImpl() {
builder = ApplicationStateDataProto.newBuilder();
}
public ApplicationStateDataPBImpl(
ApplicationStateDataProto proto) {
this.proto = proto;
viaProto = true;
}
public ApplicationStateDataProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (this.applicationSubmissionContext != null) {
builder.setApplicationSubmissionContext(
((ApplicationSubmissionContextPBImpl)applicationSubmissionContext)
.getProto());
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ApplicationStateDataProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public long getSubmitTime() {
ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasSubmitTime()) {
return -1;
}
return (p.getSubmitTime());
}
@Override
public void setSubmitTime(long submitTime) {
maybeInitBuilder();
builder.setSubmitTime(submitTime);
}
@Override
public ApplicationSubmissionContext getApplicationSubmissionContext() {
ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
if(applicationSubmissionContext != null) {
return applicationSubmissionContext;
}
if (!p.hasApplicationSubmissionContext()) {
return null;
}
applicationSubmissionContext =
new ApplicationSubmissionContextPBImpl(
p.getApplicationSubmissionContext());
return applicationSubmissionContext;
}
@Override
public void setApplicationSubmissionContext(
ApplicationSubmissionContext context) {
maybeInitBuilder();
if (context == null) {
builder.clearApplicationSubmissionContext();
}
this.applicationSubmissionContext = context;
}
}

View File

@ -210,7 +210,6 @@ public void setAMContainerSpec(ContainerLaunchContext amContainer) {
@Override
public boolean getUnmanagedAM() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
//There is a default so cancelTokens should never be null
return p.getUnmanagedAm();
}
@ -219,7 +218,7 @@ public void setUnmanagedAM(boolean value) {
maybeInitBuilder();
builder.setUnmanagedAm(value);
}
@Override
public boolean getCancelTokensWhenComplete() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -329,3 +329,15 @@ message StringBytesMapProto {
optional bytes value = 2;
}
////////////////////////////////////////////////////////////////////////
////// From recovery////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
message ApplicationStateDataProto {
optional int64 submit_time = 1;
optional ApplicationSubmissionContextProto application_submission_context = 2;
}
message ApplicationAttemptStateDataProto {
optional ApplicationAttemptIdProto attemptId = 1;
optional ContainerProto master_container = 2;
}

View File

@ -28,8 +28,6 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.junit.AfterClass;
import org.junit.Before;
@ -46,8 +44,7 @@ public class TestGetGroups extends GetGroupsTestBase {
@BeforeClass
public static void setUpResourceManager() throws IOException, InterruptedException {
conf = new YarnConfiguration();
RMStateStore store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store) {
resourceManager = new ResourceManager() {
@Override
protected void doSecureLogin() throws IOException {
};

View File

@ -34,7 +34,7 @@ public void test() {
@Test
public void testClientStop() {
Configuration conf = new Configuration();
ResourceManager rm = new ResourceManager(null);
ResourceManager rm = new ResourceManager();
rm.init(conf);
rm.start();

View File

@ -225,10 +225,12 @@ public class YarnConfiguration extends Configuration {
public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
7*24*60*60*1000; // 7 days
public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
/** The class to use as the persistent store.*/
public static final String RM_STORE = RM_PREFIX + "store.class";
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =
RM_PREFIX + "max-completed-applications";

View File

@ -204,6 +204,13 @@
<value>8192</value>
</property>
<property>
<description>Enable RM to recover state after starting. If true, then
yarn.resourcemanager.store.class must be specified</description>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>false</value>
</property>
<property>
<description>The class to use as the persistent store.</description>
<name>yarn.resourcemanager.store.class</name>

View File

@ -263,6 +263,8 @@ public AllocateResponse allocate(AllocateRequest request)
} else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
// Reboot is not useful since after AM reboots, it will send register and
// get an exception. Might as well throw an exception here.
allocateResponse.setAMResponse(reboot);
return allocateResponse;
}

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
@ -75,6 +76,7 @@
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -254,6 +256,20 @@ public SubmitApplicationResponse submitApplication(
// So call handle directly and do not send an event.
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
.currentTimeMillis()));
// If recovery is enabled then store the application information in a
// blocking call so make sure that RM has stored the information needed
// to restart the AM after RM restart without further client communication
RMStateStore stateStore = rmContext.getStateStore();
LOG.info("Storing Application with id " + applicationId);
try {
stateStore.storeApplication(rmContext.getRMApps().get(applicationId));
} catch (Exception e) {
// For HA this exception needs to be handled by giving up
// master status if we got fenced
LOG.error("Failed to store application:" + applicationId, e);
ExitUtil.terminate(1, e);
}
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -36,6 +37,10 @@
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -48,7 +53,8 @@
/**
* This class manages the list of applications for the resource manager.
*/
public class RMAppManager implements EventHandler<RMAppManagerEvent> {
public class RMAppManager implements EventHandler<RMAppManagerEvent>,
Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
@ -173,6 +179,10 @@ protected synchronized void finishApplication(ApplicationId applicationId) {
completedApps.add(applicationId);
writeAuditLog(applicationId);
// application completely done. Remove from state
RMStateStore store = rmContext.getStateStore();
store.removeApplication(rmContext.getRMApps().get(applicationId));
}
}
@ -306,6 +316,37 @@ private Credentials parseCredentials(ApplicationSubmissionContext application)
}
return credentials;
}
@Override
public void recover(RMState state) throws Exception {
RMStateStore store = rmContext.getStateStore();
assert store != null;
// recover applications
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications");
for(ApplicationState appState : appStates.values()) {
// re-submit the application
// this is going to send an app start event but since the async dispatcher
// has not started that event will be queued until we have completed re
// populating the state
if(appState.getApplicationSubmissionContext().getUnmanagedAM()) {
// do not recover unmanaged applications since current recovery
// mechanism of restarting attempts does not work for them.
// This will need to be changed in work preserving recovery in which
// RM will re-connect with the running AM's instead of restarting them
LOG.info("Not recovering unmanaged application " + appState.getAppId());
store.removeApplication(appState);
} else {
LOG.info("Recovering application " + appState.getAppId());
submitApplication(appState.getApplicationSubmissionContext(),
appState.getSubmitTime());
// re-populate attempt information in application
RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get(
appState.getAppId());
appImpl.recover(state);
}
}
}
@Override
public void handle(RMAppManagerEvent event) {

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@ -38,6 +39,8 @@
public interface RMContext {
Dispatcher getDispatcher();
RMStateStore getStateStore();
ConcurrentMap<ApplicationId, RMApp> getRMApps();

View File

@ -23,7 +23,10 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@ -33,6 +36,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import com.google.common.annotations.VisibleForTesting;
public class RMContextImpl implements RMContext {
private final Dispatcher rmDispatcher;
@ -48,6 +53,7 @@ public class RMContextImpl implements RMContext {
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
private final DelegationTokenRenewer tokenRenewer;
private final ApplicationTokenSecretManager appTokenSecretManager;
@ -55,6 +61,7 @@ public class RMContextImpl implements RMContext {
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
public RMContextImpl(Dispatcher rmDispatcher,
RMStateStore store,
ContainerAllocationExpirer containerAllocationExpirer,
AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor,
@ -63,6 +70,7 @@ public RMContextImpl(Dispatcher rmDispatcher,
RMContainerTokenSecretManager containerTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
this.rmDispatcher = rmDispatcher;
this.stateStore = store;
this.containerAllocationExpirer = containerAllocationExpirer;
this.amLivelinessMonitor = amLivelinessMonitor;
this.amFinishingMonitor = amFinishingMonitor;
@ -71,11 +79,39 @@ public RMContextImpl(Dispatcher rmDispatcher,
this.containerTokenSecretManager = containerTokenSecretManager;
this.clientToAMTokenSecretManager = clientTokenSecretManager;
}
@VisibleForTesting
// helper constructor for tests
public RMContextImpl(Dispatcher rmDispatcher,
ContainerAllocationExpirer containerAllocationExpirer,
AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor,
DelegationTokenRenewer tokenRenewer,
ApplicationTokenSecretManager appTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, tokenRenewer, appTokenSecretManager,
containerTokenSecretManager, clientTokenSecretManager);
RMStateStore nullStore = new NullRMStateStore();
nullStore.setDispatcher(rmDispatcher);
try {
nullStore.init(new YarnConfiguration());
setStateStore(nullStore);
} catch (Exception e) {
assert false;
}
}
@Override
public Dispatcher getDispatcher() {
return this.rmDispatcher;
}
@Override
public RMStateStore getStateStore() {
return stateStore;
}
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
@ -126,4 +162,9 @@ public RMContainerTokenSecretManager getContainerTokenSecretManager() {
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return this.clientToAMTokenSecretManager;
}
@VisibleForTesting
public void setStateStore(RMStateStore store) {
stateStore = store;
}
}

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
@ -45,10 +46,11 @@
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -80,6 +82,8 @@
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
import com.google.common.annotations.VisibleForTesting;
/**
* The ResourceManager is the main class that is a set of components.
* "I am the ResourceManager. All your resources are belong to us..."
@ -119,14 +123,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected RMDelegationTokenSecretManager rmDTSecretManager;
private WebApp webApp;
protected RMContext rmContext;
private final RMStateStore store;
protected ResourceTrackerService resourceTracker;
private boolean recoveryEnabled;
private Configuration conf;
public ResourceManager(RMStateStore store) {
public ResourceManager() {
super("ResourceManager");
this.store = store;
}
public RMContext getRMContext() {
@ -160,12 +163,34 @@ public synchronized void init(Configuration conf) {
this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
RMStateStore rmStore = null;
if(isRecoveryEnabled) {
recoveryEnabled = true;
rmStore = RMStateStoreFactory.getStore(conf);
} else {
recoveryEnabled = false;
rmStore = new NullRMStateStore();
}
try {
rmStore.init(conf);
rmStore.setDispatcher(rmDispatcher);
} catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced
LOG.error("Failed to init state store", e);
ExitUtil.terminate(1, e);
}
this.rmContext =
new RMContextImpl(this.rmDispatcher,
new RMContextImpl(this.rmDispatcher, rmStore,
this.containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, tokenRenewer, this.appTokenSecretManager,
this.containerTokenSecretManager, this.clientToAMSecretManager);
// Register event handler for NodesListManager
this.nodesListManager = new NodesListManager(this.rmContext);
this.rmDispatcher.register(NodesListManagerEventType.class,
@ -226,9 +251,15 @@ public synchronized void init(Configuration conf) {
addService(applicationMasterLauncher);
new RMNMInfo(this.rmContext, this.scheduler);
super.init(conf);
}
@VisibleForTesting
protected void setRMStateStore(RMStateStore rmStore) {
rmStore.setDispatcher(rmDispatcher);
((RMContextImpl) rmContext).setStateStore(rmStore);
}
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
Configuration conf) {
@ -502,6 +533,19 @@ public void start() {
this.appTokenSecretManager.start();
this.containerTokenSecretManager.start();
if(recoveryEnabled) {
try {
RMStateStore rmStore = rmContext.getStateStore();
RMState state = rmStore.loadState();
recover(state);
} catch (Exception e) {
// the Exception from loadState() needs to be handled for
// HA and we need to give up master status if we got fenced
LOG.error("Failed to load/recover state", e);
ExitUtil.terminate(1, e);
}
}
startWepApp();
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
@ -555,6 +599,13 @@ public void stop() {
DefaultMetricsSystem.shutdown();
RMStateStore store = rmContext.getStateStore();
try {
store.close();
} catch (Exception e) {
LOG.error("Error closing store.", e);
}
super.stop();
}
@ -643,6 +694,8 @@ public ApplicationTokenSecretManager getApplicationTokenSecretManager(){
@Override
public void recover(RMState state) throws Exception {
// recover applications
rmAppManager.recover(state);
}
public static void main(String argv[]) {
@ -650,13 +703,11 @@ public static void main(String argv[]) {
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
try {
Configuration conf = new YarnConfiguration();
RMStateStore store = StoreFactory.getStore(conf);
ResourceManager resourceManager = new ResourceManager(store);
ResourceManager resourceManager = new ResourceManager();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY);
resourceManager.init(conf);
//resourceManager.recover(store.restore());
resourceManager.start();
} catch (Throwable t) {
LOG.fatal("Error starting ResourceManager", t);

View File

@ -1,22 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
public class FileRMStateStore implements RMStateStore {
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@Private
@Unstable
public class MemoryRMStateStore extends RMStateStore {
RMState state = new RMState();
@VisibleForTesting
public RMState getState() {
return state;
}
@Override
public synchronized RMState loadState() throws Exception {
// return a copy of the state to allow for modification of the real state
RMState returnState = new RMState();
returnState.appState.putAll(state.appState);
return returnState;
}
@Override
public synchronized void initInternal(Configuration conf) {
}
@Override
protected synchronized void closeInternal() throws Exception {
}
@Override
public void storeApplicationState(String appId,
ApplicationStateDataPBImpl appStateData)
throws Exception {
ApplicationState appState = new ApplicationState(
appStateData.getSubmitTime(),
appStateData.getApplicationSubmissionContext());
state.appState.put(appState.getAppId(), appState);
}
@Override
public synchronized void storeApplicationAttemptState(String attemptIdStr,
ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception {
ApplicationAttemptId attemptId = ConverterUtils
.toApplicationAttemptId(attemptIdStr);
ApplicationAttemptState attemptState = new ApplicationAttemptState(
attemptId, attemptStateData.getMasterContainer());
ApplicationState appState = state.getApplicationState().get(
attemptState.getAttemptId().getApplicationId());
assert appState != null;
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
@Override
public synchronized void removeApplicationState(ApplicationState appState)
throws Exception {
ApplicationId appId = appState.getAppId();
ApplicationState removed = state.appState.remove(appId);
assert removed != null;
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
public class NullRMStateStore extends RMStateStore {
@Override
protected void initInternal(Configuration conf) throws Exception {
// Do nothing
}
@Override
protected void closeInternal() throws Exception {
// Do nothing
}
@Override
public RMState loadState() throws Exception {
return null;
}
@Override
protected void storeApplicationState(String appId,
ApplicationStateDataPBImpl appStateData) throws Exception {
// Do nothing
}
@Override
protected void storeApplicationAttemptState(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
// Do nothing
}
@Override
protected void removeApplicationState(ApplicationState appState)
throws Exception {
// Do nothing
}
}

View File

@ -15,10 +15,313 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
public interface RMStateStore {
public interface RMState {
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
@Private
@Unstable
/**
* Base class to implement storage of ResourceManager state.
* Takes care of asynchronous notifications and interfacing with YARN objects.
* Real store implementations need to derive from it and implement blocking
* store and load methods to actually store and load the state.
*/
public abstract class RMStateStore {
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
/**
* State of an application attempt
*/
public static class ApplicationAttemptState {
final ApplicationAttemptId attemptId;
final Container masterContainer;
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
}
public Container getMasterContainer() {
return masterContainer;
}
public ApplicationAttemptId getAttemptId() {
return attemptId;
}
}
/**
* State of an application application
*/
public static class ApplicationState {
final ApplicationSubmissionContext context;
final long submitTime;
Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
new HashMap<ApplicationAttemptId, ApplicationAttemptState>();
ApplicationState(long submitTime, ApplicationSubmissionContext context) {
this.submitTime = submitTime;
this.context = context;
}
public ApplicationId getAppId() {
return context.getApplicationId();
}
public long getSubmitTime() {
return submitTime;
}
public int getAttemptCount() {
return attempts.size();
}
public ApplicationSubmissionContext getApplicationSubmissionContext() {
return context;
}
public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) {
return attempts.get(attemptId);
}
}
/**
* State of the ResourceManager
*/
public static class RMState {
Map<ApplicationId, ApplicationState> appState =
new HashMap<ApplicationId, ApplicationState>();
public Map<ApplicationId, ApplicationState> getApplicationState() {
return appState;
}
}
private Dispatcher rmDispatcher;
/**
* Dispatcher used to send state operation completion events to
* ResourceManager services
*/
public void setDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
AsyncDispatcher dispatcher;
public synchronized void init(Configuration conf) throws Exception{
// create async handler
dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.register(RMStateStoreEventType.class,
new ForwardingEventHandler());
dispatcher.start();
initInternal(conf);
}
/**
* Derived classes initialize themselves using this method.
* The base class is initialized and the event dispatcher is ready to use at
* this point
*/
protected abstract void initInternal(Configuration conf) throws Exception;
public synchronized void close() throws Exception {
closeInternal();
dispatcher.stop();
}
/**
* Derived classes close themselves using this method.
* The base class will be closed and the event dispatcher will be shutdown
* after this
*/
protected abstract void closeInternal() throws Exception;
/**
* Blocking API
* The derived class must recover state from the store and return a new
* RMState object populated with that state
* This must not be called on the dispatcher thread
*/
public abstract RMState loadState() throws Exception;
/**
* Blocking API
* ResourceManager services use this to store the application's state
* This must not be called on the dispatcher thread
*/
public synchronized void storeApplication(RMApp app) throws Exception {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl();
appStateData.setSubmitTime(app.getSubmitTime());
appStateData.setApplicationSubmissionContext(context);
LOG.info("Storing info for app: " + context.getApplicationId());
storeApplicationState(app.getApplicationId().toString(), appStateData);
}
/**
* Blocking API
* Derived classes must implement this method to store the state of an
* application.
*/
protected abstract void storeApplicationState(String appId,
ApplicationStateDataPBImpl appStateData)
throws Exception;
@SuppressWarnings("unchecked")
/**
* Non-blocking API
* ResourceManager services call this to store state on an application attempt
* This does not block the dispatcher threads
* RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
*/
public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) {
ApplicationAttemptState attemptState = new ApplicationAttemptState(
appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
dispatcher.getEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState));
}
/**
* Blocking API
* Derived classes must implement this method to store the state of an
* application attempt
*/
protected abstract void storeApplicationAttemptState(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception;
/**
* Non-blocking API
* ResourceManager services call this to remove an application from the state
* store
* This does not block the dispatcher threads
* There is no notification of completion for this operation.
*/
public synchronized void removeApplication(RMApp app) {
ApplicationState appState = new ApplicationState(
app.getSubmitTime(), app.getApplicationSubmissionContext());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
ApplicationAttemptState attemptState = new ApplicationAttemptState(
appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
removeApplication(appState);
}
@SuppressWarnings("unchecked")
/**
* Non-Blocking API
*/
public synchronized void removeApplication(ApplicationState appState) {
dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
}
/**
* Blocking API
* Derived classes must implement this method to remove the state of an
* application and its attempts
*/
protected abstract void removeApplicationState(ApplicationState appState)
throws Exception;
// Dispatcher related code
private synchronized void handleStoreEvent(RMStateStoreEvent event) {
switch(event.getType()) {
case STORE_APP_ATTEMPT:
{
ApplicationAttemptState attemptState =
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
Exception storedException = null;
ApplicationAttemptStateDataPBImpl attemptStateData =
new ApplicationAttemptStateDataPBImpl();
attemptStateData.setAttemptId(attemptState.getAttemptId());
attemptStateData.setMasterContainer(attemptState.getMasterContainer());
LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
try {
storeApplicationAttemptState(attemptState.getAttemptId().toString(),
attemptStateData);
} catch (Exception e) {
LOG.error("Error storing appAttempt: "
+ attemptState.getAttemptId(), e);
storedException = e;
} finally {
notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
storedException);
}
}
break;
case REMOVE_APP:
{
ApplicationState appState =
((RMStateStoreRemoveAppEvent) event).getAppState();
ApplicationId appId = appState.getAppId();
LOG.info("Removing info for app: " + appId);
try {
removeApplicationState(appState);
} catch (Exception e) {
LOG.error("Error removing app: " + appId, e);
}
}
break;
default:
LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
}
}
@SuppressWarnings("unchecked")
/**
* In (@link storeApplicationAttempt}, derived class can call this method to
* notify the application attempt about operation completion
* @param appAttempt attempt that has been saved
*/
private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
Exception storedException) {
rmDispatcher.getEventHandler().handle(
new RMAppAttemptStoredEvent(attemptId, storedException));
}
/**
* EventHandler implementation which forward events to the FSRMStateStore
* This hides the EventHandle methods of the store from its public interface
*/
private final class ForwardingEventHandler
implements EventHandler<RMStateStoreEvent> {
@Override
public void handle(RMStateStoreEvent event) {
handleStoreEvent(event);
}
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
public class RMStateStoreAppAttemptEvent extends RMStateStoreEvent {
ApplicationAttemptState attemptState;
public RMStateStoreAppAttemptEvent(ApplicationAttemptState attemptState) {
super(RMStateStoreEventType.STORE_APP_ATTEMPT);
this.attemptState = attemptState;
}
public ApplicationAttemptState getAppAttemptState() {
return attemptState;
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.event.AbstractEvent;
public class RMStateStoreEvent extends AbstractEvent<RMStateStoreEventType> {
public RMStateStoreEvent(RMStateStoreEventType type) {
super(type);
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
public enum RMStateStoreEventType {
STORE_APP_ATTEMPT,
REMOVE_APP
}

View File

@ -21,12 +21,12 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
public class StoreFactory {
public class RMStateStoreFactory {
public static RMStateStore getStore(Configuration conf) {
RMStateStore store = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.RM_STORE,
FileRMStateStore.class, RMStateStore.class),
MemoryRMStateStore.class, RMStateStore.class),
conf);
return store;
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent {
ApplicationState appState;
RMStateStoreRemoveAppEvent(ApplicationState appState) {
super(RMStateStoreEventType.REMOVE_APP);
this.appState = appState;
}
public ApplicationState getAppState() {
return appState;
}
}

View File

@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.recovery;

View File

@ -44,6 +44,12 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* @return the {@link ApplicationId} for this {@link RMApp}.
*/
ApplicationId getApplicationId();
/**
* The application submission context for this {@link RMApp}
* @return the {@link ApplicationSubmissionContext} for this {@link RMApp}
*/
ApplicationSubmissionContext getApplicationSubmissionContext();
/**
* The current state of the {@link RMApp}.

View File

@ -50,6 +50,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@ -66,7 +69,7 @@
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
public class RMAppImpl implements RMApp {
public class RMAppImpl implements RMApp, Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
private static final String UNAVAILABLE = "N/A";
@ -243,6 +246,11 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
public ApplicationId getApplicationId() {
return this.applicationId;
}
@Override
public ApplicationSubmissionContext getApplicationSubmissionContext() {
return this.submissionContext;
}
@Override
public FinalApplicationStatus getFinalApplicationStatus() {
@ -512,9 +520,22 @@ public void handle(RMAppEvent event) {
this.writeLock.unlock();
}
}
@Override
public void recover(RMState state) {
ApplicationState appState = state.getApplicationState().get(getApplicationId());
LOG.info("Recovering app: " + getApplicationId() + " with " +
+ appState.getAttemptCount() + " attempts");
for(int i=0; i<appState.getAttemptCount(); ++i) {
// create attempt
createNewAttempt(false);
// recover attempt
((RMAppAttemptImpl) currentAttempt).recover(state);
}
}
@SuppressWarnings("unchecked")
private void createNewAttempt() {
private void createNewAttempt(boolean startAttempt) {
ApplicationAttemptId appAttemptId = Records
.newRecord(ApplicationAttemptId.class);
appAttemptId.setApplicationId(applicationId);
@ -525,8 +546,10 @@ private void createNewAttempt() {
submissionContext, conf);
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
handler.handle(
new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
if(startAttempt) {
handler.handle(
new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
}
}
private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
@ -553,7 +576,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
private static final class StartAppAttemptTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
app.createNewAttempt();
app.createNewAttempt(true);
};
}
@ -647,7 +670,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
msg = "Unmanaged application " + app.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
} else if (app.attempts.size() == app.maxRetries) {
} else if (app.attempts.size() >= app.maxRetries) {
retryApp = false;
msg = "Application " + app.getApplicationId() + " failed "
+ app.maxRetries + " times due to " + failedEvent.getDiagnostics()
@ -655,7 +678,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
}
if (retryApp) {
app.createNewAttempt();
app.createNewAttempt(true);
return initialState;
} else {
LOG.info(msg);

View File

@ -39,9 +39,15 @@ public enum RMAppAttemptEventType {
CONTAINER_ACQUIRED,
CONTAINER_ALLOCATED,
CONTAINER_FINISHED,
// Source: RMStateStore
ATTEMPT_SAVED,
// Source: Scheduler
APP_REJECTED,
APP_ACCEPTED,
// Source: RMAttemptImpl.recover
RECOVER
}

View File

@ -38,6 +38,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -57,6 +58,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -69,6 +75,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@ -85,7 +92,7 @@
import org.apache.hadoop.yarn.util.BuilderUtils;
@SuppressWarnings({"unchecked", "rawtypes"})
public class RMAppAttemptImpl implements RMAppAttempt {
public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppAttemptImpl.class);
@ -153,12 +160,15 @@ RMAppAttemptEventType.START, new AttemptStartedTransition())
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED,
RMAppAttemptEventType.REGISTERED,
new UnexpectedAMRegisteredTransition())
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.RECOVERED,
RMAppAttemptEventType.RECOVER)
// Transitions from SUBMITTED state
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
.addTransition(RMAppAttemptState.SUBMITTED,
EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.SCHEDULED),
EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.APP_ACCEPTED,
new ScheduleTransition())
.addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED,
@ -170,12 +180,42 @@ RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
// Transitions from SCHEDULED State
.addTransition(RMAppAttemptState.SCHEDULED,
RMAppAttemptState.ALLOCATED,
RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
new AMContainerAllocatedTransition())
.addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.KILLED,
RMAppAttemptEventType.KILL,
new BaseFinalTransition(RMAppAttemptState.KILLED))
// Transitions from ALLOCATED_SAVING State
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED,
RMAppAttemptEventType.ATTEMPT_SAVED, new AttemptStoredTransition())
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptEventType.CONTAINER_ACQUIRED,
new ContainerAcquiredTransition())
// App could be killed by the client. So need to handle this.
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.KILLED,
RMAppAttemptEventType.KILL,
new BaseFinalTransition(RMAppAttemptState.KILLED))
// Transitions from LAUNCHED_UNMANAGED_SAVING State
.addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.LAUNCHED,
RMAppAttemptEventType.ATTEMPT_SAVED,
new UnmanagedAMAttemptSavedTransition())
// attempt should not try to register in this state
.addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.FAILED,
RMAppAttemptEventType.REGISTERED,
new UnexpectedAMRegisteredTransition())
// App could be killed by the client. So need to handle this.
.addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.KILLED,
RMAppAttemptEventType.KILL,
new BaseFinalTransition(RMAppAttemptState.KILLED))
// Transitions from ALLOCATED State
.addTransition(RMAppAttemptState.ALLOCATED,
@ -279,11 +319,30 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.REGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.ATTEMPT_SAVED,
RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.STATUS_UPDATE))
// Transitions from RECOVERED State
.addTransition(
RMAppAttemptState.RECOVERED,
RMAppAttemptState.RECOVERED,
EnumSet.of(RMAppAttemptEventType.START,
RMAppAttemptEventType.APP_ACCEPTED,
RMAppAttemptEventType.APP_REJECTED,
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.LAUNCH_FAILED,
RMAppAttemptEventType.REGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.CONTAINER_ACQUIRED,
RMAppAttemptEventType.ATTEMPT_SAVED,
RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.STATUS_UPDATE))
.installTopology();
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
@ -318,7 +377,7 @@ public ApplicationAttemptId getAppAttemptId() {
@Override
public ApplicationSubmissionContext getSubmissionContext() {
return this.submissionContext;
}
}
@Override
public FinalApplicationStatus getFinalApplicationStatus() {
@ -494,6 +553,10 @@ public Container getMasterContainer() {
}
}
private void setMasterContainer(Container container) {
masterContainer = container;
}
@Override
public void handle(RMAppAttemptEvent event) {
@ -561,6 +624,21 @@ public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
}
}
@Override
public void recover(RMState state) {
ApplicationState appState =
state.getApplicationState().get(getAppAttemptId().getApplicationId());
ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
assert attemptState != null;
setMasterContainer(attemptState.getMasterContainer());
LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
+ " AttemptId: " + getAppAttemptId()
+ " MasterContainer: " + masterContainer);
setDiagnostics("Attempt recovered after RM restart");
handle(new RMAppAttemptEvent(getAppAttemptId(),
RMAppAttemptEventType.RECOVER));
}
private static class BaseTransition implements
SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
@ -625,13 +703,12 @@ private static final class ScheduleTransition
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
// Send the acceptance to the app
appAttempt.eventHandler.handle(new RMAppEvent(event
.getApplicationAttemptId().getApplicationId(),
RMAppEventType.APP_ACCEPTED));
if (!appAttempt.submissionContext.getUnmanagedAM()) {
// Send the acceptance to the app
appAttempt.eventHandler.handle(new RMAppEvent(event
.getApplicationAttemptId().getApplicationId(),
RMAppEventType.APP_ACCEPTED));
// Request a container for the AM.
ResourceRequest request = BuilderUtils.newResourceRequest(
AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
@ -647,35 +724,42 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
return RMAppAttemptState.SCHEDULED;
} else {
// RM not allocating container. AM is self launched.
// Directly go to LAUNCHED state
// Register with AMLivelinessMonitor
appAttempt.rmContext.getAMLivelinessMonitor().register(
appAttempt.applicationAttemptId);
return RMAppAttemptState.LAUNCHED;
RMStateStore store = appAttempt.rmContext.getStateStore();
// save state and then go to LAUNCHED state
appAttempt.storeAttempt(store);
return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
}
}
}
private static final class AMContainerAllocatedTransition extends BaseTransition {
private static final class AMContainerAllocatedTransition
extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
RMAppAttemptEvent event) {
// Acquire the AM container from the scheduler.
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
EMPTY_CONTAINER_RELEASE_LIST);
// Set the masterContainer
appAttempt.masterContainer = amContainerAllocation.getContainers().get(
0);
appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(
0));
// Send event to launch the AM Container
appAttempt.eventHandler.handle(new AMLauncherEvent(
AMLauncherEventType.LAUNCH, appAttempt));
RMStateStore store = appAttempt.rmContext.getStateStore();
appAttempt.storeAttempt(store);
}
}
private static final class AttemptStoredTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
appAttempt.checkAttemptStoreError(event);
appAttempt.launchAttempt();
}
}
private static class BaseFinalTransition extends BaseTransition {
private final RMAppAttemptState finalAttemptState;
@ -736,17 +820,34 @@ public void transition(RMAppAttemptImpl appAttempt,
}
}
private static final class AMLaunchedTransition extends BaseTransition {
private static class AMLaunchedTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
RMAppAttemptEvent event) {
// Register with AMLivelinessMonitor
appAttempt.rmContext.getAMLivelinessMonitor().register(
appAttempt.applicationAttemptId);
appAttempt.attemptLaunched();
}
}
private static final class UnmanagedAMAttemptSavedTransition
extends AMLaunchedTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
appAttempt.checkAttemptStoreError(event);
// Send the acceptance to the app
// Ideally this should have been done when the scheduler accepted the app.
// But its here because until the attempt is saved the client should not
// launch the unmanaged AM. Client waits for the app status to be accepted
// before doing so. So we have to delay the accepted state until we have
// completed storing the attempt
appAttempt.eventHandler.handle(new RMAppEvent(event
.getApplicationAttemptId().getApplicationId(),
RMAppEventType.APP_ACCEPTED));
super.transition(appAttempt, event);
}
}
private static final class LaunchFailedTransition extends BaseFinalTransition {
@ -1040,4 +1141,37 @@ public long getStartTime() {
this.readLock.unlock();
}
}
private void launchAttempt(){
// Send event to launch the AM Container
eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
}
private void attemptLaunched() {
// Register with AMLivelinessMonitor
rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
}
private void checkAttemptStoreError(RMAppAttemptEvent event) {
RMAppAttemptStoredEvent storeEvent = (RMAppAttemptStoredEvent) event;
if(storeEvent.getStoredException() != null)
{
// This needs to be handled for HA and give up master status if we got
// fenced
LOG.error("Failed to store attempt: " + getAppAttemptId(),
storeEvent.getStoredException());
ExitUtil.terminate(1, storeEvent.getStoredException());
}
}
private void storeAttempt(RMStateStore store) {
// store attempt data in a non-blocking manner to prevent dispatcher
// thread starvation and wait for state to be saved
LOG.info("Storing attempt: AppId: " +
getAppAttemptId().getApplicationId()
+ " AttemptId: " +
getAppAttemptId()
+ " MasterContainer: " + masterContainer);
store.storeApplicationAttempt(this);
}
}

View File

@ -19,6 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
public enum RMAppAttemptState {
NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING,
FINISHING, FINISHED, KILLED,
NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING,
FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
public class RMAppAttemptStoredEvent extends RMAppAttemptEvent {
final Exception storedException;
public RMAppAttemptStoredEvent(ApplicationAttemptId appAttemptId,
Exception storedException) {
super(appAttemptId, RMAppAttemptEventType.ATTEMPT_SAVED);
this.storedException = storedException;
}
public Exception getStoredException() {
return storedException;
}
}

View File

@ -47,7 +47,7 @@ public class MockAM {
private volatile int responseId = 0;
private final ApplicationAttemptId attemptId;
private final RMContext context;
private final AMRMProtocol amRMProtocol;
private AMRMProtocol amRMProtocol;
private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
private final List<ContainerId> releases = new ArrayList<ContainerId>();
@ -58,6 +58,10 @@ public class MockAM {
this.amRMProtocol = amRMProtocol;
this.attemptId = attemptId;
}
void setAMRMProtocol(AMRMProtocol amRMProtocol) {
this.amRMProtocol = amRMProtocol;
}
public void waitForState(RMAppAttemptState finalState) throws Exception {
RMApp app = context.getRMApps().get(attemptId.getApplicationId());
@ -66,7 +70,8 @@ public void waitForState(RMAppAttemptState finalState) throws Exception {
while (!finalState.equals(attempt.getAppAttemptState())
&& timeoutSecs++ < 20) {
System.out
.println("AppAttempt State is : " + attempt.getAppAttemptState()
.println("AppAttempt : " + attemptId + " State is : "
+ attempt.getAppAttemptState()
+ " Waiting for state : " + finalState);
Thread.sleep(500);
}

View File

@ -46,7 +46,7 @@ public class MockNM {
private int responseId;
private NodeId nodeId;
private final int memory;
private final ResourceTrackerService resourceTracker;
private ResourceTrackerService resourceTracker;
private final int httpPort = 2;
private MasterKey currentMasterKey;
@ -66,6 +66,10 @@ public NodeId getNodeId() {
public int getHttpPort() {
return httpPort;
}
void setResourceTrackerService(ResourceTrackerService resourceTracker) {
this.resourceTracker = resourceTracker;
}
public void containerStatus(Container container) throws Exception {
Map<ApplicationId, List<ContainerStatus>> conts =

View File

@ -39,9 +39,10 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -63,10 +64,17 @@ public MockRM() {
}
public MockRM(Configuration conf) {
super(StoreFactory.getStore(conf));
this(conf, null);
}
public MockRM(Configuration conf, RMStateStore store) {
super();
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
if(store != null) {
setRMStateStore(store);
}
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
rootLogger.setLevel(Level.DEBUG);
}
public void waitForState(ApplicationId appId, RMAppState finalState)
@ -75,7 +83,7 @@ public void waitForState(ApplicationId appId, RMAppState finalState)
Assert.assertNotNull("app shouldn't be null", app);
int timeoutSecs = 0;
while (!finalState.equals(app.getState()) && timeoutSecs++ < 20) {
System.out.println("App State is : " + app.getState()
System.out.println("App : " + appId + " State is : " + app.getState()
+ " Waiting for state : " + finalState);
Thread.sleep(500);
}
@ -83,6 +91,24 @@ public void waitForState(ApplicationId appId, RMAppState finalState)
Assert.assertEquals("App state is not correct (timedout)", finalState,
app.getState());
}
public void waitForState(ApplicationAttemptId attemptId,
RMAppAttemptState finalState)
throws Exception {
RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
Assert.assertNotNull("app shouldn't be null", app);
RMAppAttempt attempt = app.getCurrentAppAttempt();
int timeoutSecs = 0;
while (!finalState.equals(attempt.getAppAttemptState()) && timeoutSecs++ < 20) {
System.out.println("AppAttempt : " + attemptId
+ " State is : " + attempt.getAppAttemptState()
+ " Waiting for state : " + finalState);
Thread.sleep(500);
}
System.out.println("Attempt State is : " + attempt.getAppAttemptState());
Assert.assertEquals("Attempt state is not correct (timedout)", finalState,
attempt.getAppAttemptState());
}
// get new application id
public GetNewApplicationResponse getNewAppId() throws Exception {
@ -97,11 +123,16 @@ public RMApp submitApp(int masterMemory) throws Exception {
// client
public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
return submitApp(masterMemory, name, user, null);
return submitApp(masterMemory, name, user, null, false);
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls) throws Exception {
return submitApp(masterMemory, name, user, acls, false);
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged) throws Exception {
ClientRMProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
@ -114,6 +145,9 @@ public RMApp submitApp(int masterMemory, String name, String user,
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setUser(user);
if(unmanaged) {
sub.setUnmanagedAM(true);
}
ContainerLaunchContext clc = Records
.newRecord(ContainerLaunchContext.class);
Resource capability = Records.newRecord(Resource.class);

View File

@ -51,7 +51,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.BuilderUtils;
@ -85,7 +85,7 @@ public class TestApplicationACLs {
@BeforeClass
public static void setup() throws InterruptedException, IOException {
RMStateStore store = StoreFactory.getStore(conf);
RMStateStore store = RMStateStoreFactory.getStore(conf);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
AccessControlList adminACL = new AccessControlList("");
adminACL.addGroup(SUPER_GROUP);

View File

@ -0,0 +1,308 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
public class TestRMRestart {
@Test
public void testRMRestart() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE,
"org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
// PHASE 1: create state in an RM
// start RM
MockRM rm1 = new MockRM(conf, memStore);
// start like normal because state is empty
rm1.start();
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
MockNM nm2 = new MockNM("h2:5678", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode(); // nm2 will not heartbeat with RM1
// create app that will not be saved because it will finish
RMApp app0 = rm1.submitApp(200);
RMAppAttempt attempt0 = app0.getCurrentAppAttempt();
// spot check that app is saved
Assert.assertEquals(1, rmAppState.size());
nm1.nodeHeartbeat(true);
MockAM am0 = rm1.sendAMLaunched(attempt0.getAppAttemptId());
am0.registerAppAttempt();
am0.unregisterAppAttempt();
nm1.nodeHeartbeat(attempt0.getAppAttemptId(), 1, ContainerState.COMPLETE);
am0.waitForState(RMAppAttemptState.FINISHED);
rm1.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
// spot check that app is not saved anymore
Assert.assertEquals(0, rmAppState.size());
// create app that gets launched and does allocate before RM restart
RMApp app1 = rm1.submitApp(200);
// assert app1 info is saved
ApplicationState appState = rmAppState.get(app1.getApplicationId());
Assert.assertNotNull(appState);
Assert.assertEquals(0, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
.getApplicationId(), app1.getApplicationSubmissionContext()
.getApplicationId());
//kick the scheduling to allocate AM container
nm1.nodeHeartbeat(true);
// assert app1 attempt is saved
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
Assert.assertEquals(1, appState.getAttemptCount());
ApplicationAttemptState attemptState =
appState.getAttempt(attemptId1);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
// launch the AM
MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
// AM request for containers
am1.allocate("h1" , 1000, 1, new ArrayList<ContainerId>());
// kick the scheduler
nm1.nodeHeartbeat(true);
List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (conts.size() == 0) {
nm1.nodeHeartbeat(true);
conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(500);
}
// create app that does not get launched by RM before RM restart
RMApp app2 = rm1.submitApp(200);
// assert app2 info is saved
appState = rmAppState.get(app2.getApplicationId());
Assert.assertNotNull(appState);
Assert.assertEquals(0, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
.getApplicationId(), app2.getApplicationSubmissionContext()
.getApplicationId());
// create unmanaged app
RMApp appUnmanaged = rm1.submitApp(200, "", "", null, true);
ApplicationAttemptId unmanagedAttemptId =
appUnmanaged.getCurrentAppAttempt().getAppAttemptId();
// assert appUnmanaged info is saved
ApplicationId unmanagedAppId = appUnmanaged.getApplicationId();
appState = rmAppState.get(unmanagedAppId);
Assert.assertNotNull(appState);
// wait for attempt to reach LAUNCHED state
rm1.waitForState(unmanagedAttemptId, RMAppAttemptState.LAUNCHED);
rm1.waitForState(unmanagedAppId, RMAppState.ACCEPTED);
// assert unmanaged attempt info is saved
Assert.assertEquals(1, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
.getApplicationId(), appUnmanaged.getApplicationSubmissionContext()
.getApplicationId());
// PHASE 2: create new RM and start from old state
// create new RM to represent restart and recover state
MockRM rm2 = new MockRM(conf, memStore);
// start new RM
rm2.start();
// change NM to point to new RM
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
nm2.setResourceTrackerService(rm2.getResourceTrackerService());
// verify load of old state
// only 2 apps are loaded since unmanaged app is not loaded back since it
// cannot be restarted by the RM this will change with work preserving RM
// restart in which AMs/NMs are not rebooted
Assert.assertEquals(2, rm2.getRMContext().getRMApps().size());
// verify correct number of attempts and other data
RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
Assert.assertNotNull(loadedApp1);
//Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
Assert.assertEquals(app1.getApplicationSubmissionContext()
.getApplicationId(), loadedApp1.getApplicationSubmissionContext()
.getApplicationId());
RMApp loadedApp2 = rm2.getRMContext().getRMApps().get(app2.getApplicationId());
Assert.assertNotNull(loadedApp2);
//Assert.assertEquals(0, loadedApp2.getAppAttempts().size());
Assert.assertEquals(app2.getApplicationSubmissionContext()
.getApplicationId(), loadedApp2.getApplicationSubmissionContext()
.getApplicationId());
// verify state machine kicked into expected states
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED);
// verify new attempts created
Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
Assert.assertEquals(1, loadedApp2.getAppAttempts().size());
// verify old AM is not accepted
// change running AM to talk to new RM
am1.setAMRMProtocol(rm2.getApplicationMasterService());
AMResponse amResponse = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
Assert.assertTrue(amResponse.getReboot());
// NM should be rebooted on heartbeat, even first heartbeat for nm2
HeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
hbResponse = nm2.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
// new NM to represent NM re-register
nm1 = rm2.registerNode("h1:1234", 15120);
nm2 = rm2.registerNode("h2:5678", 15120);
// verify no more reboot response sent
hbResponse = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
hbResponse = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
// assert app1 attempt is saved
attempt1 = loadedApp1.getCurrentAppAttempt();
attemptId1 = attempt1.getAppAttemptId();
rm2.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
appState = rmAppState.get(loadedApp1.getApplicationId());
attemptState = appState.getAttempt(attemptId1);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
// Nodes on which the AM's run
MockNM am1Node = nm1;
if(attemptState.getMasterContainer().getNodeId().toString().contains("h2")){
am1Node = nm2;
}
// assert app2 attempt is saved
RMAppAttempt attempt2 = loadedApp2.getCurrentAppAttempt();
ApplicationAttemptId attemptId2 = attempt2.getAppAttemptId();
rm2.waitForState(attemptId2, RMAppAttemptState.ALLOCATED);
appState = rmAppState.get(loadedApp2.getApplicationId());
attemptState = appState.getAttempt(attemptId2);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId2, 1),
attemptState.getMasterContainer().getId());
MockNM am2Node = nm1;
if(attemptState.getMasterContainer().getNodeId().toString().contains("h2")){
am2Node = nm2;
}
// start the AM's
am1 = rm2.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
MockAM am2 = rm2.sendAMLaunched(attempt2.getAppAttemptId());
am2.registerAppAttempt();
//request for containers
am1.allocate("h1" , 1000, 3, new ArrayList<ContainerId>());
am2.allocate("h2" , 1000, 1, new ArrayList<ContainerId>());
// verify container allocate continues to work
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
conts = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (conts.size() == 0) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(500);
}
// finish the AM's
am1.unregisterAppAttempt();
am1Node.nodeHeartbeat(attempt1.getAppAttemptId(), 1, ContainerState.COMPLETE);
am1.waitForState(RMAppAttemptState.FINISHED);
am2.unregisterAppAttempt();
am2Node.nodeHeartbeat(attempt2.getAppAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FINISHED);
// stop RM's
rm2.stop();
rm1.stop();
// completed apps should be removed
Assert.assertEquals(0, rmAppState.size());
}
}

View File

@ -31,8 +31,6 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.junit.After;
@ -47,8 +45,7 @@ public class TestResourceManager {
@Before
public void setUp() throws Exception {
Configuration conf = new YarnConfiguration();
RMStateStore store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store);
resourceManager = new ResourceManager();
resourceManager.init(conf);
}

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -152,6 +153,11 @@ public String getUser() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public ApplicationSubmissionContext getApplicationSubmissionContext() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public String getName() {
throw new UnsupportedOperationException("Not supported yet.");

View File

@ -35,7 +35,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.junit.After;
import org.junit.Before;

View File

@ -44,7 +44,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;

View File

@ -24,9 +24,11 @@
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -67,6 +69,11 @@ public MockRMApp(int newid, long time, RMAppState newState, String userName, Str
public ApplicationId getApplicationId() {
return id;
}
@Override
public ApplicationSubmissionContext getApplicationSubmissionContext() {
return new ApplicationSubmissionContextPBImpl();
}
@Override
public RMAppState getState() {
@ -118,7 +125,9 @@ public void setName(String name) {
public Map<ApplicationAttemptId, RMAppAttempt> getAppAttempts() {
Map<ApplicationAttemptId, RMAppAttempt> attempts =
new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
attempts.put(attempt.getAppAttemptId(), attempt);
if(attempt != null) {
attempts.put(attempt.getAppAttemptId(), attempt);
}
return attempts;
}

View File

@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -65,6 +66,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@ -167,6 +169,9 @@ null, new ApplicationTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
new ClientToAMTokenSecretManagerInRM());
RMStateStore store = mock(RMStateStore.class);
((RMContextImpl) rmContext).setStateStore(store);
scheduler = mock(YarnScheduler.class);
masterService = mock(ApplicationMasterService.class);
applicationMasterLauncher = mock(ApplicationMasterLauncher.class);
@ -295,6 +300,14 @@ private void testAppAttemptKilledState(Container amContainer,
assertEquals(0, applicationAttempt.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus());
}
/**
* {@link RMAppAttemptState#RECOVERED}
*/
private void testAppAttemptRecoveredState() {
assertEquals(RMAppAttemptState.RECOVERED,
applicationAttempt.getAppAttemptState());
}
/**
* {@link RMAppAttemptState#SCHEDULED}
@ -438,6 +451,15 @@ private void scheduleApplicationAttempt() {
new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.APP_ACCEPTED));
if(unmanagedAM){
assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
applicationAttempt.getAppAttemptState());
applicationAttempt.handle(
new RMAppAttemptStoredEvent(
applicationAttempt.getAppAttemptId(), null));
}
testAppAttemptScheduledState();
}
@ -463,6 +485,12 @@ private Container allocateApplicationAttempt() {
applicationAttempt.getAppAttemptId(),
container));
assertEquals(RMAppAttemptState.ALLOCATED_SAVING,
applicationAttempt.getAppAttemptState());
applicationAttempt.handle(
new RMAppAttemptStoredEvent(
applicationAttempt.getAppAttemptId(), null));
testAppAttemptAllocatedState(container);
return container;
@ -554,6 +582,15 @@ public void testNewToKilled() {
testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS);
}
@Test
public void testNewToRecovered() {
applicationAttempt.handle(
new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.RECOVER));
testAppAttemptRecoveredState();
}
@Test
public void testSubmittedToFailed() {
submitApplicationAttempt();
@ -604,7 +641,7 @@ public void testAllocatedToFailed() {
diagnostics));
testAppAttemptFailedState(amContainer, diagnostics);
}
@Test
public void testRunningToFailed() {
Container amContainer = allocateApplicationAttempt();

View File

@ -27,7 +27,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -40,8 +39,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -76,8 +73,7 @@ public class TestCapacityScheduler {
@Before
public void setUp() throws Exception {
RMStateStore store = StoreFactory.getStore(new Configuration());
resourceManager = new ResourceManager(store);
resourceManager = new ResourceManager();
CapacitySchedulerConfiguration csConf
= new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);

View File

@ -29,8 +29,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.Before;
@ -47,8 +45,7 @@ public void setup() throws IOException {
Configuration conf = createConfiguration();
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
RMStateStore store = StoreFactory.getStore(conf);
ResourceManager resourceManager = new ResourceManager(store);
ResourceManager resourceManager = new ResourceManager();
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
scheduler.reinitialize(conf, resourceManager.getRMContext());

View File

@ -51,8 +51,6 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -102,8 +100,7 @@ public void setUp() throws IOException {
Configuration conf = createConfiguration();
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
RMStateStore store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store);
resourceManager = new ResourceManager();
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
scheduler.reinitialize(conf, resourceManager.getRMContext());

View File

@ -27,8 +27,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.After;
import org.junit.Before;
@ -50,8 +48,7 @@ public void setUp() throws IOException {
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
RMStateStore store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store);
resourceManager = new ResourceManager();
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
scheduler.reinitialize(conf, resourceManager.getRMContext());

View File

@ -38,8 +38,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -59,8 +57,7 @@ public class TestFifoScheduler {
@Before
public void setUp() throws Exception {
RMStateStore store = StoreFactory.getStore(new Configuration());
resourceManager = new ResourceManager(store);
resourceManager = new ResourceManager();
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class);

View File

@ -48,8 +48,6 @@
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
@ -154,8 +152,7 @@ public synchronized void start() {
getConfig().set(YarnConfiguration.RM_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
}
RMStateStore store = StoreFactory.getStore(getConfig());
resourceManager = new ResourceManager(store) {
resourceManager = new ResourceManager() {
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcase.

View File

@ -47,7 +47,7 @@ public void testNMUpdation() throws Exception {
// intervene
final DrainDispatcher dispatcher = new DrainDispatcher();
ResourceManager rm = new ResourceManager(null) {
ResourceManager rm = new ResourceManager() {
@Override
protected void doSecureLogin() throws IOException {
// Do nothing.