YARN-2249. Avoided AM release requests being lost on work preserving RM restart. Contributed by Jian He.
svn merge --ignore-ancestry -c 1618972 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1618973 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
96455a564a
commit
400f6131d4
|
@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
|
@ -934,5 +935,11 @@ public class ResourceSchedulerWrapper
|
|||
return new HashMap<ApplicationId,
|
||||
SchedulerApplication<SchedulerApplicationAttempt>>();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void completedContainer(RMContainer rmContainer,
|
||||
ContainerStatus containerStatus, RMContainerEventType event) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -196,6 +196,9 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2409. RM ActiveToStandBy transition missing stoping previous rmDispatcher.
|
||||
(Rohith via jianhe)
|
||||
|
||||
YARN-2249. Avoided AM release requests being lost on work preserving RM
|
||||
restart. (Jian He via zjshen)
|
||||
|
||||
Release 2.5.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -23,10 +23,14 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -34,18 +38,25 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
|||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
|
@ -54,6 +65,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|||
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public abstract class AbstractYarnScheduler
|
||||
<T extends SchedulerApplicationAttempt, N extends SchedulerNode>
|
||||
|
@ -72,6 +84,7 @@ public abstract class AbstractYarnScheduler
|
|||
|
||||
protected RMContext rmContext;
|
||||
protected Map<ApplicationId, SchedulerApplication<T>> applications;
|
||||
protected int nmExpireInterval;
|
||||
|
||||
protected final static List<Container> EMPTY_CONTAINER_LIST =
|
||||
new ArrayList<Container>();
|
||||
|
@ -87,6 +100,15 @@ public abstract class AbstractYarnScheduler
|
|||
super(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
nmExpireInterval =
|
||||
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
|
||||
createReleaseCache();
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
public synchronized List<Container> getTransferredContainers(
|
||||
ApplicationAttemptId currentAttempt) {
|
||||
ApplicationId appId = currentAttempt.getApplicationId();
|
||||
|
@ -281,6 +303,19 @@ public abstract class AbstractYarnScheduler
|
|||
((RMContainerImpl)rmContainer).setAMContainer(true);
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (schedulerAttempt) {
|
||||
Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
|
||||
if (releases.contains(container.getContainerId())) {
|
||||
// release the container
|
||||
rmContainer.handle(new RMContainerFinishedEvent(container
|
||||
.getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
|
||||
container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
|
||||
RMContainerEventType.RELEASED));
|
||||
releases.remove(container.getContainerId());
|
||||
LOG.info(container.getContainerId() + " is released by application.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -320,6 +355,62 @@ public abstract class AbstractYarnScheduler
|
|||
}
|
||||
}
|
||||
|
||||
protected void createReleaseCache() {
|
||||
// Cleanup the cache after nm expire interval.
|
||||
new Timer().schedule(new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (SchedulerApplication<T> app : applications.values()) {
|
||||
|
||||
T attempt = app.getCurrentAppAttempt();
|
||||
synchronized (attempt) {
|
||||
for (ContainerId containerId : attempt.getPendingRelease()) {
|
||||
RMAuditLogger.logFailure(
|
||||
app.getUser(),
|
||||
AuditConstants.RELEASE_CONTAINER,
|
||||
"Unauthorized access or invalid container",
|
||||
"Scheduler",
|
||||
"Trying to release container not owned by app or with invalid id.",
|
||||
attempt.getApplicationId(), containerId);
|
||||
}
|
||||
attempt.getPendingRelease().clear();
|
||||
}
|
||||
}
|
||||
LOG.info("Release request cache is cleaned up");
|
||||
}
|
||||
}, nmExpireInterval);
|
||||
}
|
||||
|
||||
// clean up a completed container
|
||||
protected abstract void completedContainer(RMContainer rmContainer,
|
||||
ContainerStatus containerStatus, RMContainerEventType event);
|
||||
|
||||
protected void releaseContainers(List<ContainerId> containers,
|
||||
SchedulerApplicationAttempt attempt) {
|
||||
for (ContainerId containerId : containers) {
|
||||
RMContainer rmContainer = getRMContainer(containerId);
|
||||
if (rmContainer == null) {
|
||||
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
|
||||
< nmExpireInterval) {
|
||||
LOG.info(containerId + " doesn't exist. Add the container"
|
||||
+ " to the release request cache as it maybe on recovery.");
|
||||
synchronized (attempt) {
|
||||
attempt.getPendingRelease().add(containerId);
|
||||
}
|
||||
} else {
|
||||
RMAuditLogger.logFailure(attempt.getUser(),
|
||||
AuditConstants.RELEASE_CONTAINER,
|
||||
"Unauthorized access or invalid container", "Scheduler",
|
||||
"Trying to release container not owned by app or with invalid id.",
|
||||
attempt.getApplicationId(), containerId);
|
||||
}
|
||||
}
|
||||
completedContainer(rmContainer,
|
||||
SchedulerUtils.createAbnormalContainerStatus(containerId,
|
||||
SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
|
||||
}
|
||||
}
|
||||
|
||||
public SchedulerNode getSchedulerNode(NodeId nodeId) {
|
||||
return nodes.get(nodeId);
|
||||
}
|
||||
|
|
|
@ -17,13 +17,14 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -41,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.HashMultiset;
|
||||
import com.google.common.collect.Multiset;
|
||||
|
||||
|
@ -87,6 +88,13 @@ public class SchedulerApplicationAttempt {
|
|||
protected List<RMContainer> newlyAllocatedContainers =
|
||||
new ArrayList<RMContainer>();
|
||||
|
||||
// This pendingRelease is used in work-preserving recovery scenario to keep
|
||||
// track of the AM's outstanding release requests. RM on recovery could
|
||||
// receive the release request form AM before it receives the container status
|
||||
// from NM for recovery. In this case, the to-be-recovered containers reported
|
||||
// by NM should not be recovered.
|
||||
private Set<ContainerId> pendingRelease = null;
|
||||
|
||||
/**
|
||||
* Count how many times the application has been given an opportunity
|
||||
* to schedule a task at each priority. Each time the scheduler
|
||||
|
@ -114,7 +122,7 @@ public class SchedulerApplicationAttempt {
|
|||
new AppSchedulingInfo(applicationAttemptId, user, queue,
|
||||
activeUsersManager, rmContext.getEpoch());
|
||||
this.queue = queue;
|
||||
|
||||
this.pendingRelease = new HashSet<ContainerId>();
|
||||
if (rmContext.getRMApps() != null &&
|
||||
rmContext.getRMApps()
|
||||
.containsKey(applicationAttemptId.getApplicationId())) {
|
||||
|
@ -163,6 +171,10 @@ public class SchedulerApplicationAttempt {
|
|||
return appSchedulingInfo.getResourceRequests(priority);
|
||||
}
|
||||
|
||||
public Set<ContainerId> getPendingRelease() {
|
||||
return this.pendingRelease;
|
||||
}
|
||||
|
||||
public int getNewContainerId() {
|
||||
return appSchedulingInfo.getNewContainerId();
|
||||
}
|
||||
|
|
|
@ -54,8 +54,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
|
||||
|
@ -199,7 +197,7 @@ public class CapacityScheduler extends
|
|||
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
|
||||
|
||||
private boolean overrideWithQueueMappings = false;
|
||||
private List<QueueMapping> mappings = new ArrayList<QueueMapping>();
|
||||
private List<QueueMapping> mappings = null;
|
||||
private Groups groups;
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -789,21 +787,7 @@ public class CapacityScheduler extends
|
|||
getMinimumResourceCapability(), maximumAllocation);
|
||||
|
||||
// Release containers
|
||||
for (ContainerId releasedContainerId : release) {
|
||||
RMContainer rmContainer = getRMContainer(releasedContainerId);
|
||||
if (rmContainer == null) {
|
||||
RMAuditLogger.logFailure(application.getUser(),
|
||||
AuditConstants.RELEASE_CONTAINER,
|
||||
"Unauthorized access or invalid container", "CapacityScheduler",
|
||||
"Trying to release container not owned by app or with invalid id",
|
||||
application.getApplicationId(), releasedContainerId);
|
||||
}
|
||||
completedContainer(rmContainer,
|
||||
SchedulerUtils.createAbnormalContainerStatus(
|
||||
releasedContainerId,
|
||||
SchedulerUtils.RELEASED_CONTAINER),
|
||||
RMContainerEventType.RELEASED);
|
||||
}
|
||||
releaseContainers(release, application);
|
||||
|
||||
synchronized (application) {
|
||||
|
||||
|
@ -1098,7 +1082,8 @@ public class CapacityScheduler extends
|
|||
}
|
||||
|
||||
@Lock(CapacityScheduler.class)
|
||||
private synchronized void completedContainer(RMContainer rmContainer,
|
||||
@Override
|
||||
protected synchronized void completedContainer(RMContainer rmContainer,
|
||||
ContainerStatus containerStatus, RMContainerEventType event) {
|
||||
if (rmContainer == null) {
|
||||
LOG.info("Null container completed...");
|
||||
|
|
|
@ -49,8 +49,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
|
@ -810,7 +808,8 @@ public class FairScheduler extends
|
|||
/**
|
||||
* Clean up a completed container.
|
||||
*/
|
||||
private synchronized void completedContainer(RMContainer rmContainer,
|
||||
@Override
|
||||
protected synchronized void completedContainer(RMContainer rmContainer,
|
||||
ContainerStatus containerStatus, RMContainerEventType event) {
|
||||
if (rmContainer == null) {
|
||||
LOG.info("Null container completed...");
|
||||
|
@ -913,21 +912,7 @@ public class FairScheduler extends
|
|||
}
|
||||
|
||||
// Release containers
|
||||
for (ContainerId releasedContainerId : release) {
|
||||
RMContainer rmContainer = getRMContainer(releasedContainerId);
|
||||
if (rmContainer == null) {
|
||||
RMAuditLogger.logFailure(application.getUser(),
|
||||
AuditConstants.RELEASE_CONTAINER,
|
||||
"Unauthorized access or invalid container", "FairScheduler",
|
||||
"Trying to release container not owned by app or with invalid id",
|
||||
application.getApplicationId(), releasedContainerId);
|
||||
}
|
||||
completedContainer(rmContainer,
|
||||
SchedulerUtils.createAbnormalContainerStatus(
|
||||
releasedContainerId,
|
||||
SchedulerUtils.RELEASED_CONTAINER),
|
||||
RMContainerEventType.RELEASED);
|
||||
}
|
||||
releaseContainers(release, application);
|
||||
|
||||
synchronized (application) {
|
||||
if (!ask.isEmpty()) {
|
||||
|
|
|
@ -52,8 +52,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
|
@ -89,7 +87,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.Lock;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
|
@ -295,21 +292,7 @@ public class FifoScheduler extends
|
|||
clusterResource, minimumAllocation, maximumAllocation);
|
||||
|
||||
// Release containers
|
||||
for (ContainerId releasedContainer : release) {
|
||||
RMContainer rmContainer = getRMContainer(releasedContainer);
|
||||
if (rmContainer == null) {
|
||||
RMAuditLogger.logFailure(application.getUser(),
|
||||
AuditConstants.RELEASE_CONTAINER,
|
||||
"Unauthorized access or invalid container", "FifoScheduler",
|
||||
"Trying to release container not owned by app or with invalid id",
|
||||
application.getApplicationId(), releasedContainer);
|
||||
}
|
||||
containerCompleted(rmContainer,
|
||||
SchedulerUtils.createAbnormalContainerStatus(
|
||||
releasedContainer,
|
||||
SchedulerUtils.RELEASED_CONTAINER),
|
||||
RMContainerEventType.RELEASED);
|
||||
}
|
||||
releaseContainers(release, application);
|
||||
|
||||
synchronized (application) {
|
||||
|
||||
|
@ -443,7 +426,7 @@ public class FifoScheduler extends
|
|||
LOG.info("Skip killing " + container.getContainerId());
|
||||
continue;
|
||||
}
|
||||
containerCompleted(container,
|
||||
completedContainer(container,
|
||||
SchedulerUtils.createAbnormalContainerStatus(
|
||||
container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
|
||||
RMContainerEventType.KILL);
|
||||
|
@ -717,7 +700,7 @@ public class FifoScheduler extends
|
|||
for (ContainerStatus completedContainer : completedContainers) {
|
||||
ContainerId containerId = completedContainer.getContainerId();
|
||||
LOG.debug("Container FINISHED: " + containerId);
|
||||
containerCompleted(getRMContainer(containerId),
|
||||
completedContainer(getRMContainer(containerId),
|
||||
completedContainer, RMContainerEventType.FINISHED);
|
||||
}
|
||||
|
||||
|
@ -818,7 +801,7 @@ public class FifoScheduler extends
|
|||
ContainerExpiredSchedulerEvent containerExpiredEvent =
|
||||
(ContainerExpiredSchedulerEvent) event;
|
||||
ContainerId containerid = containerExpiredEvent.getContainerId();
|
||||
containerCompleted(getRMContainer(containerid),
|
||||
completedContainer(getRMContainer(containerid),
|
||||
SchedulerUtils.createAbnormalContainerStatus(
|
||||
containerid,
|
||||
SchedulerUtils.EXPIRED_CONTAINER),
|
||||
|
@ -831,7 +814,8 @@ public class FifoScheduler extends
|
|||
}
|
||||
|
||||
@Lock(FifoScheduler.class)
|
||||
private synchronized void containerCompleted(RMContainer rmContainer,
|
||||
@Override
|
||||
protected synchronized void completedContainer(RMContainer rmContainer,
|
||||
ContainerStatus containerStatus, RMContainerEventType event) {
|
||||
if (rmContainer == null) {
|
||||
LOG.info("Null container completed...");
|
||||
|
@ -881,7 +865,7 @@ public class FifoScheduler extends
|
|||
}
|
||||
// Kill running containers
|
||||
for(RMContainer container : node.getRunningContainers()) {
|
||||
containerCompleted(container,
|
||||
completedContainer(container,
|
||||
SchedulerUtils.createAbnormalContainerStatus(
|
||||
container.getContainerId(),
|
||||
SchedulerUtils.LOST_CONTAINER),
|
||||
|
|
|
@ -49,7 +49,7 @@ public class MockAM {
|
|||
|
||||
private volatile int responseId = 0;
|
||||
private final ApplicationAttemptId attemptId;
|
||||
private final RMContext context;
|
||||
private RMContext context;
|
||||
private ApplicationMasterProtocol amRMProtocol;
|
||||
|
||||
private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
|
||||
|
@ -61,8 +61,10 @@ public class MockAM {
|
|||
this.amRMProtocol = amRMProtocol;
|
||||
this.attemptId = attemptId;
|
||||
}
|
||||
|
||||
void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol) {
|
||||
|
||||
public void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol,
|
||||
RMContext context) {
|
||||
this.context = context;
|
||||
this.amRMProtocol = amRMProtocol;
|
||||
}
|
||||
|
||||
|
|
|
@ -171,7 +171,6 @@ public class TestApplicationMasterService {
|
|||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
am1.registerAppAttempt();
|
||||
am1.setAMRMProtocol(rm.getApplicationMasterService());
|
||||
|
||||
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
|
||||
List<ContainerId> release = new ArrayList<ContainerId>();
|
||||
|
|
|
@ -289,7 +289,7 @@ public class TestRMRestart {
|
|||
|
||||
// verify old AM is not accepted
|
||||
// change running AM to talk to new RM
|
||||
am1.setAMRMProtocol(rm2.getApplicationMasterService());
|
||||
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
||||
AllocateResponse allocResponse = am1.allocate(
|
||||
new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>());
|
||||
|
@ -1663,7 +1663,7 @@ public class TestRMRestart {
|
|||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
// recover app
|
||||
RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
|
||||
am1.setAMRMProtocol(rm2.getApplicationMasterService());
|
||||
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
||||
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
|
||||
nm1.nodeHeartbeat(true);
|
||||
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
||||
|
|
|
@ -33,10 +33,13 @@ import java.util.Set;
|
|||
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
|
@ -72,6 +75,9 @@ import org.junit.Test;
|
|||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class TestWorkPreservingRMRestart {
|
||||
|
@ -572,8 +578,8 @@ public class TestWorkPreservingRMRestart {
|
|||
rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
|
||||
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
|
||||
|
||||
am0.setAMRMProtocol(rm2.getApplicationMasterService());
|
||||
am0.registerAppAttempt(false);
|
||||
am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
||||
am0.registerAppAttempt(true);
|
||||
|
||||
rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
|
||||
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
||||
|
@ -646,6 +652,69 @@ public class TestWorkPreservingRMRestart {
|
|||
waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
|
||||
}
|
||||
|
||||
// Test if RM on recovery receives the container release request from AM
|
||||
// before it receives the container status reported by NM for recovery. this
|
||||
// container should not be recovered.
|
||||
@Test (timeout = 30000)
|
||||
public void testReleasedContainerNotRecovered() throws Exception {
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
rm1.start();
|
||||
|
||||
RMApp app1 = rm1.submitApp(1024);
|
||||
final MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
// Re-start RM
|
||||
conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 8000);
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
|
||||
am1.registerAppAttempt(true);
|
||||
|
||||
// try to release a container before the container is actually recovered.
|
||||
final ContainerId runningContainer =
|
||||
ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
|
||||
am1.allocate(null, Arrays.asList(runningContainer));
|
||||
|
||||
// send container statuses to recover the containers
|
||||
List<NMContainerStatus> containerStatuses =
|
||||
createNMContainerStatusForApp(am1);
|
||||
nm1.registerNode(containerStatuses, null);
|
||||
|
||||
// only the am container should be recovered.
|
||||
waitForNumContainersToRecover(1, rm2, am1.getApplicationAttemptId());
|
||||
|
||||
final AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler) rm2.getResourceScheduler();
|
||||
// cached release request is cleaned.
|
||||
// assertFalse(scheduler.getPendingRelease().contains(runningContainer));
|
||||
|
||||
AllocateResponse response = am1.allocate(null, null);
|
||||
// AM gets notified of the completed container.
|
||||
boolean receivedCompletedContainer = false;
|
||||
for (ContainerStatus status : response.getCompletedContainersStatuses()) {
|
||||
if (status.getContainerId().equals(runningContainer)) {
|
||||
receivedCompletedContainer = true;
|
||||
}
|
||||
}
|
||||
assertTrue(receivedCompletedContainer);
|
||||
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
public Boolean get() {
|
||||
// release cache is cleaned up and previous running container is not
|
||||
// recovered
|
||||
return scheduler.getApplicationAttempt(am1.getApplicationAttemptId())
|
||||
.getPendingRelease().isEmpty()
|
||||
&& scheduler.getRMContainer(runningContainer) == null;
|
||||
}
|
||||
}, 1000, 20000);
|
||||
}
|
||||
|
||||
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
|
||||
int appsPending, int appsRunning, int appsCompleted,
|
||||
int allocatedContainers, int availableMB, int availableVirtualCores,
|
||||
|
@ -661,7 +730,7 @@ public class TestWorkPreservingRMRestart {
|
|||
assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores());
|
||||
}
|
||||
|
||||
private void waitForNumContainersToRecover(int num, MockRM rm,
|
||||
public static void waitForNumContainersToRecover(int num, MockRM rm,
|
||||
ApplicationAttemptId attemptId) throws Exception {
|
||||
AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler) rm.getResourceScheduler();
|
||||
|
@ -674,7 +743,9 @@ public class TestWorkPreservingRMRestart {
|
|||
attempt = scheduler.getApplicationAttempt(attemptId);
|
||||
}
|
||||
while (attempt.getLiveContainers().size() < num) {
|
||||
System.out.println("Wait for " + num + " containers to recover.");
|
||||
System.out.println("Wait for " + num
|
||||
+ " containers to recover. currently: "
|
||||
+ attempt.getLiveContainers().size());
|
||||
Thread.sleep(200);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue