YARN-2404. Removed ApplicationAttemptState and ApplicationState class in RMStateStore. Contributed by Tsuyoshi OZAWA

(cherry picked from commit 5805a81efb)
This commit is contained in:
Jian He 2014-11-25 12:48:22 -08:00
parent 4b62d6d2fd
commit 2863056530
23 changed files with 353 additions and 537 deletions

View File

@ -65,6 +65,9 @@ Release 2.7.0 - UNRELEASED
YARN-2669. FairScheduler: queue names shouldn't allow periods
(Wei Yan via Sandy Ryza)
YARN-2404. Removed ApplicationAttemptState and ApplicationState class in
RMStateStore. (Tsuyoshi OZAWA via jianhe)
OPTIMIZATIONS
BUG FIXES

View File

@ -40,9 +40,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -306,11 +306,11 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
}
}
protected void recoverApplication(ApplicationState appState, RMState rmState)
throws Exception {
protected void recoverApplication(ApplicationStateData appState,
RMState rmState) throws Exception {
ApplicationSubmissionContext appContext =
appState.getApplicationSubmissionContext();
ApplicationId appId = appState.getAppId();
ApplicationId appId = appContext.getApplicationId();
// create and recover app.
RMAppImpl application =
@ -414,9 +414,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
RMStateStore store = rmContext.getStateStore();
assert store != null;
// recover applications
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
Map<ApplicationId, ApplicationStateData> appStates =
state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications");
for (ApplicationState appState : appStates.values()) {
for (ApplicationStateData appState : appStates.values()) {
recoverApplication(appState, state);
}
}

View File

@ -38,9 +38,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -223,8 +221,8 @@ public class FileSystemRMStateStore extends RMStateStore {
private void loadRMAppState(RMState rmState) throws Exception {
try {
List<ApplicationAttemptState> attempts =
new ArrayList<ApplicationAttemptState>();
List<ApplicationAttemptStateData> attempts =
new ArrayList<ApplicationAttemptStateData>();
for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
checkAndResumeUpdateOperation(appDir.getPath());
@ -241,19 +239,11 @@ public class FileSystemRMStateStore extends RMStateStore {
if (LOG.isDebugEnabled()) {
LOG.debug("Loading application from node: " + childNodeName);
}
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
ApplicationStateDataPBImpl appStateData =
ApplicationStateDataPBImpl appState =
new ApplicationStateDataPBImpl(
ApplicationStateDataProto.parseFrom(childData));
ApplicationState appState =
new ApplicationState(appStateData.getSubmitTime(),
appStateData.getStartTime(),
appStateData.getApplicationSubmissionContext(),
appStateData.getUser(),
appStateData.getState(),
appStateData.getDiagnostics(), appStateData.getFinishTime());
// assert child node name is same as actual applicationId
assert appId.equals(appState.context.getApplicationId());
ApplicationId appId =
appState.getApplicationSubmissionContext().getApplicationId();
rmState.appState.put(appId, appState);
} else if (childNodeName
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
@ -262,33 +252,9 @@ public class FileSystemRMStateStore extends RMStateStore {
LOG.debug("Loading application attempt from node: "
+ childNodeName);
}
ApplicationAttemptId attemptId =
ConverterUtils.toApplicationAttemptId(childNodeName);
ApplicationAttemptStateDataPBImpl attemptStateData =
ApplicationAttemptStateDataPBImpl attemptState =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(childData));
Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) {
credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(),
attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus(),
attemptStateData.getFinishTime(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());
// assert child node name is same as application attempt id
assert attemptId.equals(attemptState.getAttemptId());
attempts.add(attemptState);
} else {
LOG.info("Unknown child node with name: " + childNodeName);
@ -299,9 +265,9 @@ public class FileSystemRMStateStore extends RMStateStore {
// go through all attempts and add them to their apps, Ideally, each
// attempt node must have a corresponding app node, because remove
// directory operation remove both at the same time
for (ApplicationAttemptState attemptState : attempts) {
for (ApplicationAttemptStateData attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
ApplicationState appState = rmState.appState.get(appId);
ApplicationStateData appState = rmState.appState.get(appId);
assert appState != null;
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
@ -398,10 +364,9 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override
public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
String appIdStr = appId.toString();
Path appDirPath = getAppDir(rmAppRoot, appIdStr);
Path appDirPath = getAppDir(rmAppRoot, appId);
fs.mkdirs(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@ -418,9 +383,8 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override
public synchronized void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
String appIdStr = appId.toString();
Path appDirPath = getAppDir(rmAppRoot, appIdStr);
Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
Path appDirPath = getAppDir(rmAppRoot, appId);
Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@ -440,7 +404,7 @@ public class FileSystemRMStateStore extends RMStateStore {
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
getAppDir(rmAppRoot, appAttemptId.getApplicationId());
Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
LOG.info("Storing info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
@ -461,7 +425,7 @@ public class FileSystemRMStateStore extends RMStateStore {
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
getAppDir(rmAppRoot, appAttemptId.getApplicationId());
Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
LOG.info("Updating info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
@ -477,9 +441,11 @@ public class FileSystemRMStateStore extends RMStateStore {
}
@Override
public synchronized void removeApplicationStateInternal(ApplicationState appState)
public synchronized void removeApplicationStateInternal(
ApplicationStateData appState)
throws Exception {
String appId = appState.getAppId().toString();
ApplicationId appId =
appState.getApplicationSubmissionContext().getApplicationId();
Path nodeRemovePath = getAppDir(rmAppRoot, appId);
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
deleteFile(nodeRemovePath);
@ -572,8 +538,8 @@ public class FileSystemRMStateStore extends RMStateStore {
}
}
private Path getAppDir(Path root, String appId) {
return getNodePath(root, appId);
private Path getAppDir(Path root, ApplicationId appId) {
return getNodePath(root, appId.toString());
}
// FileSystem related code

View File

@ -25,8 +25,6 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -93,57 +91,30 @@ public class MemoryRMStateStore extends RMStateStore {
}
@Override
public void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateData)
public void storeApplicationStateInternal(
ApplicationId appId, ApplicationStateData appState)
throws Exception {
ApplicationState appState =
new ApplicationState(appStateData.getSubmitTime(),
appStateData.getStartTime(),
appStateData.getApplicationSubmissionContext(),
appStateData.getUser());
state.appState.put(appId, appState);
}
@Override
public void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateData) throws Exception {
ApplicationState updatedAppState =
new ApplicationState(appStateData.getSubmitTime(),
appStateData.getStartTime(),
appStateData.getApplicationSubmissionContext(),
appStateData.getUser(), appStateData.getState(),
appStateData.getDiagnostics(), appStateData.getFinishTime());
LOG.info("Updating final state " + appStateData.getState() + " for app: "
ApplicationStateData appState) throws Exception {
LOG.info("Updating final state " + appState.getState() + " for app: "
+ appId);
if (state.appState.get(appId) != null) {
// add the earlier attempts back
updatedAppState.attempts
.putAll(state.appState.get(appId).attempts);
appState.attempts.putAll(state.appState.get(appId).attempts);
}
state.appState.put(appId, updatedAppState);
state.appState.put(appId, appState);
}
@Override
public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateData)
ApplicationAttemptStateData attemptState)
throws Exception {
Credentials credentials = null;
if(attemptStateData.getAppAttemptTokens() != null){
DataInputByteBuffer dibb = new DataInputByteBuffer();
credentials = new Credentials();
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttemptId,
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());
ApplicationState appState = state.getApplicationState().get(
ApplicationStateData appState = state.getApplicationState().get(
attemptState.getAttemptId().getApplicationId());
if (appState == null) {
throw new YarnRuntimeException("Application doesn't exist");
@ -154,44 +125,25 @@ public class MemoryRMStateStore extends RMStateStore {
@Override
public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateData)
ApplicationAttemptStateData attemptState)
throws Exception {
Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) {
DataInputByteBuffer dibb = new DataInputByteBuffer();
credentials = new Credentials();
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState updatedAttemptState =
new ApplicationAttemptState(appAttemptId,
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(), attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus(),
attemptStateData.getFinishTime(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());
ApplicationState appState =
state.getApplicationState().get(
updatedAttemptState.getAttemptId().getApplicationId());
ApplicationStateData appState =
state.getApplicationState().get(appAttemptId.getApplicationId());
if (appState == null) {
throw new YarnRuntimeException("Application doesn't exist");
}
LOG.info("Updating final state " + updatedAttemptState.getState()
+ " for attempt: " + updatedAttemptState.getAttemptId());
appState.attempts.put(updatedAttemptState.getAttemptId(),
updatedAttemptState);
LOG.info("Updating final state " + attemptState.getState()
+ " for attempt: " + attemptState.getAttemptId());
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
@Override
public synchronized void removeApplicationStateInternal(
ApplicationState appState) throws Exception {
ApplicationId appId = appState.getAppId();
ApplicationState removed = state.appState.remove(appId);
ApplicationStateData appState) throws Exception {
ApplicationId appId =
appState.getApplicationSubmissionContext().getApplicationId();
ApplicationStateData removed = state.appState.remove(appId);
if (removed == null) {
throw new YarnRuntimeException("Removing non-exsisting application state");
}

View File

@ -71,7 +71,7 @@ public class NullRMStateStore extends RMStateStore {
}
@Override
protected void removeApplicationStateInternal(ApplicationState appState)
protected void removeApplicationStateInternal(ApplicationStateData appState)
throws Exception {
// Do nothing
}

View File

@ -38,9 +38,6 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -56,12 +53,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Applicatio
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
@ -129,13 +124,13 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState();
ApplicationId appId = appState.getAppId();
ApplicationStateData appStateData = ApplicationStateData
.newInstance(appState);
ApplicationStateData appState =
((RMStateStoreAppEvent) event).getAppState();
ApplicationId appId =
appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Storing info for app: " + appId);
try {
store.storeApplicationStateInternal(appId, appStateData);
store.storeApplicationStateInternal(appId, appState);
store.notifyApplication(new RMAppEvent(appId,
RMAppEventType.APP_NEW_SAVED));
} catch (Exception e) {
@ -154,13 +149,13 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState();
ApplicationId appId = appState.getAppId();
ApplicationStateData appStateData = ApplicationStateData
.newInstance(appState);
ApplicationStateData appState =
((RMStateUpdateAppEvent) event).getAppState();
ApplicationId appId =
appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Updating info for app: " + appId);
try {
store.updateApplicationStateInternal(appId, appStateData);
store.updateApplicationStateInternal(appId, appState);
store.notifyApplication(new RMAppEvent(appId,
RMAppEventType.APP_UPDATE_SAVED));
} catch (Exception e) {
@ -179,9 +174,10 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
ApplicationState appState = ((RMStateStoreRemoveAppEvent) event)
.getAppState();
ApplicationId appId = appState.getAppId();
ApplicationStateData appState =
((RMStateStoreRemoveAppEvent) event).getAppState();
ApplicationId appId =
appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Removing info for app: " + appId);
try {
store.removeApplicationStateInternal(appState);
@ -201,16 +197,14 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
ApplicationAttemptState attemptState =
ApplicationAttemptStateData attemptState =
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
try {
ApplicationAttemptStateData attemptStateData =
ApplicationAttemptStateData.newInstance(attemptState);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
}
store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptStateData);
attemptState);
store.notifyApplicationAttempt(new RMAppAttemptEvent
(attemptState.getAttemptId(),
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
@ -230,16 +224,14 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
ApplicationAttemptState attemptState =
ApplicationAttemptStateData attemptState =
((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
try {
ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData
.newInstance(attemptState);
if (LOG.isDebugEnabled()) {
LOG.debug("Updating info for attempt: " + attemptState.getAttemptId());
}
store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptStateData);
attemptState);
store.notifyApplicationAttempt(new RMAppAttemptEvent
(attemptState.getAttemptId(),
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
@ -255,153 +247,6 @@ public abstract class RMStateStore extends AbstractService {
stateMachine = stateMachineFactory.make(this);
}
/**
* State of an application attempt
*/
public static class ApplicationAttemptState {
final ApplicationAttemptId attemptId;
final Container masterContainer;
final Credentials appAttemptCredentials;
long startTime = 0;
long finishTime = 0;
// fields set when attempt completes
RMAppAttemptState state;
String finalTrackingUrl = "N/A";
String diagnostics;
int exitStatus = ContainerExitStatus.INVALID;
FinalApplicationStatus amUnregisteredFinalStatus;
long memorySeconds;
long vcoreSeconds;
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
long startTime, long memorySeconds, long vcoreSeconds) {
this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
null, "", null, ContainerExitStatus.INVALID, 0, memorySeconds, vcoreSeconds);
}
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
long startTime, RMAppAttemptState state, String finalTrackingUrl,
String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
int exitStatus, long finishTime, long memorySeconds,
long vcoreSeconds) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
this.appAttemptCredentials = appAttemptCredentials;
this.startTime = startTime;
this.state = state;
this.finalTrackingUrl = finalTrackingUrl;
this.diagnostics = diagnostics == null ? "" : diagnostics;
this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
this.exitStatus = exitStatus;
this.finishTime = finishTime;
this.memorySeconds = memorySeconds;
this.vcoreSeconds = vcoreSeconds;
}
public Container getMasterContainer() {
return masterContainer;
}
public ApplicationAttemptId getAttemptId() {
return attemptId;
}
public Credentials getAppAttemptCredentials() {
return appAttemptCredentials;
}
public RMAppAttemptState getState(){
return state;
}
public String getFinalTrackingUrl() {
return finalTrackingUrl;
}
public String getDiagnostics() {
return diagnostics;
}
public long getStartTime() {
return startTime;
}
public FinalApplicationStatus getFinalApplicationStatus() {
return amUnregisteredFinalStatus;
}
public int getAMContainerExitStatus(){
return this.exitStatus;
}
public long getMemorySeconds() {
return memorySeconds;
}
public long getVcoreSeconds() {
return vcoreSeconds;
}
public long getFinishTime() {
return this.finishTime;
}
}
/**
* State of an application application
*/
public static class ApplicationState {
final ApplicationSubmissionContext context;
final long submitTime;
final long startTime;
final String user;
Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
new HashMap<ApplicationAttemptId, ApplicationAttemptState>();
// fields set when application completes.
RMAppState state;
String diagnostics;
long finishTime;
public ApplicationState(long submitTime,
long startTime, ApplicationSubmissionContext context, String user) {
this(submitTime, startTime, context, user, null, "", 0);
}
public ApplicationState(long submitTime,
long startTime,ApplicationSubmissionContext context,
String user, RMAppState state, String diagnostics, long finishTime) {
this.submitTime = submitTime;
this.startTime = startTime;
this.context = context;
this.user = user;
this.state = state;
this.diagnostics = diagnostics == null ? "" : diagnostics;
this.finishTime = finishTime;
}
public ApplicationId getAppId() {
return context.getApplicationId();
}
public long getSubmitTime() {
return submitTime;
}
public long getStartTime() {
return startTime;
}
public int getAttemptCount() {
return attempts.size();
}
public ApplicationSubmissionContext getApplicationSubmissionContext() {
return context;
}
public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) {
return attempts.get(attemptId);
}
public String getUser() {
return user;
}
public RMAppState getState() {
return state;
}
public String getDiagnostics() {
return diagnostics;
}
public long getFinishTime() {
return finishTime;
}
}
public static class RMDTSecretManagerState {
// DTIdentifier -> renewDate
Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
@ -429,14 +274,14 @@ public abstract class RMStateStore extends AbstractService {
* State of the ResourceManager
*/
public static class RMState {
Map<ApplicationId, ApplicationState> appState =
new TreeMap<ApplicationId, ApplicationState>();
Map<ApplicationId, ApplicationStateData> appState =
new TreeMap<ApplicationId, ApplicationStateData>();
RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;
public Map<ApplicationId, ApplicationState> getApplicationState() {
public Map<ApplicationId, ApplicationStateData> getApplicationState() {
return appState;
}
@ -575,14 +420,15 @@ public abstract class RMStateStore extends AbstractService {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationState appState =
new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
app.getUser());
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(), context, app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
}
@SuppressWarnings("unchecked")
public synchronized void updateApplicationState(ApplicationState appState) {
public synchronized void updateApplicationState(
ApplicationStateData appState) {
dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
}
@ -609,11 +455,13 @@ public abstract class RMStateStore extends AbstractService {
AggregateAppResourceUsage resUsage =
appAttempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(), credentials,
appAttempt.getStartTime(), resUsage.getMemorySeconds(),
resUsage.getVcoreSeconds());
ApplicationAttemptStateData attemptState =
ApplicationAttemptStateData.newInstance(
appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(),
credentials, appAttempt.getStartTime(),
resUsage.getMemorySeconds(),
resUsage.getVcoreSeconds());
dispatcher.getEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState));
@ -621,7 +469,7 @@ public abstract class RMStateStore extends AbstractService {
@SuppressWarnings("unchecked")
public synchronized void updateApplicationAttemptState(
ApplicationAttemptState attemptState) {
ApplicationAttemptStateData attemptState) {
dispatcher.getEventHandler().handle(
new RMStateUpdateAppAttemptEvent(attemptState));
}
@ -761,16 +609,12 @@ public abstract class RMStateStore extends AbstractService {
*/
@SuppressWarnings("unchecked")
public synchronized void removeApplication(RMApp app) {
ApplicationState appState = new ApplicationState(
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(),
app.getApplicationSubmissionContext(), app.getUser());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(), credentials,
appAttempt.getStartTime(), 0, 0);
appState.attempts.put(attemptState.getAttemptId(), attemptState);
appState.attempts.put(appAttempt.getAppAttemptId(), null);
}
dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
@ -782,7 +626,7 @@ public abstract class RMStateStore extends AbstractService {
* application and its attempts
*/
protected abstract void removeApplicationStateInternal(
ApplicationState appState) throws Exception;
ApplicationStateData appState) throws Exception;
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
// YARN-1779

View File

@ -18,17 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
public class RMStateStoreAppAttemptEvent extends RMStateStoreEvent {
ApplicationAttemptState attemptState;
ApplicationAttemptStateData attemptState;
public RMStateStoreAppAttemptEvent(ApplicationAttemptState attemptState) {
public RMStateStoreAppAttemptEvent(ApplicationAttemptStateData attemptState) {
super(RMStateStoreEventType.STORE_APP_ATTEMPT);
this.attemptState = attemptState;
}
public ApplicationAttemptState getAppAttemptState() {
public ApplicationAttemptStateData getAppAttemptState() {
return attemptState;
}
}

View File

@ -18,18 +18,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
public class RMStateStoreAppEvent extends RMStateStoreEvent {
private final ApplicationState appState;
private final ApplicationStateData appState;
public RMStateStoreAppEvent(ApplicationState appState) {
public RMStateStoreAppEvent(ApplicationStateData appState) {
super(RMStateStoreEventType.STORE_APP);
this.appState = appState;
}
public ApplicationState getAppState() {
public ApplicationStateData getAppState() {
return appState;
}
}

View File

@ -18,17 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent {
ApplicationState appState;
ApplicationStateData appState;
RMStateStoreRemoveAppEvent(ApplicationState appState) {
RMStateStoreRemoveAppEvent(ApplicationStateData appState) {
super(RMStateStoreEventType.REMOVE_APP);
this.appState = appState;
}
public ApplicationState getAppState() {
public ApplicationStateData getAppState() {
return appState;
}
}

View File

@ -18,18 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
public class RMStateUpdateAppAttemptEvent extends RMStateStoreEvent {
ApplicationAttemptState attemptState;
ApplicationAttemptStateData attemptState;
public RMStateUpdateAppAttemptEvent(ApplicationAttemptState attemptState) {
public RMStateUpdateAppAttemptEvent(
ApplicationAttemptStateData attemptState) {
super(RMStateStoreEventType.UPDATE_APP_ATTEMPT);
this.attemptState = attemptState;
}
public ApplicationAttemptState getAppAttemptState() {
public ApplicationAttemptStateData getAppAttemptState() {
return attemptState;
}
}

View File

@ -18,17 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
public class RMStateUpdateAppEvent extends RMStateStoreEvent {
private final ApplicationState appState;
private final ApplicationStateData appState;
public RMStateUpdateAppEvent(ApplicationState appState) {
public RMStateUpdateAppEvent(ApplicationStateData appState) {
super(RMStateStoreEventType.UPDATE_APP);
this.appState = appState;
}
public ApplicationState getAppState() {
public ApplicationStateData getAppState() {
return appState;
}
}

View File

@ -34,8 +34,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
@ -562,17 +560,11 @@ public class ZKRMStateStore extends RMStateStore {
LOG.debug("Loading application from znode: " + childNodeName);
}
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
ApplicationStateDataPBImpl appStateData =
ApplicationStateDataPBImpl appState =
new ApplicationStateDataPBImpl(
ApplicationStateDataProto.parseFrom(childData));
ApplicationState appState =
new ApplicationState(appStateData.getSubmitTime(),
appStateData.getStartTime(),
appStateData.getApplicationSubmissionContext(),
appStateData.getUser(),
appStateData.getState(),
appStateData.getDiagnostics(), appStateData.getFinishTime());
if (!appId.equals(appState.context.getApplicationId())) {
if (!appId.equals(
appState.getApplicationSubmissionContext().getApplicationId())) {
throw new YarnRuntimeException("The child node name is different " +
"from the application id");
}
@ -584,7 +576,7 @@ public class ZKRMStateStore extends RMStateStore {
}
}
private void loadApplicationAttemptState(ApplicationState appState,
private void loadApplicationAttemptState(ApplicationStateData appState,
ApplicationId appId)
throws Exception {
String appPath = getNodePath(rmAppRoot, appId.toString());
@ -594,31 +586,9 @@ public class ZKRMStateStore extends RMStateStore {
String attemptPath = getNodePath(appPath, attemptIDStr);
byte[] attemptData = getDataWithRetries(attemptPath, true);
ApplicationAttemptId attemptId =
ConverterUtils.toApplicationAttemptId(attemptIDStr);
ApplicationAttemptStateDataPBImpl attemptStateData =
ApplicationAttemptStateDataPBImpl attemptState =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(attemptData));
Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) {
credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(), attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus(),
attemptStateData.getAMContainerExitStatus(),
attemptStateData.getFinishTime(),
attemptStateData.getMemorySeconds(),
attemptStateData.getVcoreSeconds());
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
@ -705,9 +675,11 @@ public class ZKRMStateStore extends RMStateStore {
}
@Override
public synchronized void removeApplicationStateInternal(ApplicationState appState)
public synchronized void removeApplicationStateInternal(
ApplicationStateData appState)
throws Exception {
String appId = appState.getAppId().toString();
String appId = appState.getApplicationSubmissionContext().getApplicationId()
.toString();
String appIdRemovePath = getNodePath(rmAppRoot, appId);
ArrayList<Op> opList = new ArrayList<Op>();

View File

@ -18,18 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
@ -41,7 +37,7 @@ import org.apache.hadoop.yarn.util.Records;
public abstract class ApplicationAttemptStateData {
public static ApplicationAttemptStateData newInstance(
ApplicationAttemptId attemptId, Container container,
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
Credentials attemptTokens, long startTime, RMAppAttemptState finalState,
String finalTrackingUrl, String diagnostics,
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
long finishTime, long memorySeconds, long vcoreSeconds) {
@ -52,7 +48,7 @@ public abstract class ApplicationAttemptStateData {
attemptStateData.setAppAttemptTokens(attemptTokens);
attemptStateData.setState(finalState);
attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
attemptStateData.setDiagnostics(diagnostics);
attemptStateData.setDiagnostics(diagnostics == null ? "" : diagnostics);
attemptStateData.setStartTime(startTime);
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
attemptStateData.setAMContainerExitStatus(exitStatus);
@ -63,22 +59,14 @@ public abstract class ApplicationAttemptStateData {
}
public static ApplicationAttemptStateData newInstance(
ApplicationAttemptState attemptState) throws IOException {
Credentials credentials = attemptState.getAppAttemptCredentials();
ByteBuffer appAttemptTokens = null;
if (credentials != null) {
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
ApplicationAttemptId attemptId, Container masterContainer,
Credentials attemptTokens, long startTime, long memorySeconds,
long vcoreSeconds) {
return newInstance(attemptId, masterContainer, attemptTokens,
startTime, null, "N/A", "", null, ContainerExitStatus.INVALID, 0,
memorySeconds, vcoreSeconds);
}
return newInstance(attemptState.getAttemptId(),
attemptState.getMasterContainer(), appAttemptTokens,
attemptState.getStartTime(), attemptState.getState(),
attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
attemptState.getFinalApplicationStatus(),
attemptState.getAMContainerExitStatus(), attemptState.getFinishTime(),
attemptState.getMemorySeconds(), attemptState.getVcoreSeconds());
}
public abstract ApplicationAttemptStateDataProto getProto();
@ -108,9 +96,9 @@ public abstract class ApplicationAttemptStateData {
*/
@Public
@Unstable
public abstract ByteBuffer getAppAttemptTokens();
public abstract Credentials getAppAttemptTokens();
public abstract void setAppAttemptTokens(ByteBuffer attemptTokens);
public abstract void setAppAttemptTokens(Credentials attemptTokens);
/**
* Get the final state of the application attempt.

View File

@ -18,14 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.Records;
@ -36,6 +38,9 @@ import org.apache.hadoop.yarn.util.Records;
@Public
@Unstable
public abstract class ApplicationStateData {
public Map<ApplicationAttemptId, ApplicationAttemptStateData> attempts =
new HashMap<ApplicationAttemptId, ApplicationAttemptStateData>();
public static ApplicationStateData newInstance(long submitTime,
long startTime, String user,
ApplicationSubmissionContext submissionContext,
@ -51,12 +56,18 @@ public abstract class ApplicationStateData {
return appState;
}
public static ApplicationStateData newInstance(
ApplicationState appState) {
return newInstance(appState.getSubmitTime(), appState.getStartTime(),
appState.getUser(), appState.getApplicationSubmissionContext(),
appState.getState(), appState.getDiagnostics(),
appState.getFinishTime());
public static ApplicationStateData newInstance(long submitTime,
long startTime, ApplicationSubmissionContext context, String user) {
return newInstance(submitTime, startTime, user, context, null, "", 0);
}
public int getAttemptCount() {
return attempts.size();
}
public ApplicationAttemptStateData getAttempt(
ApplicationAttemptId attemptId) {
return attempts.get(attemptId);
}
public abstract ApplicationStateDataProto getProto();

View File

@ -18,8 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -37,6 +44,8 @@ import com.google.protobuf.TextFormat;
public class ApplicationAttemptStateDataPBImpl extends
ApplicationAttemptStateData {
private static Log LOG =
LogFactory.getLog(ApplicationAttemptStateDataPBImpl.class);
ApplicationAttemptStateDataProto proto =
ApplicationAttemptStateDataProto.getDefaultInstance();
ApplicationAttemptStateDataProto.Builder builder = null;
@ -137,26 +146,27 @@ public class ApplicationAttemptStateDataPBImpl extends
}
@Override
public ByteBuffer getAppAttemptTokens() {
public Credentials getAppAttemptTokens() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
if(appAttemptTokens != null) {
return appAttemptTokens;
return convertCredentialsFromByteBuffer(appAttemptTokens);
}
if(!p.hasAppAttemptTokens()) {
return null;
}
this.appAttemptTokens = ProtoUtils.convertFromProtoFormat(
p.getAppAttemptTokens());
return appAttemptTokens;
return convertCredentialsFromByteBuffer(appAttemptTokens);
}
@Override
public void setAppAttemptTokens(ByteBuffer attemptTokens) {
public void setAppAttemptTokens(Credentials attemptTokens) {
maybeInitBuilder();
if(attemptTokens == null) {
builder.clearAppAttemptTokens();
return;
}
this.appAttemptTokens = attemptTokens;
this.appAttemptTokens = convertCredentialsToByteBuffer(attemptTokens);
}
@Override
@ -330,4 +340,44 @@ public class ApplicationAttemptStateDataPBImpl extends
maybeInitBuilder();
builder.setFinishTime(finishTime);
}
private static ByteBuffer convertCredentialsToByteBuffer(
Credentials credentials) {
ByteBuffer appAttemptTokens = null;
DataOutputBuffer dob = new DataOutputBuffer();
try {
if (credentials != null) {
credentials.writeTokenStorageToStream(dob);
appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
return appAttemptTokens;
} catch (IOException e) {
LOG.error("Failed to convert Credentials to ByteBuffer.");
assert false;
return null;
} finally {
IOUtils.closeStream(dob);
}
}
private static Credentials convertCredentialsFromByteBuffer(
ByteBuffer appAttemptTokens) {
DataInputByteBuffer dibb = new DataInputByteBuffer();
try {
Credentials credentials = null;
if (appAttemptTokens != null) {
credentials = new Credentials();
appAttemptTokens.rewind();
dibb.reset(appAttemptTokens);
credentials.readTokenStorageStream(dibb);
}
return credentials;
} catch (IOException e) {
LOG.error("Failed to convert Credentials from ByteBuffer.");
assert false;
return null;
} finally {
IOUtils.closeStream(dibb);
}
}
}

View File

@ -66,9 +66,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -728,10 +728,12 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override
public void recover(RMState state) {
ApplicationState appState = state.getApplicationState().get(getApplicationId());
ApplicationStateData appState =
state.getApplicationState().get(getApplicationId());
this.recoveredFinalState = appState.getState();
LOG.info("Recovering app: " + getApplicationId() + " with " +
+ appState.getAttemptCount() + " attempts and final state = " + this.recoveredFinalState );
+ appState.getAttemptCount() + " attempts and final state = "
+ this.recoveredFinalState );
this.diagnostics.append(appState.getDiagnostics());
this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime();
@ -1019,10 +1021,10 @@ public class RMAppImpl implements RMApp, Recoverable {
default:
break;
}
ApplicationState appState =
new ApplicationState(this.submitTime, this.startTime,
this.submissionContext, this.user, stateToBeStored, diags,
this.storedFinishTime);
ApplicationStateData appState =
ApplicationStateData.newInstance(this.submitTime, this.startTime,
this.user, this.submissionContext,
stateToBeStored, diags, this.storedFinishTime);
this.rmContext.getStateStore().updateApplicationState(appState);
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@ -70,9 +69,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -793,9 +792,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@Override
public void recover(RMState state) {
ApplicationState appState =
ApplicationStateData appState =
state.getApplicationState().get(getAppAttemptId().getApplicationId());
ApplicationAttemptState attemptState =
ApplicationAttemptStateData attemptState =
appState.getAttempt(getAppAttemptId());
assert attemptState != null;
LOG.info("Recovering attempt: " + getAppAttemptId() + " with final state: "
@ -806,9 +805,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
if (amContainerExitStatus == ContainerExitStatus.PREEMPTED) {
this.attemptMetrics.setIsPreempted();
}
Credentials credentials = attemptState.getAppAttemptTokens();
setMasterContainer(attemptState.getMasterContainer());
recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
attemptState.getState());
recoverAppAttemptCredentials(credentials, attemptState.getState());
this.recoveredFinalState = attemptState.getState();
this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
@ -1123,10 +1123,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.attemptMetrics.getAggregateAppResourceUsage();
RMStateStore rmStore = rmContext.getStateStore();
setFinishTime(System.currentTimeMillis());
ApplicationAttemptState attemptState =
new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this), startTime,
stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus,
ApplicationAttemptStateData attemptState =
ApplicationAttemptStateData.newInstance(
applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this),
startTime, stateToBeStored, finalTrackingUrl, diags,
finalStatus, exitStatus,
getFinishTime(), resUsage.getMemorySeconds(),
resUsage.getVcoreSeconds());
LOG.info("Updating application attempt " + applicationAttemptId

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -459,7 +460,8 @@ public class TestRMHA {
MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override
public synchronized void updateApplicationState(ApplicationState appState) {
public synchronized void updateApplicationState(
ApplicationStateData appState) {
notifyStoreOperationFailed(new StoreFencedException());
}
};

View File

@ -84,8 +84,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
@ -162,7 +160,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
@ -194,7 +192,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// create app that gets launched and does allocate before RM restart
RMApp app1 = rm1.submitApp(200);
// assert app1 info is saved
ApplicationState appState = rmAppState.get(app1.getApplicationId());
ApplicationStateData appState = rmAppState.get(app1.getApplicationId());
Assert.assertNotNull(appState);
Assert.assertEquals(0, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
@ -209,7 +207,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
Assert.assertEquals(1, appState.getAttemptCount());
ApplicationAttemptState attemptState =
ApplicationAttemptStateData attemptState =
appState.getAttempt(attemptId1);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
@ -429,7 +427,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// start RM
@ -450,7 +448,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am0.waitForState(RMAppAttemptState.FAILED);
ApplicationState appState = rmAppState.get(app0.getApplicationId());
ApplicationStateData appState = rmAppState.get(app0.getApplicationId());
// assert the AM failed state is saved.
Assert.assertEquals(RMAppAttemptState.FAILED,
appState.getAttempt(am0.getApplicationAttemptId()).getState());
@ -486,7 +484,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// start RM
@ -650,7 +648,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
};
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// start RM
@ -689,7 +687,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// start RM
@ -709,7 +707,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
// assert the app/attempt failed state is saved.
ApplicationState appState = rmAppState.get(app0.getApplicationId());
ApplicationStateData appState = rmAppState.get(app0.getApplicationId());
Assert.assertEquals(RMAppState.FAILED, appState.getState());
Assert.assertEquals(RMAppAttemptState.FAILED,
appState.getAttempt(am0.getApplicationAttemptId()).getState());
@ -737,7 +735,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// start RM
@ -757,7 +755,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
rm1.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.KILLED);
// killed state is saved.
ApplicationState appState = rmAppState.get(app0.getApplicationId());
ApplicationStateData appState = rmAppState.get(app0.getApplicationId());
Assert.assertEquals(RMAppState.KILLED, appState.getState());
Assert.assertEquals(RMAppAttemptState.KILLED,
appState.getAttempt(am0.getApplicationAttemptId()).getState());
@ -823,7 +821,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// start RM
@ -844,8 +842,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
finishApplicationMaster(app0, rm1, nm1, am0, req);
// check the state store about the unregistered info.
ApplicationState appState = rmAppState.get(app0.getApplicationId());
ApplicationAttemptState attemptState0 =
ApplicationStateData appState = rmAppState.get(app0.getApplicationId());
ApplicationAttemptStateData attemptState0 =
appState.getAttempt(am0.getApplicationAttemptId());
Assert.assertEquals("diagnostics", attemptState0.getDiagnostics());
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
@ -995,7 +993,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MockAM am, FinishApplicationMasterRequest req) throws Exception {
RMState rmState =
((MemoryRMStateStore) rm.getRMContext().getStateStore()).getState();
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
am.unregisterAppAttempt(req,true);
am.waitForState(RMAppAttemptState.FINISHING);
@ -1003,7 +1001,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
am.waitForState(RMAppAttemptState.FINISHED);
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
// check that app/attempt is saved with the final state
ApplicationState appState = rmAppState.get(rmApp.getApplicationId());
ApplicationStateData appState = rmAppState.get(rmApp.getApplicationId());
Assert
.assertEquals(RMAppState.FINISHED, appState.getState());
Assert.assertEquals(RMAppAttemptState.FINISHED,
@ -1019,7 +1017,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
@ -1037,7 +1035,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
null);
// assert app1 info is saved
ApplicationState appState = rmAppState.get(app1.getApplicationId());
ApplicationStateData appState = rmAppState.get(app1.getApplicationId());
Assert.assertNotNull(appState);
Assert.assertEquals(0, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
@ -1050,7 +1048,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
ApplicationAttemptId attemptId1 = attempt.getAppAttemptId();
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
Assert.assertEquals(1, appState.getAttemptCount());
ApplicationAttemptState attemptState =
ApplicationAttemptStateData attemptState =
appState.getAttempt(attemptId1);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
@ -1092,7 +1090,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start();
@ -1131,7 +1129,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
// assert app info is saved
ApplicationState appState = rmAppState.get(app.getApplicationId());
ApplicationStateData appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
// assert delegation tokens exist in rm1 DelegationTokenRenewr
@ -1187,7 +1185,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start();
@ -1201,7 +1199,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
new HashMap<ApplicationAccessType, String>(), "default");
// assert app info is saved
ApplicationState appState = rmAppState.get(app1.getApplicationId());
ApplicationStateData appState = rmAppState.get(app1.getApplicationId());
Assert.assertNotNull(appState);
// Allocate the AM
@ -1211,7 +1209,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
// assert attempt info is saved
ApplicationAttemptState attemptState = appState.getAttempt(attemptId1);
ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
@ -1222,7 +1220,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
attempt1.getClientTokenMasterKey().getEncoded();
// assert application credentials are saved
Credentials savedCredentials = attemptState.getAppAttemptCredentials();
Credentials savedCredentials = attemptState.getAppAttemptTokens();
Assert.assertArrayEquals("client token master key not saved",
clientTokenMasterKey, savedCredentials.getSecretKey(
RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
@ -1268,7 +1266,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
Map<RMDelegationTokenIdentifier, Long> rmDTState =
rmState.getRMDTSecretManagerState().getTokenState();
@ -1305,7 +1303,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
// assert app info is saved
ApplicationState appState = rmAppState.get(app.getApplicationId());
ApplicationStateData appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
// assert all master keys are saved
@ -1479,7 +1477,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// queue, and will be processed once rm.stop() is called.
// Nothing exist in state store before stop is called.
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
memStore.getState().getApplicationState();
Assert.assertTrue(rmAppState.size() == 0);
@ -1489,7 +1487,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// Assert app info is still saved even if stop is called with pending saving
// request on dispatcher.
for (RMApp app : appList) {
ApplicationState appState = rmAppState.get(app.getApplicationId());
ApplicationStateData appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
Assert.assertEquals(0, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
@ -1523,7 +1521,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// app0 exits in both state store and rmContext
@ -1658,10 +1656,15 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
memStore.init(conf);
for (int i = 10; i > 0; i--) {
ApplicationState appState = mock(ApplicationState.class);
when(appState.getAppId()).thenReturn(ApplicationId.newInstance(1234, i));
memStore.getState().getApplicationState()
.put(appState.getAppId(), appState);
ApplicationStateData appState = mock(ApplicationStateData.class);
ApplicationSubmissionContext context =
mock(ApplicationSubmissionContext.class);
when(appState.getApplicationSubmissionContext()).thenReturn(context);
when(context.getApplicationId()).thenReturn(
ApplicationId.newInstance(1234, i));
memStore.getState().getApplicationState().put(
appState.getApplicationSubmissionContext().getApplicationId(),
appState);
}
MockRM rm1 = new MockRM(conf, memStore) {
@ -1681,12 +1684,14 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
}
@Override
protected void recoverApplication(ApplicationState appState,
protected void recoverApplication(ApplicationStateData appState,
RMState rmState) throws Exception {
// check application is recovered in order.
Assert.assertTrue(rmState.getApplicationState().size() > 0);
Assert.assertTrue(appState.getAppId().compareTo(prevId) > 0);
prevId = appState.getAppId();
Assert.assertTrue(appState.getApplicationSubmissionContext()
.getApplicationId().compareTo(prevId) > 0);
prevId =
appState.getApplicationSubmissionContext().getApplicationId();
}
}
};
@ -2030,4 +2035,5 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// Do nothing.
}
}
}

View File

@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -386,7 +386,7 @@ public class TestAMRestart {
am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
ApplicationState appState =
ApplicationStateData appState =
memStore.getState().getApplicationState().get(app1.getApplicationId());
// AM should be restarted even though max-am-attempt is 1.
MockAM am2 =
@ -497,7 +497,7 @@ public class TestAMRestart {
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// state store has 1 attempt stored.
ApplicationState appState =
ApplicationStateData appState =
memStore.getState().getApplicationState().get(app1.getApplicationId());
Assert.assertEquals(1, appState.getAttemptCount());
// attempt stored has the preempted container exit status.
@ -555,7 +555,7 @@ public class TestAMRestart {
// Restart rm.
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
ApplicationState appState =
ApplicationStateData appState =
memStore.getState().getApplicationState().get(app1.getApplicationId());
// re-register the NM
nm1.setResourceTrackerService(rm2.getResourceTrackerService());

View File

@ -58,8 +58,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
@ -243,6 +243,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime);
when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context);
when(mockRemovedApp.getAppAttempts()).thenReturn(attempts);
when(mockRemovedApp.getUser()).thenReturn("user1");
RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class);
when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved);
when(mockRemovedAttempt.getRMAppAttemptMetrics())
@ -269,10 +270,10 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
store = stateStoreHelper.getRMStateStore();
store.setRMDispatcher(dispatcher);
RMState state = store.loadState();
Map<ApplicationId, ApplicationState> rmAppState =
Map<ApplicationId, ApplicationStateData> rmAppState =
state.getApplicationState();
ApplicationState appState = rmAppState.get(appId1);
ApplicationStateData appState = rmAppState.get(appId1);
// app is loaded
assertNotNull(appState);
// app is loaded correctly
@ -281,7 +282,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
// submission context is loaded correctly
assertEquals(appId1,
appState.getApplicationSubmissionContext().getApplicationId());
ApplicationAttemptState attemptState = appState.getAttempt(attemptId1);
ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1);
// attempt1 is loaded correctly
assertNotNull(attemptState);
assertEquals(attemptId1, attemptState.getAttemptId());
@ -289,9 +290,10 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
// attempt1 container is loaded correctly
assertEquals(containerId1, attemptState.getMasterContainer().getId());
// attempt1 client token master key is loaded correctly
assertArrayEquals(clientTokenKey1.getEncoded(),
attemptState.getAppAttemptCredentials()
.getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
assertArrayEquals(
clientTokenKey1.getEncoded(),
attemptState.getAppAttemptTokens()
.getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
attemptState = appState.getAttempt(attemptId2);
// attempt2 is loaded correctly
@ -300,27 +302,30 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
// attempt2 container is loaded correctly
assertEquals(containerId2, attemptState.getMasterContainer().getId());
// attempt2 client token master key is loaded correctly
assertArrayEquals(clientTokenKey2.getEncoded(),
attemptState.getAppAttemptCredentials()
.getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
assertArrayEquals(
clientTokenKey2.getEncoded(),
attemptState.getAppAttemptTokens()
.getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
//******* update application/attempt state *******//
ApplicationState appState2 =
new ApplicationState(appState.submitTime, appState.startTime,
appState.context, appState.user, RMAppState.FINISHED,
"appDiagnostics", 1234);
ApplicationStateData appState2 =
ApplicationStateData.newInstance(appState.getSubmitTime(),
appState.getStartTime(), appState.getUser(),
appState.getApplicationSubmissionContext(), RMAppState.FINISHED,
"appDiagnostics", 1234);
appState2.attempts.putAll(appState.attempts);
store.updateApplicationState(appState2);
ApplicationAttemptState oldAttemptState = attemptState;
ApplicationAttemptState newAttemptState =
new ApplicationAttemptState(oldAttemptState.getAttemptId(),
oldAttemptState.getMasterContainer(),
oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 100,
oldAttemptState.getFinishTime(), 0, 0);
ApplicationAttemptStateData oldAttemptState = attemptState;
ApplicationAttemptStateData newAttemptState =
ApplicationAttemptStateData.newInstance(
oldAttemptState.getAttemptId(),
oldAttemptState.getMasterContainer(),
oldAttemptState.getAppAttemptTokens(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 100,
oldAttemptState.getFinishTime(), 0, 0);
store.updateApplicationAttemptState(newAttemptState);
// test updating the state of an app/attempt whose initial state was not
@ -329,22 +334,22 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
ApplicationSubmissionContext dummyContext =
new ApplicationSubmissionContextPBImpl();
dummyContext.setApplicationId(dummyAppId);
ApplicationState dummyApp =
new ApplicationState(appState.submitTime, appState.startTime,
dummyContext, appState.user, RMAppState.FINISHED, "appDiagnostics",
1234);
ApplicationStateData dummyApp =
ApplicationStateData.newInstance(appState.getSubmitTime(),
appState.getStartTime(), appState.getUser(), dummyContext,
RMAppState.FINISHED, "appDiagnostics", 1234);
store.updateApplicationState(dummyApp);
ApplicationAttemptId dummyAttemptId =
ApplicationAttemptId.newInstance(dummyAppId, 6);
ApplicationAttemptState dummyAttempt =
new ApplicationAttemptState(dummyAttemptId,
oldAttemptState.getMasterContainer(),
oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 111,
oldAttemptState.getFinishTime(), 0, 0);
ApplicationAttemptStateData dummyAttempt =
ApplicationAttemptStateData.newInstance(dummyAttemptId,
oldAttemptState.getMasterContainer(),
oldAttemptState.getAppAttemptTokens(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED, 111,
oldAttemptState.getFinishTime(), 0, 0);
store.updateApplicationAttemptState(dummyAttempt);
// let things settle down
@ -355,11 +360,13 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
store = stateStoreHelper.getRMStateStore();
store.setRMDispatcher(dispatcher);
RMState newRMState = store.loadState();
Map<ApplicationId, ApplicationState> newRMAppState =
Map<ApplicationId, ApplicationStateData> newRMAppState =
newRMState.getApplicationState();
assertNotNull(newRMAppState.get(dummyApp.getAppId()));
ApplicationState updatedAppState = newRMAppState.get(appId1);
assertEquals(appState.getAppId(),updatedAppState.getAppId());
assertNotNull(newRMAppState.get(
dummyApp.getApplicationSubmissionContext().getApplicationId()));
ApplicationStateData updatedAppState = newRMAppState.get(appId1);
assertEquals(appState.getApplicationSubmissionContext().getApplicationId(),
updatedAppState.getApplicationSubmissionContext().getApplicationId());
assertEquals(appState.getSubmitTime(), updatedAppState.getSubmitTime());
assertEquals(appState.getStartTime(), updatedAppState.getStartTime());
assertEquals(appState.getUser(), updatedAppState.getUser());
@ -369,16 +376,17 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
assertEquals(1234, updatedAppState.getFinishTime());
// check updated attempt state
assertNotNull(newRMAppState.get(dummyApp.getAppId()).getAttempt(
dummyAttemptId));
ApplicationAttemptState updatedAttemptState =
assertNotNull(newRMAppState.get(dummyApp.getApplicationSubmissionContext
().getApplicationId()).getAttempt(dummyAttemptId));
ApplicationAttemptStateData updatedAttemptState =
updatedAppState.getAttempt(newAttemptState.getAttemptId());
assertEquals(oldAttemptState.getAttemptId(),
updatedAttemptState.getAttemptId());
assertEquals(containerId2, updatedAttemptState.getMasterContainer().getId());
assertArrayEquals(clientTokenKey2.getEncoded(),
updatedAttemptState.getAppAttemptCredentials().getSecretKey(
RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
assertArrayEquals(
clientTokenKey2.getEncoded(),
attemptState.getAppAttemptTokens()
.getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
// new attempt state fields
assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState());
assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl());

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doReturn;
@ -62,7 +63,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -326,11 +326,13 @@ public class TestRMAppTransitions {
}
private void assertAppFinalStateSaved(RMApp application){
verify(store, times(1)).updateApplicationState(any(ApplicationState.class));
verify(store, times(1)).updateApplicationState(
any(ApplicationStateData.class));
}
private void assertAppFinalStateNotSaved(RMApp application){
verify(store, times(0)).updateApplicationState(any(ApplicationState.class));
verify(store, times(0)).updateApplicationState(
any(ApplicationStateData.class));
}
private void assertKilled(RMApp application) {
@ -395,11 +397,13 @@ public class TestRMAppTransitions {
RMApp application = createNewTestApp(submissionContext);
// NEW => SUBMITTED event RMAppEventType.RECOVER
RMState state = new RMState();
ApplicationState appState = new ApplicationState(123, 123, null, "user");
ApplicationStateData appState =
ApplicationStateData.newInstance(123, 123, null, "user");
state.getApplicationState().put(application.getApplicationId(), appState);
RMAppEvent event =
new RMAppRecoverEvent(application.getApplicationId(), state);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.SUBMITTED, application);
@ -946,22 +950,25 @@ public class TestRMAppTransitions {
@Test(timeout = 30000)
public void testAppsRecoveringStates() throws Exception {
RMState state = new RMState();
Map<ApplicationId, ApplicationState> applicationState =
Map<ApplicationId, ApplicationStateData> applicationState =
state.getApplicationState();
createRMStateForApplications(applicationState, RMAppState.FINISHED);
createRMStateForApplications(applicationState, RMAppState.KILLED);
createRMStateForApplications(applicationState, RMAppState.FAILED);
for (ApplicationState appState : applicationState.values()) {
for (ApplicationStateData appState : applicationState.values()) {
testRecoverApplication(appState, state);
}
}
public void testRecoverApplication(ApplicationState appState, RMState rmState)
public void testRecoverApplication(ApplicationStateData appState,
RMState rmState)
throws Exception {
ApplicationSubmissionContext submissionContext =
appState.getApplicationSubmissionContext();
RMAppImpl application =
new RMAppImpl(appState.getAppId(), rmContext, conf,
new RMAppImpl(
appState.getApplicationSubmissionContext().getApplicationId(),
rmContext, conf,
submissionContext.getApplicationName(), null,
submissionContext.getQueue(), submissionContext, null, null,
appState.getSubmitTime(), submissionContext.getApplicationType(),
@ -986,12 +993,12 @@ public class TestRMAppTransitions {
}
public void createRMStateForApplications(
Map<ApplicationId, ApplicationState> applicationState,
Map<ApplicationId, ApplicationStateData> applicationState,
RMAppState rmAppState) {
RMApp app = createNewTestApp(null);
ApplicationState appState =
new ApplicationState(app.getSubmitTime(), app.getStartTime(),
app.getApplicationSubmissionContext(), app.getUser(), rmAppState,
ApplicationStateData appState =
ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(),
app.getUser(), app.getApplicationSubmissionContext(), rmAppState,
null, app.getFinishTime());
applicationState.put(app.getApplicationId(), appState);
}

View File

@ -81,7 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventT
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -1520,7 +1520,7 @@ public class TestRMAppAttemptTransitions {
private void verifyAttemptFinalStateSaved() {
verify(store, times(1)).updateApplicationAttemptState(
any(ApplicationAttemptState.class));
any(ApplicationAttemptStateData.class));
}
private void verifyAMHostAndPortInvalidated() {