From 63940d6e13fe95430db19f8ba815eb6ed827851c Mon Sep 17 00:00:00 2001 From: Jian He Date: Wed, 25 Jun 2014 04:45:50 +0000 Subject: [PATCH] Merge r1605263 from trunk. YARN-1365. Changed ApplicationMasterService to allow an app to re-register after RM restart. Contributed by Anubhav Dhoot git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1605264 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 ++ ...plicationMasterNotRegisteredException.java | 47 +++++++++++++++++++ ...alidApplicationMasterRequestException.java | 6 +-- .../ApplicationMasterService.java | 19 +++++--- .../rmapp/attempt/RMAppAttemptImpl.java | 6 ++- .../scheduler/capacity/CapacityScheduler.java | 18 +++++-- .../event/AppAttemptAddedSchedulerEvent.java | 13 +++++ .../scheduler/fair/FairScheduler.java | 19 ++++++-- .../scheduler/fifo/FifoScheduler.java | 18 +++++-- .../TestApplicationMasterLauncher.java | 42 +++++++++-------- .../TestApplicationMasterService.java | 37 +++------------ .../resourcemanager/TestFifoScheduler.java | 2 +- .../server/resourcemanager/TestRMRestart.java | 4 +- .../TestWorkPreservingRMRestart.java | 30 ++++++++++++ .../scheduler/fair/FairSchedulerTestBase.java | 2 +- .../scheduler/fair/TestFairScheduler.java | 10 ++-- 16 files changed, 190 insertions(+), 86 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationMasterNotRegisteredException.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 400837634b5..6d862442a46 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -27,6 +27,9 @@ Release 2.5.0 - UNRELEASED YARN-1339. Recover DeletionService state upon nodemanager restart. (Jason Lowe via junping_du) + YARN-1365. Changed ApplicationMasterService to allow an app to re-register + after RM restart. (Anubhav Dhoot via jianhe) + IMPROVEMENTS YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationMasterNotRegisteredException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationMasterNotRegisteredException.java new file mode 100644 index 00000000000..9a75b29dd6a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationMasterNotRegisteredException.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.exceptions; + +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; + +/** + * This exception is thrown when an Application Master tries to unregister by calling + * {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)} + * API without first registering by calling + * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)} + * or after an RM restart. The ApplicationMaster is expected to call + * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)} + * and retry. + */ + +public class ApplicationMasterNotRegisteredException extends YarnException { + + private static final long serialVersionUID = 13498238L; + + public ApplicationMasterNotRegisteredException(Throwable cause) { super(cause);} + + public ApplicationMasterNotRegisteredException(String message) { super(message); } + + public ApplicationMasterNotRegisteredException(String message, Throwable + cause) { + super(message, cause); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java index 6b422b3bf0d..6fe6f275d3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidApplicationMasterRequestException.java @@ -24,10 +24,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque /** * This exception is thrown when an ApplicationMaster asks for resources by - * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} or tries - * to unregister by calling - * {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)} - * API without first registering by calling + * calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} + * without first registering by calling * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)} * or if it tries to register more than once. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 94dc47437fa..e60add44bb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; @@ -107,12 +108,15 @@ public class ApplicationMasterService extends AbstractService implements new ConcurrentHashMap(); private final AllocateResponse resync = recordFactory.newRecordInstance(AllocateResponse.class); + private final AllocateResponse shutdown = + recordFactory.newRecordInstance(AllocateResponse.class); private final RMContext rmContext; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { super(ApplicationMasterService.class.getName()); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; + this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN); this.resync.setAMCommand(AMCommand.AM_RESYNC); this.rmContext = rmContext; } @@ -346,9 +350,9 @@ public class ApplicationMasterService extends AbstractService implements AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService", message, applicationAttemptId.getApplicationId(), applicationAttemptId); - throw new InvalidApplicationMasterRequestException(message); + throw new ApplicationMasterNotRegisteredException(message); } - + this.amLivelinessMonitor.receivedPing(applicationAttemptId); RMApp rmApp = @@ -409,22 +413,23 @@ public class ApplicationMasterService extends AbstractService implements AllocateResponseLock lock = responseMap.get(appAttemptId); if (lock == null) { LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); - return resync; + return shutdown; } synchronized (lock) { AllocateResponse lastResponse = lock.getAllocateResponse(); if (!hasApplicationMasterRegistered(appAttemptId)) { String message = - "Application Master is trying to allocate before registering for: " - + appAttemptId.getApplicationId(); - LOG.error(message); + "Application Master is not registered for known application: " + + appAttemptId.getApplicationId() + + ". Let AM resync."; + LOG.info(message); RMAuditLogger.logFailure( this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) .getUser(), AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, appAttemptId.getApplicationId(), appAttemptId); - throw new InvalidApplicationMasterRequestException(message); + return resync; } if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 4ac64ef1257..1e7693f5b71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -899,8 +899,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { } else { // Add the current attempt to the scheduler. if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) { + // Need to register an app attempt before AM can register + appAttempt.masterService + .registerAppAttempt(appAttempt.applicationAttemptId); + appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( - appAttempt.getAppAttemptId(), false)); + appAttempt.getAppAttemptId(), false, false)); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 74eb1964076..5b982ea23f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -557,7 +557,8 @@ public class CapacityScheduler extends private synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); @@ -575,9 +576,15 @@ public class CapacityScheduler extends LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); - rmContext.getDispatcher().getEventHandler() .handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping notifying ATTEMPT_ADDED"); + } + } } private synchronized void doneApplication(ApplicationId applicationId, @@ -911,7 +918,8 @@ public class CapacityScheduler extends AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); } break; case APP_ATTEMPT_REMOVED: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java index d31010d6755..64d308adea1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java @@ -24,13 +24,22 @@ public class AppAttemptAddedSchedulerEvent extends SchedulerEvent { private final ApplicationAttemptId applicationAttemptId; private final boolean transferStateFromPreviousAttempt; + private final boolean shouldNotifyAttemptAdded; public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt) { + this(applicationAttemptId, transferStateFromPreviousAttempt, true); + } + + public AppAttemptAddedSchedulerEvent( + ApplicationAttemptId applicationAttemptId, + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { super(SchedulerEventType.APP_ATTEMPT_ADDED); this.applicationAttemptId = applicationAttemptId; this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; + this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded; } public ApplicationAttemptId getApplicationAttemptId() { @@ -40,4 +49,8 @@ public class AppAttemptAddedSchedulerEvent extends SchedulerEvent { public boolean getTransferStateFromPreviousAttempt() { return transferStateFromPreviousAttempt; } + + public boolean getShouldNotifyAttemptAdded() { + return shouldNotifyAttemptAdded; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a042acd185f..0cbcaae1424 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -597,7 +597,8 @@ public class FairScheduler extends */ protected synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); String user = application.getUser(); @@ -625,9 +626,16 @@ public class FairScheduler extends LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping notifying ATTEMPT_ADDED"); + } + } } /** @@ -1130,7 +1138,8 @@ public class FairScheduler extends AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); break; case APP_ATTEMPT_REMOVED: if (!(event instanceof AppAttemptRemovedSchedulerEvent)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 4681516fce6..b017db7321e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -370,7 +370,8 @@ public class FifoScheduler extends @VisibleForTesting public synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId, - boolean transferStateFromPreviousAttempt) { + boolean transferStateFromPreviousAttempt, + boolean shouldNotifyAttemptAdded) { SchedulerApplication application = applications.get(appAttemptId.getApplicationId()); String user = application.getUser(); @@ -388,9 +389,15 @@ public class FifoScheduler extends metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(appAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + if (shouldNotifyAttemptAdded) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(appAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping notifying ATTEMPT_ADDED"); + } + } } private synchronized void doneApplication(ApplicationId applicationId, @@ -780,7 +787,8 @@ public class FifoScheduler extends AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getShouldNotifyAttemptAdded()); } break; case APP_ATTEMPT_REMOVED: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 64e5cc96192..36182f51669 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -194,28 +195,17 @@ public class TestApplicationMasterLauncher { // request for containers int request = 2; - try { - AllocateResponse ar = - am.allocate("h1", 1000, request, new ArrayList()); - } catch (Exception e) { - Assert.assertEquals("Application Master is trying to allocate before " - + "registering for: " + attempt.getAppAttemptId().getApplicationId(), - e.getMessage()); - thrown = true; - } + AllocateResponse ar = + am.allocate("h1", 1000, request, new ArrayList()); + Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + // kick the scheduler nm1.nodeHeartbeat(true); - try { - AllocateResponse amrs = - am.allocate(new ArrayList(), - new ArrayList()); - } catch (Exception e) { - Assert.assertEquals("Application Master is trying to allocate before " - + "registering for: " + attempt.getAppAttemptId().getApplicationId(), - e.getMessage()); - thrown = true; - } - Assert.assertTrue(thrown); + AllocateResponse amrs = + am.allocate(new ArrayList(), + new ArrayList()); + Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + am.registerAppAttempt(); thrown = false; try { @@ -228,5 +218,17 @@ public class TestApplicationMasterLauncher { thrown = true; } Assert.assertTrue(thrown); + + // Simulate an AM that was disconnected and app attempt was removed + // (responseMap does not contain attemptid) + am.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, + ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + + AllocateResponse amrs2 = + am.allocate(new ArrayList(), + new ArrayList()); + Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index afe28aad210..671851c9529 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -18,60 +18,33 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import com.google.common.collect.Maps; import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.event.InlineDispatcher; -import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; -import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.ArgumentCaptor; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.concurrent.ConcurrentMap; import static java.lang.Thread.sleep; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyList; -import static org.mockito.Mockito.*; public class TestApplicationMasterService { private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); @@ -270,13 +243,17 @@ public class TestApplicationMasterService { } Assert.assertNotNull(cause); Assert - .assertTrue(cause instanceof InvalidApplicationMasterRequestException); + .assertTrue(cause instanceof ApplicationMasterNotRegisteredException); Assert.assertNotNull(cause.getMessage()); Assert .assertTrue(cause .getMessage() .contains( "Application Master is trying to unregister before registering for:")); + + am1.registerAppAttempt(); + + am1.unregisterAppAttempt(req, false); } finally { if (rm != null) { rm.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index b7b77a72aa5..aa7f63144eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -238,7 +238,7 @@ public class TestFifoScheduler { } ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1); - scheduler.addApplicationAttempt(attId, false); + scheduler.addApplicationAttempt(attId, false, true); rm.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 3b71b423c5f..dc3e9f18178 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -293,7 +293,7 @@ public class TestRMRestart { AllocateResponse allocResponse = am1.allocate( new ArrayList(), new ArrayList()); - Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC); + Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand()); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -1648,7 +1648,7 @@ public class TestRMRestart { rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); - am1.allocate("127.0.0.1" , 1000, 1, new ArrayList()); + am1.allocate("127.0.0.1" , 1000, 1, new ArrayList()); nm1.nodeHeartbeat(true); List conts = am1.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 6dd2992bbe3..89342afc976 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -535,6 +535,36 @@ public class TestWorkPreservingRMRestart { assertNull(scheduler.getRMContainer(completedContainer.getContainerId())); } + @Test (timeout = 600000) + public void testAppReregisterOnRMWorkPreservingRestart() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = MockRM.launchAM(app0, rm1, nm1); + + // start new RM + rm2 = new MockRM(conf, memStore); + rm2.start(); + rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); + + am0.setAMRMProtocol(rm2.getApplicationMasterService()); + am0.registerAppAttempt(false); + + rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + } + private void asserteMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, int allocatedContainers, int availableMB, int availableVirtualCores, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index fb864a2ac70..dd1e4bb6cde 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -146,7 +146,7 @@ public class FairSchedulerTestBase { // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { - scheduler.addApplicationAttempt(id, false); + scheduler.addApplicationAttempt(id, false, true); } List ask = new ArrayList(); ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 68e6f14c3d4..643631ea592 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -787,13 +787,13 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId id11 = createAppAttemptId(1, 1); scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); - scheduler.addApplicationAttempt(id11, false); + scheduler.addApplicationAttempt(id11, false, true); ApplicationAttemptId id21 = createAppAttemptId(2, 1); scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id21, false); + scheduler.addApplicationAttempt(id21, false, true); ApplicationAttemptId id22 = createAppAttemptId(2, 2); scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id22, false); + scheduler.addApplicationAttempt(id22, false, true); int minReqSize = FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; @@ -1555,7 +1555,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(appId.getApplicationId(), "queue1", "user1"); - scheduler.addApplicationAttempt(appId, false); + scheduler.addApplicationAttempt(appId, false, true); // 1 request with 2 nodes on the same rack. another request with 1 node on // a different rack @@ -2714,7 +2714,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11"); - fs.addApplicationAttempt(appAttemptId, false); + fs.addApplicationAttempt(appAttemptId, false, true); List ask = new ArrayList(); ResourceRequest request = createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);