diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 511bb6e8aad..be8fbed5bd1 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -65,6 +65,9 @@ Release 2.7.0 - UNRELEASED YARN-2669. FairScheduler: queue names shouldn't allow periods (Wei Yan via Sandy Ryza) + YARN-2404. Removed ApplicationAttemptState and ApplicationState class in + RMStateStore. (Tsuyoshi OZAWA via jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index ab8df62947c..f38e128cfe2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -40,9 +40,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -306,11 +306,11 @@ public class RMAppManager implements EventHandler, } } - protected void recoverApplication(ApplicationState appState, RMState rmState) - throws Exception { + protected void recoverApplication(ApplicationStateData appState, + RMState rmState) throws Exception { ApplicationSubmissionContext appContext = appState.getApplicationSubmissionContext(); - ApplicationId appId = appState.getAppId(); + ApplicationId appId = appContext.getApplicationId(); // create and recover app. RMAppImpl application = @@ -414,9 +414,10 @@ public class RMAppManager implements EventHandler, RMStateStore store = rmContext.getStateStore(); assert store != null; // recover applications - Map appStates = state.getApplicationState(); + Map appStates = + state.getApplicationState(); LOG.info("Recovering " + appStates.size() + " applications"); - for (ApplicationState appState : appStates.values()) { + for (ApplicationStateData appState : appStates.values()) { recoverApplication(appState, state); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 2bbc5c2d2f4..299639225e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -38,9 +38,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -223,8 +221,8 @@ public class FileSystemRMStateStore extends RMStateStore { private void loadRMAppState(RMState rmState) throws Exception { try { - List attempts = - new ArrayList(); + List attempts = + new ArrayList(); for (FileStatus appDir : fs.listStatus(rmAppRoot)) { checkAndResumeUpdateOperation(appDir.getPath()); @@ -241,19 +239,11 @@ public class FileSystemRMStateStore extends RMStateStore { if (LOG.isDebugEnabled()) { LOG.debug("Loading application from node: " + childNodeName); } - ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); - ApplicationStateDataPBImpl appStateData = + ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl( ApplicationStateDataProto.parseFrom(childData)); - ApplicationState appState = - new ApplicationState(appStateData.getSubmitTime(), - appStateData.getStartTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser(), - appStateData.getState(), - appStateData.getDiagnostics(), appStateData.getFinishTime()); - // assert child node name is same as actual applicationId - assert appId.equals(appState.context.getApplicationId()); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); rmState.appState.put(appId, appState); } else if (childNodeName .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { @@ -262,33 +252,9 @@ public class FileSystemRMStateStore extends RMStateStore { LOG.debug("Loading application attempt from node: " + childNodeName); } - ApplicationAttemptId attemptId = - ConverterUtils.toApplicationAttemptId(childNodeName); - ApplicationAttemptStateDataPBImpl attemptStateData = + ApplicationAttemptStateDataPBImpl attemptState = new ApplicationAttemptStateDataPBImpl( ApplicationAttemptStateDataProto.parseFrom(childData)); - Credentials credentials = null; - if (attemptStateData.getAppAttemptTokens() != null) { - credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - ApplicationAttemptState attemptState = - new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), - attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), - attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus(), - attemptStateData.getAMContainerExitStatus(), - attemptStateData.getFinishTime(), - attemptStateData.getMemorySeconds(), - attemptStateData.getVcoreSeconds()); - - // assert child node name is same as application attempt id - assert attemptId.equals(attemptState.getAttemptId()); attempts.add(attemptState); } else { LOG.info("Unknown child node with name: " + childNodeName); @@ -299,9 +265,9 @@ public class FileSystemRMStateStore extends RMStateStore { // go through all attempts and add them to their apps, Ideally, each // attempt node must have a corresponding app node, because remove // directory operation remove both at the same time - for (ApplicationAttemptState attemptState : attempts) { + for (ApplicationAttemptStateData attemptState : attempts) { ApplicationId appId = attemptState.getAttemptId().getApplicationId(); - ApplicationState appState = rmState.appState.get(appId); + ApplicationStateData appState = rmState.appState.get(appId); assert appState != null; appState.attempts.put(attemptState.getAttemptId(), attemptState); } @@ -398,10 +364,9 @@ public class FileSystemRMStateStore extends RMStateStore { @Override public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { - String appIdStr = appId.toString(); - Path appDirPath = getAppDir(rmAppRoot, appIdStr); + Path appDirPath = getAppDir(rmAppRoot, appId); fs.mkdirs(appDirPath); - Path nodeCreatePath = getNodePath(appDirPath, appIdStr); + Path nodeCreatePath = getNodePath(appDirPath, appId.toString()); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); byte[] appStateData = appStateDataPB.getProto().toByteArray(); @@ -418,9 +383,8 @@ public class FileSystemRMStateStore extends RMStateStore { @Override public synchronized void updateApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { - String appIdStr = appId.toString(); - Path appDirPath = getAppDir(rmAppRoot, appIdStr); - Path nodeCreatePath = getNodePath(appDirPath, appIdStr); + Path appDirPath = getAppDir(rmAppRoot, appId); + Path nodeCreatePath = getNodePath(appDirPath, appId.toString()); LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath); byte[] appStateData = appStateDataPB.getProto().toByteArray(); @@ -440,7 +404,7 @@ public class FileSystemRMStateStore extends RMStateStore { ApplicationAttemptStateData attemptStateDataPB) throws Exception { Path appDirPath = - getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); + getAppDir(rmAppRoot, appAttemptId.getApplicationId()); Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); LOG.info("Storing info for attempt: " + appAttemptId + " at: " + nodeCreatePath); @@ -461,7 +425,7 @@ public class FileSystemRMStateStore extends RMStateStore { ApplicationAttemptStateData attemptStateDataPB) throws Exception { Path appDirPath = - getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); + getAppDir(rmAppRoot, appAttemptId.getApplicationId()); Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); LOG.info("Updating info for attempt: " + appAttemptId + " at: " + nodeCreatePath); @@ -477,9 +441,11 @@ public class FileSystemRMStateStore extends RMStateStore { } @Override - public synchronized void removeApplicationStateInternal(ApplicationState appState) + public synchronized void removeApplicationStateInternal( + ApplicationStateData appState) throws Exception { - String appId = appState.getAppId().toString(); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); Path nodeRemovePath = getAppDir(rmAppRoot, appId); LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); deleteFile(nodeRemovePath); @@ -572,8 +538,8 @@ public class FileSystemRMStateStore extends RMStateStore { } } - private Path getAppDir(Path root, String appId) { - return getNodePath(root, appId); + private Path getAppDir(Path root, ApplicationId appId) { + return getNodePath(root, appId.toString()); } // FileSystem related code diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index a67da2cf245..917fdc13a38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -25,8 +25,6 @@ import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -93,57 +91,30 @@ public class MemoryRMStateStore extends RMStateStore { } @Override - public void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateData) + public void storeApplicationStateInternal( + ApplicationId appId, ApplicationStateData appState) throws Exception { - ApplicationState appState = - new ApplicationState(appStateData.getSubmitTime(), - appStateData.getStartTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser()); state.appState.put(appId, appState); } @Override public void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateData appStateData) throws Exception { - ApplicationState updatedAppState = - new ApplicationState(appStateData.getSubmitTime(), - appStateData.getStartTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser(), appStateData.getState(), - appStateData.getDiagnostics(), appStateData.getFinishTime()); - LOG.info("Updating final state " + appStateData.getState() + " for app: " + ApplicationStateData appState) throws Exception { + LOG.info("Updating final state " + appState.getState() + " for app: " + appId); if (state.appState.get(appId) != null) { // add the earlier attempts back - updatedAppState.attempts - .putAll(state.appState.get(appId).attempts); + appState.attempts.putAll(state.appState.get(appId).attempts); } - state.appState.put(appId, updatedAppState); + state.appState.put(appId, appState); } @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateData attemptStateData) + ApplicationAttemptStateData attemptState) throws Exception { - Credentials credentials = null; - if(attemptStateData.getAppAttemptTokens() != null){ - DataInputByteBuffer dibb = new DataInputByteBuffer(); - credentials = new Credentials(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - ApplicationAttemptState attemptState = - new ApplicationAttemptState(appAttemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), - attemptStateData.getMemorySeconds(), - attemptStateData.getVcoreSeconds()); - - - ApplicationState appState = state.getApplicationState().get( + ApplicationStateData appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); if (appState == null) { throw new YarnRuntimeException("Application doesn't exist"); @@ -154,44 +125,25 @@ public class MemoryRMStateStore extends RMStateStore { @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateData attemptStateData) + ApplicationAttemptStateData attemptState) throws Exception { - Credentials credentials = null; - if (attemptStateData.getAppAttemptTokens() != null) { - DataInputByteBuffer dibb = new DataInputByteBuffer(); - credentials = new Credentials(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - ApplicationAttemptState updatedAttemptState = - new ApplicationAttemptState(appAttemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), - attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus(), - attemptStateData.getAMContainerExitStatus(), - attemptStateData.getFinishTime(), - attemptStateData.getMemorySeconds(), - attemptStateData.getVcoreSeconds()); - - ApplicationState appState = - state.getApplicationState().get( - updatedAttemptState.getAttemptId().getApplicationId()); + ApplicationStateData appState = + state.getApplicationState().get(appAttemptId.getApplicationId()); if (appState == null) { throw new YarnRuntimeException("Application doesn't exist"); } - LOG.info("Updating final state " + updatedAttemptState.getState() - + " for attempt: " + updatedAttemptState.getAttemptId()); - appState.attempts.put(updatedAttemptState.getAttemptId(), - updatedAttemptState); + LOG.info("Updating final state " + attemptState.getState() + + " for attempt: " + attemptState.getAttemptId()); + appState.attempts.put(attemptState.getAttemptId(), attemptState); } @Override public synchronized void removeApplicationStateInternal( - ApplicationState appState) throws Exception { - ApplicationId appId = appState.getAppId(); - ApplicationState removed = state.appState.remove(appId); + ApplicationStateData appState) throws Exception { + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); + ApplicationStateData removed = state.appState.remove(appId); + if (removed == null) { throw new YarnRuntimeException("Removing non-exsisting application state"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index b957d12f564..f80c497e80a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -71,7 +71,7 @@ public class NullRMStateStore extends RMStateStore { } @Override - protected void removeApplicationStateInternal(ApplicationState appState) + protected void removeApplicationStateInternal(ApplicationStateData appState) throws Exception { // Do nothing } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 8948b54ed62..35a54c3ab6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -38,9 +38,6 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -56,12 +53,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Applicatio import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; @@ -129,13 +124,13 @@ public abstract class RMStateStore extends AbstractService { LOG.error("Illegal event type: " + event.getClass()); return; } - ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState(); - ApplicationId appId = appState.getAppId(); - ApplicationStateData appStateData = ApplicationStateData - .newInstance(appState); + ApplicationStateData appState = + ((RMStateStoreAppEvent) event).getAppState(); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Storing info for app: " + appId); try { - store.storeApplicationStateInternal(appId, appStateData); + store.storeApplicationStateInternal(appId, appState); store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { @@ -154,13 +149,13 @@ public abstract class RMStateStore extends AbstractService { LOG.error("Illegal event type: " + event.getClass()); return; } - ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState(); - ApplicationId appId = appState.getAppId(); - ApplicationStateData appStateData = ApplicationStateData - .newInstance(appState); + ApplicationStateData appState = + ((RMStateUpdateAppEvent) event).getAppState(); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Updating info for app: " + appId); try { - store.updateApplicationStateInternal(appId, appStateData); + store.updateApplicationStateInternal(appId, appState); store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED)); } catch (Exception e) { @@ -179,9 +174,10 @@ public abstract class RMStateStore extends AbstractService { LOG.error("Illegal event type: " + event.getClass()); return; } - ApplicationState appState = ((RMStateStoreRemoveAppEvent) event) - .getAppState(); - ApplicationId appId = appState.getAppId(); + ApplicationStateData appState = + ((RMStateStoreRemoveAppEvent) event).getAppState(); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Removing info for app: " + appId); try { store.removeApplicationStateInternal(appState); @@ -201,16 +197,14 @@ public abstract class RMStateStore extends AbstractService { LOG.error("Illegal event type: " + event.getClass()); return; } - ApplicationAttemptState attemptState = + ApplicationAttemptStateData attemptState = ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); try { - ApplicationAttemptStateData attemptStateData = - ApplicationAttemptStateData.newInstance(attemptState); if (LOG.isDebugEnabled()) { LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); } store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); + attemptState); store.notifyApplicationAttempt(new RMAppAttemptEvent (attemptState.getAttemptId(), RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); @@ -230,16 +224,14 @@ public abstract class RMStateStore extends AbstractService { LOG.error("Illegal event type: " + event.getClass()); return; } - ApplicationAttemptState attemptState = + ApplicationAttemptStateData attemptState = ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); try { - ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData - .newInstance(attemptState); if (LOG.isDebugEnabled()) { LOG.debug("Updating info for attempt: " + attemptState.getAttemptId()); } store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); + attemptState); store.notifyApplicationAttempt(new RMAppAttemptEvent (attemptState.getAttemptId(), RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); @@ -255,153 +247,6 @@ public abstract class RMStateStore extends AbstractService { stateMachine = stateMachineFactory.make(this); } - /** - * State of an application attempt - */ - public static class ApplicationAttemptState { - final ApplicationAttemptId attemptId; - final Container masterContainer; - final Credentials appAttemptCredentials; - long startTime = 0; - long finishTime = 0; - // fields set when attempt completes - RMAppAttemptState state; - String finalTrackingUrl = "N/A"; - String diagnostics; - int exitStatus = ContainerExitStatus.INVALID; - FinalApplicationStatus amUnregisteredFinalStatus; - long memorySeconds; - long vcoreSeconds; - - public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer, Credentials appAttemptCredentials, - long startTime, long memorySeconds, long vcoreSeconds) { - this(attemptId, masterContainer, appAttemptCredentials, startTime, null, - null, "", null, ContainerExitStatus.INVALID, 0, memorySeconds, vcoreSeconds); - } - - public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer, Credentials appAttemptCredentials, - long startTime, RMAppAttemptState state, String finalTrackingUrl, - String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, - int exitStatus, long finishTime, long memorySeconds, - long vcoreSeconds) { - this.attemptId = attemptId; - this.masterContainer = masterContainer; - this.appAttemptCredentials = appAttemptCredentials; - this.startTime = startTime; - this.state = state; - this.finalTrackingUrl = finalTrackingUrl; - this.diagnostics = diagnostics == null ? "" : diagnostics; - this.amUnregisteredFinalStatus = amUnregisteredFinalStatus; - this.exitStatus = exitStatus; - this.finishTime = finishTime; - this.memorySeconds = memorySeconds; - this.vcoreSeconds = vcoreSeconds; - } - - public Container getMasterContainer() { - return masterContainer; - } - public ApplicationAttemptId getAttemptId() { - return attemptId; - } - public Credentials getAppAttemptCredentials() { - return appAttemptCredentials; - } - public RMAppAttemptState getState(){ - return state; - } - public String getFinalTrackingUrl() { - return finalTrackingUrl; - } - public String getDiagnostics() { - return diagnostics; - } - public long getStartTime() { - return startTime; - } - public FinalApplicationStatus getFinalApplicationStatus() { - return amUnregisteredFinalStatus; - } - public int getAMContainerExitStatus(){ - return this.exitStatus; - } - public long getMemorySeconds() { - return memorySeconds; - } - public long getVcoreSeconds() { - return vcoreSeconds; - } - public long getFinishTime() { - return this.finishTime; - } - } - - /** - * State of an application application - */ - public static class ApplicationState { - final ApplicationSubmissionContext context; - final long submitTime; - final long startTime; - final String user; - Map attempts = - new HashMap(); - // fields set when application completes. - RMAppState state; - String diagnostics; - long finishTime; - - public ApplicationState(long submitTime, - long startTime, ApplicationSubmissionContext context, String user) { - this(submitTime, startTime, context, user, null, "", 0); - } - - public ApplicationState(long submitTime, - long startTime,ApplicationSubmissionContext context, - String user, RMAppState state, String diagnostics, long finishTime) { - this.submitTime = submitTime; - this.startTime = startTime; - this.context = context; - this.user = user; - this.state = state; - this.diagnostics = diagnostics == null ? "" : diagnostics; - this.finishTime = finishTime; - } - - public ApplicationId getAppId() { - return context.getApplicationId(); - } - public long getSubmitTime() { - return submitTime; - } - public long getStartTime() { - return startTime; - } - public int getAttemptCount() { - return attempts.size(); - } - public ApplicationSubmissionContext getApplicationSubmissionContext() { - return context; - } - public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) { - return attempts.get(attemptId); - } - public String getUser() { - return user; - } - public RMAppState getState() { - return state; - } - public String getDiagnostics() { - return diagnostics; - } - public long getFinishTime() { - return finishTime; - } - } - public static class RMDTSecretManagerState { // DTIdentifier -> renewDate Map delegationTokenState = @@ -429,14 +274,14 @@ public abstract class RMStateStore extends AbstractService { * State of the ResourceManager */ public static class RMState { - Map appState = - new TreeMap(); + Map appState = + new TreeMap(); RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState(); AMRMTokenSecretManagerState amrmTokenSecretManagerState = null; - public Map getApplicationState() { + public Map getApplicationState() { return appState; } @@ -575,14 +420,15 @@ public abstract class RMStateStore extends AbstractService { ApplicationSubmissionContext context = app .getApplicationSubmissionContext(); assert context instanceof ApplicationSubmissionContextPBImpl; - ApplicationState appState = - new ApplicationState(app.getSubmitTime(), app.getStartTime(), context, - app.getUser()); + ApplicationStateData appState = + ApplicationStateData.newInstance( + app.getSubmitTime(), app.getStartTime(), context, app.getUser()); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } @SuppressWarnings("unchecked") - public synchronized void updateApplicationState(ApplicationState appState) { + public synchronized void updateApplicationState( + ApplicationStateData appState) { dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); } @@ -609,11 +455,13 @@ public abstract class RMStateStore extends AbstractService { AggregateAppResourceUsage resUsage = appAttempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage(); - ApplicationAttemptState attemptState = - new ApplicationAttemptState(appAttempt.getAppAttemptId(), - appAttempt.getMasterContainer(), credentials, - appAttempt.getStartTime(), resUsage.getMemorySeconds(), - resUsage.getVcoreSeconds()); + ApplicationAttemptStateData attemptState = + ApplicationAttemptStateData.newInstance( + appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), + credentials, appAttempt.getStartTime(), + resUsage.getMemorySeconds(), + resUsage.getVcoreSeconds()); dispatcher.getEventHandler().handle( new RMStateStoreAppAttemptEvent(attemptState)); @@ -621,7 +469,7 @@ public abstract class RMStateStore extends AbstractService { @SuppressWarnings("unchecked") public synchronized void updateApplicationAttemptState( - ApplicationAttemptState attemptState) { + ApplicationAttemptStateData attemptState) { dispatcher.getEventHandler().handle( new RMStateUpdateAppAttemptEvent(attemptState)); } @@ -761,16 +609,12 @@ public abstract class RMStateStore extends AbstractService { */ @SuppressWarnings("unchecked") public synchronized void removeApplication(RMApp app) { - ApplicationState appState = new ApplicationState( + ApplicationStateData appState = + ApplicationStateData.newInstance( app.getSubmitTime(), app.getStartTime(), app.getApplicationSubmissionContext(), app.getUser()); for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { - Credentials credentials = getCredentialsFromAppAttempt(appAttempt); - ApplicationAttemptState attemptState = - new ApplicationAttemptState(appAttempt.getAppAttemptId(), - appAttempt.getMasterContainer(), credentials, - appAttempt.getStartTime(), 0, 0); - appState.attempts.put(attemptState.getAttemptId(), attemptState); + appState.attempts.put(appAttempt.getAppAttemptId(), null); } dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState)); @@ -782,7 +626,7 @@ public abstract class RMStateStore extends AbstractService { * application and its attempts */ protected abstract void removeApplicationStateInternal( - ApplicationState appState) throws Exception; + ApplicationStateData appState) throws Exception; // TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See // YARN-1779 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java index c4a04bc5771..3399431cf04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java @@ -18,17 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; public class RMStateStoreAppAttemptEvent extends RMStateStoreEvent { - ApplicationAttemptState attemptState; + ApplicationAttemptStateData attemptState; - public RMStateStoreAppAttemptEvent(ApplicationAttemptState attemptState) { + public RMStateStoreAppAttemptEvent(ApplicationAttemptStateData attemptState) { super(RMStateStoreEventType.STORE_APP_ATTEMPT); this.attemptState = attemptState; } - public ApplicationAttemptState getAppAttemptState() { + public ApplicationAttemptStateData getAppAttemptState() { return attemptState; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java index 99f8e37891a..50e59f72610 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java @@ -18,18 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; public class RMStateStoreAppEvent extends RMStateStoreEvent { - private final ApplicationState appState; + private final ApplicationStateData appState; - public RMStateStoreAppEvent(ApplicationState appState) { + public RMStateStoreAppEvent(ApplicationStateData appState) { super(RMStateStoreEventType.STORE_APP); this.appState = appState; } - public ApplicationState getAppState() { + public ApplicationStateData getAppState() { return appState; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java index 402feb96ec9..fbba64c8783 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java @@ -18,17 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent { - ApplicationState appState; + ApplicationStateData appState; - RMStateStoreRemoveAppEvent(ApplicationState appState) { + RMStateStoreRemoveAppEvent(ApplicationStateData appState) { super(RMStateStoreEventType.REMOVE_APP); this.appState = appState; } - public ApplicationState getAppState() { + public ApplicationStateData getAppState() { return appState; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java index 9ded6732a2e..14f8e9d47fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java @@ -18,18 +18,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; public class RMStateUpdateAppAttemptEvent extends RMStateStoreEvent { - ApplicationAttemptState attemptState; + ApplicationAttemptStateData attemptState; - public RMStateUpdateAppAttemptEvent(ApplicationAttemptState attemptState) { + public RMStateUpdateAppAttemptEvent( + ApplicationAttemptStateData attemptState) { super(RMStateStoreEventType.UPDATE_APP_ATTEMPT); this.attemptState = attemptState; } - public ApplicationAttemptState getAppAttemptState() { + public ApplicationAttemptStateData getAppAttemptState() { return attemptState; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java index 9bb96e57256..cec364c138a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java @@ -18,17 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; public class RMStateUpdateAppEvent extends RMStateStoreEvent { - private final ApplicationState appState; + private final ApplicationStateData appState; - public RMStateUpdateAppEvent(ApplicationState appState) { + public RMStateUpdateAppEvent(ApplicationStateData appState) { super(RMStateStoreEventType.UPDATE_APP); this.appState = appState; } - public ApplicationState getAppState() { + public ApplicationStateData getAppState() { return appState; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index ab048cabc0d..a19ed30ecfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -34,8 +34,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ZKUtil; @@ -562,17 +560,11 @@ public class ZKRMStateStore extends RMStateStore { LOG.debug("Loading application from znode: " + childNodeName); } ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); - ApplicationStateDataPBImpl appStateData = + ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl( ApplicationStateDataProto.parseFrom(childData)); - ApplicationState appState = - new ApplicationState(appStateData.getSubmitTime(), - appStateData.getStartTime(), - appStateData.getApplicationSubmissionContext(), - appStateData.getUser(), - appStateData.getState(), - appStateData.getDiagnostics(), appStateData.getFinishTime()); - if (!appId.equals(appState.context.getApplicationId())) { + if (!appId.equals( + appState.getApplicationSubmissionContext().getApplicationId())) { throw new YarnRuntimeException("The child node name is different " + "from the application id"); } @@ -584,7 +576,7 @@ public class ZKRMStateStore extends RMStateStore { } } - private void loadApplicationAttemptState(ApplicationState appState, + private void loadApplicationAttemptState(ApplicationStateData appState, ApplicationId appId) throws Exception { String appPath = getNodePath(rmAppRoot, appId.toString()); @@ -594,31 +586,9 @@ public class ZKRMStateStore extends RMStateStore { String attemptPath = getNodePath(appPath, attemptIDStr); byte[] attemptData = getDataWithRetries(attemptPath, true); - ApplicationAttemptId attemptId = - ConverterUtils.toApplicationAttemptId(attemptIDStr); - ApplicationAttemptStateDataPBImpl attemptStateData = + ApplicationAttemptStateDataPBImpl attemptState = new ApplicationAttemptStateDataPBImpl( ApplicationAttemptStateDataProto.parseFrom(attemptData)); - Credentials credentials = null; - if (attemptStateData.getAppAttemptTokens() != null) { - credentials = new Credentials(); - DataInputByteBuffer dibb = new DataInputByteBuffer(); - dibb.reset(attemptStateData.getAppAttemptTokens()); - credentials.readTokenStorageStream(dibb); - } - - ApplicationAttemptState attemptState = - new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), - attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus(), - attemptStateData.getAMContainerExitStatus(), - attemptStateData.getFinishTime(), - attemptStateData.getMemorySeconds(), - attemptStateData.getVcoreSeconds()); - appState.attempts.put(attemptState.getAttemptId(), attemptState); } @@ -705,9 +675,11 @@ public class ZKRMStateStore extends RMStateStore { } @Override - public synchronized void removeApplicationStateInternal(ApplicationState appState) + public synchronized void removeApplicationStateInternal( + ApplicationStateData appState) throws Exception { - String appId = appState.getAppId().toString(); + String appId = appState.getApplicationSubmissionContext().getApplicationId() + .toString(); String appIdRemovePath = getNodePath(rmAppRoot, appId); ArrayList opList = new ArrayList(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 63ef8f635c9..391783b9712 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -18,18 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; -import java.io.IOException; -import java.nio.ByteBuffer; - import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; @@ -41,7 +37,7 @@ import org.apache.hadoop.yarn.util.Records; public abstract class ApplicationAttemptStateData { public static ApplicationAttemptStateData newInstance( ApplicationAttemptId attemptId, Container container, - ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, + Credentials attemptTokens, long startTime, RMAppAttemptState finalState, String finalTrackingUrl, String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus, long finishTime, long memorySeconds, long vcoreSeconds) { @@ -52,7 +48,7 @@ public abstract class ApplicationAttemptStateData { attemptStateData.setAppAttemptTokens(attemptTokens); attemptStateData.setState(finalState); attemptStateData.setFinalTrackingUrl(finalTrackingUrl); - attemptStateData.setDiagnostics(diagnostics); + attemptStateData.setDiagnostics(diagnostics == null ? "" : diagnostics); attemptStateData.setStartTime(startTime); attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); attemptStateData.setAMContainerExitStatus(exitStatus); @@ -63,22 +59,14 @@ public abstract class ApplicationAttemptStateData { } public static ApplicationAttemptStateData newInstance( - ApplicationAttemptState attemptState) throws IOException { - Credentials credentials = attemptState.getAppAttemptCredentials(); - ByteBuffer appAttemptTokens = null; - if (credentials != null) { - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + ApplicationAttemptId attemptId, Container masterContainer, + Credentials attemptTokens, long startTime, long memorySeconds, + long vcoreSeconds) { + return newInstance(attemptId, masterContainer, attemptTokens, + startTime, null, "N/A", "", null, ContainerExitStatus.INVALID, 0, + memorySeconds, vcoreSeconds); } - return newInstance(attemptState.getAttemptId(), - attemptState.getMasterContainer(), appAttemptTokens, - attemptState.getStartTime(), attemptState.getState(), - attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(), - attemptState.getFinalApplicationStatus(), - attemptState.getAMContainerExitStatus(), attemptState.getFinishTime(), - attemptState.getMemorySeconds(), attemptState.getVcoreSeconds()); - } + public abstract ApplicationAttemptStateDataProto getProto(); @@ -108,9 +96,9 @@ public abstract class ApplicationAttemptStateData { */ @Public @Unstable - public abstract ByteBuffer getAppAttemptTokens(); + public abstract Credentials getAppAttemptTokens(); - public abstract void setAppAttemptTokens(ByteBuffer attemptTokens); + public abstract void setAppAttemptTokens(Credentials attemptTokens); /** * Get the final state of the application attempt. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index eff0445155d..43046a96a98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -18,14 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.Records; @@ -36,6 +38,9 @@ import org.apache.hadoop.yarn.util.Records; @Public @Unstable public abstract class ApplicationStateData { + public Map attempts = + new HashMap(); + public static ApplicationStateData newInstance(long submitTime, long startTime, String user, ApplicationSubmissionContext submissionContext, @@ -51,12 +56,18 @@ public abstract class ApplicationStateData { return appState; } - public static ApplicationStateData newInstance( - ApplicationState appState) { - return newInstance(appState.getSubmitTime(), appState.getStartTime(), - appState.getUser(), appState.getApplicationSubmissionContext(), - appState.getState(), appState.getDiagnostics(), - appState.getFinishTime()); + public static ApplicationStateData newInstance(long submitTime, + long startTime, ApplicationSubmissionContext context, String user) { + return newInstance(submitTime, startTime, user, context, null, "", 0); + } + + public int getAttemptCount() { + return attempts.size(); + } + + public ApplicationAttemptStateData getAttempt( + ApplicationAttemptId attemptId) { + return attempts.get(attemptId); } public abstract ApplicationStateDataProto getProto(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java index 516af2d7785..bae3f9c8001 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -18,8 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; +import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -37,6 +44,8 @@ import com.google.protobuf.TextFormat; public class ApplicationAttemptStateDataPBImpl extends ApplicationAttemptStateData { + private static Log LOG = + LogFactory.getLog(ApplicationAttemptStateDataPBImpl.class); ApplicationAttemptStateDataProto proto = ApplicationAttemptStateDataProto.getDefaultInstance(); ApplicationAttemptStateDataProto.Builder builder = null; @@ -137,26 +146,27 @@ public class ApplicationAttemptStateDataPBImpl extends } @Override - public ByteBuffer getAppAttemptTokens() { + public Credentials getAppAttemptTokens() { ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; if(appAttemptTokens != null) { - return appAttemptTokens; + return convertCredentialsFromByteBuffer(appAttemptTokens); } if(!p.hasAppAttemptTokens()) { return null; } this.appAttemptTokens = ProtoUtils.convertFromProtoFormat( p.getAppAttemptTokens()); - return appAttemptTokens; + return convertCredentialsFromByteBuffer(appAttemptTokens); } @Override - public void setAppAttemptTokens(ByteBuffer attemptTokens) { + public void setAppAttemptTokens(Credentials attemptTokens) { maybeInitBuilder(); if(attemptTokens == null) { builder.clearAppAttemptTokens(); + return; } - this.appAttemptTokens = attemptTokens; + this.appAttemptTokens = convertCredentialsToByteBuffer(attemptTokens); } @Override @@ -330,4 +340,44 @@ public class ApplicationAttemptStateDataPBImpl extends maybeInitBuilder(); builder.setFinishTime(finishTime); } + + private static ByteBuffer convertCredentialsToByteBuffer( + Credentials credentials) { + ByteBuffer appAttemptTokens = null; + DataOutputBuffer dob = new DataOutputBuffer(); + try { + if (credentials != null) { + credentials.writeTokenStorageToStream(dob); + appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } + return appAttemptTokens; + } catch (IOException e) { + LOG.error("Failed to convert Credentials to ByteBuffer."); + assert false; + return null; + } finally { + IOUtils.closeStream(dob); + } + } + + private static Credentials convertCredentialsFromByteBuffer( + ByteBuffer appAttemptTokens) { + DataInputByteBuffer dibb = new DataInputByteBuffer(); + try { + Credentials credentials = null; + if (appAttemptTokens != null) { + credentials = new Credentials(); + appAttemptTokens.rewind(); + dibb.reset(appAttemptTokens); + credentials.readTokenStorageStream(dibb); + } + return credentials; + } catch (IOException e) { + LOG.error("Failed to convert Credentials from ByteBuffer."); + assert false; + return null; + } finally { + IOUtils.closeStream(dibb); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 751dbe44be5..33b62fe8ea2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -66,9 +66,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -728,10 +728,12 @@ public class RMAppImpl implements RMApp, Recoverable { @Override public void recover(RMState state) { - ApplicationState appState = state.getApplicationState().get(getApplicationId()); + ApplicationStateData appState = + state.getApplicationState().get(getApplicationId()); this.recoveredFinalState = appState.getState(); LOG.info("Recovering app: " + getApplicationId() + " with " + - + appState.getAttemptCount() + " attempts and final state = " + this.recoveredFinalState ); + + appState.getAttemptCount() + " attempts and final state = " + + this.recoveredFinalState ); this.diagnostics.append(appState.getDiagnostics()); this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); @@ -1019,10 +1021,10 @@ public class RMAppImpl implements RMApp, Recoverable { default: break; } - ApplicationState appState = - new ApplicationState(this.submitTime, this.startTime, - this.submissionContext, this.user, stateToBeStored, diags, - this.storedFinishTime); + ApplicationStateData appState = + ApplicationStateData.newInstance(this.submitTime, this.startTime, + this.user, this.submissionContext, + stateToBeStored, diags, this.storedFinishTime); this.rmContext.getStateStore().updateApplicationState(appState); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index a80167f9b2a..4c52d298a8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; import static org.apache.hadoop.yarn.util.StringHelper.pjoin; -import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -70,9 +69,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -793,9 +792,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { @Override public void recover(RMState state) { - ApplicationState appState = + ApplicationStateData appState = state.getApplicationState().get(getAppAttemptId().getApplicationId()); - ApplicationAttemptState attemptState = + ApplicationAttemptStateData attemptState = appState.getAttempt(getAppAttemptId()); assert attemptState != null; LOG.info("Recovering attempt: " + getAppAttemptId() + " with final state: " @@ -806,9 +805,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { if (amContainerExitStatus == ContainerExitStatus.PREEMPTED) { this.attemptMetrics.setIsPreempted(); } + + Credentials credentials = attemptState.getAppAttemptTokens(); setMasterContainer(attemptState.getMasterContainer()); - recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(), - attemptState.getState()); + recoverAppAttemptCredentials(credentials, attemptState.getState()); this.recoveredFinalState = attemptState.getState(); this.originalTrackingUrl = attemptState.getFinalTrackingUrl(); this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); @@ -1123,10 +1123,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { this.attemptMetrics.getAggregateAppResourceUsage(); RMStateStore rmStore = rmContext.getStateStore(); setFinishTime(System.currentTimeMillis()); - ApplicationAttemptState attemptState = - new ApplicationAttemptState(applicationAttemptId, getMasterContainer(), - rmStore.getCredentialsFromAppAttempt(this), startTime, - stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus, + + ApplicationAttemptStateData attemptState = + ApplicationAttemptStateData.newInstance( + applicationAttemptId, getMasterContainer(), + rmStore.getCredentialsFromAppAttempt(this), + startTime, stateToBeStored, finalTrackingUrl, diags, + finalStatus, exitStatus, getFinishTime(), resUsage.getMemorySeconds(), resUsage.getVcoreSeconds()); LOG.info("Updating application attempt " + applicationAttemptId diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 122eb60999f..0200e858ac2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -459,7 +460,8 @@ public class TestRMHA { MemoryRMStateStore memStore = new MemoryRMStateStore() { @Override - public synchronized void updateApplicationState(ApplicationState appState) { + public synchronized void updateApplicationState( + ApplicationStateData appState) { notifyStoreOperationFailed(new StoreFencedException()); } }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index a0f86272b78..a42170b2969 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -84,8 +84,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; @@ -162,7 +160,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); @@ -194,7 +192,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { // create app that gets launched and does allocate before RM restart RMApp app1 = rm1.submitApp(200); // assert app1 info is saved - ApplicationState appState = rmAppState.get(app1.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app1.getApplicationId()); Assert.assertNotNull(appState); Assert.assertEquals(0, appState.getAttemptCount()); Assert.assertEquals(appState.getApplicationSubmissionContext() @@ -209,7 +207,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId(); rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); Assert.assertEquals(1, appState.getAttemptCount()); - ApplicationAttemptState attemptState = + ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1); Assert.assertNotNull(attemptState); Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), @@ -429,7 +427,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // start RM @@ -450,7 +448,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am0.waitForState(RMAppAttemptState.FAILED); - ApplicationState appState = rmAppState.get(app0.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app0.getApplicationId()); // assert the AM failed state is saved. Assert.assertEquals(RMAppAttemptState.FAILED, appState.getAttempt(am0.getApplicationAttemptId()).getState()); @@ -486,7 +484,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // start RM @@ -650,7 +648,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { }; memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // start RM @@ -689,7 +687,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // start RM @@ -709,7 +707,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED); // assert the app/attempt failed state is saved. - ApplicationState appState = rmAppState.get(app0.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app0.getApplicationId()); Assert.assertEquals(RMAppState.FAILED, appState.getState()); Assert.assertEquals(RMAppAttemptState.FAILED, appState.getAttempt(am0.getApplicationAttemptId()).getState()); @@ -737,7 +735,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // start RM @@ -757,7 +755,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { rm1.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.KILLED); // killed state is saved. - ApplicationState appState = rmAppState.get(app0.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app0.getApplicationId()); Assert.assertEquals(RMAppState.KILLED, appState.getState()); Assert.assertEquals(RMAppAttemptState.KILLED, appState.getAttempt(am0.getApplicationAttemptId()).getState()); @@ -823,7 +821,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // start RM @@ -844,8 +842,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { finishApplicationMaster(app0, rm1, nm1, am0, req); // check the state store about the unregistered info. - ApplicationState appState = rmAppState.get(app0.getApplicationId()); - ApplicationAttemptState attemptState0 = + ApplicationStateData appState = rmAppState.get(app0.getApplicationId()); + ApplicationAttemptStateData attemptState0 = appState.getAttempt(am0.getApplicationAttemptId()); Assert.assertEquals("diagnostics", attemptState0.getDiagnostics()); Assert.assertEquals(FinalApplicationStatus.SUCCEEDED, @@ -995,7 +993,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { MockAM am, FinishApplicationMasterRequest req) throws Exception { RMState rmState = ((MemoryRMStateStore) rm.getRMContext().getStateStore()).getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); am.unregisterAppAttempt(req,true); am.waitForState(RMAppAttemptState.FINISHING); @@ -1003,7 +1001,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { am.waitForState(RMAppAttemptState.FINISHED); rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); // check that app/attempt is saved with the final state - ApplicationState appState = rmAppState.get(rmApp.getApplicationId()); + ApplicationStateData appState = rmAppState.get(rmApp.getApplicationId()); Assert .assertEquals(RMAppState.FINISHED, appState.getState()); Assert.assertEquals(RMAppAttemptState.FINISHED, @@ -1019,7 +1017,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); MockRM rm1 = createMockRM(conf, memStore); rm1.start(); @@ -1037,7 +1035,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { null); // assert app1 info is saved - ApplicationState appState = rmAppState.get(app1.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app1.getApplicationId()); Assert.assertNotNull(appState); Assert.assertEquals(0, appState.getAttemptCount()); Assert.assertEquals(appState.getApplicationSubmissionContext() @@ -1050,7 +1048,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { ApplicationAttemptId attemptId1 = attempt.getAppAttemptId(); rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); Assert.assertEquals(1, appState.getAttemptCount()); - ApplicationAttemptState attemptState = + ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1); Assert.assertNotNull(attemptState); Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), @@ -1092,7 +1090,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); MockRM rm1 = new TestSecurityMockRM(conf, memStore); rm1.start(); @@ -1131,7 +1129,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { new HashMap(), false, "default", 1, ts); // assert app info is saved - ApplicationState appState = rmAppState.get(app.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app.getApplicationId()); Assert.assertNotNull(appState); // assert delegation tokens exist in rm1 DelegationTokenRenewr @@ -1187,7 +1185,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); MockRM rm1 = new TestSecurityMockRM(conf, memStore); rm1.start(); @@ -1201,7 +1199,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { new HashMap(), "default"); // assert app info is saved - ApplicationState appState = rmAppState.get(app1.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app1.getApplicationId()); Assert.assertNotNull(appState); // Allocate the AM @@ -1211,7 +1209,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); // assert attempt info is saved - ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); + ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1); Assert.assertNotNull(attemptState); Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), attemptState.getMasterContainer().getId()); @@ -1222,7 +1220,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { attempt1.getClientTokenMasterKey().getEncoded(); // assert application credentials are saved - Credentials savedCredentials = attemptState.getAppAttemptCredentials(); + Credentials savedCredentials = attemptState.getAppAttemptTokens(); Assert.assertArrayEquals("client token master key not saved", clientTokenMasterKey, savedCredentials.getSecretKey( RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); @@ -1268,7 +1266,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { memStore.init(conf); RMState rmState = memStore.getState(); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); Map rmDTState = rmState.getRMDTSecretManagerState().getTokenState(); @@ -1305,7 +1303,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { new HashMap(), false, "default", 1, ts); // assert app info is saved - ApplicationState appState = rmAppState.get(app.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app.getApplicationId()); Assert.assertNotNull(appState); // assert all master keys are saved @@ -1479,7 +1477,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { // queue, and will be processed once rm.stop() is called. // Nothing exist in state store before stop is called. - Map rmAppState = + Map rmAppState = memStore.getState().getApplicationState(); Assert.assertTrue(rmAppState.size() == 0); @@ -1489,7 +1487,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { // Assert app info is still saved even if stop is called with pending saving // request on dispatcher. for (RMApp app : appList) { - ApplicationState appState = rmAppState.get(app.getApplicationId()); + ApplicationStateData appState = rmAppState.get(app.getApplicationId()); Assert.assertNotNull(appState); Assert.assertEquals(0, appState.getAttemptCount()); Assert.assertEquals(appState.getApplicationSubmissionContext() @@ -1523,7 +1521,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm1 = rm2.registerNode("127.0.0.1:1234", 15120); - Map rmAppState = + Map rmAppState = rmState.getApplicationState(); // app0 exits in both state store and rmContext @@ -1658,10 +1656,15 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { memStore.init(conf); for (int i = 10; i > 0; i--) { - ApplicationState appState = mock(ApplicationState.class); - when(appState.getAppId()).thenReturn(ApplicationId.newInstance(1234, i)); - memStore.getState().getApplicationState() - .put(appState.getAppId(), appState); + ApplicationStateData appState = mock(ApplicationStateData.class); + ApplicationSubmissionContext context = + mock(ApplicationSubmissionContext.class); + when(appState.getApplicationSubmissionContext()).thenReturn(context); + when(context.getApplicationId()).thenReturn( + ApplicationId.newInstance(1234, i)); + memStore.getState().getApplicationState().put( + appState.getApplicationSubmissionContext().getApplicationId(), + appState); } MockRM rm1 = new MockRM(conf, memStore) { @@ -1681,12 +1684,14 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { } @Override - protected void recoverApplication(ApplicationState appState, + protected void recoverApplication(ApplicationStateData appState, RMState rmState) throws Exception { // check application is recovered in order. Assert.assertTrue(rmState.getApplicationState().size() > 0); - Assert.assertTrue(appState.getAppId().compareTo(prevId) > 0); - prevId = appState.getAppId(); + Assert.assertTrue(appState.getApplicationSubmissionContext() + .getApplicationId().compareTo(prevId) > 0); + prevId = + appState.getApplicationSubmissionContext().getApplicationId(); } } }; @@ -2030,4 +2035,5 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { // Do nothing. } } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index a93123e9187..49b18418c4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -386,7 +386,7 @@ public class TestAMRestart { am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); - ApplicationState appState = + ApplicationStateData appState = memStore.getState().getApplicationState().get(app1.getApplicationId()); // AM should be restarted even though max-am-attempt is 1. MockAM am2 = @@ -497,7 +497,7 @@ public class TestAMRestart { rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); // state store has 1 attempt stored. - ApplicationState appState = + ApplicationStateData appState = memStore.getState().getApplicationState().get(app1.getApplicationId()); Assert.assertEquals(1, appState.getAttemptCount()); // attempt stored has the preempted container exit status. @@ -555,7 +555,7 @@ public class TestAMRestart { // Restart rm. MockRM rm2 = new MockRM(conf, memStore); rm2.start(); - ApplicationState appState = + ApplicationStateData appState = memStore.getState().getApplicationState().get(app1.getApplicationId()); // re-register the NM nm1.setResourceTrackerService(rm2.getResourceTrackerService()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 00b60d303d1..8d6a7b69388 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -58,8 +58,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; @@ -243,6 +243,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime); when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context); when(mockRemovedApp.getAppAttempts()).thenReturn(attempts); + when(mockRemovedApp.getUser()).thenReturn("user1"); RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class); when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved); when(mockRemovedAttempt.getRMAppAttemptMetrics()) @@ -269,10 +270,10 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ store = stateStoreHelper.getRMStateStore(); store.setRMDispatcher(dispatcher); RMState state = store.loadState(); - Map rmAppState = + Map rmAppState = state.getApplicationState(); - ApplicationState appState = rmAppState.get(appId1); + ApplicationStateData appState = rmAppState.get(appId1); // app is loaded assertNotNull(appState); // app is loaded correctly @@ -281,7 +282,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ // submission context is loaded correctly assertEquals(appId1, appState.getApplicationSubmissionContext().getApplicationId()); - ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); + ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1); // attempt1 is loaded correctly assertNotNull(attemptState); assertEquals(attemptId1, attemptState.getAttemptId()); @@ -289,9 +290,10 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ // attempt1 container is loaded correctly assertEquals(containerId1, attemptState.getMasterContainer().getId()); // attempt1 client token master key is loaded correctly - assertArrayEquals(clientTokenKey1.getEncoded(), - attemptState.getAppAttemptCredentials() - .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + assertArrayEquals( + clientTokenKey1.getEncoded(), + attemptState.getAppAttemptTokens() + .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); attemptState = appState.getAttempt(attemptId2); // attempt2 is loaded correctly @@ -300,27 +302,30 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ // attempt2 container is loaded correctly assertEquals(containerId2, attemptState.getMasterContainer().getId()); // attempt2 client token master key is loaded correctly - assertArrayEquals(clientTokenKey2.getEncoded(), - attemptState.getAppAttemptCredentials() - .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + assertArrayEquals( + clientTokenKey2.getEncoded(), + attemptState.getAppAttemptTokens() + .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); //******* update application/attempt state *******// - ApplicationState appState2 = - new ApplicationState(appState.submitTime, appState.startTime, - appState.context, appState.user, RMAppState.FINISHED, - "appDiagnostics", 1234); + ApplicationStateData appState2 = + ApplicationStateData.newInstance(appState.getSubmitTime(), + appState.getStartTime(), appState.getUser(), + appState.getApplicationSubmissionContext(), RMAppState.FINISHED, + "appDiagnostics", 1234); appState2.attempts.putAll(appState.attempts); store.updateApplicationState(appState2); - ApplicationAttemptState oldAttemptState = attemptState; - ApplicationAttemptState newAttemptState = - new ApplicationAttemptState(oldAttemptState.getAttemptId(), - oldAttemptState.getMasterContainer(), - oldAttemptState.getAppAttemptCredentials(), - oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, - "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED, 100, - oldAttemptState.getFinishTime(), 0, 0); + ApplicationAttemptStateData oldAttemptState = attemptState; + ApplicationAttemptStateData newAttemptState = + ApplicationAttemptStateData.newInstance( + oldAttemptState.getAttemptId(), + oldAttemptState.getMasterContainer(), + oldAttemptState.getAppAttemptTokens(), + oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, + "myTrackingUrl", "attemptDiagnostics", + FinalApplicationStatus.SUCCEEDED, 100, + oldAttemptState.getFinishTime(), 0, 0); store.updateApplicationAttemptState(newAttemptState); // test updating the state of an app/attempt whose initial state was not @@ -329,22 +334,22 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ ApplicationSubmissionContext dummyContext = new ApplicationSubmissionContextPBImpl(); dummyContext.setApplicationId(dummyAppId); - ApplicationState dummyApp = - new ApplicationState(appState.submitTime, appState.startTime, - dummyContext, appState.user, RMAppState.FINISHED, "appDiagnostics", - 1234); + ApplicationStateData dummyApp = + ApplicationStateData.newInstance(appState.getSubmitTime(), + appState.getStartTime(), appState.getUser(), dummyContext, + RMAppState.FINISHED, "appDiagnostics", 1234); store.updateApplicationState(dummyApp); ApplicationAttemptId dummyAttemptId = ApplicationAttemptId.newInstance(dummyAppId, 6); - ApplicationAttemptState dummyAttempt = - new ApplicationAttemptState(dummyAttemptId, - oldAttemptState.getMasterContainer(), - oldAttemptState.getAppAttemptCredentials(), - oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, - "myTrackingUrl", "attemptDiagnostics", - FinalApplicationStatus.SUCCEEDED, 111, - oldAttemptState.getFinishTime(), 0, 0); + ApplicationAttemptStateData dummyAttempt = + ApplicationAttemptStateData.newInstance(dummyAttemptId, + oldAttemptState.getMasterContainer(), + oldAttemptState.getAppAttemptTokens(), + oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, + "myTrackingUrl", "attemptDiagnostics", + FinalApplicationStatus.SUCCEEDED, 111, + oldAttemptState.getFinishTime(), 0, 0); store.updateApplicationAttemptState(dummyAttempt); // let things settle down @@ -355,11 +360,13 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ store = stateStoreHelper.getRMStateStore(); store.setRMDispatcher(dispatcher); RMState newRMState = store.loadState(); - Map newRMAppState = + Map newRMAppState = newRMState.getApplicationState(); - assertNotNull(newRMAppState.get(dummyApp.getAppId())); - ApplicationState updatedAppState = newRMAppState.get(appId1); - assertEquals(appState.getAppId(),updatedAppState.getAppId()); + assertNotNull(newRMAppState.get( + dummyApp.getApplicationSubmissionContext().getApplicationId())); + ApplicationStateData updatedAppState = newRMAppState.get(appId1); + assertEquals(appState.getApplicationSubmissionContext().getApplicationId(), + updatedAppState.getApplicationSubmissionContext().getApplicationId()); assertEquals(appState.getSubmitTime(), updatedAppState.getSubmitTime()); assertEquals(appState.getStartTime(), updatedAppState.getStartTime()); assertEquals(appState.getUser(), updatedAppState.getUser()); @@ -369,16 +376,17 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ assertEquals(1234, updatedAppState.getFinishTime()); // check updated attempt state - assertNotNull(newRMAppState.get(dummyApp.getAppId()).getAttempt( - dummyAttemptId)); - ApplicationAttemptState updatedAttemptState = + assertNotNull(newRMAppState.get(dummyApp.getApplicationSubmissionContext + ().getApplicationId()).getAttempt(dummyAttemptId)); + ApplicationAttemptStateData updatedAttemptState = updatedAppState.getAttempt(newAttemptState.getAttemptId()); assertEquals(oldAttemptState.getAttemptId(), updatedAttemptState.getAttemptId()); assertEquals(containerId2, updatedAttemptState.getMasterContainer().getId()); - assertArrayEquals(clientTokenKey2.getEncoded(), - updatedAttemptState.getAppAttemptCredentials().getSecretKey( - RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); + assertArrayEquals( + clientTokenKey2.getEncoded(), + attemptState.getAppAttemptTokens() + .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME)); // new attempt state fields assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState()); assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 0a2f0d446e6..72f1dffa7d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doReturn; @@ -62,7 +63,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -326,11 +326,13 @@ public class TestRMAppTransitions { } private void assertAppFinalStateSaved(RMApp application){ - verify(store, times(1)).updateApplicationState(any(ApplicationState.class)); + verify(store, times(1)).updateApplicationState( + any(ApplicationStateData.class)); } private void assertAppFinalStateNotSaved(RMApp application){ - verify(store, times(0)).updateApplicationState(any(ApplicationState.class)); + verify(store, times(0)).updateApplicationState( + any(ApplicationStateData.class)); } private void assertKilled(RMApp application) { @@ -395,11 +397,13 @@ public class TestRMAppTransitions { RMApp application = createNewTestApp(submissionContext); // NEW => SUBMITTED event RMAppEventType.RECOVER RMState state = new RMState(); - ApplicationState appState = new ApplicationState(123, 123, null, "user"); + ApplicationStateData appState = + ApplicationStateData.newInstance(123, 123, null, "user"); state.getApplicationState().put(application.getApplicationId(), appState); RMAppEvent event = new RMAppRecoverEvent(application.getApplicationId(), state); + application.handle(event); assertStartTimeSet(application); assertAppState(RMAppState.SUBMITTED, application); @@ -946,22 +950,25 @@ public class TestRMAppTransitions { @Test(timeout = 30000) public void testAppsRecoveringStates() throws Exception { RMState state = new RMState(); - Map applicationState = + Map applicationState = state.getApplicationState(); createRMStateForApplications(applicationState, RMAppState.FINISHED); createRMStateForApplications(applicationState, RMAppState.KILLED); createRMStateForApplications(applicationState, RMAppState.FAILED); - for (ApplicationState appState : applicationState.values()) { + for (ApplicationStateData appState : applicationState.values()) { testRecoverApplication(appState, state); } } - public void testRecoverApplication(ApplicationState appState, RMState rmState) + public void testRecoverApplication(ApplicationStateData appState, + RMState rmState) throws Exception { ApplicationSubmissionContext submissionContext = appState.getApplicationSubmissionContext(); RMAppImpl application = - new RMAppImpl(appState.getAppId(), rmContext, conf, + new RMAppImpl( + appState.getApplicationSubmissionContext().getApplicationId(), + rmContext, conf, submissionContext.getApplicationName(), null, submissionContext.getQueue(), submissionContext, null, null, appState.getSubmitTime(), submissionContext.getApplicationType(), @@ -986,12 +993,12 @@ public class TestRMAppTransitions { } public void createRMStateForApplications( - Map applicationState, + Map applicationState, RMAppState rmAppState) { RMApp app = createNewTestApp(null); - ApplicationState appState = - new ApplicationState(app.getSubmitTime(), app.getStartTime(), - app.getApplicationSubmissionContext(), app.getUser(), rmAppState, + ApplicationStateData appState = + ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(), + app.getUser(), app.getApplicationSubmissionContext(), rmAppState, null, app.getFinishTime()); applicationState.put(app.getApplicationId(), appState); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 2b5c2b882b9..fc653dc25ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -81,7 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventT import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -1520,7 +1520,7 @@ public class TestRMAppAttemptTransitions { private void verifyAttemptFinalStateSaved() { verify(store, times(1)).updateApplicationAttemptState( - any(ApplicationAttemptState.class)); + any(ApplicationAttemptStateData.class)); } private void verifyAMHostAndPortInvalidated() {