Augmented RMStateStore with state machine. Contributed by Binglin Chang.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1601491 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
36383505c4
commit
c94f2cec3a
|
@ -149,6 +149,8 @@ Release 2.5.0 - UNRELEASED
|
||||||
YARN-2132. ZKRMStateStore.ZKAction#runWithRetries doesn't log the exception
|
YARN-2132. ZKRMStateStore.ZKAction#runWithRetries doesn't log the exception
|
||||||
it encounters. (Vamsee Yarlagadda via kasha)
|
it encounters. (Vamsee Yarlagadda via kasha)
|
||||||
|
|
||||||
|
YARN-2030. Augmented RMStateStore with state machine. (Binglin Chang via jianhe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Appli
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
|
@ -314,7 +316,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void storeApplicationStateInternal(ApplicationId appId,
|
public synchronized void storeApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
|
ApplicationStateData appStateDataPB) throws Exception {
|
||||||
String appIdStr = appId.toString();
|
String appIdStr = appId.toString();
|
||||||
Path appDirPath = getAppDir(rmAppRoot, appIdStr);
|
Path appDirPath = getAppDir(rmAppRoot, appIdStr);
|
||||||
fs.mkdirs(appDirPath);
|
fs.mkdirs(appDirPath);
|
||||||
|
@ -334,7 +336,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void updateApplicationStateInternal(ApplicationId appId,
|
public synchronized void updateApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
|
ApplicationStateData appStateDataPB) throws Exception {
|
||||||
String appIdStr = appId.toString();
|
String appIdStr = appId.toString();
|
||||||
Path appDirPath = getAppDir(rmAppRoot, appIdStr);
|
Path appDirPath = getAppDir(rmAppRoot, appIdStr);
|
||||||
Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
|
Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
|
||||||
|
@ -354,7 +356,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void storeApplicationAttemptStateInternal(
|
public synchronized void storeApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId appAttemptId,
|
ApplicationAttemptId appAttemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
|
ApplicationAttemptStateData attemptStateDataPB)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Path appDirPath =
|
Path appDirPath =
|
||||||
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
|
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
|
||||||
|
@ -375,7 +377,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void updateApplicationAttemptStateInternal(
|
public synchronized void updateApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId appAttemptId,
|
ApplicationAttemptId appAttemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
|
ApplicationAttemptStateData attemptStateDataPB)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Path appDirPath =
|
Path appDirPath =
|
||||||
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
|
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
|
||||||
|
|
|
@ -32,9 +32,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ public class MemoryRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void storeApplicationStateInternal(ApplicationId appId,
|
public void storeApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateDataPBImpl appStateData)
|
ApplicationStateData appStateData)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
ApplicationState appState =
|
ApplicationState appState =
|
||||||
new ApplicationState(appStateData.getSubmitTime(),
|
new ApplicationState(appStateData.getSubmitTime(),
|
||||||
|
@ -92,7 +92,7 @@ public class MemoryRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateApplicationStateInternal(ApplicationId appId,
|
public void updateApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateDataPBImpl appStateData) throws Exception {
|
ApplicationStateData appStateData) throws Exception {
|
||||||
ApplicationState updatedAppState =
|
ApplicationState updatedAppState =
|
||||||
new ApplicationState(appStateData.getSubmitTime(),
|
new ApplicationState(appStateData.getSubmitTime(),
|
||||||
appStateData.getStartTime(),
|
appStateData.getStartTime(),
|
||||||
|
@ -112,7 +112,7 @@ public class MemoryRMStateStore extends RMStateStore {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void storeApplicationAttemptStateInternal(
|
public synchronized void storeApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId appAttemptId,
|
ApplicationAttemptId appAttemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateData)
|
ApplicationAttemptStateData attemptStateData)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Credentials credentials = null;
|
Credentials credentials = null;
|
||||||
if(attemptStateData.getAppAttemptTokens() != null){
|
if(attemptStateData.getAppAttemptTokens() != null){
|
||||||
|
@ -137,7 +137,7 @@ public class MemoryRMStateStore extends RMStateStore {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void updateApplicationAttemptStateInternal(
|
public synchronized void updateApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId appAttemptId,
|
ApplicationAttemptId appAttemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateData)
|
ApplicationAttemptStateData attemptStateData)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Credentials credentials = null;
|
Credentials credentials = null;
|
||||||
if (attemptStateData.getAppAttemptTokens() != null) {
|
if (attemptStateData.getAppAttemptTokens() != null) {
|
||||||
|
|
|
@ -25,9 +25,9 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
|
||||||
|
|
||||||
@Unstable
|
@Unstable
|
||||||
public class NullRMStateStore extends RMStateStore {
|
public class NullRMStateStore extends RMStateStore {
|
||||||
|
@ -54,13 +54,13 @@ public class NullRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void storeApplicationStateInternal(ApplicationId appId,
|
protected void storeApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateDataPBImpl appStateData) throws Exception {
|
ApplicationStateData appStateData) throws Exception {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
|
protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
|
ApplicationAttemptStateData attemptStateData) throws Exception {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,13 +102,13 @@ public class NullRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void updateApplicationStateInternal(ApplicationId appId,
|
protected void updateApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateDataPBImpl appStateData) throws Exception {
|
ApplicationStateData appStateData) throws Exception {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
|
protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
|
ApplicationAttemptStateData attemptStateData) throws Exception {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -31,7 +30,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -50,6 +48,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
|
@ -61,6 +61,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
||||||
|
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
||||||
|
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||||
|
import org.apache.hadoop.yarn.state.StateMachine;
|
||||||
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
|
@ -83,8 +87,163 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
|
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
|
||||||
|
|
||||||
|
private enum RMStateStoreState {
|
||||||
|
DEFAULT
|
||||||
|
};
|
||||||
|
|
||||||
|
private static final StateMachineFactory<RMStateStore,
|
||||||
|
RMStateStoreState,
|
||||||
|
RMStateStoreEventType,
|
||||||
|
RMStateStoreEvent>
|
||||||
|
stateMachineFactory = new StateMachineFactory<RMStateStore,
|
||||||
|
RMStateStoreState,
|
||||||
|
RMStateStoreEventType,
|
||||||
|
RMStateStoreEvent>(
|
||||||
|
RMStateStoreState.DEFAULT)
|
||||||
|
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
|
||||||
|
RMStateStoreEventType.STORE_APP, new StoreAppTransition())
|
||||||
|
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
|
||||||
|
RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
|
||||||
|
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
|
||||||
|
RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
|
||||||
|
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
|
||||||
|
RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
|
||||||
|
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
|
||||||
|
RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition());
|
||||||
|
|
||||||
|
private final StateMachine<RMStateStoreState,
|
||||||
|
RMStateStoreEventType,
|
||||||
|
RMStateStoreEvent> stateMachine;
|
||||||
|
|
||||||
|
private static class StoreAppTransition
|
||||||
|
implements SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
||||||
|
@Override
|
||||||
|
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||||
|
if (!(event instanceof RMStateStoreAppEvent)) {
|
||||||
|
// should never happen
|
||||||
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState();
|
||||||
|
ApplicationId appId = appState.getAppId();
|
||||||
|
ApplicationStateData appStateData = ApplicationStateData
|
||||||
|
.newInstance(appState);
|
||||||
|
LOG.info("Storing info for app: " + appId);
|
||||||
|
try {
|
||||||
|
store.storeApplicationStateInternal(appId, appStateData);
|
||||||
|
store.notifyDoneStoringApplication(appId, null);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error storing app: " + appId, e);
|
||||||
|
store.notifyStoreOperationFailed(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class UpdateAppTransition implements
|
||||||
|
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
||||||
|
@Override
|
||||||
|
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||||
|
if (!(event instanceof RMStateUpdateAppEvent)) {
|
||||||
|
// should never happen
|
||||||
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState();
|
||||||
|
ApplicationId appId = appState.getAppId();
|
||||||
|
ApplicationStateData appStateData = ApplicationStateData
|
||||||
|
.newInstance(appState);
|
||||||
|
LOG.info("Updating info for app: " + appId);
|
||||||
|
try {
|
||||||
|
store.updateApplicationStateInternal(appId, appStateData);
|
||||||
|
store.notifyDoneUpdatingApplication(appId, null);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error updating app: " + appId, e);
|
||||||
|
store.notifyStoreOperationFailed(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class RemoveAppTransition implements
|
||||||
|
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
||||||
|
@Override
|
||||||
|
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||||
|
if (!(event instanceof RMStateStoreRemoveAppEvent)) {
|
||||||
|
// should never happen
|
||||||
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ApplicationState appState = ((RMStateStoreRemoveAppEvent) event)
|
||||||
|
.getAppState();
|
||||||
|
ApplicationId appId = appState.getAppId();
|
||||||
|
LOG.info("Removing info for app: " + appId);
|
||||||
|
try {
|
||||||
|
store.removeApplicationStateInternal(appState);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error removing app: " + appId, e);
|
||||||
|
store.notifyStoreOperationFailed(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class StoreAppAttemptTransition implements
|
||||||
|
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
||||||
|
@Override
|
||||||
|
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||||
|
if (!(event instanceof RMStateStoreAppAttemptEvent)) {
|
||||||
|
// should never happen
|
||||||
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ApplicationAttemptState attemptState =
|
||||||
|
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
|
||||||
|
try {
|
||||||
|
ApplicationAttemptStateData attemptStateData =
|
||||||
|
ApplicationAttemptStateData.newInstance(attemptState);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
|
||||||
|
}
|
||||||
|
store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
|
||||||
|
attemptStateData);
|
||||||
|
store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
|
||||||
|
null);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
|
||||||
|
store.notifyStoreOperationFailed(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class UpdateAppAttemptTransition implements
|
||||||
|
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
||||||
|
@Override
|
||||||
|
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||||
|
if (!(event instanceof RMStateUpdateAppAttemptEvent)) {
|
||||||
|
// should never happen
|
||||||
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ApplicationAttemptState attemptState =
|
||||||
|
((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
|
||||||
|
try {
|
||||||
|
ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData
|
||||||
|
.newInstance(attemptState);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Updating info for attempt: " + attemptState.getAttemptId());
|
||||||
|
}
|
||||||
|
store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
|
||||||
|
attemptStateData);
|
||||||
|
store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
|
||||||
|
null);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
|
||||||
|
store.notifyStoreOperationFailed(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
public RMStateStore() {
|
public RMStateStore() {
|
||||||
super(RMStateStore.class.getName());
|
super(RMStateStore.class.getName());
|
||||||
|
stateMachine = stateMachineFactory.make(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -390,10 +549,10 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
* application.
|
* application.
|
||||||
*/
|
*/
|
||||||
protected abstract void storeApplicationStateInternal(ApplicationId appId,
|
protected abstract void storeApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateDataPBImpl appStateData) throws Exception;
|
ApplicationStateData appStateData) throws Exception;
|
||||||
|
|
||||||
protected abstract void updateApplicationStateInternal(ApplicationId appId,
|
protected abstract void updateApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateDataPBImpl appStateData) throws Exception;
|
ApplicationStateData appStateData) throws Exception;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
/**
|
/**
|
||||||
|
@ -428,11 +587,11 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
*/
|
*/
|
||||||
protected abstract void storeApplicationAttemptStateInternal(
|
protected abstract void storeApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId attemptId,
|
ApplicationAttemptId attemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
|
ApplicationAttemptStateData attemptStateData) throws Exception;
|
||||||
|
|
||||||
protected abstract void updateApplicationAttemptStateInternal(
|
protected abstract void updateApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId attemptId,
|
ApplicationAttemptId attemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
|
ApplicationAttemptStateData attemptStateData) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RMDTSecretManager call this to store the state of a delegation token
|
* RMDTSecretManager call this to store the state of a delegation token
|
||||||
|
@ -596,105 +755,10 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
|
|
||||||
// Dispatcher related code
|
// Dispatcher related code
|
||||||
protected void handleStoreEvent(RMStateStoreEvent event) {
|
protected void handleStoreEvent(RMStateStoreEvent event) {
|
||||||
if (event.getType().equals(RMStateStoreEventType.STORE_APP)
|
try {
|
||||||
|| event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
|
this.stateMachine.doTransition(event.getType(), event);
|
||||||
ApplicationState appState = null;
|
} catch (InvalidStateTransitonException e) {
|
||||||
if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
|
LOG.error("Can't handle this event at current state", e);
|
||||||
appState = ((RMStateStoreAppEvent) event).getAppState();
|
|
||||||
} else {
|
|
||||||
assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
|
|
||||||
appState = ((RMStateUpdateAppEvent) event).getAppState();
|
|
||||||
}
|
|
||||||
|
|
||||||
Exception storedException = null;
|
|
||||||
ApplicationStateDataPBImpl appStateData =
|
|
||||||
(ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
|
|
||||||
.newApplicationStateData(appState.getSubmitTime(),
|
|
||||||
appState.getStartTime(), appState.getUser(),
|
|
||||||
appState.getApplicationSubmissionContext(), appState.getState(),
|
|
||||||
appState.getDiagnostics(), appState.getFinishTime());
|
|
||||||
|
|
||||||
ApplicationId appId =
|
|
||||||
appState.getApplicationSubmissionContext().getApplicationId();
|
|
||||||
|
|
||||||
LOG.info("Storing info for app: " + appId);
|
|
||||||
try {
|
|
||||||
if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
|
|
||||||
storeApplicationStateInternal(appId, appStateData);
|
|
||||||
notifyDoneStoringApplication(appId, storedException);
|
|
||||||
} else {
|
|
||||||
assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
|
|
||||||
updateApplicationStateInternal(appId, appStateData);
|
|
||||||
notifyDoneUpdatingApplication(appId, storedException);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Error storing/updating app: " + appId, e);
|
|
||||||
notifyStoreOperationFailed(e);
|
|
||||||
}
|
|
||||||
} else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)
|
|
||||||
|| event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) {
|
|
||||||
|
|
||||||
ApplicationAttemptState attemptState = null;
|
|
||||||
if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
|
|
||||||
attemptState =
|
|
||||||
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
|
|
||||||
} else {
|
|
||||||
assert event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT);
|
|
||||||
attemptState =
|
|
||||||
((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
|
|
||||||
}
|
|
||||||
|
|
||||||
Exception storedException = null;
|
|
||||||
Credentials credentials = attemptState.getAppAttemptCredentials();
|
|
||||||
ByteBuffer appAttemptTokens = null;
|
|
||||||
try {
|
|
||||||
if (credentials != null) {
|
|
||||||
DataOutputBuffer dob = new DataOutputBuffer();
|
|
||||||
credentials.writeTokenStorageToStream(dob);
|
|
||||||
appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
|
||||||
}
|
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateData =
|
|
||||||
(ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
|
|
||||||
.newApplicationAttemptStateData(attemptState.getAttemptId(),
|
|
||||||
attemptState.getMasterContainer(), appAttemptTokens,
|
|
||||||
attemptState.getStartTime(), attemptState.getState(),
|
|
||||||
attemptState.getFinalTrackingUrl(),
|
|
||||||
attemptState.getDiagnostics(),
|
|
||||||
attemptState.getFinalApplicationStatus());
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
|
|
||||||
}
|
|
||||||
if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
|
|
||||||
storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
|
|
||||||
attemptStateData);
|
|
||||||
notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
|
|
||||||
storedException);
|
|
||||||
} else {
|
|
||||||
assert event.getType().equals(
|
|
||||||
RMStateStoreEventType.UPDATE_APP_ATTEMPT);
|
|
||||||
updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
|
|
||||||
attemptStateData);
|
|
||||||
notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
|
|
||||||
storedException);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error(
|
|
||||||
"Error storing/updating appAttempt: " + attemptState.getAttemptId(), e);
|
|
||||||
notifyStoreOperationFailed(e);
|
|
||||||
}
|
|
||||||
} else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {
|
|
||||||
ApplicationState appState =
|
|
||||||
((RMStateStoreRemoveAppEvent) event).getAppState();
|
|
||||||
ApplicationId appId = appState.getAppId();
|
|
||||||
LOG.info("Removing info for app: " + appId);
|
|
||||||
try {
|
|
||||||
removeApplicationStateInternal(appState);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Error removing app: " + appId, e);
|
|
||||||
notifyStoreOperationFailed(e);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Appli
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
|
@ -551,7 +553,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void storeApplicationStateInternal(ApplicationId appId,
|
public synchronized void storeApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
|
ApplicationStateData appStateDataPB) throws Exception {
|
||||||
String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
|
String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -565,7 +567,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void updateApplicationStateInternal(ApplicationId appId,
|
public synchronized void updateApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
|
ApplicationStateData appStateDataPB) throws Exception {
|
||||||
String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
|
String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -587,7 +589,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void storeApplicationAttemptStateInternal(
|
public synchronized void storeApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId appAttemptId,
|
ApplicationAttemptId appAttemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
|
ApplicationAttemptStateData attemptStateDataPB)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
String appDirPath = getNodePath(rmAppRoot,
|
String appDirPath = getNodePath(rmAppRoot,
|
||||||
appAttemptId.getApplicationId().toString());
|
appAttemptId.getApplicationId().toString());
|
||||||
|
@ -605,7 +607,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void updateApplicationAttemptStateInternal(
|
public synchronized void updateApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId appAttemptId,
|
ApplicationAttemptId appAttemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
|
ApplicationAttemptStateData attemptStateDataPB)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
String appIdStr = appAttemptId.getApplicationId().toString();
|
String appIdStr = appAttemptId.getApplicationId().toString();
|
||||||
String appAttemptIdStr = appAttemptId.toString();
|
String appAttemptIdStr = appAttemptId.toString();
|
||||||
|
|
|
@ -18,31 +18,73 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Contains the state data that needs to be persisted for an ApplicationAttempt
|
* Contains the state data that needs to be persisted for an ApplicationAttempt
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public interface ApplicationAttemptStateData {
|
public abstract class ApplicationAttemptStateData {
|
||||||
|
public static ApplicationAttemptStateData newInstance(
|
||||||
|
ApplicationAttemptId attemptId, Container container,
|
||||||
|
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
|
||||||
|
String finalTrackingUrl, String diagnostics,
|
||||||
|
FinalApplicationStatus amUnregisteredFinalStatus) {
|
||||||
|
ApplicationAttemptStateData attemptStateData =
|
||||||
|
Records.newRecord(ApplicationAttemptStateData.class);
|
||||||
|
attemptStateData.setAttemptId(attemptId);
|
||||||
|
attemptStateData.setMasterContainer(container);
|
||||||
|
attemptStateData.setAppAttemptTokens(attemptTokens);
|
||||||
|
attemptStateData.setState(finalState);
|
||||||
|
attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
|
||||||
|
attemptStateData.setDiagnostics(diagnostics);
|
||||||
|
attemptStateData.setStartTime(startTime);
|
||||||
|
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
|
||||||
|
return attemptStateData;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ApplicationAttemptStateData newInstance(
|
||||||
|
ApplicationAttemptState attemptState) throws IOException {
|
||||||
|
Credentials credentials = attemptState.getAppAttemptCredentials();
|
||||||
|
ByteBuffer appAttemptTokens = null;
|
||||||
|
if (credentials != null) {
|
||||||
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
credentials.writeTokenStorageToStream(dob);
|
||||||
|
appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||||
|
}
|
||||||
|
return newInstance(attemptState.getAttemptId(),
|
||||||
|
attemptState.getMasterContainer(), appAttemptTokens,
|
||||||
|
attemptState.getStartTime(), attemptState.getState(),
|
||||||
|
attemptState.getFinalTrackingUrl(),
|
||||||
|
attemptState.getDiagnostics(),
|
||||||
|
attemptState.getFinalApplicationStatus());
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract ApplicationAttemptStateDataProto getProto();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The ApplicationAttemptId for the application attempt
|
* The ApplicationAttemptId for the application attempt
|
||||||
* @return ApplicationAttemptId for the application attempt
|
* @return ApplicationAttemptId for the application attempt
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public ApplicationAttemptId getAttemptId();
|
public abstract ApplicationAttemptId getAttemptId();
|
||||||
|
|
||||||
public void setAttemptId(ApplicationAttemptId attemptId);
|
public abstract void setAttemptId(ApplicationAttemptId attemptId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The master container running the application attempt
|
* The master container running the application attempt
|
||||||
|
@ -50,9 +92,9 @@ public interface ApplicationAttemptStateData {
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public Container getMasterContainer();
|
public abstract Container getMasterContainer();
|
||||||
|
|
||||||
public void setMasterContainer(Container container);
|
public abstract void setMasterContainer(Container container);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The application attempt tokens that belong to this attempt
|
* The application attempt tokens that belong to this attempt
|
||||||
|
@ -60,17 +102,17 @@ public interface ApplicationAttemptStateData {
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public ByteBuffer getAppAttemptTokens();
|
public abstract ByteBuffer getAppAttemptTokens();
|
||||||
|
|
||||||
public void setAppAttemptTokens(ByteBuffer attemptTokens);
|
public abstract void setAppAttemptTokens(ByteBuffer attemptTokens);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the final state of the application attempt.
|
* Get the final state of the application attempt.
|
||||||
* @return the final state of the application attempt.
|
* @return the final state of the application attempt.
|
||||||
*/
|
*/
|
||||||
public RMAppAttemptState getState();
|
public abstract RMAppAttemptState getState();
|
||||||
|
|
||||||
public void setState(RMAppAttemptState state);
|
public abstract void setState(RMAppAttemptState state);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the original not-proxied <em>final tracking url</em> for the
|
* Get the original not-proxied <em>final tracking url</em> for the
|
||||||
|
@ -79,34 +121,34 @@ public interface ApplicationAttemptStateData {
|
||||||
* @return the original not-proxied <em>final tracking url</em> for the
|
* @return the original not-proxied <em>final tracking url</em> for the
|
||||||
* application
|
* application
|
||||||
*/
|
*/
|
||||||
public String getFinalTrackingUrl();
|
public abstract String getFinalTrackingUrl();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the final tracking Url of the AM.
|
* Set the final tracking Url of the AM.
|
||||||
* @param url
|
* @param url
|
||||||
*/
|
*/
|
||||||
public void setFinalTrackingUrl(String url);
|
public abstract void setFinalTrackingUrl(String url);
|
||||||
/**
|
/**
|
||||||
* Get the <em>diagnositic information</em> of the attempt
|
* Get the <em>diagnositic information</em> of the attempt
|
||||||
* @return <em>diagnositic information</em> of the attempt
|
* @return <em>diagnositic information</em> of the attempt
|
||||||
*/
|
*/
|
||||||
public String getDiagnostics();
|
public abstract String getDiagnostics();
|
||||||
|
|
||||||
public void setDiagnostics(String diagnostics);
|
public abstract void setDiagnostics(String diagnostics);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the <em>start time</em> of the application.
|
* Get the <em>start time</em> of the application.
|
||||||
* @return <em>start time</em> of the application
|
* @return <em>start time</em> of the application
|
||||||
*/
|
*/
|
||||||
public long getStartTime();
|
public abstract long getStartTime();
|
||||||
|
|
||||||
public void setStartTime(long startTime);
|
public abstract void setStartTime(long startTime);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the <em>final finish status</em> of the application.
|
* Get the <em>final finish status</em> of the application.
|
||||||
* @return <em>final finish status</em> of the application
|
* @return <em>final finish status</em> of the application
|
||||||
*/
|
*/
|
||||||
public FinalApplicationStatus getFinalApplicationStatus();
|
public abstract FinalApplicationStatus getFinalApplicationStatus();
|
||||||
|
|
||||||
public void setFinalApplicationStatus(FinalApplicationStatus finishState);
|
public abstract void setFinalApplicationStatus(FinalApplicationStatus finishState);
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,10 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Contains all the state data that needs to be stored persistently
|
* Contains all the state data that needs to be stored persistently
|
||||||
|
@ -32,19 +35,43 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public interface ApplicationStateData {
|
public abstract class ApplicationStateData {
|
||||||
|
public static ApplicationStateData newInstance(long submitTime,
|
||||||
|
long startTime, String user,
|
||||||
|
ApplicationSubmissionContext submissionContext,
|
||||||
|
RMAppState state, String diagnostics, long finishTime) {
|
||||||
|
ApplicationStateData appState = Records.newRecord(ApplicationStateData.class);
|
||||||
|
appState.setSubmitTime(submitTime);
|
||||||
|
appState.setStartTime(startTime);
|
||||||
|
appState.setUser(user);
|
||||||
|
appState.setApplicationSubmissionContext(submissionContext);
|
||||||
|
appState.setState(state);
|
||||||
|
appState.setDiagnostics(diagnostics);
|
||||||
|
appState.setFinishTime(finishTime);
|
||||||
|
return appState;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ApplicationStateData newInstance(
|
||||||
|
ApplicationState appState) {
|
||||||
|
return newInstance(appState.getSubmitTime(), appState.getStartTime(),
|
||||||
|
appState.getUser(), appState.getApplicationSubmissionContext(),
|
||||||
|
appState.getState(), appState.getDiagnostics(),
|
||||||
|
appState.getFinishTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract ApplicationStateDataProto getProto();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The time at which the application was received by the Resource Manager
|
* The time at which the application was received by the Resource Manager
|
||||||
* @return submitTime
|
* @return submitTime
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public long getSubmitTime();
|
public abstract long getSubmitTime();
|
||||||
|
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public void setSubmitTime(long submitTime);
|
public abstract void setSubmitTime(long submitTime);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the <em>start time</em> of the application.
|
* Get the <em>start time</em> of the application.
|
||||||
|
@ -63,11 +90,11 @@ public interface ApplicationStateData {
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public void setUser(String user);
|
public abstract void setUser(String user);
|
||||||
|
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public String getUser();
|
public abstract String getUser();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The {@link ApplicationSubmissionContext} for the application
|
* The {@link ApplicationSubmissionContext} for the application
|
||||||
|
@ -76,34 +103,34 @@ public interface ApplicationStateData {
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public ApplicationSubmissionContext getApplicationSubmissionContext();
|
public abstract ApplicationSubmissionContext getApplicationSubmissionContext();
|
||||||
|
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public void setApplicationSubmissionContext(
|
public abstract void setApplicationSubmissionContext(
|
||||||
ApplicationSubmissionContext context);
|
ApplicationSubmissionContext context);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the final state of the application.
|
* Get the final state of the application.
|
||||||
* @return the final state of the application.
|
* @return the final state of the application.
|
||||||
*/
|
*/
|
||||||
public RMAppState getState();
|
public abstract RMAppState getState();
|
||||||
|
|
||||||
public void setState(RMAppState state);
|
public abstract void setState(RMAppState state);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the diagnostics information for the application master.
|
* Get the diagnostics information for the application master.
|
||||||
* @return the diagnostics information for the application master.
|
* @return the diagnostics information for the application master.
|
||||||
*/
|
*/
|
||||||
public String getDiagnostics();
|
public abstract String getDiagnostics();
|
||||||
|
|
||||||
public void setDiagnostics(String diagnostics);
|
public abstract void setDiagnostics(String diagnostics);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The finish time of the application.
|
* The finish time of the application.
|
||||||
* @return the finish time of the application.,
|
* @return the finish time of the application.,
|
||||||
*/
|
*/
|
||||||
public long getFinishTime();
|
public abstract long getFinishTime();
|
||||||
|
|
||||||
public void setFinishTime(long finishTime);
|
public abstract void setFinishTime(long finishTime);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,10 +25,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
|
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
|
||||||
|
@ -36,12 +33,10 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMApp
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
|
||||||
public class ApplicationAttemptStateDataPBImpl
|
import com.google.protobuf.TextFormat;
|
||||||
extends ProtoBase<ApplicationAttemptStateDataProto>
|
|
||||||
implements ApplicationAttemptStateData {
|
|
||||||
private static final RecordFactory recordFactory = RecordFactoryProvider
|
|
||||||
.getRecordFactory(null);
|
|
||||||
|
|
||||||
|
public class ApplicationAttemptStateDataPBImpl extends
|
||||||
|
ApplicationAttemptStateData {
|
||||||
ApplicationAttemptStateDataProto proto =
|
ApplicationAttemptStateDataProto proto =
|
||||||
ApplicationAttemptStateDataProto.getDefaultInstance();
|
ApplicationAttemptStateDataProto.getDefaultInstance();
|
||||||
ApplicationAttemptStateDataProto.Builder builder = null;
|
ApplicationAttemptStateDataProto.Builder builder = null;
|
||||||
|
@ -60,7 +55,8 @@ implements ApplicationAttemptStateData {
|
||||||
this.proto = proto;
|
this.proto = proto;
|
||||||
viaProto = true;
|
viaProto = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ApplicationAttemptStateDataProto getProto() {
|
public ApplicationAttemptStateDataProto getProto() {
|
||||||
mergeLocalToProto();
|
mergeLocalToProto();
|
||||||
proto = viaProto ? proto : builder.build();
|
proto = viaProto ? proto : builder.build();
|
||||||
|
@ -76,7 +72,8 @@ implements ApplicationAttemptStateData {
|
||||||
builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
|
builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
|
||||||
}
|
}
|
||||||
if(this.appAttemptTokens != null) {
|
if(this.appAttemptTokens != null) {
|
||||||
builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens));
|
builder.setAppAttemptTokens(ProtoUtils.convertToProtoFormat(
|
||||||
|
this.appAttemptTokens));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,7 +145,8 @@ implements ApplicationAttemptStateData {
|
||||||
if(!p.hasAppAttemptTokens()) {
|
if(!p.hasAppAttemptTokens()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens());
|
this.appAttemptTokens = ProtoUtils.convertFromProtoFormat(
|
||||||
|
p.getAppAttemptTokens());
|
||||||
return appAttemptTokens;
|
return appAttemptTokens;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -249,24 +247,26 @@ implements ApplicationAttemptStateData {
|
||||||
builder.setFinalApplicationStatus(convertToProtoFormat(finishState));
|
builder.setFinalApplicationStatus(convertToProtoFormat(finishState));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ApplicationAttemptStateData newApplicationAttemptStateData(
|
@Override
|
||||||
ApplicationAttemptId attemptId, Container container,
|
public int hashCode() {
|
||||||
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
|
return getProto().hashCode();
|
||||||
String finalTrackingUrl, String diagnostics,
|
|
||||||
FinalApplicationStatus amUnregisteredFinalStatus) {
|
|
||||||
ApplicationAttemptStateData attemptStateData =
|
|
||||||
recordFactory.newRecordInstance(ApplicationAttemptStateData.class);
|
|
||||||
attemptStateData.setAttemptId(attemptId);
|
|
||||||
attemptStateData.setMasterContainer(container);
|
|
||||||
attemptStateData.setAppAttemptTokens(attemptTokens);
|
|
||||||
attemptStateData.setState(finalState);
|
|
||||||
attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
|
|
||||||
attemptStateData.setDiagnostics(diagnostics);
|
|
||||||
attemptStateData.setStartTime(startTime);
|
|
||||||
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
|
|
||||||
return attemptStateData;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (other == null)
|
||||||
|
return false;
|
||||||
|
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||||
|
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return TextFormat.shortDebugString(getProto());
|
||||||
|
}
|
||||||
|
|
||||||
private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_";
|
private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_";
|
||||||
public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) {
|
public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) {
|
||||||
return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name());
|
return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name());
|
||||||
|
|
|
@ -20,21 +20,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
|
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
|
|
||||||
public class ApplicationStateDataPBImpl
|
import com.google.protobuf.TextFormat;
|
||||||
extends ProtoBase<ApplicationStateDataProto>
|
|
||||||
implements ApplicationStateData {
|
|
||||||
private static final RecordFactory recordFactory = RecordFactoryProvider
|
|
||||||
.getRecordFactory(null);
|
|
||||||
|
|
||||||
|
public class ApplicationStateDataPBImpl extends ApplicationStateData {
|
||||||
ApplicationStateDataProto proto =
|
ApplicationStateDataProto proto =
|
||||||
ApplicationStateDataProto.getDefaultInstance();
|
ApplicationStateDataProto.getDefaultInstance();
|
||||||
ApplicationStateDataProto.Builder builder = null;
|
ApplicationStateDataProto.Builder builder = null;
|
||||||
|
@ -51,7 +45,8 @@ implements ApplicationStateData {
|
||||||
this.proto = proto;
|
this.proto = proto;
|
||||||
viaProto = true;
|
viaProto = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ApplicationStateDataProto getProto() {
|
public ApplicationStateDataProto getProto() {
|
||||||
mergeLocalToProto();
|
mergeLocalToProto();
|
||||||
proto = viaProto ? proto : builder.build();
|
proto = viaProto ? proto : builder.build();
|
||||||
|
@ -136,7 +131,7 @@ implements ApplicationStateData {
|
||||||
}
|
}
|
||||||
applicationSubmissionContext =
|
applicationSubmissionContext =
|
||||||
new ApplicationSubmissionContextPBImpl(
|
new ApplicationSubmissionContextPBImpl(
|
||||||
p.getApplicationSubmissionContext());
|
p.getApplicationSubmissionContext());
|
||||||
return applicationSubmissionContext;
|
return applicationSubmissionContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,21 +195,24 @@ implements ApplicationStateData {
|
||||||
builder.setFinishTime(finishTime);
|
builder.setFinishTime(finishTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ApplicationStateData newApplicationStateData(long submitTime,
|
@Override
|
||||||
long startTime, String user,
|
public int hashCode() {
|
||||||
ApplicationSubmissionContext submissionContext, RMAppState state,
|
return getProto().hashCode();
|
||||||
String diagnostics, long finishTime) {
|
}
|
||||||
|
|
||||||
ApplicationStateData appState =
|
@Override
|
||||||
recordFactory.newRecordInstance(ApplicationStateData.class);
|
public boolean equals(Object other) {
|
||||||
appState.setSubmitTime(submitTime);
|
if (other == null)
|
||||||
appState.setStartTime(startTime);
|
return false;
|
||||||
appState.setUser(user);
|
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||||
appState.setApplicationSubmissionContext(submissionContext);
|
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||||
appState.setState(state);
|
}
|
||||||
appState.setDiagnostics(diagnostics);
|
return false;
|
||||||
appState.setFinishTime(finishTime);
|
}
|
||||||
return appState;
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return TextFormat.shortDebugString(getProto());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String RM_APP_PREFIX = "RMAPP_";
|
private static String RM_APP_PREFIX = "RMAPP_";
|
||||||
|
|
|
@ -84,8 +84,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.Appli
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
@ -612,7 +612,7 @@ public class TestRMRestart {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateApplicationStateInternal(ApplicationId appId,
|
public void updateApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateDataPBImpl appStateData) throws Exception {
|
ApplicationStateData appStateData) throws Exception {
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
// do nothing; simulate app final state is not saved.
|
// do nothing; simulate app final state is not saved.
|
||||||
LOG.info(appId + " final state is not saved.");
|
LOG.info(appId + " final state is not saved.");
|
||||||
|
@ -760,14 +760,14 @@ public class TestRMRestart {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void storeApplicationAttemptStateInternal(
|
public synchronized void storeApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId attemptId,
|
ApplicationAttemptId attemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
|
ApplicationAttemptStateData attemptStateData) throws Exception {
|
||||||
// ignore attempt saving request.
|
// ignore attempt saving request.
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void updateApplicationAttemptStateInternal(
|
public synchronized void updateApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId attemptId,
|
ApplicationAttemptId attemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
|
ApplicationAttemptStateData attemptStateData) throws Exception {
|
||||||
// ignore attempt saving request.
|
// ignore attempt saving request.
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -1862,7 +1862,7 @@ public class TestRMRestart {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateApplicationStateInternal(ApplicationId appId,
|
public void updateApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateDataPBImpl appStateData) throws Exception {
|
ApplicationStateData appStateData) throws Exception {
|
||||||
updateApp = ++count;
|
updateApp = ++count;
|
||||||
super.updateApplicationStateInternal(appId, appStateData);
|
super.updateApplicationStateInternal(appId, appStateData);
|
||||||
}
|
}
|
||||||
|
@ -1871,7 +1871,7 @@ public class TestRMRestart {
|
||||||
public synchronized void
|
public synchronized void
|
||||||
updateApplicationAttemptStateInternal(
|
updateApplicationAttemptStateInternal(
|
||||||
ApplicationAttemptId attemptId,
|
ApplicationAttemptId attemptId,
|
||||||
ApplicationAttemptStateDataPBImpl attemptStateData)
|
ApplicationAttemptStateData attemptStateData)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
updateAttempt = ++count;
|
updateAttempt = ++count;
|
||||||
super.updateApplicationAttemptStateInternal(attemptId,
|
super.updateApplicationAttemptStateInternal(attemptId,
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.io.IOException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -37,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
||||||
|
@ -213,9 +213,8 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
try {
|
try {
|
||||||
store.storeApplicationStateInternal(
|
store.storeApplicationStateInternal(
|
||||||
ApplicationId.newInstance(100L, 1),
|
ApplicationId.newInstance(100L, 1),
|
||||||
(ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
|
ApplicationStateData.newInstance(111, 111, "user", null,
|
||||||
.newApplicationStateData(111, 111, "user", null,
|
RMAppState.ACCEPTED, "diagnostics", 333));
|
||||||
RMAppState.ACCEPTED, "diagnostics", 333));
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// TODO 0 datanode exception will not be retried by dfs client, fix
|
// TODO 0 datanode exception will not be retried by dfs client, fix
|
||||||
// that separately.
|
// that separately.
|
||||||
|
|
Loading…
Reference in New Issue