MAPREDUCE-5910. Make MR AM resync with RM in case of work-preserving RM-restart. Contributed by Rohith
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611434 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ef9e24f826
commit
875592220f
|
@ -17,6 +17,9 @@ Trunk (Unreleased)
|
||||||
MAPREDUCE-5232. Add a configuration to be able to log classpath and other
|
MAPREDUCE-5232. Add a configuration to be able to log classpath and other
|
||||||
system properties on mapreduce JVMs startup. (Sangjin Lee via vinodkv)
|
system properties on mapreduce JVMs startup. (Sangjin Lee via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-5910. Make MR AM resync with RM in case of work-preserving
|
||||||
|
RM-restart. (Rohith via jianhe)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)
|
MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)
|
||||||
|
|
|
@ -64,6 +64,7 @@ public class LocalContainerAllocator extends RMCommunicator
|
||||||
private int nmPort;
|
private int nmPort;
|
||||||
private int nmHttpPort;
|
private int nmHttpPort;
|
||||||
private ContainerId containerId;
|
private ContainerId containerId;
|
||||||
|
protected int lastResponseID;
|
||||||
|
|
||||||
private final RecordFactory recordFactory =
|
private final RecordFactory recordFactory =
|
||||||
RecordFactoryProvider.getRecordFactory(null);
|
RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
@ -119,6 +120,11 @@ public class LocalContainerAllocator extends RMCommunicator
|
||||||
if (allocateResponse.getAMCommand() != null) {
|
if (allocateResponse.getAMCommand() != null) {
|
||||||
switch(allocateResponse.getAMCommand()) {
|
switch(allocateResponse.getAMCommand()) {
|
||||||
case AM_RESYNC:
|
case AM_RESYNC:
|
||||||
|
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
|
||||||
|
+ " hence resyncing.");
|
||||||
|
this.lastResponseID = 0;
|
||||||
|
register();
|
||||||
|
break;
|
||||||
case AM_SHUTDOWN:
|
case AM_SHUTDOWN:
|
||||||
LOG.info("Event from RM: shutting down Application Master");
|
LOG.info("Event from RM: shutting down Application Master");
|
||||||
// This can happen if the RM has been restarted. If it is in that state,
|
// This can happen if the RM has been restarted. If it is in that state,
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
@ -216,6 +217,7 @@ public abstract class RMCommunicator extends AbstractService
|
||||||
FinishApplicationMasterRequest request =
|
FinishApplicationMasterRequest request =
|
||||||
FinishApplicationMasterRequest.newInstance(finishState,
|
FinishApplicationMasterRequest.newInstance(finishState,
|
||||||
sb.toString(), historyUrl);
|
sb.toString(), historyUrl);
|
||||||
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
FinishApplicationMasterResponse response =
|
FinishApplicationMasterResponse response =
|
||||||
scheduler.finishApplicationMaster(request);
|
scheduler.finishApplicationMaster(request);
|
||||||
|
@ -231,6 +233,12 @@ public abstract class RMCommunicator extends AbstractService
|
||||||
LOG.info("Waiting for application to be successfully unregistered.");
|
LOG.info("Waiting for application to be successfully unregistered.");
|
||||||
Thread.sleep(rmPollInterval);
|
Thread.sleep(rmPollInterval);
|
||||||
}
|
}
|
||||||
|
} catch (ApplicationMasterNotRegisteredException e) {
|
||||||
|
// RM might have restarted or failed over and so lost the fact that AM had
|
||||||
|
// registered before.
|
||||||
|
register();
|
||||||
|
doUnregistration();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Resource getMaxContainerCapability() {
|
protected Resource getMaxContainerCapability() {
|
||||||
|
|
|
@ -389,6 +389,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
removed = true;
|
removed = true;
|
||||||
assignedRequests.remove(aId);
|
assignedRequests.remove(aId);
|
||||||
containersReleased++;
|
containersReleased++;
|
||||||
|
pendingRelease.add(containerId);
|
||||||
release(containerId);
|
release(containerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -641,6 +642,15 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
if (response.getAMCommand() != null) {
|
if (response.getAMCommand() != null) {
|
||||||
switch(response.getAMCommand()) {
|
switch(response.getAMCommand()) {
|
||||||
case AM_RESYNC:
|
case AM_RESYNC:
|
||||||
|
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
|
||||||
|
+ " hence resyncing.");
|
||||||
|
lastResponseID = 0;
|
||||||
|
|
||||||
|
// Registering to allow RM to discover an active AM for this
|
||||||
|
// application
|
||||||
|
register();
|
||||||
|
addOutstandingRequestOnResync();
|
||||||
|
break;
|
||||||
case AM_SHUTDOWN:
|
case AM_SHUTDOWN:
|
||||||
// This can happen if the RM has been restarted. If it is in that state,
|
// This can happen if the RM has been restarted. If it is in that state,
|
||||||
// this application must clean itself up.
|
// this application must clean itself up.
|
||||||
|
@ -700,6 +710,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
LOG.error("Container complete event for unknown container id "
|
LOG.error("Container complete event for unknown container id "
|
||||||
+ cont.getContainerId());
|
+ cont.getContainerId());
|
||||||
} else {
|
} else {
|
||||||
|
pendingRelease.remove(cont.getContainerId());
|
||||||
assignedRequests.remove(attemptID);
|
assignedRequests.remove(attemptID);
|
||||||
|
|
||||||
// send the container completed event to Task attempt
|
// send the container completed event to Task attempt
|
||||||
|
@ -991,6 +1002,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
|
|
||||||
private void containerNotAssigned(Container allocated) {
|
private void containerNotAssigned(Container allocated) {
|
||||||
containersReleased++;
|
containersReleased++;
|
||||||
|
pendingRelease.add(allocated.getId());
|
||||||
release(allocated.getId());
|
release(allocated.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
@ -58,7 +59,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
|
private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
|
||||||
|
|
||||||
private int lastResponseID;
|
protected int lastResponseID;
|
||||||
private Resource availableResources;
|
private Resource availableResources;
|
||||||
|
|
||||||
private final RecordFactory recordFactory =
|
private final RecordFactory recordFactory =
|
||||||
|
@ -78,7 +79,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
||||||
private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
|
private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
|
||||||
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
|
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
|
||||||
private final Set<ContainerId> release = new TreeSet<ContainerId>();
|
private final Set<ContainerId> release = new TreeSet<ContainerId>();
|
||||||
|
// pendingRelease holds history or release requests.request is removed only if
|
||||||
|
// RM sends completedContainer.
|
||||||
|
// How it different from release? --> release is for per allocate() request.
|
||||||
|
protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
|
||||||
private boolean nodeBlacklistingEnabled;
|
private boolean nodeBlacklistingEnabled;
|
||||||
private int blacklistDisablePercent;
|
private int blacklistDisablePercent;
|
||||||
private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
|
private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
|
||||||
|
@ -186,6 +190,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
||||||
} catch (YarnException e) {
|
} catch (YarnException e) {
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isResyncCommand(allocateResponse)) {
|
||||||
|
return allocateResponse;
|
||||||
|
}
|
||||||
lastResponseID = allocateResponse.getResponseId();
|
lastResponseID = allocateResponse.getResponseId();
|
||||||
availableResources = allocateResponse.getAvailableResources();
|
availableResources = allocateResponse.getAvailableResources();
|
||||||
lastClusterNmCount = clusterNmCount;
|
lastClusterNmCount = clusterNmCount;
|
||||||
|
@ -214,6 +222,28 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
||||||
return allocateResponse;
|
return allocateResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected boolean isResyncCommand(AllocateResponse allocateResponse) {
|
||||||
|
return allocateResponse.getAMCommand() != null
|
||||||
|
&& allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addOutstandingRequestOnResync() {
|
||||||
|
for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
|
||||||
|
.values()) {
|
||||||
|
for (Map<Resource, ResourceRequest> capabalities : rr.values()) {
|
||||||
|
for (ResourceRequest request : capabalities.values()) {
|
||||||
|
addResourceRequestToAsk(request);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!ignoreBlacklisting.get()) {
|
||||||
|
blacklistAdditions.addAll(blacklistedNodes);
|
||||||
|
}
|
||||||
|
if (!pendingRelease.isEmpty()) {
|
||||||
|
release.addAll(pendingRelease);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// May be incorrect if there's multiple NodeManagers running on a single host.
|
// May be incorrect if there's multiple NodeManagers running on a single host.
|
||||||
// knownNodeCount is based on node managers, not hosts. blacklisting is
|
// knownNodeCount is based on node managers, not hosts. blacklisting is
|
||||||
// currently based on hosts.
|
// currently based on hosts.
|
||||||
|
|
|
@ -78,6 +78,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
@ -87,6 +88,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Event;
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
|
@ -95,9 +97,13 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
@ -618,6 +624,10 @@ public class TestRMContainerAllocator {
|
||||||
super(conf);
|
super(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MyResourceManager(Configuration conf, RMStateStore store) {
|
||||||
|
super(conf, store);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serviceStart() throws Exception {
|
public void serviceStart() throws Exception {
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
|
@ -1426,6 +1436,13 @@ public class TestRMContainerAllocator {
|
||||||
rm.getMyFifoScheduler().lastBlacklistRemovals.size());
|
rm.getMyFifoScheduler().lastBlacklistRemovals.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void assertAsksAndReleases(int expectedAsk,
|
||||||
|
int expectedRelease, MyResourceManager rm) {
|
||||||
|
Assert.assertEquals(expectedAsk, rm.getMyFifoScheduler().lastAsk.size());
|
||||||
|
Assert.assertEquals(expectedRelease,
|
||||||
|
rm.getMyFifoScheduler().lastRelease.size());
|
||||||
|
}
|
||||||
|
|
||||||
private static class MyFifoScheduler extends FifoScheduler {
|
private static class MyFifoScheduler extends FifoScheduler {
|
||||||
|
|
||||||
public MyFifoScheduler(RMContext rmContext) {
|
public MyFifoScheduler(RMContext rmContext) {
|
||||||
|
@ -1440,6 +1457,7 @@ public class TestRMContainerAllocator {
|
||||||
}
|
}
|
||||||
|
|
||||||
List<ResourceRequest> lastAsk = null;
|
List<ResourceRequest> lastAsk = null;
|
||||||
|
List<ContainerId> lastRelease = null;
|
||||||
List<String> lastBlacklistAdditions;
|
List<String> lastBlacklistAdditions;
|
||||||
List<String> lastBlacklistRemovals;
|
List<String> lastBlacklistRemovals;
|
||||||
|
|
||||||
|
@ -1458,6 +1476,7 @@ public class TestRMContainerAllocator {
|
||||||
askCopy.add(reqCopy);
|
askCopy.add(reqCopy);
|
||||||
}
|
}
|
||||||
lastAsk = ask;
|
lastAsk = ask;
|
||||||
|
lastRelease = release;
|
||||||
lastBlacklistAdditions = blacklistAdditions;
|
lastBlacklistAdditions = blacklistAdditions;
|
||||||
lastBlacklistRemovals = blacklistRemovals;
|
lastBlacklistRemovals = blacklistRemovals;
|
||||||
return super.allocate(
|
return super.allocate(
|
||||||
|
@ -1505,6 +1524,20 @@ public class TestRMContainerAllocator {
|
||||||
return new ContainerFailedEvent(attemptId, host);
|
return new ContainerFailedEvent(attemptId, host);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ContainerAllocatorEvent createDeallocateEvent(JobId jobId,
|
||||||
|
int taskAttemptId, boolean reduce) {
|
||||||
|
TaskId taskId;
|
||||||
|
if (reduce) {
|
||||||
|
taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
|
||||||
|
} else {
|
||||||
|
taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
|
||||||
|
}
|
||||||
|
TaskAttemptId attemptId =
|
||||||
|
MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId);
|
||||||
|
return new ContainerAllocatorEvent(attemptId,
|
||||||
|
ContainerAllocator.EventType.CONTAINER_DEALLOCATE);
|
||||||
|
}
|
||||||
|
|
||||||
private void checkAssignments(ContainerRequestEvent[] requests,
|
private void checkAssignments(ContainerRequestEvent[] requests,
|
||||||
List<TaskAttemptContainerAssignedEvent> assignments,
|
List<TaskAttemptContainerAssignedEvent> assignments,
|
||||||
boolean checkHostMatch) {
|
boolean checkHostMatch) {
|
||||||
|
@ -1557,6 +1590,7 @@ public class TestRMContainerAllocator {
|
||||||
= new ArrayList<JobUpdatedNodesEvent>();
|
= new ArrayList<JobUpdatedNodesEvent>();
|
||||||
private MyResourceManager rm;
|
private MyResourceManager rm;
|
||||||
private boolean isUnregistered = false;
|
private boolean isUnregistered = false;
|
||||||
|
private AllocateResponse allocateResponse;
|
||||||
private static AppContext createAppContext(
|
private static AppContext createAppContext(
|
||||||
ApplicationAttemptId appAttemptId, Job job) {
|
ApplicationAttemptId appAttemptId, Job job) {
|
||||||
AppContext context = mock(AppContext.class);
|
AppContext context = mock(AppContext.class);
|
||||||
|
@ -1668,6 +1702,10 @@ public class TestRMContainerAllocator {
|
||||||
super.handleEvent(f);
|
super.handleEvent(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void sendDeallocate(ContainerAllocatorEvent f) {
|
||||||
|
super.handleEvent(f);
|
||||||
|
}
|
||||||
|
|
||||||
// API to be used by tests
|
// API to be used by tests
|
||||||
public List<TaskAttemptContainerAssignedEvent> schedule()
|
public List<TaskAttemptContainerAssignedEvent> schedule()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
@ -1713,6 +1751,20 @@ public class TestRMContainerAllocator {
|
||||||
public boolean isUnregistered() {
|
public boolean isUnregistered() {
|
||||||
return isUnregistered;
|
return isUnregistered;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateSchedulerProxy(MyResourceManager rm) {
|
||||||
|
scheduler = rm.getApplicationMasterService();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AllocateResponse makeRemoteRequest() throws IOException {
|
||||||
|
allocateResponse = super.makeRemoteRequest();
|
||||||
|
return allocateResponse;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isResyncCommand() {
|
||||||
|
return super.isResyncCommand(allocateResponse);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -2022,6 +2074,198 @@ public class TestRMContainerAllocator {
|
||||||
Assert.assertTrue(allocator.isUnregistered());
|
Assert.assertTrue(allocator.isUnregistered());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Step-1 : AM send allocate request for 2 ContainerRequests and 1
|
||||||
|
// blackListeNode
|
||||||
|
// Step-2 : 2 containers are allocated by RM.
|
||||||
|
// Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to
|
||||||
|
// RM
|
||||||
|
// Step-4 : On RM restart, AM(does not know RM is restarted) sends
|
||||||
|
// additional containerRequest(event4) and blacklisted nodes.
|
||||||
|
// Intern RM send resync command
|
||||||
|
// Step-5 : On Resync,AM sends all outstanding
|
||||||
|
// asks,release,blacklistAaddition
|
||||||
|
// and another containerRequest(event5)
|
||||||
|
// Step-6 : RM allocates containers i.e event3,event4 and cRequest5
|
||||||
|
@Test
|
||||||
|
public void testRMContainerAllocatorResendsRequestsOnRMRestart()
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
||||||
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
|
||||||
|
|
||||||
|
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
|
||||||
|
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
|
||||||
|
conf.setInt(
|
||||||
|
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
|
||||||
|
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
|
||||||
|
MyResourceManager rm1 = new MyResourceManager(conf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
DrainDispatcher dispatcher =
|
||||||
|
(DrainDispatcher) rm1.getRMContext().getDispatcher();
|
||||||
|
|
||||||
|
// Submit the application
|
||||||
|
RMApp app = rm1.submitApp(1024);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
nm1.nodeHeartbeat(true); // Node heartbeat
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
app.getCurrentAppAttempt().getAppAttemptId();
|
||||||
|
rm1.sendAMLaunched(appAttemptId);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
|
||||||
|
Job mockJob = mock(Job.class);
|
||||||
|
when(mockJob.getReport()).thenReturn(
|
||||||
|
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
||||||
|
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
||||||
|
MyContainerAllocator allocator =
|
||||||
|
new MyContainerAllocator(rm1, conf, appAttemptId, mockJob);
|
||||||
|
|
||||||
|
// Step-1 : AM send allocate request for 2 ContainerRequests and 1
|
||||||
|
// blackListeNode
|
||||||
|
// create the container request
|
||||||
|
// send MAP request
|
||||||
|
ContainerRequestEvent event1 =
|
||||||
|
createReq(jobId, 1, 1024, new String[] { "h1" });
|
||||||
|
allocator.sendRequest(event1);
|
||||||
|
|
||||||
|
ContainerRequestEvent event2 =
|
||||||
|
createReq(jobId, 2, 2048, new String[] { "h1", "h2" });
|
||||||
|
allocator.sendRequest(event2);
|
||||||
|
|
||||||
|
// Send events to blacklist h2
|
||||||
|
ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h2", false);
|
||||||
|
allocator.sendFailure(f1);
|
||||||
|
|
||||||
|
// send allocate request and 1 blacklisted nodes
|
||||||
|
List<TaskAttemptContainerAssignedEvent> assignedContainers =
|
||||||
|
allocator.schedule();
|
||||||
|
dispatcher.await();
|
||||||
|
Assert.assertEquals("No of assignments must be 0", 0,
|
||||||
|
assignedContainers.size());
|
||||||
|
// Why ask is 3, not 4? --> ask from blacklisted node h2 is removed
|
||||||
|
assertAsksAndReleases(3, 0, rm1);
|
||||||
|
assertBlacklistAdditionsAndRemovals(1, 0, rm1);
|
||||||
|
|
||||||
|
nm1.nodeHeartbeat(true); // Node heartbeat
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// Step-2 : 2 containers are allocated by RM.
|
||||||
|
assignedContainers = allocator.schedule();
|
||||||
|
dispatcher.await();
|
||||||
|
Assert.assertEquals("No of assignments must be 2", 2,
|
||||||
|
assignedContainers.size());
|
||||||
|
assertAsksAndReleases(0, 0, rm1);
|
||||||
|
assertBlacklistAdditionsAndRemovals(0, 0, rm1);
|
||||||
|
|
||||||
|
assignedContainers = allocator.schedule();
|
||||||
|
Assert.assertEquals("No of assignments must be 0", 0,
|
||||||
|
assignedContainers.size());
|
||||||
|
assertAsksAndReleases(3, 0, rm1);
|
||||||
|
assertBlacklistAdditionsAndRemovals(0, 0, rm1);
|
||||||
|
|
||||||
|
// Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to
|
||||||
|
// RM
|
||||||
|
// send container request
|
||||||
|
ContainerRequestEvent event3 =
|
||||||
|
createReq(jobId, 3, 1000, new String[] { "h1" });
|
||||||
|
allocator.sendRequest(event3);
|
||||||
|
|
||||||
|
// send deallocate request
|
||||||
|
ContainerAllocatorEvent deallocate1 =
|
||||||
|
createDeallocateEvent(jobId, 1, false);
|
||||||
|
allocator.sendDeallocate(deallocate1);
|
||||||
|
|
||||||
|
assignedContainers = allocator.schedule();
|
||||||
|
Assert.assertEquals("No of assignments must be 0", 0,
|
||||||
|
assignedContainers.size());
|
||||||
|
assertAsksAndReleases(3, 1, rm1);
|
||||||
|
assertBlacklistAdditionsAndRemovals(0, 0, rm1);
|
||||||
|
|
||||||
|
// Phase-2 start 2nd RM is up
|
||||||
|
MyResourceManager rm2 = new MyResourceManager(conf, memStore);
|
||||||
|
rm2.start();
|
||||||
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
|
allocator.updateSchedulerProxy(rm2);
|
||||||
|
dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
|
||||||
|
|
||||||
|
// NM should be rebooted on heartbeat, even first heartbeat for nm2
|
||||||
|
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
|
||||||
|
Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
|
||||||
|
|
||||||
|
// new NM to represent NM re-register
|
||||||
|
nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// Step-4 : On RM restart, AM(does not know RM is restarted) sends
|
||||||
|
// additional containerRequest(event4) and blacklisted nodes.
|
||||||
|
// Intern RM send resync command
|
||||||
|
|
||||||
|
// send deallocate request, release=1
|
||||||
|
ContainerAllocatorEvent deallocate2 =
|
||||||
|
createDeallocateEvent(jobId, 2, false);
|
||||||
|
allocator.sendDeallocate(deallocate2);
|
||||||
|
|
||||||
|
// Send events to blacklist nodes h3
|
||||||
|
ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h3", false);
|
||||||
|
allocator.sendFailure(f2);
|
||||||
|
|
||||||
|
ContainerRequestEvent event4 =
|
||||||
|
createReq(jobId, 4, 2000, new String[] { "h1", "h2" });
|
||||||
|
allocator.sendRequest(event4);
|
||||||
|
|
||||||
|
// send allocate request to 2nd RM and get resync command
|
||||||
|
allocator.schedule();
|
||||||
|
dispatcher.await();
|
||||||
|
Assert.assertTrue("Last allocate response is not RESYNC",
|
||||||
|
allocator.isResyncCommand());
|
||||||
|
|
||||||
|
// Step-5 : On Resync,AM sends all outstanding
|
||||||
|
// asks,release,blacklistAaddition
|
||||||
|
// and another containerRequest(event5)
|
||||||
|
ContainerRequestEvent event5 =
|
||||||
|
createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" });
|
||||||
|
allocator.sendRequest(event5);
|
||||||
|
|
||||||
|
// send all outstanding request again.
|
||||||
|
assignedContainers = allocator.schedule();
|
||||||
|
dispatcher.await();
|
||||||
|
assertAsksAndReleases(3, 2, rm2);
|
||||||
|
assertBlacklistAdditionsAndRemovals(2, 0, rm2);
|
||||||
|
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// Step-6 : RM allocates containers i.e event3,event4 and cRequest5
|
||||||
|
assignedContainers = allocator.schedule();
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
Assert.assertEquals("Number of container should be 3", 3,
|
||||||
|
assignedContainers.size());
|
||||||
|
|
||||||
|
for (TaskAttemptContainerAssignedEvent assig : assignedContainers) {
|
||||||
|
Assert.assertTrue("Assigned count not correct",
|
||||||
|
"h1".equals(assig.getContainer().getNodeId().getHost()));
|
||||||
|
}
|
||||||
|
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
TestRMContainerAllocator t = new TestRMContainerAllocator();
|
TestRMContainerAllocator t = new TestRMContainerAllocator();
|
||||||
t.testSimple();
|
t.testSimple();
|
||||||
|
|
Loading…
Reference in New Issue