YARN-1365. Changed ApplicationMasterService to allow an app to re-register after RM restart. Contributed by Anubhav Dhoot
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1605263 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
988bc17bc3
commit
c3f1c30e65
|
@ -42,6 +42,9 @@ Release 2.5.0 - UNRELEASED
|
|||
YARN-1339. Recover DeletionService state upon nodemanager restart. (Jason Lowe
|
||||
via junping_du)
|
||||
|
||||
YARN-1365. Changed ApplicationMasterService to allow an app to re-register
|
||||
after RM restart. (Anubhav Dhoot via jianhe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.exceptions;
|
||||
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
|
||||
/**
|
||||
* This exception is thrown when an Application Master tries to unregister by calling
|
||||
* {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)}
|
||||
* API without first registering by calling
|
||||
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
|
||||
* or after an RM restart. The ApplicationMaster is expected to call
|
||||
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
|
||||
* and retry.
|
||||
*/
|
||||
|
||||
public class ApplicationMasterNotRegisteredException extends YarnException {
|
||||
|
||||
private static final long serialVersionUID = 13498238L;
|
||||
|
||||
public ApplicationMasterNotRegisteredException(Throwable cause) { super(cause);}
|
||||
|
||||
public ApplicationMasterNotRegisteredException(String message) { super(message); }
|
||||
|
||||
public ApplicationMasterNotRegisteredException(String message, Throwable
|
||||
cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
|
@ -24,10 +24,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
|
|||
|
||||
/**
|
||||
* This exception is thrown when an ApplicationMaster asks for resources by
|
||||
* calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)} or tries
|
||||
* to unregister by calling
|
||||
* {@link ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)}
|
||||
* API without first registering by calling
|
||||
* calling {@link ApplicationMasterProtocol#allocate(AllocateRequest)}
|
||||
* without first registering by calling
|
||||
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
|
||||
* or if it tries to register more than once.
|
||||
*/
|
||||
|
|
|
@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
|
||||
|
@ -107,12 +108,15 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
|
||||
private final AllocateResponse resync =
|
||||
recordFactory.newRecordInstance(AllocateResponse.class);
|
||||
private final AllocateResponse shutdown =
|
||||
recordFactory.newRecordInstance(AllocateResponse.class);
|
||||
private final RMContext rmContext;
|
||||
|
||||
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
|
||||
super(ApplicationMasterService.class.getName());
|
||||
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
|
||||
this.rScheduler = scheduler;
|
||||
this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN);
|
||||
this.resync.setAMCommand(AMCommand.AM_RESYNC);
|
||||
this.rmContext = rmContext;
|
||||
}
|
||||
|
@ -346,9 +350,9 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
|
||||
message, applicationAttemptId.getApplicationId(),
|
||||
applicationAttemptId);
|
||||
throw new InvalidApplicationMasterRequestException(message);
|
||||
throw new ApplicationMasterNotRegisteredException(message);
|
||||
}
|
||||
|
||||
|
||||
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
|
||||
|
||||
RMApp rmApp =
|
||||
|
@ -409,22 +413,23 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
AllocateResponseLock lock = responseMap.get(appAttemptId);
|
||||
if (lock == null) {
|
||||
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
|
||||
return resync;
|
||||
return shutdown;
|
||||
}
|
||||
synchronized (lock) {
|
||||
AllocateResponse lastResponse = lock.getAllocateResponse();
|
||||
if (!hasApplicationMasterRegistered(appAttemptId)) {
|
||||
String message =
|
||||
"Application Master is trying to allocate before registering for: "
|
||||
+ appAttemptId.getApplicationId();
|
||||
LOG.error(message);
|
||||
"Application Master is not registered for known application: "
|
||||
+ appAttemptId.getApplicationId()
|
||||
+ ". Let AM resync.";
|
||||
LOG.info(message);
|
||||
RMAuditLogger.logFailure(
|
||||
this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
|
||||
.getUser(), AuditConstants.REGISTER_AM, "",
|
||||
"ApplicationMasterService", message,
|
||||
appAttemptId.getApplicationId(),
|
||||
appAttemptId);
|
||||
throw new InvalidApplicationMasterRequestException(message);
|
||||
return resync;
|
||||
}
|
||||
|
||||
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
|
||||
|
|
|
@ -899,8 +899,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
} else {
|
||||
// Add the current attempt to the scheduler.
|
||||
if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) {
|
||||
// Need to register an app attempt before AM can register
|
||||
appAttempt.masterService
|
||||
.registerAppAttempt(appAttempt.applicationAttemptId);
|
||||
|
||||
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
|
||||
appAttempt.getAppAttemptId(), false));
|
||||
appAttempt.getAppAttemptId(), false, false));
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -557,7 +557,8 @@ public class CapacityScheduler extends
|
|||
|
||||
private synchronized void addApplicationAttempt(
|
||||
ApplicationAttemptId applicationAttemptId,
|
||||
boolean transferStateFromPreviousAttempt) {
|
||||
boolean transferStateFromPreviousAttempt,
|
||||
boolean shouldNotifyAttemptAdded) {
|
||||
SchedulerApplication<FiCaSchedulerApp> application =
|
||||
applications.get(applicationAttemptId.getApplicationId());
|
||||
CSQueue queue = (CSQueue) application.getQueue();
|
||||
|
@ -575,9 +576,15 @@ public class CapacityScheduler extends
|
|||
LOG.info("Added Application Attempt " + applicationAttemptId
|
||||
+ " to scheduler from user " + application.getUser() + " in queue "
|
||||
+ queue.getQueueName());
|
||||
rmContext.getDispatcher().getEventHandler() .handle(
|
||||
new RMAppAttemptEvent(applicationAttemptId,
|
||||
RMAppAttemptEventType.ATTEMPT_ADDED));
|
||||
if (shouldNotifyAttemptAdded) {
|
||||
rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppAttemptEvent(applicationAttemptId,
|
||||
RMAppAttemptEventType.ATTEMPT_ADDED));
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping notifying ATTEMPT_ADDED");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void doneApplication(ApplicationId applicationId,
|
||||
|
@ -911,7 +918,8 @@ public class CapacityScheduler extends
|
|||
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
|
||||
(AppAttemptAddedSchedulerEvent) event;
|
||||
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
||||
appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
|
||||
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
|
||||
appAttemptAddedEvent.getShouldNotifyAttemptAdded());
|
||||
}
|
||||
break;
|
||||
case APP_ATTEMPT_REMOVED:
|
||||
|
|
|
@ -24,13 +24,22 @@ public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
|
|||
|
||||
private final ApplicationAttemptId applicationAttemptId;
|
||||
private final boolean transferStateFromPreviousAttempt;
|
||||
private final boolean shouldNotifyAttemptAdded;
|
||||
|
||||
public AppAttemptAddedSchedulerEvent(
|
||||
ApplicationAttemptId applicationAttemptId,
|
||||
boolean transferStateFromPreviousAttempt) {
|
||||
this(applicationAttemptId, transferStateFromPreviousAttempt, true);
|
||||
}
|
||||
|
||||
public AppAttemptAddedSchedulerEvent(
|
||||
ApplicationAttemptId applicationAttemptId,
|
||||
boolean transferStateFromPreviousAttempt,
|
||||
boolean shouldNotifyAttemptAdded) {
|
||||
super(SchedulerEventType.APP_ATTEMPT_ADDED);
|
||||
this.applicationAttemptId = applicationAttemptId;
|
||||
this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
|
||||
this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded;
|
||||
}
|
||||
|
||||
public ApplicationAttemptId getApplicationAttemptId() {
|
||||
|
@ -40,4 +49,8 @@ public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
|
|||
public boolean getTransferStateFromPreviousAttempt() {
|
||||
return transferStateFromPreviousAttempt;
|
||||
}
|
||||
|
||||
public boolean getShouldNotifyAttemptAdded() {
|
||||
return shouldNotifyAttemptAdded;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -597,7 +597,8 @@ public class FairScheduler extends
|
|||
*/
|
||||
protected synchronized void addApplicationAttempt(
|
||||
ApplicationAttemptId applicationAttemptId,
|
||||
boolean transferStateFromPreviousAttempt) {
|
||||
boolean transferStateFromPreviousAttempt,
|
||||
boolean shouldNotifyAttemptAdded) {
|
||||
SchedulerApplication<FSSchedulerApp> application =
|
||||
applications.get(applicationAttemptId.getApplicationId());
|
||||
String user = application.getUser();
|
||||
|
@ -625,9 +626,16 @@ public class FairScheduler extends
|
|||
|
||||
LOG.info("Added Application Attempt " + applicationAttemptId
|
||||
+ " to scheduler from user: " + user);
|
||||
rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppAttemptEvent(applicationAttemptId,
|
||||
RMAppAttemptEventType.ATTEMPT_ADDED));
|
||||
|
||||
if (shouldNotifyAttemptAdded) {
|
||||
rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppAttemptEvent(applicationAttemptId,
|
||||
RMAppAttemptEventType.ATTEMPT_ADDED));
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping notifying ATTEMPT_ADDED");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1130,7 +1138,8 @@ public class FairScheduler extends
|
|||
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
|
||||
(AppAttemptAddedSchedulerEvent) event;
|
||||
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
||||
appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
|
||||
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
|
||||
appAttemptAddedEvent.getShouldNotifyAttemptAdded());
|
||||
break;
|
||||
case APP_ATTEMPT_REMOVED:
|
||||
if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
|
||||
|
|
|
@ -370,7 +370,8 @@ public class FifoScheduler extends
|
|||
@VisibleForTesting
|
||||
public synchronized void
|
||||
addApplicationAttempt(ApplicationAttemptId appAttemptId,
|
||||
boolean transferStateFromPreviousAttempt) {
|
||||
boolean transferStateFromPreviousAttempt,
|
||||
boolean shouldNotifyAttemptAdded) {
|
||||
SchedulerApplication<FiCaSchedulerApp> application =
|
||||
applications.get(appAttemptId.getApplicationId());
|
||||
String user = application.getUser();
|
||||
|
@ -388,9 +389,15 @@ public class FifoScheduler extends
|
|||
metrics.submitAppAttempt(user);
|
||||
LOG.info("Added Application Attempt " + appAttemptId
|
||||
+ " to scheduler from user " + application.getUser());
|
||||
rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppAttemptEvent(appAttemptId,
|
||||
RMAppAttemptEventType.ATTEMPT_ADDED));
|
||||
if (shouldNotifyAttemptAdded) {
|
||||
rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppAttemptEvent(appAttemptId,
|
||||
RMAppAttemptEventType.ATTEMPT_ADDED));
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping notifying ATTEMPT_ADDED");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void doneApplication(ApplicationId applicationId,
|
||||
|
@ -780,7 +787,8 @@ public class FifoScheduler extends
|
|||
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
|
||||
(AppAttemptAddedSchedulerEvent) event;
|
||||
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
|
||||
appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
|
||||
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
|
||||
appAttemptAddedEvent.getShouldNotifyAttemptAdded());
|
||||
}
|
||||
break;
|
||||
case APP_ATTEMPT_REMOVED:
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
|
@ -194,28 +195,17 @@ public class TestApplicationMasterLauncher {
|
|||
|
||||
// request for containers
|
||||
int request = 2;
|
||||
try {
|
||||
AllocateResponse ar =
|
||||
am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
|
||||
} catch (Exception e) {
|
||||
Assert.assertEquals("Application Master is trying to allocate before "
|
||||
+ "registering for: " + attempt.getAppAttemptId().getApplicationId(),
|
||||
e.getMessage());
|
||||
thrown = true;
|
||||
}
|
||||
AllocateResponse ar =
|
||||
am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
|
||||
Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
|
||||
|
||||
// kick the scheduler
|
||||
nm1.nodeHeartbeat(true);
|
||||
try {
|
||||
AllocateResponse amrs =
|
||||
am.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>());
|
||||
} catch (Exception e) {
|
||||
Assert.assertEquals("Application Master is trying to allocate before "
|
||||
+ "registering for: " + attempt.getAppAttemptId().getApplicationId(),
|
||||
e.getMessage());
|
||||
thrown = true;
|
||||
}
|
||||
Assert.assertTrue(thrown);
|
||||
AllocateResponse amrs =
|
||||
am.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>());
|
||||
Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC);
|
||||
|
||||
am.registerAppAttempt();
|
||||
thrown = false;
|
||||
try {
|
||||
|
@ -228,5 +218,17 @@ public class TestApplicationMasterLauncher {
|
|||
thrown = true;
|
||||
}
|
||||
Assert.assertTrue(thrown);
|
||||
|
||||
// Simulate an AM that was disconnected and app attempt was removed
|
||||
// (responseMap does not contain attemptid)
|
||||
am.unregisterAppAttempt();
|
||||
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
|
||||
ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
|
||||
AllocateResponse amrs2 =
|
||||
am.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>());
|
||||
Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,60 +18,33 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.*;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyList;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class TestApplicationMasterService {
|
||||
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
|
||||
|
@ -270,13 +243,17 @@ public class TestApplicationMasterService {
|
|||
}
|
||||
Assert.assertNotNull(cause);
|
||||
Assert
|
||||
.assertTrue(cause instanceof InvalidApplicationMasterRequestException);
|
||||
.assertTrue(cause instanceof ApplicationMasterNotRegisteredException);
|
||||
Assert.assertNotNull(cause.getMessage());
|
||||
Assert
|
||||
.assertTrue(cause
|
||||
.getMessage()
|
||||
.contains(
|
||||
"Application Master is trying to unregister before registering for:"));
|
||||
|
||||
am1.registerAppAttempt();
|
||||
|
||||
am1.unregisterAppAttempt(req, false);
|
||||
} finally {
|
||||
if (rm != null) {
|
||||
rm.stop();
|
||||
|
|
|
@ -238,7 +238,7 @@ public class TestFifoScheduler {
|
|||
}
|
||||
|
||||
ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
|
||||
scheduler.addApplicationAttempt(attId, false);
|
||||
scheduler.addApplicationAttempt(attId, false, true);
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
|
|
@ -293,7 +293,7 @@ public class TestRMRestart {
|
|||
AllocateResponse allocResponse = am1.allocate(
|
||||
new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>());
|
||||
Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC);
|
||||
Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand());
|
||||
|
||||
// NM should be rebooted on heartbeat, even first heartbeat for nm2
|
||||
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
|
||||
|
@ -1648,7 +1648,7 @@ public class TestRMRestart {
|
|||
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
|
||||
MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
am1.registerAppAttempt();
|
||||
am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
|
||||
am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());
|
||||
nm1.nodeHeartbeat(true);
|
||||
List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||
|
|
|
@ -535,6 +535,36 @@ public class TestWorkPreservingRMRestart {
|
|||
assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
|
||||
}
|
||||
|
||||
@Test (timeout = 600000)
|
||||
public void testAppReregisterOnRMWorkPreservingRestart() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// start RM
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
// create app and launch the AM
|
||||
RMApp app0 = rm1.submitApp(200);
|
||||
MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
|
||||
|
||||
// start new RM
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2.start();
|
||||
rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
|
||||
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||
|
||||
am0.setAMRMProtocol(rm2.getApplicationMasterService());
|
||||
am0.registerAppAttempt(false);
|
||||
|
||||
rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
|
||||
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
||||
}
|
||||
|
||||
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
|
||||
int appsPending, int appsRunning, int appsCompleted,
|
||||
int allocatedContainers, int availableMB, int availableVirtualCores,
|
||||
|
|
|
@ -146,7 +146,7 @@ public class FairSchedulerTestBase {
|
|||
// This conditional is for testAclSubmitApplication where app is rejected
|
||||
// and no app is added.
|
||||
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
|
||||
scheduler.addApplicationAttempt(id, false);
|
||||
scheduler.addApplicationAttempt(id, false, true);
|
||||
}
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
|
||||
|
|
|
@ -787,13 +787,13 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
|
||||
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
|
||||
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
|
||||
scheduler.addApplicationAttempt(id11, false);
|
||||
scheduler.addApplicationAttempt(id11, false, true);
|
||||
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
|
||||
scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
|
||||
scheduler.addApplicationAttempt(id21, false);
|
||||
scheduler.addApplicationAttempt(id21, false, true);
|
||||
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
|
||||
scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
|
||||
scheduler.addApplicationAttempt(id22, false);
|
||||
scheduler.addApplicationAttempt(id22, false, true);
|
||||
|
||||
int minReqSize =
|
||||
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
|
||||
|
@ -1555,7 +1555,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
|
||||
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
|
||||
scheduler.addApplicationAttempt(appId, false);
|
||||
scheduler.addApplicationAttempt(appId, false, true);
|
||||
|
||||
// 1 request with 2 nodes on the same rack. another request with 1 node on
|
||||
// a different rack
|
||||
|
@ -2714,7 +2714,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
ApplicationAttemptId appAttemptId =
|
||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
|
||||
fs.addApplicationAttempt(appAttemptId, false);
|
||||
fs.addApplicationAttempt(appAttemptId, false, true);
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
ResourceRequest request =
|
||||
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
|
||||
|
|
Loading…
Reference in New Issue