Merge r1615550 from trunk: YARN-1354. Recover applications upon nodemanager restart. (Contributed by Jason Lowe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1615554 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8e8d4717e9
commit
fc5bb235f2
|
@ -17,6 +17,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2181. Added preemption info to logs and RM web UI. (Wangda Tan via
|
YARN-2181. Added preemption info to logs and RM web UI. (Wangda Tan via
|
||||||
jianhe)
|
jianhe)
|
||||||
|
|
||||||
|
YARN-1354. Recover applications upon nodemanager restart. (Jason Lowe via
|
||||||
|
junping_du)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-2242. Improve exception information on AM launch crashes. (Li Lu
|
YARN-2242. Improve exception information on AM launch crashes. (Li Lu
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||||
|
|
||||||
import static org.apache.hadoop.service.Service.STATE.STARTED;
|
import static org.apache.hadoop.service.Service.STATE.STARTED;
|
||||||
|
|
||||||
|
import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
@ -42,6 +43,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||||
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
@ -71,6 +74,8 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.SerializedException;
|
import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
@ -81,6 +86,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||||
|
@ -119,11 +126,13 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
|
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
|
||||||
public class ContainerManagerImpl extends CompositeService implements
|
public class ContainerManagerImpl extends CompositeService implements
|
||||||
ServiceStateChangeListener, ContainerManagementProtocol,
|
ServiceStateChangeListener, ContainerManagementProtocol,
|
||||||
|
@ -224,14 +233,49 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
recover();
|
recover();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
private void recover() throws IOException, URISyntaxException {
|
private void recover() throws IOException, URISyntaxException {
|
||||||
NMStateStoreService stateStore = context.getNMStateStore();
|
NMStateStoreService stateStore = context.getNMStateStore();
|
||||||
if (stateStore.canRecover()) {
|
if (stateStore.canRecover()) {
|
||||||
rsrcLocalizationSrvc.recoverLocalizedResources(
|
rsrcLocalizationSrvc.recoverLocalizedResources(
|
||||||
stateStore.loadLocalizationState());
|
stateStore.loadLocalizationState());
|
||||||
|
|
||||||
|
RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
|
||||||
|
for (ContainerManagerApplicationProto proto :
|
||||||
|
appsState.getApplications()) {
|
||||||
|
recoverApplication(proto);
|
||||||
|
}
|
||||||
|
|
||||||
|
String diagnostic = "Application marked finished during recovery";
|
||||||
|
for (ApplicationId appId : appsState.getFinishedApplications()) {
|
||||||
|
dispatcher.getEventHandler().handle(
|
||||||
|
new ApplicationFinishEvent(appId, diagnostic));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void recoverApplication(ContainerManagerApplicationProto p)
|
||||||
|
throws IOException {
|
||||||
|
ApplicationId appId = new ApplicationIdPBImpl(p.getId());
|
||||||
|
Credentials creds = new Credentials();
|
||||||
|
creds.readTokenStorageStream(
|
||||||
|
new DataInputStream(p.getCredentials().newInput()));
|
||||||
|
|
||||||
|
List<ApplicationACLMapProto> aclProtoList = p.getAclsList();
|
||||||
|
Map<ApplicationAccessType, String> acls =
|
||||||
|
new HashMap<ApplicationAccessType, String>(aclProtoList.size());
|
||||||
|
for (ApplicationACLMapProto aclProto : aclProtoList) {
|
||||||
|
acls.put(ProtoUtils.convertFromProtoFormat(aclProto.getAccessType()),
|
||||||
|
aclProto.getAcl());
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Recovering application " + appId);
|
||||||
|
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
|
||||||
|
creds, context);
|
||||||
|
context.getApplications().put(appId, app);
|
||||||
|
app.handle(new ApplicationInitEvent(appId, acls));
|
||||||
|
}
|
||||||
|
|
||||||
protected LogHandler createLogHandler(Configuration conf, Context context,
|
protected LogHandler createLogHandler(Configuration conf, Context context,
|
||||||
DeletionService deletionService) {
|
DeletionService deletionService) {
|
||||||
if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||||
|
@ -358,6 +402,12 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
}
|
}
|
||||||
LOG.info("Applications still running : " + applications.keySet());
|
LOG.info("Applications still running : " + applications.keySet());
|
||||||
|
|
||||||
|
if (this.context.getNMStateStore().canRecover()
|
||||||
|
&& !this.context.getDecommissioned()) {
|
||||||
|
// do not cleanup apps as they can be recovered on restart
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
List<ApplicationId> appIds =
|
List<ApplicationId> appIds =
|
||||||
new ArrayList<ApplicationId>(applications.keySet());
|
new ArrayList<ApplicationId>(applications.keySet());
|
||||||
this.handle(
|
this.handle(
|
||||||
|
@ -567,6 +617,41 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
succeededContainers, failedContainers);
|
succeededContainers, failedContainers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
|
||||||
|
String user, Credentials credentials,
|
||||||
|
Map<ApplicationAccessType, String> appAcls) {
|
||||||
|
|
||||||
|
ContainerManagerApplicationProto.Builder builder =
|
||||||
|
ContainerManagerApplicationProto.newBuilder();
|
||||||
|
builder.setId(((ApplicationIdPBImpl) appId).getProto());
|
||||||
|
builder.setUser(user);
|
||||||
|
|
||||||
|
builder.clearCredentials();
|
||||||
|
if (credentials != null) {
|
||||||
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
try {
|
||||||
|
credentials.writeTokenStorageToStream(dob);
|
||||||
|
builder.setCredentials(ByteString.copyFrom(dob.getData()));
|
||||||
|
} catch (IOException e) {
|
||||||
|
// should not occur
|
||||||
|
LOG.error("Cannot serialize credentials", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
builder.clearAcls();
|
||||||
|
if (appAcls != null) {
|
||||||
|
for (Map.Entry<ApplicationAccessType, String> acl : appAcls.entrySet()) {
|
||||||
|
ApplicationACLMapProto p = ApplicationACLMapProto.newBuilder()
|
||||||
|
.setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey()))
|
||||||
|
.setAcl(acl.getValue())
|
||||||
|
.build();
|
||||||
|
builder.addAcls(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
|
private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
|
||||||
ContainerTokenIdentifier containerTokenIdentifier,
|
ContainerTokenIdentifier containerTokenIdentifier,
|
||||||
|
@ -640,10 +725,12 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
if (null == context.getApplications().putIfAbsent(applicationID,
|
if (null == context.getApplications().putIfAbsent(applicationID,
|
||||||
application)) {
|
application)) {
|
||||||
LOG.info("Creating a new application reference for app " + applicationID);
|
LOG.info("Creating a new application reference for app " + applicationID);
|
||||||
|
Map<ApplicationAccessType, String> appAcls =
|
||||||
|
container.getLaunchContext().getApplicationACLs();
|
||||||
|
context.getNMStateStore().storeApplication(applicationID,
|
||||||
|
buildAppProto(applicationID, user, credentials, appAcls));
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
new ApplicationInitEvent(applicationID, container.getLaunchContext()
|
new ApplicationInitEvent(applicationID, appAcls));
|
||||||
.getApplicationACLs()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
|
@ -894,6 +981,11 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
} else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
|
} else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
|
||||||
diagnostic = "Application killed by ResourceManager";
|
diagnostic = "Application killed by ResourceManager";
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
this.context.getNMStateStore().storeFinishedApplication(appID);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to update application state in store", e);
|
||||||
|
}
|
||||||
this.dispatcher.getEventHandler().handle(
|
this.dispatcher.getEventHandler().handle(
|
||||||
new ApplicationFinishEvent(appID,
|
new ApplicationFinishEvent(appID,
|
||||||
diagnostic));
|
diagnostic));
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -428,6 +429,11 @@ public class ApplicationImpl implements Application {
|
||||||
ApplicationId appId = event.getApplicationID();
|
ApplicationId appId = event.getApplicationID();
|
||||||
app.context.getApplications().remove(appId);
|
app.context.getApplications().remove(appId);
|
||||||
app.aclsManager.removeApplication(appId);
|
app.aclsManager.removeApplication(appId);
|
||||||
|
try {
|
||||||
|
app.context.getNMStateStore().removeApplication(appId);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to remove application from state store", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
|
@ -74,6 +75,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
private static final String DELETION_TASK_KEY_PREFIX =
|
private static final String DELETION_TASK_KEY_PREFIX =
|
||||||
"DeletionService/deltask_";
|
"DeletionService/deltask_";
|
||||||
|
|
||||||
|
private static final String APPLICATIONS_KEY_PREFIX =
|
||||||
|
"ContainerManager/applications/";
|
||||||
|
private static final String FINISHED_APPS_KEY_PREFIX =
|
||||||
|
"ContainerManager/finishedApps/";
|
||||||
|
|
||||||
private static final String LOCALIZATION_KEY_PREFIX = "Localization/";
|
private static final String LOCALIZATION_KEY_PREFIX = "Localization/";
|
||||||
private static final String LOCALIZATION_PUBLIC_KEY_PREFIX =
|
private static final String LOCALIZATION_PUBLIC_KEY_PREFIX =
|
||||||
LOCALIZATION_KEY_PREFIX + "public/";
|
LOCALIZATION_KEY_PREFIX + "public/";
|
||||||
|
@ -116,6 +122,92 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecoveredApplicationsState loadApplicationsState()
|
||||||
|
throws IOException {
|
||||||
|
RecoveredApplicationsState state = new RecoveredApplicationsState();
|
||||||
|
state.applications = new ArrayList<ContainerManagerApplicationProto>();
|
||||||
|
String keyPrefix = APPLICATIONS_KEY_PREFIX;
|
||||||
|
LeveldbIterator iter = null;
|
||||||
|
try {
|
||||||
|
iter = new LeveldbIterator(db);
|
||||||
|
iter.seek(bytes(keyPrefix));
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
Entry<byte[], byte[]> entry = iter.next();
|
||||||
|
String key = asString(entry.getKey());
|
||||||
|
if (!key.startsWith(keyPrefix)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
state.applications.add(
|
||||||
|
ContainerManagerApplicationProto.parseFrom(entry.getValue()));
|
||||||
|
}
|
||||||
|
|
||||||
|
state.finishedApplications = new ArrayList<ApplicationId>();
|
||||||
|
keyPrefix = FINISHED_APPS_KEY_PREFIX;
|
||||||
|
iter.seek(bytes(keyPrefix));
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
Entry<byte[], byte[]> entry = iter.next();
|
||||||
|
String key = asString(entry.getKey());
|
||||||
|
if (!key.startsWith(keyPrefix)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ApplicationId appId =
|
||||||
|
ConverterUtils.toApplicationId(key.substring(keyPrefix.length()));
|
||||||
|
state.finishedApplications.add(appId);
|
||||||
|
}
|
||||||
|
} catch (DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
} finally {
|
||||||
|
if (iter != null) {
|
||||||
|
iter.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeApplication(ApplicationId appId,
|
||||||
|
ContainerManagerApplicationProto p) throws IOException {
|
||||||
|
String key = APPLICATIONS_KEY_PREFIX + appId;
|
||||||
|
try {
|
||||||
|
db.put(bytes(key), p.toByteArray());
|
||||||
|
} catch (DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeFinishedApplication(ApplicationId appId)
|
||||||
|
throws IOException {
|
||||||
|
String key = FINISHED_APPS_KEY_PREFIX + appId;
|
||||||
|
try {
|
||||||
|
db.put(bytes(key), new byte[0]);
|
||||||
|
} catch (DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeApplication(ApplicationId appId)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
WriteBatch batch = db.createWriteBatch();
|
||||||
|
try {
|
||||||
|
String key = APPLICATIONS_KEY_PREFIX + appId;
|
||||||
|
batch.delete(bytes(key));
|
||||||
|
key = FINISHED_APPS_KEY_PREFIX + appId;
|
||||||
|
batch.delete(bytes(key));
|
||||||
|
db.write(batch);
|
||||||
|
} finally {
|
||||||
|
batch.close();
|
||||||
|
}
|
||||||
|
} catch (DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecoveredLocalizationState loadLocalizationState()
|
public RecoveredLocalizationState loadLocalizationState()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
|
@ -42,6 +43,25 @@ public class NMNullStateStoreService extends NMStateStoreService {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecoveredApplicationsState loadApplicationsState() throws IOException {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"Recovery not supported by this state store");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeApplication(ApplicationId appId,
|
||||||
|
ContainerManagerApplicationProto p) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeFinishedApplication(ApplicationId appId) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeApplication(ApplicationId appId) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecoveredLocalizationState loadLocalizationState()
|
public RecoveredLocalizationState loadLocalizationState()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
|
@ -45,6 +46,19 @@ public abstract class NMStateStoreService extends AbstractService {
|
||||||
super(name);
|
super(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class RecoveredApplicationsState {
|
||||||
|
List<ContainerManagerApplicationProto> applications;
|
||||||
|
List<ApplicationId> finishedApplications;
|
||||||
|
|
||||||
|
public List<ContainerManagerApplicationProto> getApplications() {
|
||||||
|
return applications;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<ApplicationId> getFinishedApplications() {
|
||||||
|
return finishedApplications;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class LocalResourceTrackerState {
|
public static class LocalResourceTrackerState {
|
||||||
List<LocalizedResourceProto> localizedResources =
|
List<LocalizedResourceProto> localizedResources =
|
||||||
new ArrayList<LocalizedResourceProto>();
|
new ArrayList<LocalizedResourceProto>();
|
||||||
|
@ -162,6 +176,19 @@ public abstract class NMStateStoreService extends AbstractService {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public abstract RecoveredApplicationsState loadApplicationsState()
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
public abstract void storeApplication(ApplicationId appId,
|
||||||
|
ContainerManagerApplicationProto p) throws IOException;
|
||||||
|
|
||||||
|
public abstract void storeFinishedApplication(ApplicationId appId)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
public abstract void removeApplication(ApplicationId appId)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load the state of localized resources
|
* Load the state of localized resources
|
||||||
* @return recovered localized resource state
|
* @return recovered localized resource state
|
||||||
|
|
|
@ -24,6 +24,13 @@ package hadoop.yarn;
|
||||||
|
|
||||||
import "yarn_protos.proto";
|
import "yarn_protos.proto";
|
||||||
|
|
||||||
|
message ContainerManagerApplicationProto {
|
||||||
|
optional ApplicationIdProto id = 1;
|
||||||
|
optional string user = 2;
|
||||||
|
optional bytes credentials = 3;
|
||||||
|
repeated ApplicationACLMapProto acls = 4;
|
||||||
|
}
|
||||||
|
|
||||||
message DeletionServiceDeleteTaskProto {
|
message DeletionServiceDeleteTaskProto {
|
||||||
optional int32 id = 1;
|
optional int32 id = 1;
|
||||||
optional string user = 2;
|
optional string user = 2;
|
||||||
|
|
|
@ -82,6 +82,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
|
@ -91,8 +93,6 @@ import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
public class TestNodeStatusUpdater {
|
public class TestNodeStatusUpdater {
|
||||||
|
|
|
@ -0,0 +1,323 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||||
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestContainerManagerRecovery {
|
||||||
|
|
||||||
|
private NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApplicationRecovery() throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
|
||||||
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||||
|
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
|
||||||
|
NMStateStoreService stateStore = new NMMemoryStateStoreService();
|
||||||
|
stateStore.init(conf);
|
||||||
|
stateStore.start();
|
||||||
|
Context context = new NMContext(new NMContainerTokenSecretManager(
|
||||||
|
conf), new NMTokenSecretManagerInNM(), null,
|
||||||
|
new ApplicationACLsManager(conf), stateStore);
|
||||||
|
ContainerManagerImpl cm = createContainerManager(context);
|
||||||
|
cm.init(conf);
|
||||||
|
cm.start();
|
||||||
|
|
||||||
|
// simulate registration with RM
|
||||||
|
MasterKey masterKey = new MasterKeyPBImpl();
|
||||||
|
masterKey.setKeyId(123);
|
||||||
|
masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
|
||||||
|
.byteValue() }));
|
||||||
|
context.getContainerTokenSecretManager().setMasterKey(masterKey);
|
||||||
|
context.getNMTokenSecretManager().setMasterKey(masterKey);
|
||||||
|
|
||||||
|
// add an application by starting a container
|
||||||
|
String appUser = "app_user1";
|
||||||
|
String modUser = "modify_user1";
|
||||||
|
String viewUser = "view_user1";
|
||||||
|
String enemyUser = "enemy_user";
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
ApplicationAttemptId attemptId =
|
||||||
|
ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
ContainerId cid = ContainerId.newInstance(attemptId, 1);
|
||||||
|
Map<String, LocalResource> localResources = Collections.emptyMap();
|
||||||
|
Map<String, String> containerEnv = Collections.emptyMap();
|
||||||
|
List<String> containerCmds = Collections.emptyList();
|
||||||
|
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
|
||||||
|
Credentials containerCreds = new Credentials();
|
||||||
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
containerCreds.writeTokenStorageToStream(dob);
|
||||||
|
ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
|
||||||
|
dob.getLength());
|
||||||
|
Map<ApplicationAccessType, String> acls =
|
||||||
|
new HashMap<ApplicationAccessType, String>();
|
||||||
|
acls.put(ApplicationAccessType.MODIFY_APP, modUser);
|
||||||
|
acls.put(ApplicationAccessType.VIEW_APP, viewUser);
|
||||||
|
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
|
||||||
|
localResources, containerEnv, containerCmds, serviceData,
|
||||||
|
containerTokens, acls);
|
||||||
|
StartContainersResponse startResponse = startContainer(context, cm, cid,
|
||||||
|
clc);
|
||||||
|
assertTrue(startResponse.getFailedRequests().isEmpty());
|
||||||
|
assertEquals(1, context.getApplications().size());
|
||||||
|
Application app = context.getApplications().get(appId);
|
||||||
|
assertNotNull(app);
|
||||||
|
waitForAppState(app, ApplicationState.INITING);
|
||||||
|
assertTrue(context.getApplicationACLsManager().checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser(modUser),
|
||||||
|
ApplicationAccessType.MODIFY_APP, appUser, appId));
|
||||||
|
assertFalse(context.getApplicationACLsManager().checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser(viewUser),
|
||||||
|
ApplicationAccessType.MODIFY_APP, appUser, appId));
|
||||||
|
assertTrue(context.getApplicationACLsManager().checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser(viewUser),
|
||||||
|
ApplicationAccessType.VIEW_APP, appUser, appId));
|
||||||
|
assertFalse(context.getApplicationACLsManager().checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser(enemyUser),
|
||||||
|
ApplicationAccessType.VIEW_APP, appUser, appId));
|
||||||
|
|
||||||
|
// reset container manager and verify app recovered with proper acls
|
||||||
|
cm.stop();
|
||||||
|
context = new NMContext(new NMContainerTokenSecretManager(
|
||||||
|
conf), new NMTokenSecretManagerInNM(), null,
|
||||||
|
new ApplicationACLsManager(conf), stateStore);
|
||||||
|
cm = createContainerManager(context);
|
||||||
|
cm.init(conf);
|
||||||
|
cm.start();
|
||||||
|
assertEquals(1, context.getApplications().size());
|
||||||
|
app = context.getApplications().get(appId);
|
||||||
|
assertNotNull(app);
|
||||||
|
waitForAppState(app, ApplicationState.INITING);
|
||||||
|
assertTrue(context.getApplicationACLsManager().checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser(modUser),
|
||||||
|
ApplicationAccessType.MODIFY_APP, appUser, appId));
|
||||||
|
assertFalse(context.getApplicationACLsManager().checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser(viewUser),
|
||||||
|
ApplicationAccessType.MODIFY_APP, appUser, appId));
|
||||||
|
assertTrue(context.getApplicationACLsManager().checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser(viewUser),
|
||||||
|
ApplicationAccessType.VIEW_APP, appUser, appId));
|
||||||
|
assertFalse(context.getApplicationACLsManager().checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser(enemyUser),
|
||||||
|
ApplicationAccessType.VIEW_APP, appUser, appId));
|
||||||
|
|
||||||
|
// simulate application completion
|
||||||
|
List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
|
||||||
|
finishedApps.add(appId);
|
||||||
|
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
|
||||||
|
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
|
||||||
|
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
||||||
|
|
||||||
|
// restart and verify app is marked for finishing
|
||||||
|
cm.stop();
|
||||||
|
context = new NMContext(new NMContainerTokenSecretManager(
|
||||||
|
conf), new NMTokenSecretManagerInNM(), null,
|
||||||
|
new ApplicationACLsManager(conf), stateStore);
|
||||||
|
cm = createContainerManager(context);
|
||||||
|
cm.init(conf);
|
||||||
|
cm.start();
|
||||||
|
assertEquals(1, context.getApplications().size());
|
||||||
|
app = context.getApplications().get(appId);
|
||||||
|
assertNotNull(app);
|
||||||
|
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
|
||||||
|
assertTrue(context.getApplicationACLsManager().checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser(modUser),
|
||||||
|
ApplicationAccessType.MODIFY_APP, appUser, appId));
|
||||||
|
assertFalse(context.getApplicationACLsManager().checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser(viewUser),
|
||||||
|
ApplicationAccessType.MODIFY_APP, appUser, appId));
|
||||||
|
assertTrue(context.getApplicationACLsManager().checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser(viewUser),
|
||||||
|
ApplicationAccessType.VIEW_APP, appUser, appId));
|
||||||
|
assertFalse(context.getApplicationACLsManager().checkAccess(
|
||||||
|
UserGroupInformation.createRemoteUser(enemyUser),
|
||||||
|
ApplicationAccessType.VIEW_APP, appUser, appId));
|
||||||
|
|
||||||
|
// simulate log aggregation completion
|
||||||
|
app.handle(new ApplicationEvent(app.getAppId(),
|
||||||
|
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
|
||||||
|
assertEquals(app.getApplicationState(), ApplicationState.FINISHED);
|
||||||
|
app.handle(new ApplicationEvent(app.getAppId(),
|
||||||
|
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
|
||||||
|
|
||||||
|
// restart and verify app is no longer present after recovery
|
||||||
|
cm.stop();
|
||||||
|
context = new NMContext(new NMContainerTokenSecretManager(
|
||||||
|
conf), new NMTokenSecretManagerInNM(), null,
|
||||||
|
new ApplicationACLsManager(conf), stateStore);
|
||||||
|
cm = createContainerManager(context);
|
||||||
|
cm.init(conf);
|
||||||
|
cm.start();
|
||||||
|
assertTrue(context.getApplications().isEmpty());
|
||||||
|
cm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private StartContainersResponse startContainer(Context context,
|
||||||
|
final ContainerManagerImpl cm, ContainerId cid,
|
||||||
|
ContainerLaunchContext clc) throws Exception {
|
||||||
|
UserGroupInformation user = UserGroupInformation.createRemoteUser(
|
||||||
|
cid.getApplicationAttemptId().toString());
|
||||||
|
StartContainerRequest scReq = StartContainerRequest.newInstance(
|
||||||
|
clc, TestContainerManager.createContainerToken(cid, 0,
|
||||||
|
context.getNodeId(), user.getShortUserName(),
|
||||||
|
context.getContainerTokenSecretManager()));
|
||||||
|
final List<StartContainerRequest> scReqList =
|
||||||
|
new ArrayList<StartContainerRequest>();
|
||||||
|
scReqList.add(scReq);
|
||||||
|
NMTokenIdentifier nmToken = new NMTokenIdentifier(
|
||||||
|
cid.getApplicationAttemptId(), context.getNodeId(),
|
||||||
|
user.getShortUserName(),
|
||||||
|
context.getNMTokenSecretManager().getCurrentKey().getKeyId());
|
||||||
|
user.addTokenIdentifier(nmToken);
|
||||||
|
return user.doAs(new PrivilegedExceptionAction<StartContainersResponse>() {
|
||||||
|
@Override
|
||||||
|
public StartContainersResponse run() throws Exception {
|
||||||
|
return cm.startContainers(
|
||||||
|
StartContainersRequest.newInstance(scReqList));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForAppState(Application app, ApplicationState state)
|
||||||
|
throws Exception {
|
||||||
|
final int msecPerSleep = 10;
|
||||||
|
int msecLeft = 5000;
|
||||||
|
while (app.getApplicationState() != state && msecLeft > 0) {
|
||||||
|
Thread.sleep(msecPerSleep);
|
||||||
|
msecLeft -= msecPerSleep;
|
||||||
|
}
|
||||||
|
assertEquals(state, app.getApplicationState());
|
||||||
|
}
|
||||||
|
|
||||||
|
private ContainerManagerImpl createContainerManager(Context context) {
|
||||||
|
final LogHandler logHandler = mock(LogHandler.class);
|
||||||
|
final ResourceLocalizationService rsrcSrv =
|
||||||
|
new ResourceLocalizationService(null, null, null, null,
|
||||||
|
context.getNMStateStore()) {
|
||||||
|
@Override
|
||||||
|
public void serviceInit(Configuration conf) throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serviceStart() throws Exception {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serviceStop() throws Exception {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(LocalizationEvent event) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
final ContainersLauncher launcher = new ContainersLauncher(context, null,
|
||||||
|
null, null, null) {
|
||||||
|
@Override
|
||||||
|
public void handle(ContainersLauncherEvent event) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return new ContainerManagerImpl(context,
|
||||||
|
mock(ContainerExecutor.class), mock(DeletionService.class),
|
||||||
|
mock(NodeStatusUpdater.class), metrics,
|
||||||
|
context.getApplicationACLsManager(), null) {
|
||||||
|
@Override
|
||||||
|
protected LogHandler createLogHandler(Configuration conf,
|
||||||
|
Context context, DeletionService deletionService) {
|
||||||
|
return logHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ResourceLocalizationService createResourceLocalizationService(
|
||||||
|
ContainerExecutor exec, DeletionService deletionContext) {
|
||||||
|
return rsrcSrv;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ContainersLauncher createContainersLauncher(
|
||||||
|
Context context, ContainerExecutor exec) {
|
||||||
|
return launcher;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setBlockNewContainerRequests(
|
||||||
|
boolean blockNewContainerRequests) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
|
@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -722,6 +723,8 @@ public class TestContainer {
|
||||||
Context context = mock(Context.class);
|
Context context = mock(Context.class);
|
||||||
when(context.getApplications()).thenReturn(
|
when(context.getApplications()).thenReturn(
|
||||||
new ConcurrentHashMap<ApplicationId, Application>());
|
new ConcurrentHashMap<ApplicationId, Application>());
|
||||||
|
NMNullStateStoreService stateStore = new NMNullStateStoreService();
|
||||||
|
when(context.getNMStateStore()).thenReturn(stateStore);
|
||||||
ContainerExecutor executor = mock(ContainerExecutor.class);
|
ContainerExecutor executor = mock(ContainerExecutor.class);
|
||||||
launcher =
|
launcher =
|
||||||
new ContainersLauncher(context, dispatcher, executor, null, null);
|
new ContainersLauncher(context, dispatcher, executor, null, null);
|
||||||
|
|
|
@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -29,12 +31,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
|
|
||||||
public class NMMemoryStateStoreService extends NMStateStoreService {
|
public class NMMemoryStateStoreService extends NMStateStoreService {
|
||||||
|
private Map<ApplicationId, ContainerManagerApplicationProto> apps;
|
||||||
|
private Set<ApplicationId> finishedApps;
|
||||||
private Map<TrackerKey, TrackerState> trackerStates;
|
private Map<TrackerKey, TrackerState> trackerStates;
|
||||||
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
|
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
|
||||||
private RecoveredNMTokensState nmTokenState;
|
private RecoveredNMTokensState nmTokenState;
|
||||||
|
@ -44,6 +49,58 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
||||||
super(NMMemoryStateStoreService.class.getName());
|
super(NMMemoryStateStoreService.class.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void initStorage(Configuration conf) {
|
||||||
|
apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
|
||||||
|
finishedApps = new HashSet<ApplicationId>();
|
||||||
|
nmTokenState = new RecoveredNMTokensState();
|
||||||
|
nmTokenState.applicationMasterKeys =
|
||||||
|
new HashMap<ApplicationAttemptId, MasterKey>();
|
||||||
|
containerTokenState = new RecoveredContainerTokensState();
|
||||||
|
containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
|
||||||
|
trackerStates = new HashMap<TrackerKey, TrackerState>();
|
||||||
|
deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void startStorage() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void closeStorage() {
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecoveredApplicationsState loadApplicationsState()
|
||||||
|
throws IOException {
|
||||||
|
RecoveredApplicationsState state = new RecoveredApplicationsState();
|
||||||
|
state.applications = new ArrayList<ContainerManagerApplicationProto>(
|
||||||
|
apps.values());
|
||||||
|
state.finishedApplications = new ArrayList<ApplicationId>(finishedApps);
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeApplication(ApplicationId appId,
|
||||||
|
ContainerManagerApplicationProto proto) throws IOException {
|
||||||
|
ContainerManagerApplicationProto protoCopy =
|
||||||
|
ContainerManagerApplicationProto.parseFrom(proto.toByteString());
|
||||||
|
apps.put(appId, protoCopy);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeFinishedApplication(ApplicationId appId) {
|
||||||
|
finishedApps.add(appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeApplication(ApplicationId appId) throws IOException {
|
||||||
|
apps.remove(appId);
|
||||||
|
finishedApps.remove(appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private LocalResourceTrackerState loadTrackerState(TrackerState ts) {
|
private LocalResourceTrackerState loadTrackerState(TrackerState ts) {
|
||||||
LocalResourceTrackerState result = new LocalResourceTrackerState();
|
LocalResourceTrackerState result = new LocalResourceTrackerState();
|
||||||
result.localizedResources.addAll(ts.localizedResources.values());
|
result.localizedResources.addAll(ts.localizedResources.values());
|
||||||
|
@ -117,25 +174,6 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void initStorage(Configuration conf) {
|
|
||||||
nmTokenState = new RecoveredNMTokensState();
|
|
||||||
nmTokenState.applicationMasterKeys =
|
|
||||||
new HashMap<ApplicationAttemptId, MasterKey>();
|
|
||||||
containerTokenState = new RecoveredContainerTokensState();
|
|
||||||
containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
|
|
||||||
trackerStates = new HashMap<TrackerKey, TrackerState>();
|
|
||||||
deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void startStorage() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void closeStorage() {
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecoveredDeletionServiceState loadDeletionServiceState()
|
public RecoveredDeletionServiceState loadDeletionServiceState()
|
||||||
|
|
|
@ -37,13 +37,16 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
|
||||||
|
@ -141,6 +144,54 @@ public class TestNMLeveldbStateStoreService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApplicationStorage() throws IOException {
|
||||||
|
// test empty when no state
|
||||||
|
RecoveredApplicationsState state = stateStore.loadApplicationsState();
|
||||||
|
assertTrue(state.getApplications().isEmpty());
|
||||||
|
assertTrue(state.getFinishedApplications().isEmpty());
|
||||||
|
|
||||||
|
// store an application and verify recovered
|
||||||
|
final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
|
||||||
|
ContainerManagerApplicationProto.Builder builder =
|
||||||
|
ContainerManagerApplicationProto.newBuilder();
|
||||||
|
builder.setId(((ApplicationIdPBImpl) appId1).getProto());
|
||||||
|
builder.setUser("user1");
|
||||||
|
ContainerManagerApplicationProto appProto1 = builder.build();
|
||||||
|
stateStore.storeApplication(appId1, appProto1);
|
||||||
|
restartStateStore();
|
||||||
|
state = stateStore.loadApplicationsState();
|
||||||
|
assertEquals(1, state.getApplications().size());
|
||||||
|
assertEquals(appProto1, state.getApplications().get(0));
|
||||||
|
assertTrue(state.getFinishedApplications().isEmpty());
|
||||||
|
|
||||||
|
// finish an application and add a new one
|
||||||
|
stateStore.storeFinishedApplication(appId1);
|
||||||
|
final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
|
||||||
|
builder = ContainerManagerApplicationProto.newBuilder();
|
||||||
|
builder.setId(((ApplicationIdPBImpl) appId2).getProto());
|
||||||
|
builder.setUser("user2");
|
||||||
|
ContainerManagerApplicationProto appProto2 = builder.build();
|
||||||
|
stateStore.storeApplication(appId2, appProto2);
|
||||||
|
restartStateStore();
|
||||||
|
state = stateStore.loadApplicationsState();
|
||||||
|
assertEquals(2, state.getApplications().size());
|
||||||
|
assertTrue(state.getApplications().contains(appProto1));
|
||||||
|
assertTrue(state.getApplications().contains(appProto2));
|
||||||
|
assertEquals(1, state.getFinishedApplications().size());
|
||||||
|
assertEquals(appId1, state.getFinishedApplications().get(0));
|
||||||
|
|
||||||
|
// test removing an application
|
||||||
|
stateStore.storeFinishedApplication(appId2);
|
||||||
|
stateStore.removeApplication(appId2);
|
||||||
|
restartStateStore();
|
||||||
|
state = stateStore.loadApplicationsState();
|
||||||
|
assertEquals(1, state.getApplications().size());
|
||||||
|
assertEquals(appProto1, state.getApplications().get(0));
|
||||||
|
assertEquals(1, state.getFinishedApplications().size());
|
||||||
|
assertEquals(appId1, state.getFinishedApplications().get(0));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStartResourceLocalization() throws IOException {
|
public void testStartResourceLocalization() throws IOException {
|
||||||
String user = "somebody";
|
String user = "somebody";
|
||||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
|
@ -312,7 +311,8 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
LOG.info("Reconnect from the node at: " + host);
|
LOG.info("Reconnect from the node at: " + host);
|
||||||
this.nmLivelinessMonitor.unregister(nodeId);
|
this.nmLivelinessMonitor.unregister(nodeId);
|
||||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
new RMNodeReconnectEvent(nodeId, rmNode));
|
new RMNodeReconnectEvent(nodeId, rmNode,
|
||||||
|
request.getRunningApplications()));
|
||||||
}
|
}
|
||||||
// On every node manager register we will be clearing NMToken keys if
|
// On every node manager register we will be clearing NMToken keys if
|
||||||
// present for any running application.
|
// present for any running application.
|
||||||
|
|
|
@ -1191,6 +1191,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
|
|
||||||
public static boolean isAppInFinalState(RMApp rmApp) {
|
public static boolean isAppInFinalState(RMApp rmApp) {
|
||||||
RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState();
|
RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState();
|
||||||
|
if (appState == null) {
|
||||||
|
appState = rmApp.getState();
|
||||||
|
}
|
||||||
return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
|
return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
|
||||||
|| appState == RMAppState.KILLED;
|
|| appState == RMAppState.KILLED;
|
||||||
}
|
}
|
||||||
|
|
|
@ -456,6 +456,24 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void handleRunningAppOnNode(RMNodeImpl rmNode,
|
||||||
|
RMContext context, ApplicationId appId, NodeId nodeId) {
|
||||||
|
RMApp app = context.getRMApps().get(appId);
|
||||||
|
|
||||||
|
// if we failed getting app by appId, maybe something wrong happened, just
|
||||||
|
// add the app to the finishedApplications list so that the app can be
|
||||||
|
// cleaned up on the NM
|
||||||
|
if (null == app) {
|
||||||
|
LOG.warn("Cannot get RMApp by appId=" + appId
|
||||||
|
+ ", just added it to finishedApplications list for cleanup");
|
||||||
|
rmNode.finishedApplications.add(appId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
context.getDispatcher().getEventHandler()
|
||||||
|
.handle(new RMAppRunningOnNodeEvent(appId, nodeId));
|
||||||
|
}
|
||||||
|
|
||||||
public static class AddNodeTransition implements
|
public static class AddNodeTransition implements
|
||||||
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
|
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
|
||||||
|
|
||||||
|
@ -496,24 +514,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
new NodesListManagerEvent(
|
new NodesListManagerEvent(
|
||||||
NodesListManagerEventType.NODE_USABLE, rmNode));
|
NodesListManagerEventType.NODE_USABLE, rmNode));
|
||||||
}
|
}
|
||||||
|
|
||||||
void handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context,
|
|
||||||
ApplicationId appId, NodeId nodeId) {
|
|
||||||
RMApp app = context.getRMApps().get(appId);
|
|
||||||
|
|
||||||
// if we failed getting app by appId, maybe something wrong happened, just
|
|
||||||
// add the app to the finishedApplications list so that the app can be
|
|
||||||
// cleaned up on the NM
|
|
||||||
if (null == app) {
|
|
||||||
LOG.warn("Cannot get RMApp by appId=" + appId
|
|
||||||
+ ", just added it to finishedApplications list for cleanup");
|
|
||||||
rmNode.finishedApplications.add(appId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
context.getDispatcher().getEventHandler()
|
|
||||||
.handle(new RMAppRunningOnNodeEvent(appId, nodeId));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class ReconnectNodeTransition implements
|
public static class ReconnectNodeTransition implements
|
||||||
|
@ -526,7 +526,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||||
new NodeRemovedSchedulerEvent(rmNode));
|
new NodeRemovedSchedulerEvent(rmNode));
|
||||||
|
|
||||||
RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode();
|
RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
|
||||||
|
RMNode newNode = reconnectEvent.getReconnectedNode();
|
||||||
rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
|
rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
|
||||||
if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
|
if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
|
||||||
&& rmNode.getHttpPort() == newNode.getHttpPort()) {
|
&& rmNode.getHttpPort() == newNode.getHttpPort()) {
|
||||||
|
@ -551,6 +552,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||||
new RMNodeStartedEvent(newNode.getNodeID(), null, null));
|
new RMNodeStartedEvent(newNode.getNodeID(), null, null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (null != reconnectEvent.getRunningApplications()) {
|
||||||
|
for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
|
||||||
|
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||||
new NodesListManagerEvent(
|
new NodesListManagerEvent(
|
||||||
NodesListManagerEventType.NODE_USABLE, rmNode));
|
NodesListManagerEventType.NODE_USABLE, rmNode));
|
||||||
|
|
|
@ -18,17 +18,27 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
|
||||||
public class RMNodeReconnectEvent extends RMNodeEvent {
|
public class RMNodeReconnectEvent extends RMNodeEvent {
|
||||||
private RMNode reconnectedNode;
|
private RMNode reconnectedNode;
|
||||||
|
private List<ApplicationId> runningApplications;
|
||||||
|
|
||||||
public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) {
|
public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode,
|
||||||
|
List<ApplicationId> runningApps) {
|
||||||
super(nodeId, RMNodeEventType.RECONNECTED);
|
super(nodeId, RMNodeEventType.RECONNECTED);
|
||||||
reconnectedNode = newNode;
|
reconnectedNode = newNode;
|
||||||
|
runningApplications = runningApps;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMNode getReconnectedNode() {
|
public RMNode getReconnectedNode() {
|
||||||
return reconnectedNode;
|
return reconnectedNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<ApplicationId> getRunningApplications() {
|
||||||
|
return runningApplications;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -449,6 +449,35 @@ public class TestApplicationCleanup {
|
||||||
rm2.stop();
|
rm2.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 60000)
|
||||||
|
public void testAppCleanupWhenNMReconnects() throws Exception {
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
|
||||||
|
// start RM
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
|
||||||
|
// create app and launch the AM
|
||||||
|
RMApp app0 = rm1.submitApp(200);
|
||||||
|
MockAM am0 = launchAM(app0, rm1, nm1);
|
||||||
|
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
|
rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
|
||||||
|
|
||||||
|
// wait for application cleanup message received
|
||||||
|
waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
|
||||||
|
|
||||||
|
// reconnect NM with application still active
|
||||||
|
nm1.registerNode(Arrays.asList(app0.getApplicationId()));
|
||||||
|
waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
|
||||||
|
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
TestApplicationCleanup t = new TestApplicationCleanup();
|
TestApplicationCleanup t = new TestApplicationCleanup();
|
||||||
t.testAppCleanup();
|
t.testAppCleanup();
|
||||||
|
|
|
@ -520,7 +520,7 @@ public class TestRMNodeTransitions {
|
||||||
int initialUnhealthy = cm.getUnhealthyNMs();
|
int initialUnhealthy = cm.getUnhealthyNMs();
|
||||||
int initialDecommissioned = cm.getNumDecommisionedNMs();
|
int initialDecommissioned = cm.getNumDecommisionedNMs();
|
||||||
int initialRebooted = cm.getNumRebootedNMs();
|
int initialRebooted = cm.getNumRebootedNMs();
|
||||||
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node));
|
node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null));
|
||||||
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
|
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
|
||||||
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
|
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
|
||||||
Assert.assertEquals("Unhealthy Nodes",
|
Assert.assertEquals("Unhealthy Nodes",
|
||||||
|
@ -542,7 +542,8 @@ public class TestRMNodeTransitions {
|
||||||
RMNodeImpl node = getRunningNode(nmVersion1);
|
RMNodeImpl node = getRunningNode(nmVersion1);
|
||||||
Assert.assertEquals(nmVersion1, node.getNodeManagerVersion());
|
Assert.assertEquals(nmVersion1, node.getNodeManagerVersion());
|
||||||
RMNodeImpl reconnectingNode = getRunningNode(nmVersion2);
|
RMNodeImpl reconnectingNode = getRunningNode(nmVersion2);
|
||||||
node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode));
|
node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode,
|
||||||
|
null));
|
||||||
Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
|
Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue