Merge r1601491 from trunk. YARN-2030. Augmented RMStateStore with state machine. Contributed by Binglin Chang

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1601492 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-06-09 19:46:48 +00:00
parent d156c785a6
commit e6a03e2fc0
12 changed files with 355 additions and 219 deletions

View File

@ -134,6 +134,8 @@ Release 2.5.0 - UNRELEASED
YARN-2132. ZKRMStateStore.ZKAction#runWithRetries doesn't log the exception YARN-2132. ZKRMStateStore.ZKAction#runWithRetries doesn't log the exception
it encounters. (Vamsee Yarlagadda via kasha) it encounters. (Vamsee Yarlagadda via kasha)
YARN-2030. Augmented RMStateStore with state machine. (Binglin Chang via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Appli
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@ -314,7 +316,7 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override @Override
public synchronized void storeApplicationStateInternal(ApplicationId appId, public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception { ApplicationStateData appStateDataPB) throws Exception {
String appIdStr = appId.toString(); String appIdStr = appId.toString();
Path appDirPath = getAppDir(rmAppRoot, appIdStr); Path appDirPath = getAppDir(rmAppRoot, appIdStr);
fs.mkdirs(appDirPath); fs.mkdirs(appDirPath);
@ -334,7 +336,7 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override @Override
public synchronized void updateApplicationStateInternal(ApplicationId appId, public synchronized void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception { ApplicationStateData appStateDataPB) throws Exception {
String appIdStr = appId.toString(); String appIdStr = appId.toString();
Path appDirPath = getAppDir(rmAppRoot, appIdStr); Path appDirPath = getAppDir(rmAppRoot, appIdStr);
Path nodeCreatePath = getNodePath(appDirPath, appIdStr); Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
@ -354,7 +356,7 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override @Override
public synchronized void storeApplicationAttemptStateInternal( public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId, ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB) ApplicationAttemptStateData attemptStateDataPB)
throws Exception { throws Exception {
Path appDirPath = Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
@ -375,7 +377,7 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override @Override
public synchronized void updateApplicationAttemptStateInternal( public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId, ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB) ApplicationAttemptStateData attemptStateDataPB)
throws Exception { throws Exception {
Path appDirPath = Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());

View File

@ -32,9 +32,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -80,7 +80,7 @@ public class MemoryRMStateStore extends RMStateStore {
@Override @Override
public void storeApplicationStateInternal(ApplicationId appId, public void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) ApplicationStateData appStateData)
throws Exception { throws Exception {
ApplicationState appState = ApplicationState appState =
new ApplicationState(appStateData.getSubmitTime(), new ApplicationState(appStateData.getSubmitTime(),
@ -92,7 +92,7 @@ public class MemoryRMStateStore extends RMStateStore {
@Override @Override
public void updateApplicationStateInternal(ApplicationId appId, public void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception { ApplicationStateData appStateData) throws Exception {
ApplicationState updatedAppState = ApplicationState updatedAppState =
new ApplicationState(appStateData.getSubmitTime(), new ApplicationState(appStateData.getSubmitTime(),
appStateData.getStartTime(), appStateData.getStartTime(),
@ -112,7 +112,7 @@ public class MemoryRMStateStore extends RMStateStore {
@Override @Override
public synchronized void storeApplicationAttemptStateInternal( public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId, ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) ApplicationAttemptStateData attemptStateData)
throws Exception { throws Exception {
Credentials credentials = null; Credentials credentials = null;
if(attemptStateData.getAppAttemptTokens() != null){ if(attemptStateData.getAppAttemptTokens() != null){
@ -137,7 +137,7 @@ public class MemoryRMStateStore extends RMStateStore {
@Override @Override
public synchronized void updateApplicationAttemptStateInternal( public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId, ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) ApplicationAttemptStateData attemptStateData)
throws Exception { throws Exception {
Credentials credentials = null; Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) { if (attemptStateData.getAppAttemptTokens() != null) {

View File

@ -25,9 +25,9 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@Unstable @Unstable
public class NullRMStateStore extends RMStateStore { public class NullRMStateStore extends RMStateStore {
@ -54,13 +54,13 @@ public class NullRMStateStore extends RMStateStore {
@Override @Override
protected void storeApplicationStateInternal(ApplicationId appId, protected void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception { ApplicationStateData appStateData) throws Exception {
// Do nothing // Do nothing
} }
@Override @Override
protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId, protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { ApplicationAttemptStateData attemptStateData) throws Exception {
// Do nothing // Do nothing
} }
@ -102,13 +102,13 @@ public class NullRMStateStore extends RMStateStore {
@Override @Override
protected void updateApplicationStateInternal(ApplicationId appId, protected void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception { ApplicationStateData appStateData) throws Exception {
// Do nothing // Do nothing
} }
@Override @Override
protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId, protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { ApplicationAttemptStateData attemptStateData) throws Exception {
} }
@Override @Override

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery; package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
@ -31,7 +30,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -50,6 +48,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@ -61,6 +61,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@Private @Private
@Unstable @Unstable
@ -83,8 +87,163 @@ public abstract class RMStateStore extends AbstractService {
public static final Log LOG = LogFactory.getLog(RMStateStore.class); public static final Log LOG = LogFactory.getLog(RMStateStore.class);
private enum RMStateStoreState {
DEFAULT
};
private static final StateMachineFactory<RMStateStore,
RMStateStoreState,
RMStateStoreEventType,
RMStateStoreEvent>
stateMachineFactory = new StateMachineFactory<RMStateStore,
RMStateStoreState,
RMStateStoreEventType,
RMStateStoreEvent>(
RMStateStoreState.DEFAULT)
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
RMStateStoreEventType.STORE_APP, new StoreAppTransition())
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition());
private final StateMachine<RMStateStoreState,
RMStateStoreEventType,
RMStateStoreEvent> stateMachine;
private static class StoreAppTransition
implements SingleArcTransition<RMStateStore, RMStateStoreEvent> {
@Override
public void transition(RMStateStore store, RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreAppEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return;
}
ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState();
ApplicationId appId = appState.getAppId();
ApplicationStateData appStateData = ApplicationStateData
.newInstance(appState);
LOG.info("Storing info for app: " + appId);
try {
store.storeApplicationStateInternal(appId, appStateData);
store.notifyDoneStoringApplication(appId, null);
} catch (Exception e) {
LOG.error("Error storing app: " + appId, e);
store.notifyStoreOperationFailed(e);
}
};
}
private static class UpdateAppTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
@Override
public void transition(RMStateStore store, RMStateStoreEvent event) {
if (!(event instanceof RMStateUpdateAppEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return;
}
ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState();
ApplicationId appId = appState.getAppId();
ApplicationStateData appStateData = ApplicationStateData
.newInstance(appState);
LOG.info("Updating info for app: " + appId);
try {
store.updateApplicationStateInternal(appId, appStateData);
store.notifyDoneUpdatingApplication(appId, null);
} catch (Exception e) {
LOG.error("Error updating app: " + appId, e);
store.notifyStoreOperationFailed(e);
}
};
}
private static class RemoveAppTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
@Override
public void transition(RMStateStore store, RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRemoveAppEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return;
}
ApplicationState appState = ((RMStateStoreRemoveAppEvent) event)
.getAppState();
ApplicationId appId = appState.getAppId();
LOG.info("Removing info for app: " + appId);
try {
store.removeApplicationStateInternal(appState);
} catch (Exception e) {
LOG.error("Error removing app: " + appId, e);
store.notifyStoreOperationFailed(e);
}
};
}
private static class StoreAppAttemptTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
@Override
public void transition(RMStateStore store, RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreAppAttemptEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return;
}
ApplicationAttemptState attemptState =
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
try {
ApplicationAttemptStateData attemptStateData =
ApplicationAttemptStateData.newInstance(attemptState);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
}
store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptStateData);
store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
null);
} catch (Exception e) {
LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
store.notifyStoreOperationFailed(e);
}
};
}
private static class UpdateAppAttemptTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
@Override
public void transition(RMStateStore store, RMStateStoreEvent event) {
if (!(event instanceof RMStateUpdateAppAttemptEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return;
}
ApplicationAttemptState attemptState =
((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
try {
ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData
.newInstance(attemptState);
if (LOG.isDebugEnabled()) {
LOG.debug("Updating info for attempt: " + attemptState.getAttemptId());
}
store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptStateData);
store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
null);
} catch (Exception e) {
LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
store.notifyStoreOperationFailed(e);
}
};
}
public RMStateStore() { public RMStateStore() {
super(RMStateStore.class.getName()); super(RMStateStore.class.getName());
stateMachine = stateMachineFactory.make(this);
} }
/** /**
@ -390,10 +549,10 @@ public abstract class RMStateStore extends AbstractService {
* application. * application.
*/ */
protected abstract void storeApplicationStateInternal(ApplicationId appId, protected abstract void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception; ApplicationStateData appStateData) throws Exception;
protected abstract void updateApplicationStateInternal(ApplicationId appId, protected abstract void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception; ApplicationStateData appStateData) throws Exception;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
/** /**
@ -428,11 +587,11 @@ public abstract class RMStateStore extends AbstractService {
*/ */
protected abstract void storeApplicationAttemptStateInternal( protected abstract void storeApplicationAttemptStateInternal(
ApplicationAttemptId attemptId, ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; ApplicationAttemptStateData attemptStateData) throws Exception;
protected abstract void updateApplicationAttemptStateInternal( protected abstract void updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId, ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; ApplicationAttemptStateData attemptStateData) throws Exception;
/** /**
* RMDTSecretManager call this to store the state of a delegation token * RMDTSecretManager call this to store the state of a delegation token
@ -596,105 +755,10 @@ public abstract class RMStateStore extends AbstractService {
// Dispatcher related code // Dispatcher related code
protected void handleStoreEvent(RMStateStoreEvent event) { protected void handleStoreEvent(RMStateStoreEvent event) {
if (event.getType().equals(RMStateStoreEventType.STORE_APP)
|| event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
ApplicationState appState = null;
if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
appState = ((RMStateStoreAppEvent) event).getAppState();
} else {
assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
appState = ((RMStateUpdateAppEvent) event).getAppState();
}
Exception storedException = null;
ApplicationStateDataPBImpl appStateData =
(ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
.newApplicationStateData(appState.getSubmitTime(),
appState.getStartTime(), appState.getUser(),
appState.getApplicationSubmissionContext(), appState.getState(),
appState.getDiagnostics(), appState.getFinishTime());
ApplicationId appId =
appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Storing info for app: " + appId);
try { try {
if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { this.stateMachine.doTransition(event.getType(), event);
storeApplicationStateInternal(appId, appStateData); } catch (InvalidStateTransitonException e) {
notifyDoneStoringApplication(appId, storedException); LOG.error("Can't handle this event at current state", e);
} else {
assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
updateApplicationStateInternal(appId, appStateData);
notifyDoneUpdatingApplication(appId, storedException);
}
} catch (Exception e) {
LOG.error("Error storing/updating app: " + appId, e);
notifyStoreOperationFailed(e);
}
} else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)
|| event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) {
ApplicationAttemptState attemptState = null;
if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
attemptState =
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
} else {
assert event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT);
attemptState =
((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
}
Exception storedException = null;
Credentials credentials = attemptState.getAppAttemptCredentials();
ByteBuffer appAttemptTokens = null;
try {
if (credentials != null) {
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
ApplicationAttemptStateDataPBImpl attemptStateData =
(ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
.newApplicationAttemptStateData(attemptState.getAttemptId(),
attemptState.getMasterContainer(), appAttemptTokens,
attemptState.getStartTime(), attemptState.getState(),
attemptState.getFinalTrackingUrl(),
attemptState.getDiagnostics(),
attemptState.getFinalApplicationStatus());
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
}
if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptStateData);
notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
storedException);
} else {
assert event.getType().equals(
RMStateStoreEventType.UPDATE_APP_ATTEMPT);
updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptStateData);
notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
storedException);
}
} catch (Exception e) {
LOG.error(
"Error storing/updating appAttempt: " + attemptState.getAttemptId(), e);
notifyStoreOperationFailed(e);
}
} else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {
ApplicationState appState =
((RMStateStoreRemoveAppEvent) event).getAppState();
ApplicationId appId = appState.getAppId();
LOG.info("Removing info for app: " + appId);
try {
removeApplicationStateInternal(appState);
} catch (Exception e) {
LOG.error("Error removing app: " + appId, e);
notifyStoreOperationFailed(e);
}
} else {
LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
} }
} }

View File

@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Appli
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils; import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@ -551,7 +553,7 @@ public class ZKRMStateStore extends RMStateStore {
@Override @Override
public synchronized void storeApplicationStateInternal(ApplicationId appId, public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception { ApplicationStateData appStateDataPB) throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, appId.toString()); String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -565,7 +567,7 @@ public class ZKRMStateStore extends RMStateStore {
@Override @Override
public synchronized void updateApplicationStateInternal(ApplicationId appId, public synchronized void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception { ApplicationStateData appStateDataPB) throws Exception {
String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString()); String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -587,7 +589,7 @@ public class ZKRMStateStore extends RMStateStore {
@Override @Override
public synchronized void storeApplicationAttemptStateInternal( public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId, ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB) ApplicationAttemptStateData attemptStateDataPB)
throws Exception { throws Exception {
String appDirPath = getNodePath(rmAppRoot, String appDirPath = getNodePath(rmAppRoot,
appAttemptId.getApplicationId().toString()); appAttemptId.getApplicationId().toString());
@ -605,7 +607,7 @@ public class ZKRMStateStore extends RMStateStore {
@Override @Override
public synchronized void updateApplicationAttemptStateInternal( public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId, ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB) ApplicationAttemptStateData attemptStateDataPB)
throws Exception { throws Exception {
String appIdStr = appAttemptId.getApplicationId().toString(); String appIdStr = appAttemptId.getApplicationId().toString();
String appAttemptIdStr = appAttemptId.toString(); String appAttemptIdStr = appAttemptId.toString();

View File

@ -18,21 +18,63 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
/* /*
* Contains the state data that needs to be persisted for an ApplicationAttempt * Contains the state data that needs to be persisted for an ApplicationAttempt
*/ */
@Public @Public
@Unstable @Unstable
public interface ApplicationAttemptStateData { public abstract class ApplicationAttemptStateData {
public static ApplicationAttemptStateData newInstance(
ApplicationAttemptId attemptId, Container container,
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
String finalTrackingUrl, String diagnostics,
FinalApplicationStatus amUnregisteredFinalStatus) {
ApplicationAttemptStateData attemptStateData =
Records.newRecord(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId);
attemptStateData.setMasterContainer(container);
attemptStateData.setAppAttemptTokens(attemptTokens);
attemptStateData.setState(finalState);
attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
attemptStateData.setDiagnostics(diagnostics);
attemptStateData.setStartTime(startTime);
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
return attemptStateData;
}
public static ApplicationAttemptStateData newInstance(
ApplicationAttemptState attemptState) throws IOException {
Credentials credentials = attemptState.getAppAttemptCredentials();
ByteBuffer appAttemptTokens = null;
if (credentials != null) {
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
return newInstance(attemptState.getAttemptId(),
attemptState.getMasterContainer(), appAttemptTokens,
attemptState.getStartTime(), attemptState.getState(),
attemptState.getFinalTrackingUrl(),
attemptState.getDiagnostics(),
attemptState.getFinalApplicationStatus());
}
public abstract ApplicationAttemptStateDataProto getProto();
/** /**
* The ApplicationAttemptId for the application attempt * The ApplicationAttemptId for the application attempt
@ -40,9 +82,9 @@ public interface ApplicationAttemptStateData {
*/ */
@Public @Public
@Unstable @Unstable
public ApplicationAttemptId getAttemptId(); public abstract ApplicationAttemptId getAttemptId();
public void setAttemptId(ApplicationAttemptId attemptId); public abstract void setAttemptId(ApplicationAttemptId attemptId);
/* /*
* The master container running the application attempt * The master container running the application attempt
@ -50,9 +92,9 @@ public interface ApplicationAttemptStateData {
*/ */
@Public @Public
@Unstable @Unstable
public Container getMasterContainer(); public abstract Container getMasterContainer();
public void setMasterContainer(Container container); public abstract void setMasterContainer(Container container);
/** /**
* The application attempt tokens that belong to this attempt * The application attempt tokens that belong to this attempt
@ -60,17 +102,17 @@ public interface ApplicationAttemptStateData {
*/ */
@Public @Public
@Unstable @Unstable
public ByteBuffer getAppAttemptTokens(); public abstract ByteBuffer getAppAttemptTokens();
public void setAppAttemptTokens(ByteBuffer attemptTokens); public abstract void setAppAttemptTokens(ByteBuffer attemptTokens);
/** /**
* Get the final state of the application attempt. * Get the final state of the application attempt.
* @return the final state of the application attempt. * @return the final state of the application attempt.
*/ */
public RMAppAttemptState getState(); public abstract RMAppAttemptState getState();
public void setState(RMAppAttemptState state); public abstract void setState(RMAppAttemptState state);
/** /**
* Get the original not-proxied <em>final tracking url</em> for the * Get the original not-proxied <em>final tracking url</em> for the
@ -79,34 +121,34 @@ public interface ApplicationAttemptStateData {
* @return the original not-proxied <em>final tracking url</em> for the * @return the original not-proxied <em>final tracking url</em> for the
* application * application
*/ */
public String getFinalTrackingUrl(); public abstract String getFinalTrackingUrl();
/** /**
* Set the final tracking Url of the AM. * Set the final tracking Url of the AM.
* @param url * @param url
*/ */
public void setFinalTrackingUrl(String url); public abstract void setFinalTrackingUrl(String url);
/** /**
* Get the <em>diagnositic information</em> of the attempt * Get the <em>diagnositic information</em> of the attempt
* @return <em>diagnositic information</em> of the attempt * @return <em>diagnositic information</em> of the attempt
*/ */
public String getDiagnostics(); public abstract String getDiagnostics();
public void setDiagnostics(String diagnostics); public abstract void setDiagnostics(String diagnostics);
/** /**
* Get the <em>start time</em> of the application. * Get the <em>start time</em> of the application.
* @return <em>start time</em> of the application * @return <em>start time</em> of the application
*/ */
public long getStartTime(); public abstract long getStartTime();
public void setStartTime(long startTime); public abstract void setStartTime(long startTime);
/** /**
* Get the <em>final finish status</em> of the application. * Get the <em>final finish status</em> of the application.
* @return <em>final finish status</em> of the application * @return <em>final finish status</em> of the application
*/ */
public FinalApplicationStatus getFinalApplicationStatus(); public abstract FinalApplicationStatus getFinalApplicationStatus();
public void setFinalApplicationStatus(FinalApplicationStatus finishState); public abstract void setFinalApplicationStatus(FinalApplicationStatus finishState);
} }

View File

@ -24,7 +24,10 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.Records;
/** /**
* Contains all the state data that needs to be stored persistently * Contains all the state data that needs to be stored persistently
@ -32,7 +35,31 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
*/ */
@Public @Public
@Unstable @Unstable
public interface ApplicationStateData { public abstract class ApplicationStateData {
public static ApplicationStateData newInstance(long submitTime,
long startTime, String user,
ApplicationSubmissionContext submissionContext,
RMAppState state, String diagnostics, long finishTime) {
ApplicationStateData appState = Records.newRecord(ApplicationStateData.class);
appState.setSubmitTime(submitTime);
appState.setStartTime(startTime);
appState.setUser(user);
appState.setApplicationSubmissionContext(submissionContext);
appState.setState(state);
appState.setDiagnostics(diagnostics);
appState.setFinishTime(finishTime);
return appState;
}
public static ApplicationStateData newInstance(
ApplicationState appState) {
return newInstance(appState.getSubmitTime(), appState.getStartTime(),
appState.getUser(), appState.getApplicationSubmissionContext(),
appState.getState(), appState.getDiagnostics(),
appState.getFinishTime());
}
public abstract ApplicationStateDataProto getProto();
/** /**
* The time at which the application was received by the Resource Manager * The time at which the application was received by the Resource Manager
@ -40,11 +67,11 @@ public interface ApplicationStateData {
*/ */
@Public @Public
@Unstable @Unstable
public long getSubmitTime(); public abstract long getSubmitTime();
@Public @Public
@Unstable @Unstable
public void setSubmitTime(long submitTime); public abstract void setSubmitTime(long submitTime);
/** /**
* Get the <em>start time</em> of the application. * Get the <em>start time</em> of the application.
@ -63,11 +90,11 @@ public interface ApplicationStateData {
*/ */
@Public @Public
@Unstable @Unstable
public void setUser(String user); public abstract void setUser(String user);
@Public @Public
@Unstable @Unstable
public String getUser(); public abstract String getUser();
/** /**
* The {@link ApplicationSubmissionContext} for the application * The {@link ApplicationSubmissionContext} for the application
@ -76,34 +103,34 @@ public interface ApplicationStateData {
*/ */
@Public @Public
@Unstable @Unstable
public ApplicationSubmissionContext getApplicationSubmissionContext(); public abstract ApplicationSubmissionContext getApplicationSubmissionContext();
@Public @Public
@Unstable @Unstable
public void setApplicationSubmissionContext( public abstract void setApplicationSubmissionContext(
ApplicationSubmissionContext context); ApplicationSubmissionContext context);
/** /**
* Get the final state of the application. * Get the final state of the application.
* @return the final state of the application. * @return the final state of the application.
*/ */
public RMAppState getState(); public abstract RMAppState getState();
public void setState(RMAppState state); public abstract void setState(RMAppState state);
/** /**
* Get the diagnostics information for the application master. * Get the diagnostics information for the application master.
* @return the diagnostics information for the application master. * @return the diagnostics information for the application master.
*/ */
public String getDiagnostics(); public abstract String getDiagnostics();
public void setDiagnostics(String diagnostics); public abstract void setDiagnostics(String diagnostics);
/** /**
* The finish time of the application. * The finish time of the application.
* @return the finish time of the application., * @return the finish time of the application.,
*/ */
public long getFinishTime(); public abstract long getFinishTime();
public void setFinishTime(long finishTime); public abstract void setFinishTime(long finishTime);
} }

View File

@ -25,10 +25,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
@ -36,12 +33,10 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMApp
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
public class ApplicationAttemptStateDataPBImpl import com.google.protobuf.TextFormat;
extends ProtoBase<ApplicationAttemptStateDataProto>
implements ApplicationAttemptStateData {
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
public class ApplicationAttemptStateDataPBImpl extends
ApplicationAttemptStateData {
ApplicationAttemptStateDataProto proto = ApplicationAttemptStateDataProto proto =
ApplicationAttemptStateDataProto.getDefaultInstance(); ApplicationAttemptStateDataProto.getDefaultInstance();
ApplicationAttemptStateDataProto.Builder builder = null; ApplicationAttemptStateDataProto.Builder builder = null;
@ -61,6 +56,7 @@ implements ApplicationAttemptStateData {
viaProto = true; viaProto = true;
} }
@Override
public ApplicationAttemptStateDataProto getProto() { public ApplicationAttemptStateDataProto getProto() {
mergeLocalToProto(); mergeLocalToProto();
proto = viaProto ? proto : builder.build(); proto = viaProto ? proto : builder.build();
@ -76,7 +72,8 @@ implements ApplicationAttemptStateData {
builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
} }
if(this.appAttemptTokens != null) { if(this.appAttemptTokens != null) {
builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens)); builder.setAppAttemptTokens(ProtoUtils.convertToProtoFormat(
this.appAttemptTokens));
} }
} }
@ -148,7 +145,8 @@ implements ApplicationAttemptStateData {
if(!p.hasAppAttemptTokens()) { if(!p.hasAppAttemptTokens()) {
return null; return null;
} }
this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens()); this.appAttemptTokens = ProtoUtils.convertFromProtoFormat(
p.getAppAttemptTokens());
return appAttemptTokens; return appAttemptTokens;
} }
@ -249,22 +247,24 @@ implements ApplicationAttemptStateData {
builder.setFinalApplicationStatus(convertToProtoFormat(finishState)); builder.setFinalApplicationStatus(convertToProtoFormat(finishState));
} }
public static ApplicationAttemptStateData newApplicationAttemptStateData( @Override
ApplicationAttemptId attemptId, Container container, public int hashCode() {
ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, return getProto().hashCode();
String finalTrackingUrl, String diagnostics, }
FinalApplicationStatus amUnregisteredFinalStatus) {
ApplicationAttemptStateData attemptStateData = @Override
recordFactory.newRecordInstance(ApplicationAttemptStateData.class); public boolean equals(Object other) {
attemptStateData.setAttemptId(attemptId); if (other == null)
attemptStateData.setMasterContainer(container); return false;
attemptStateData.setAppAttemptTokens(attemptTokens); if (other.getClass().isAssignableFrom(this.getClass())) {
attemptStateData.setState(finalState); return this.getProto().equals(this.getClass().cast(other).getProto());
attemptStateData.setFinalTrackingUrl(finalTrackingUrl); }
attemptStateData.setDiagnostics(diagnostics); return false;
attemptStateData.setStartTime(startTime); }
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
return attemptStateData; @Override
public String toString() {
return TextFormat.shortDebugString(getProto());
} }
private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_"; private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_";

View File

@ -20,21 +20,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
public class ApplicationStateDataPBImpl import com.google.protobuf.TextFormat;
extends ProtoBase<ApplicationStateDataProto>
implements ApplicationStateData {
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
public class ApplicationStateDataPBImpl extends ApplicationStateData {
ApplicationStateDataProto proto = ApplicationStateDataProto proto =
ApplicationStateDataProto.getDefaultInstance(); ApplicationStateDataProto.getDefaultInstance();
ApplicationStateDataProto.Builder builder = null; ApplicationStateDataProto.Builder builder = null;
@ -52,6 +46,7 @@ implements ApplicationStateData {
viaProto = true; viaProto = true;
} }
@Override
public ApplicationStateDataProto getProto() { public ApplicationStateDataProto getProto() {
mergeLocalToProto(); mergeLocalToProto();
proto = viaProto ? proto : builder.build(); proto = viaProto ? proto : builder.build();
@ -200,21 +195,24 @@ implements ApplicationStateData {
builder.setFinishTime(finishTime); builder.setFinishTime(finishTime);
} }
public static ApplicationStateData newApplicationStateData(long submitTime, @Override
long startTime, String user, public int hashCode() {
ApplicationSubmissionContext submissionContext, RMAppState state, return getProto().hashCode();
String diagnostics, long finishTime) { }
ApplicationStateData appState = @Override
recordFactory.newRecordInstance(ApplicationStateData.class); public boolean equals(Object other) {
appState.setSubmitTime(submitTime); if (other == null)
appState.setStartTime(startTime); return false;
appState.setUser(user); if (other.getClass().isAssignableFrom(this.getClass())) {
appState.setApplicationSubmissionContext(submissionContext); return this.getProto().equals(this.getClass().cast(other).getProto());
appState.setState(state); }
appState.setDiagnostics(diagnostics); return false;
appState.setFinishTime(finishTime); }
return appState;
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
} }
private static String RM_APP_PREFIX = "RMAPP_"; private static String RM_APP_PREFIX = "RMAPP_";

View File

@ -84,8 +84,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.Appli
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -612,7 +612,7 @@ public class TestRMRestart {
@Override @Override
public void updateApplicationStateInternal(ApplicationId appId, public void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception { ApplicationStateData appStateData) throws Exception {
if (count == 0) { if (count == 0) {
// do nothing; simulate app final state is not saved. // do nothing; simulate app final state is not saved.
LOG.info(appId + " final state is not saved."); LOG.info(appId + " final state is not saved.");
@ -760,14 +760,14 @@ public class TestRMRestart {
@Override @Override
public synchronized void storeApplicationAttemptStateInternal( public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId attemptId, ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { ApplicationAttemptStateData attemptStateData) throws Exception {
// ignore attempt saving request. // ignore attempt saving request.
} }
@Override @Override
public synchronized void updateApplicationAttemptStateInternal( public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId, ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { ApplicationAttemptStateData attemptStateData) throws Exception {
// ignore attempt saving request. // ignore attempt saving request.
} }
}; };
@ -1862,7 +1862,7 @@ public class TestRMRestart {
@Override @Override
public void updateApplicationStateInternal(ApplicationId appId, public void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception { ApplicationStateData appStateData) throws Exception {
updateApp = ++count; updateApp = ++count;
super.updateApplicationStateInternal(appId, appStateData); super.updateApplicationStateInternal(appId, appStateData);
} }
@ -1871,7 +1871,7 @@ public class TestRMRestart {
public synchronized void public synchronized void
updateApplicationAttemptStateInternal( updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId, ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) ApplicationAttemptStateData attemptStateData)
throws Exception { throws Exception {
updateAttempt = ++count; updateAttempt = ++count;
super.updateApplicationAttemptStateInternal(attemptId, super.updateApplicationAttemptStateInternal(attemptId,

View File

@ -24,7 +24,6 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert; import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -37,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
@ -213,8 +213,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
try { try {
store.storeApplicationStateInternal( store.storeApplicationStateInternal(
ApplicationId.newInstance(100L, 1), ApplicationId.newInstance(100L, 1),
(ApplicationStateDataPBImpl) ApplicationStateDataPBImpl ApplicationStateData.newInstance(111, 111, "user", null,
.newApplicationStateData(111, 111, "user", null,
RMAppState.ACCEPTED, "diagnostics", 333)); RMAppState.ACCEPTED, "diagnostics", 333));
} catch (Exception e) { } catch (Exception e) {
// TODO 0 datanode exception will not be retried by dfs client, fix // TODO 0 datanode exception will not be retried by dfs client, fix