diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index df63230dcba..e11cd50797d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -17,6 +17,9 @@ Release 2.6.0 - UNRELEASED YARN-2181. Added preemption info to logs and RM web UI. (Wangda Tan via jianhe) + YARN-1354. Recover applications upon nodemanager restart. (Jason Lowe via + junping_du) + IMPROVEMENTS YARN-2242. Improve exception information on AM launch crashes. (Li Lu diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 7922fe2ce06..aee4e9a78a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; import static org.apache.hadoop.service.Service.STATE.STARTED; +import java.io.DataInputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URISyntaxException; @@ -42,6 +43,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; @@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -71,6 +74,8 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -81,6 +86,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; @@ -119,11 +126,13 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; public class ContainerManagerImpl extends CompositeService implements ServiceStateChangeListener, ContainerManagementProtocol, @@ -224,14 +233,49 @@ public class ContainerManagerImpl extends CompositeService implements recover(); } + @SuppressWarnings("unchecked") private void recover() throws IOException, URISyntaxException { NMStateStoreService stateStore = context.getNMStateStore(); if (stateStore.canRecover()) { rsrcLocalizationSrvc.recoverLocalizedResources( stateStore.loadLocalizationState()); + + RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); + for (ContainerManagerApplicationProto proto : + appsState.getApplications()) { + recoverApplication(proto); + } + + String diagnostic = "Application marked finished during recovery"; + for (ApplicationId appId : appsState.getFinishedApplications()) { + dispatcher.getEventHandler().handle( + new ApplicationFinishEvent(appId, diagnostic)); + } } } + private void recoverApplication(ContainerManagerApplicationProto p) + throws IOException { + ApplicationId appId = new ApplicationIdPBImpl(p.getId()); + Credentials creds = new Credentials(); + creds.readTokenStorageStream( + new DataInputStream(p.getCredentials().newInput())); + + List aclProtoList = p.getAclsList(); + Map acls = + new HashMap(aclProtoList.size()); + for (ApplicationACLMapProto aclProto : aclProtoList) { + acls.put(ProtoUtils.convertFromProtoFormat(aclProto.getAccessType()), + aclProto.getAcl()); + } + + LOG.info("Recovering application " + appId); + ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, + creds, context); + context.getApplications().put(appId, app); + app.handle(new ApplicationInitEvent(appId, acls)); + } + protected LogHandler createLogHandler(Configuration conf, Context context, DeletionService deletionService) { if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, @@ -358,6 +402,12 @@ public class ContainerManagerImpl extends CompositeService implements } LOG.info("Applications still running : " + applications.keySet()); + if (this.context.getNMStateStore().canRecover() + && !this.context.getDecommissioned()) { + // do not cleanup apps as they can be recovered on restart + return; + } + List appIds = new ArrayList(applications.keySet()); this.handle( @@ -567,6 +617,41 @@ public class ContainerManagerImpl extends CompositeService implements succeededContainers, failedContainers); } + private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, + String user, Credentials credentials, + Map appAcls) { + + ContainerManagerApplicationProto.Builder builder = + ContainerManagerApplicationProto.newBuilder(); + builder.setId(((ApplicationIdPBImpl) appId).getProto()); + builder.setUser(user); + + builder.clearCredentials(); + if (credentials != null) { + DataOutputBuffer dob = new DataOutputBuffer(); + try { + credentials.writeTokenStorageToStream(dob); + builder.setCredentials(ByteString.copyFrom(dob.getData())); + } catch (IOException e) { + // should not occur + LOG.error("Cannot serialize credentials", e); + } + } + + builder.clearAcls(); + if (appAcls != null) { + for (Map.Entry acl : appAcls.entrySet()) { + ApplicationACLMapProto p = ApplicationACLMapProto.newBuilder() + .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey())) + .setAcl(acl.getValue()) + .build(); + builder.addAcls(p); + } + } + + return builder.build(); + } + @SuppressWarnings("unchecked") private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, ContainerTokenIdentifier containerTokenIdentifier, @@ -640,10 +725,12 @@ public class ContainerManagerImpl extends CompositeService implements if (null == context.getApplications().putIfAbsent(applicationID, application)) { LOG.info("Creating a new application reference for app " + applicationID); - + Map appAcls = + container.getLaunchContext().getApplicationACLs(); + context.getNMStateStore().storeApplication(applicationID, + buildAppProto(applicationID, user, credentials, appAcls)); dispatcher.getEventHandler().handle( - new ApplicationInitEvent(applicationID, container.getLaunchContext() - .getApplicationACLs())); + new ApplicationInitEvent(applicationID, appAcls)); } dispatcher.getEventHandler().handle( @@ -894,6 +981,11 @@ public class ContainerManagerImpl extends CompositeService implements } else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) { diagnostic = "Application killed by ResourceManager"; } + try { + this.context.getNMStateStore().storeFinishedApplication(appID); + } catch (IOException e) { + LOG.error("Unable to update application state in store", e); + } this.dispatcher.getEventHandler().handle( new ApplicationFinishEvent(appID, diagnostic)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index a206591d80a..cc5544cc47a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; +import java.io.IOException; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; @@ -428,6 +429,11 @@ public class ApplicationImpl implements Application { ApplicationId appId = event.getApplicationID(); app.context.getApplications().remove(appId); app.aclsManager.removeApplication(appId); + try { + app.context.getNMStateStore().removeApplication(appId); + } catch (IOException e) { + LOG.error("Unable to remove application from state store", e); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index cc7656c0826..c3fc272d7f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -74,6 +75,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String DELETION_TASK_KEY_PREFIX = "DeletionService/deltask_"; + private static final String APPLICATIONS_KEY_PREFIX = + "ContainerManager/applications/"; + private static final String FINISHED_APPS_KEY_PREFIX = + "ContainerManager/finishedApps/"; + private static final String LOCALIZATION_KEY_PREFIX = "Localization/"; private static final String LOCALIZATION_PUBLIC_KEY_PREFIX = LOCALIZATION_KEY_PREFIX + "public/"; @@ -116,6 +122,92 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } + @Override + public RecoveredApplicationsState loadApplicationsState() + throws IOException { + RecoveredApplicationsState state = new RecoveredApplicationsState(); + state.applications = new ArrayList(); + String keyPrefix = APPLICATIONS_KEY_PREFIX; + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(keyPrefix)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + state.applications.add( + ContainerManagerApplicationProto.parseFrom(entry.getValue())); + } + + state.finishedApplications = new ArrayList(); + keyPrefix = FINISHED_APPS_KEY_PREFIX; + iter.seek(bytes(keyPrefix)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + ApplicationId appId = + ConverterUtils.toApplicationId(key.substring(keyPrefix.length())); + state.finishedApplications.add(appId); + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + + return state; + } + + @Override + public void storeApplication(ApplicationId appId, + ContainerManagerApplicationProto p) throws IOException { + String key = APPLICATIONS_KEY_PREFIX + appId; + try { + db.put(bytes(key), p.toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeFinishedApplication(ApplicationId appId) + throws IOException { + String key = FINISHED_APPS_KEY_PREFIX + appId; + try { + db.put(bytes(key), new byte[0]); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void removeApplication(ApplicationId appId) + throws IOException { + try { + WriteBatch batch = db.createWriteBatch(); + try { + String key = APPLICATIONS_KEY_PREFIX + appId; + batch.delete(bytes(key)); + key = FINISHED_APPS_KEY_PREFIX + appId; + batch.delete(bytes(key)); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override public RecoveredLocalizationState loadLocalizationState() throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 89205b1f637..3bb1e2189fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -42,6 +43,25 @@ public class NMNullStateStoreService extends NMStateStoreService { return false; } + @Override + public RecoveredApplicationsState loadApplicationsState() throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void storeApplication(ApplicationId appId, + ContainerManagerApplicationProto p) throws IOException { + } + + @Override + public void storeFinishedApplication(ApplicationId appId) { + } + + @Override + public void removeApplication(ApplicationId appId) throws IOException { + } + @Override public RecoveredLocalizationState loadLocalizationState() throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 87c438b59bd..f0988e36172 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -45,6 +46,19 @@ public abstract class NMStateStoreService extends AbstractService { super(name); } + public static class RecoveredApplicationsState { + List applications; + List finishedApplications; + + public List getApplications() { + return applications; + } + + public List getFinishedApplications() { + return finishedApplications; + } + } + public static class LocalResourceTrackerState { List localizedResources = new ArrayList(); @@ -162,6 +176,19 @@ public abstract class NMStateStoreService extends AbstractService { } + public abstract RecoveredApplicationsState loadApplicationsState() + throws IOException; + + public abstract void storeApplication(ApplicationId appId, + ContainerManagerApplicationProto p) throws IOException; + + public abstract void storeFinishedApplication(ApplicationId appId) + throws IOException; + + public abstract void removeApplication(ApplicationId appId) + throws IOException; + + /** * Load the state of localized resources * @return recovered localized resource state diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto index 460c4932342..e6f39f6c5f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto @@ -24,6 +24,13 @@ package hadoop.yarn; import "yarn_protos.proto"; +message ContainerManagerApplicationProto { + optional ApplicationIdProto id = 1; + optional string user = 2; + optional bytes credentials = 3; + repeated ApplicationACLMapProto acls = 4; +} + message DeletionServiceDeleteTaskProto { optional int32 id = 1; optional string user = 2; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 772bc05cb9f..fecc837cfab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -82,6 +82,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -91,8 +93,6 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @SuppressWarnings("rawtypes") public class TestNodeStatusUpdater { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java new file mode 100644 index 00000000000..00eb2fe288b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.junit.Test; + +public class TestContainerManagerRecovery { + + private NodeManagerMetrics metrics = NodeManagerMetrics.create(); + + @Test + public void testApplicationRecovery() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); + conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user"); + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + Context context = new NMContext(new NMContainerTokenSecretManager( + conf), new NMTokenSecretManagerInNM(), null, + new ApplicationACLsManager(conf), stateStore); + ContainerManagerImpl cm = createContainerManager(context); + cm.init(conf); + cm.start(); + + // simulate registration with RM + MasterKey masterKey = new MasterKeyPBImpl(); + masterKey.setKeyId(123); + masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123) + .byteValue() })); + context.getContainerTokenSecretManager().setMasterKey(masterKey); + context.getNMTokenSecretManager().setMasterKey(masterKey); + + // add an application by starting a container + String appUser = "app_user1"; + String modUser = "modify_user1"; + String viewUser = "view_user1"; + String enemyUser = "enemy_user"; + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newInstance(attemptId, 1); + Map localResources = Collections.emptyMap(); + Map containerEnv = Collections.emptyMap(); + List containerCmds = Collections.emptyList(); + Map serviceData = Collections.emptyMap(); + Credentials containerCreds = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + containerCreds.writeTokenStorageToStream(dob); + ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0, + dob.getLength()); + Map acls = + new HashMap(); + acls.put(ApplicationAccessType.MODIFY_APP, modUser); + acls.put(ApplicationAccessType.VIEW_APP, viewUser); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, containerEnv, containerCmds, serviceData, + containerTokens, acls); + StartContainersResponse startResponse = startContainer(context, cm, cid, + clc); + assertTrue(startResponse.getFailedRequests().isEmpty()); + assertEquals(1, context.getApplications().size()); + Application app = context.getApplications().get(appId); + assertNotNull(app); + waitForAppState(app, ApplicationState.INITING); + assertTrue(context.getApplicationACLsManager().checkAccess( + UserGroupInformation.createRemoteUser(modUser), + ApplicationAccessType.MODIFY_APP, appUser, appId)); + assertFalse(context.getApplicationACLsManager().checkAccess( + UserGroupInformation.createRemoteUser(viewUser), + ApplicationAccessType.MODIFY_APP, appUser, appId)); + assertTrue(context.getApplicationACLsManager().checkAccess( + UserGroupInformation.createRemoteUser(viewUser), + ApplicationAccessType.VIEW_APP, appUser, appId)); + assertFalse(context.getApplicationACLsManager().checkAccess( + UserGroupInformation.createRemoteUser(enemyUser), + ApplicationAccessType.VIEW_APP, appUser, appId)); + + // reset container manager and verify app recovered with proper acls + cm.stop(); + context = new NMContext(new NMContainerTokenSecretManager( + conf), new NMTokenSecretManagerInNM(), null, + new ApplicationACLsManager(conf), stateStore); + cm = createContainerManager(context); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = context.getApplications().get(appId); + assertNotNull(app); + waitForAppState(app, ApplicationState.INITING); + assertTrue(context.getApplicationACLsManager().checkAccess( + UserGroupInformation.createRemoteUser(modUser), + ApplicationAccessType.MODIFY_APP, appUser, appId)); + assertFalse(context.getApplicationACLsManager().checkAccess( + UserGroupInformation.createRemoteUser(viewUser), + ApplicationAccessType.MODIFY_APP, appUser, appId)); + assertTrue(context.getApplicationACLsManager().checkAccess( + UserGroupInformation.createRemoteUser(viewUser), + ApplicationAccessType.VIEW_APP, appUser, appId)); + assertFalse(context.getApplicationACLsManager().checkAccess( + UserGroupInformation.createRemoteUser(enemyUser), + ApplicationAccessType.VIEW_APP, appUser, appId)); + + // simulate application completion + List finishedApps = new ArrayList(); + finishedApps.add(appId); + cm.handle(new CMgrCompletedAppsEvent(finishedApps, + CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); + waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP); + + // restart and verify app is marked for finishing + cm.stop(); + context = new NMContext(new NMContainerTokenSecretManager( + conf), new NMTokenSecretManagerInNM(), null, + new ApplicationACLsManager(conf), stateStore); + cm = createContainerManager(context); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = context.getApplications().get(appId); + assertNotNull(app); + waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP); + assertTrue(context.getApplicationACLsManager().checkAccess( + UserGroupInformation.createRemoteUser(modUser), + ApplicationAccessType.MODIFY_APP, appUser, appId)); + assertFalse(context.getApplicationACLsManager().checkAccess( + UserGroupInformation.createRemoteUser(viewUser), + ApplicationAccessType.MODIFY_APP, appUser, appId)); + assertTrue(context.getApplicationACLsManager().checkAccess( + UserGroupInformation.createRemoteUser(viewUser), + ApplicationAccessType.VIEW_APP, appUser, appId)); + assertFalse(context.getApplicationACLsManager().checkAccess( + UserGroupInformation.createRemoteUser(enemyUser), + ApplicationAccessType.VIEW_APP, appUser, appId)); + + // simulate log aggregation completion + app.handle(new ApplicationEvent(app.getAppId(), + ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)); + assertEquals(app.getApplicationState(), ApplicationState.FINISHED); + app.handle(new ApplicationEvent(app.getAppId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); + + // restart and verify app is no longer present after recovery + cm.stop(); + context = new NMContext(new NMContainerTokenSecretManager( + conf), new NMTokenSecretManagerInNM(), null, + new ApplicationACLsManager(conf), stateStore); + cm = createContainerManager(context); + cm.init(conf); + cm.start(); + assertTrue(context.getApplications().isEmpty()); + cm.stop(); + } + + private StartContainersResponse startContainer(Context context, + final ContainerManagerImpl cm, ContainerId cid, + ContainerLaunchContext clc) throws Exception { + UserGroupInformation user = UserGroupInformation.createRemoteUser( + cid.getApplicationAttemptId().toString()); + StartContainerRequest scReq = StartContainerRequest.newInstance( + clc, TestContainerManager.createContainerToken(cid, 0, + context.getNodeId(), user.getShortUserName(), + context.getContainerTokenSecretManager())); + final List scReqList = + new ArrayList(); + scReqList.add(scReq); + NMTokenIdentifier nmToken = new NMTokenIdentifier( + cid.getApplicationAttemptId(), context.getNodeId(), + user.getShortUserName(), + context.getNMTokenSecretManager().getCurrentKey().getKeyId()); + user.addTokenIdentifier(nmToken); + return user.doAs(new PrivilegedExceptionAction() { + @Override + public StartContainersResponse run() throws Exception { + return cm.startContainers( + StartContainersRequest.newInstance(scReqList)); + } + }); + } + + private void waitForAppState(Application app, ApplicationState state) + throws Exception { + final int msecPerSleep = 10; + int msecLeft = 5000; + while (app.getApplicationState() != state && msecLeft > 0) { + Thread.sleep(msecPerSleep); + msecLeft -= msecPerSleep; + } + assertEquals(state, app.getApplicationState()); + } + + private ContainerManagerImpl createContainerManager(Context context) { + final LogHandler logHandler = mock(LogHandler.class); + final ResourceLocalizationService rsrcSrv = + new ResourceLocalizationService(null, null, null, null, + context.getNMStateStore()) { + @Override + public void serviceInit(Configuration conf) throws Exception { + } + + @Override + public void serviceStart() throws Exception { + // do nothing + } + + @Override + public void serviceStop() throws Exception { + // do nothing + } + + @Override + public void handle(LocalizationEvent event) { + // do nothing + } + }; + + final ContainersLauncher launcher = new ContainersLauncher(context, null, + null, null, null) { + @Override + public void handle(ContainersLauncherEvent event) { + // do nothing + } + }; + + return new ContainerManagerImpl(context, + mock(ContainerExecutor.class), mock(DeletionService.class), + mock(NodeStatusUpdater.class), metrics, + context.getApplicationACLsManager(), null) { + @Override + protected LogHandler createLogHandler(Configuration conf, + Context context, DeletionService deletionService) { + return logHandler; + } + + @Override + protected ResourceLocalizationService createResourceLocalizationService( + ContainerExecutor exec, DeletionService deletionContext) { + return rsrcSrv; + } + + @Override + protected ContainersLauncher createContainersLauncher( + Context context, ContainerExecutor exec) { + return launcher; + } + + @Override + public void setBlockNewContainerRequests( + boolean blockNewContainerRequests) { + // do nothing + } + }; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index c43471fa2ee..e0239928221 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Test; @@ -722,6 +723,8 @@ public class TestContainer { Context context = mock(Context.class); when(context.getApplications()).thenReturn( new ConcurrentHashMap()); + NMNullStateStoreService stateStore = new NMNullStateStoreService(); + when(context.getNMStateStore()).thenReturn(stateStore); ContainerExecutor executor = mock(ContainerExecutor.class); launcher = new ContainersLauncher(context, dispatcher, executor, null, null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index fef2b12221f..5b5e1ef5bb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -29,12 +31,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; public class NMMemoryStateStoreService extends NMStateStoreService { + private Map apps; + private Set finishedApps; private Map trackerStates; private Map deleteTasks; private RecoveredNMTokensState nmTokenState; @@ -44,6 +49,58 @@ public class NMMemoryStateStoreService extends NMStateStoreService { super(NMMemoryStateStoreService.class.getName()); } + @Override + protected void initStorage(Configuration conf) { + apps = new HashMap(); + finishedApps = new HashSet(); + nmTokenState = new RecoveredNMTokensState(); + nmTokenState.applicationMasterKeys = + new HashMap(); + containerTokenState = new RecoveredContainerTokensState(); + containerTokenState.activeTokens = new HashMap(); + trackerStates = new HashMap(); + deleteTasks = new HashMap(); + } + + @Override + protected void startStorage() { + } + + @Override + protected void closeStorage() { + } + + + @Override + public RecoveredApplicationsState loadApplicationsState() + throws IOException { + RecoveredApplicationsState state = new RecoveredApplicationsState(); + state.applications = new ArrayList( + apps.values()); + state.finishedApplications = new ArrayList(finishedApps); + return state; + } + + @Override + public void storeApplication(ApplicationId appId, + ContainerManagerApplicationProto proto) throws IOException { + ContainerManagerApplicationProto protoCopy = + ContainerManagerApplicationProto.parseFrom(proto.toByteString()); + apps.put(appId, protoCopy); + } + + @Override + public void storeFinishedApplication(ApplicationId appId) { + finishedApps.add(appId); + } + + @Override + public void removeApplication(ApplicationId appId) throws IOException { + apps.remove(appId); + finishedApps.remove(appId); + } + + private LocalResourceTrackerState loadTrackerState(TrackerState ts) { LocalResourceTrackerState result = new LocalResourceTrackerState(); result.localizedResources.addAll(ts.localizedResources.values()); @@ -117,25 +174,6 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } } - @Override - protected void initStorage(Configuration conf) { - nmTokenState = new RecoveredNMTokensState(); - nmTokenState.applicationMasterKeys = - new HashMap(); - containerTokenState = new RecoveredContainerTokensState(); - containerTokenState.activeTokens = new HashMap(); - trackerStates = new HashMap(); - deleteTasks = new HashMap(); - } - - @Override - protected void startStorage() { - } - - @Override - protected void closeStorage() { - } - @Override public RecoveredDeletionServiceState loadDeletionServiceState() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 294f424b0f8..84a69069a30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -37,13 +37,16 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; @@ -141,6 +144,54 @@ public class TestNMLeveldbStateStoreService { } } + @Test + public void testApplicationStorage() throws IOException { + // test empty when no state + RecoveredApplicationsState state = stateStore.loadApplicationsState(); + assertTrue(state.getApplications().isEmpty()); + assertTrue(state.getFinishedApplications().isEmpty()); + + // store an application and verify recovered + final ApplicationId appId1 = ApplicationId.newInstance(1234, 1); + ContainerManagerApplicationProto.Builder builder = + ContainerManagerApplicationProto.newBuilder(); + builder.setId(((ApplicationIdPBImpl) appId1).getProto()); + builder.setUser("user1"); + ContainerManagerApplicationProto appProto1 = builder.build(); + stateStore.storeApplication(appId1, appProto1); + restartStateStore(); + state = stateStore.loadApplicationsState(); + assertEquals(1, state.getApplications().size()); + assertEquals(appProto1, state.getApplications().get(0)); + assertTrue(state.getFinishedApplications().isEmpty()); + + // finish an application and add a new one + stateStore.storeFinishedApplication(appId1); + final ApplicationId appId2 = ApplicationId.newInstance(1234, 2); + builder = ContainerManagerApplicationProto.newBuilder(); + builder.setId(((ApplicationIdPBImpl) appId2).getProto()); + builder.setUser("user2"); + ContainerManagerApplicationProto appProto2 = builder.build(); + stateStore.storeApplication(appId2, appProto2); + restartStateStore(); + state = stateStore.loadApplicationsState(); + assertEquals(2, state.getApplications().size()); + assertTrue(state.getApplications().contains(appProto1)); + assertTrue(state.getApplications().contains(appProto2)); + assertEquals(1, state.getFinishedApplications().size()); + assertEquals(appId1, state.getFinishedApplications().get(0)); + + // test removing an application + stateStore.storeFinishedApplication(appId2); + stateStore.removeApplication(appId2); + restartStateStore(); + state = stateStore.loadApplicationsState(); + assertEquals(1, state.getApplications().size()); + assertEquals(appProto1, state.getApplications().get(0)); + assertEquals(1, state.getFinishedApplications().size()); + assertEquals(appId1, state.getFinishedApplications().get(0)); + } + @Test public void testStartResourceLocalization() throws IOException { String user = "somebody"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 6eebb4d7bff..b532dd56309 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -312,7 +311,8 @@ public class ResourceTrackerService extends AbstractService implements LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeReconnectEvent(nodeId, rmNode)); + new RMNodeReconnectEvent(nodeId, rmNode, + request.getRunningApplications())); } // On every node manager register we will be clearing NMToken keys if // present for any running application. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index efa1ee72808..7c7b7541b3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1191,6 +1191,9 @@ public class RMAppImpl implements RMApp, Recoverable { public static boolean isAppInFinalState(RMApp rmApp) { RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState(); + if (appState == null) { + appState = rmApp.getState(); + } return appState == RMAppState.FAILED || appState == RMAppState.FINISHED || appState == RMAppState.KILLED; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index e20adc5c4d4..cf81d727138 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -456,6 +456,24 @@ public class RMNodeImpl implements RMNode, EventHandler { } } + private static void handleRunningAppOnNode(RMNodeImpl rmNode, + RMContext context, ApplicationId appId, NodeId nodeId) { + RMApp app = context.getRMApps().get(appId); + + // if we failed getting app by appId, maybe something wrong happened, just + // add the app to the finishedApplications list so that the app can be + // cleaned up on the NM + if (null == app) { + LOG.warn("Cannot get RMApp by appId=" + appId + + ", just added it to finishedApplications list for cleanup"); + rmNode.finishedApplications.add(appId); + return; + } + + context.getDispatcher().getEventHandler() + .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); + } + public static class AddNodeTransition implements SingleArcTransition { @@ -496,24 +514,6 @@ public class RMNodeImpl implements RMNode, EventHandler { new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmNode)); } - - void handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context, - ApplicationId appId, NodeId nodeId) { - RMApp app = context.getRMApps().get(appId); - - // if we failed getting app by appId, maybe something wrong happened, just - // add the app to the finishedApplications list so that the app can be - // cleaned up on the NM - if (null == app) { - LOG.warn("Cannot get RMApp by appId=" + appId - + ", just added it to finishedApplications list for cleanup"); - rmNode.finishedApplications.add(appId); - return; - } - - context.getDispatcher().getEventHandler() - .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); - } } public static class ReconnectNodeTransition implements @@ -526,7 +526,8 @@ public class RMNodeImpl implements RMNode, EventHandler { rmNode.context.getDispatcher().getEventHandler().handle( new NodeRemovedSchedulerEvent(rmNode)); - RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode(); + RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event; + RMNode newNode = reconnectEvent.getReconnectedNode(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); if (rmNode.getTotalCapability().equals(newNode.getTotalCapability()) && rmNode.getHttpPort() == newNode.getHttpPort()) { @@ -551,6 +552,13 @@ public class RMNodeImpl implements RMNode, EventHandler { rmNode.context.getDispatcher().getEventHandler().handle( new RMNodeStartedEvent(newNode.getNodeID(), null, null)); } + + if (null != reconnectEvent.getRunningApplications()) { + for (ApplicationId appId : reconnectEvent.getRunningApplications()) { + handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); + } + } + rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmNode)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java index b1fa0ad8c0c..ebbac9ab156 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java @@ -18,17 +18,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; public class RMNodeReconnectEvent extends RMNodeEvent { private RMNode reconnectedNode; + private List runningApplications; - public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) { + public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode, + List runningApps) { super(nodeId, RMNodeEventType.RECONNECTED); reconnectedNode = newNode; + runningApplications = runningApps; } public RMNode getReconnectedNode() { return reconnectedNode; } + + public List getRunningApplications() { + return runningApplications; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index e88ebd24f3b..c07882da0ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -449,6 +449,35 @@ public class TestApplicationCleanup { rm2.stop(); } + @Test (timeout = 60000) + public void testAppCleanupWhenNMReconnects() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // wait for application cleanup message received + waitForAppCleanupMessageRecved(nm1, app0.getApplicationId()); + + // reconnect NM with application still active + nm1.registerNode(Arrays.asList(app0.getApplicationId())); + waitForAppCleanupMessageRecved(nm1, app0.getApplicationId()); + + rm1.stop(); + } + public static void main(String[] args) throws Exception { TestApplicationCleanup t = new TestApplicationCleanup(); t.testAppCleanup(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 46693be36ca..aa2cfc2eba1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -520,7 +520,7 @@ public class TestRMNodeTransitions { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeReconnectEvent(node.getNodeID(), node)); + node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null)); Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", @@ -542,7 +542,8 @@ public class TestRMNodeTransitions { RMNodeImpl node = getRunningNode(nmVersion1); Assert.assertEquals(nmVersion1, node.getNodeManagerVersion()); RMNodeImpl reconnectingNode = getRunningNode(nmVersion2); - node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode)); + node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode, + null)); Assert.assertEquals(nmVersion2, node.getNodeManagerVersion()); } }