YARN-1588. Enhanced RM and the scheduling protocol to also send NMTokens of transferred containers from previous app-attempts to new AMs after YARN-1490. Contributed by Jian He.

svn merge --ignore-ancestry -c 1572230 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1572232 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-02-26 20:20:39 +00:00
parent b1d64419ac
commit 3406b14755
12 changed files with 324 additions and 43 deletions

View File

@ -131,6 +131,10 @@ Release 2.4.0 - UNRELEASED
YARN-1497. Command line additions for moving apps between queues (Sandy YARN-1497. Command line additions for moving apps between queues (Sandy
Ryza) Ryza)
YARN-1588. Enhanced RM and the scheduling protocol to also send NMTokens of
transferred containers from previous app-attempts to new AMs after YARN-1490.
(Jian He via vinodkv)
IMPROVEMENTS IMPROVEMENTS
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -55,13 +56,15 @@ public abstract class RegisterApplicationMasterResponse {
public static RegisterApplicationMasterResponse newInstance( public static RegisterApplicationMasterResponse newInstance(
Resource minCapability, Resource maxCapability, Resource minCapability, Resource maxCapability,
Map<ApplicationAccessType, String> acls, ByteBuffer key, Map<ApplicationAccessType, String> acls, ByteBuffer key,
List<Container> containersFromPreviousAttempt, String queue) { List<Container> containersFromPreviousAttempt, String queue,
List<NMToken> nmTokensFromPreviousAttempts) {
RegisterApplicationMasterResponse response = RegisterApplicationMasterResponse response =
Records.newRecord(RegisterApplicationMasterResponse.class); Records.newRecord(RegisterApplicationMasterResponse.class);
response.setMaximumResourceCapability(maxCapability); response.setMaximumResourceCapability(maxCapability);
response.setApplicationACLs(acls); response.setApplicationACLs(acls);
response.setClientToAMTokenMasterKey(key); response.setClientToAMTokenMasterKey(key);
response.setContainersFromPreviousAttempt(containersFromPreviousAttempt); response.setContainersFromPreviousAttempts(containersFromPreviousAttempt);
response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts);
response.setQueue(queue); response.setQueue(queue);
return response; return response;
} }
@ -129,26 +132,52 @@ public abstract class RegisterApplicationMasterResponse {
/** /**
* <p> * <p>
* Get the list of running containers as viewed by * Get the list of running containers as viewed by
* <code>ResourceManager</code> from previous application attempt. * <code>ResourceManager</code> from previous application attempts.
* </p> * </p>
* *
* @return the list of running containers as viewed by * @return the list of running containers as viewed by
* <code>ResourceManager</code> from previous application attempt * <code>ResourceManager</code> from previous application attempts
* @see RegisterApplicationMasterResponse#getNMTokensFromPreviousAttempts()
*/ */
@Public @Public
@Unstable @Unstable
public abstract List<Container> getContainersFromPreviousAttempt(); public abstract List<Container> getContainersFromPreviousAttempts();
/** /**
* Set the list of running containers as viewed by * Set the list of running containers as viewed by
* <code>ResourceManager</code> from previous application attempt. * <code>ResourceManager</code> from previous application attempts.
* *
* @param containersFromPreviousAttempt * @param containersFromPreviousAttempt
* the list of running containers as viewed by * the list of running containers as viewed by
* <code>ResourceManager</code> from previous application attempt. * <code>ResourceManager</code> from previous application attempts.
*/ */
@Private @Private
@Unstable @Unstable
public abstract void setContainersFromPreviousAttempt( public abstract void setContainersFromPreviousAttempts(
List<Container> containersFromPreviousAttempt); List<Container> containersFromPreviousAttempt);
/**
* Get the list of NMTokens for communicating with the NMs where the
* containers of previous application attempts are running.
*
* @return the list of NMTokens for communicating with the NMs where the
* containers of previous application attempts are running.
*
* @see RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()
*/
@Public
@Stable
public abstract List<NMToken> getNMTokensFromPreviousAttempts();
/**
* Set the list of NMTokens for communicating with the NMs where the the
* containers of previous application attempts are running.
*
* @param nmTokens
* the list of NMTokens for communicating with the NMs where the
* containers of previous application attempts are running.
*/
@Private
@Unstable
public abstract void setNMTokensFromPreviousAttempts(List<NMToken> nmTokens);
} }

View File

@ -72,4 +72,37 @@ public abstract class NMToken {
@Stable @Stable
public abstract void setToken(Token token); public abstract void setToken(Token token);
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result =
prime * result + ((getNodeId() == null) ? 0 : getNodeId().hashCode());
result =
prime * result + ((getToken() == null) ? 0 : getToken().hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
NMToken other = (NMToken) obj;
if (getNodeId() == null) {
if (other.getNodeId() != null)
return false;
} else if (!getNodeId().equals(other.getNodeId()))
return false;
if (getToken() == null) {
if (other.getToken() != null)
return false;
} else if (!getToken().equals(other.getToken()))
return false;
return true;
}
} }

View File

@ -44,8 +44,9 @@ message RegisterApplicationMasterResponseProto {
optional ResourceProto maximumCapability = 1; optional ResourceProto maximumCapability = 1;
optional bytes client_to_am_token_master_key = 2; optional bytes client_to_am_token_master_key = 2;
repeated ApplicationACLMapProto application_ACLs = 3; repeated ApplicationACLMapProto application_ACLs = 3;
repeated ContainerProto containers_from_previous_attempt = 4; repeated ContainerProto containers_from_previous_attempts = 4;
optional string queue = 5; optional string queue = 5;
repeated NMTokenProto nm_tokens_from_previous_attempts = 6;
} }
message FinishApplicationMasterRequestProto { message FinishApplicationMasterRequestProto {

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -542,7 +543,7 @@ public class ApplicationMaster {
} }
List<Container> previousAMRunningContainers = List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempt(); response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size() LOG.info("Received " + previousAMRunningContainers.size()
+ " previous AM's running containers on AM registration."); + " previous AM's running containers on AM registration.");
numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); numAllocatedContainers.addAndGet(previousAMRunningContainers.size());

View File

@ -195,6 +195,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
appTrackingUrl); appTrackingUrl);
RegisterApplicationMasterResponse response = RegisterApplicationMasterResponse response =
rmClient.registerApplicationMaster(request); rmClient.registerApplicationMaster(request);
synchronized (this) {
if(!response.getNMTokensFromPreviousAttempts().isEmpty()) {
populateNMTokens(response.getNMTokensFromPreviousAttempts());
}
}
return response; return response;
} }
@ -250,7 +256,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
lastResponseId = allocateResponse.getResponseId(); lastResponseId = allocateResponse.getResponseId();
clusterAvailableResources = allocateResponse.getAvailableResources(); clusterAvailableResources = allocateResponse.getAvailableResources();
if (!allocateResponse.getNMTokens().isEmpty()) { if (!allocateResponse.getNMTokens().isEmpty()) {
populateNMTokens(allocateResponse); populateNMTokens(allocateResponse.getNMTokens());
} }
} }
} finally { } finally {
@ -284,13 +290,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
@Private @Private
@VisibleForTesting @VisibleForTesting
protected void populateNMTokens(AllocateResponse allocateResponse) { protected void populateNMTokens(List<NMToken> nmTokens) {
for (NMToken token : allocateResponse.getNMTokens()) { for (NMToken token : nmTokens) {
String nodeId = token.getNodeId().toString(); String nodeId = token.getNodeId().toString();
if (getNMTokenCache().containsToken(nodeId)) { if (getNMTokenCache().containsToken(nodeId)) {
LOG.debug("Replacing token for : " + nodeId); if (LOG.isDebugEnabled()) {
LOG.debug("Replacing token for : " + nodeId);
}
} else { } else {
LOG.debug("Received new token for : " + nodeId); if (LOG.isDebugEnabled()) {
LOG.debug("Received new token for : " + nodeId);
}
} }
getNMTokenCache().setToken(nodeId, token.getToken()); getNMTokenCache().setToken(nodeId, token.getToken());
} }

View File

@ -31,13 +31,16 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
@ -56,7 +59,8 @@ public class RegisterApplicationMasterResponsePBImpl extends
private Resource maximumResourceCapability; private Resource maximumResourceCapability;
private Map<ApplicationAccessType, String> applicationACLS = null; private Map<ApplicationAccessType, String> applicationACLS = null;
private List<Container> containersFromPreviousAttempt = null; private List<Container> containersFromPreviousAttempts = null;
private List<NMToken> nmTokens = null;
public RegisterApplicationMasterResponsePBImpl() { public RegisterApplicationMasterResponsePBImpl() {
builder = RegisterApplicationMasterResponseProto.newBuilder(); builder = RegisterApplicationMasterResponseProto.newBuilder();
@ -110,8 +114,13 @@ public class RegisterApplicationMasterResponsePBImpl extends
if (this.applicationACLS != null) { if (this.applicationACLS != null) {
addApplicationACLs(); addApplicationACLs();
} }
if (this.containersFromPreviousAttempt != null) { if (this.containersFromPreviousAttempts != null) {
addRunningContainersToProto(); addContainersFromPreviousAttemptToProto();
}
if (nmTokens != null) {
builder.clearNmTokensFromPreviousAttempts();
Iterable<NMTokenProto> iterable = getTokenProtoIterable(nmTokens);
builder.addAllNmTokensFromPreviousAttempts(iterable);
} }
} }
@ -236,21 +245,22 @@ public class RegisterApplicationMasterResponsePBImpl extends
} }
@Override @Override
public List<Container> getContainersFromPreviousAttempt() { public List<Container> getContainersFromPreviousAttempts() {
if (this.containersFromPreviousAttempt != null) { if (this.containersFromPreviousAttempts != null) {
return this.containersFromPreviousAttempt; return this.containersFromPreviousAttempts;
} }
initRunningContainersList(); initContainersPreviousAttemptList();
return this.containersFromPreviousAttempt; return this.containersFromPreviousAttempts;
} }
@Override @Override
public void setContainersFromPreviousAttempt(final List<Container> containers) { public void
setContainersFromPreviousAttempts(final List<Container> containers) {
if (containers == null) { if (containers == null) {
return; return;
} }
this.containersFromPreviousAttempt = new ArrayList<Container>(); this.containersFromPreviousAttempts = new ArrayList<Container>();
this.containersFromPreviousAttempt.addAll(containers); this.containersFromPreviousAttempts.addAll(containers);
} }
@Override @Override
@ -272,23 +282,86 @@ public class RegisterApplicationMasterResponsePBImpl extends
} }
} }
private void initRunningContainersList() {
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; private void initContainersPreviousAttemptList() {
List<ContainerProto> list = p.getContainersFromPreviousAttemptList(); RegisterApplicationMasterResponseProtoOrBuilder p =
containersFromPreviousAttempt = new ArrayList<Container>(); viaProto ? proto : builder;
List<ContainerProto> list = p.getContainersFromPreviousAttemptsList();
containersFromPreviousAttempts = new ArrayList<Container>();
for (ContainerProto c : list) { for (ContainerProto c : list) {
containersFromPreviousAttempt.add(convertFromProtoFormat(c)); containersFromPreviousAttempts.add(convertFromProtoFormat(c));
} }
} }
private void addRunningContainersToProto() { private void addContainersFromPreviousAttemptToProto() {
maybeInitBuilder(); maybeInitBuilder();
builder.clearContainersFromPreviousAttempt(); builder.clearContainersFromPreviousAttempts();
List<ContainerProto> list = new ArrayList<ContainerProto>(); List<ContainerProto> list = new ArrayList<ContainerProto>();
for (Container c : containersFromPreviousAttempt) { for (Container c : containersFromPreviousAttempts) {
list.add(convertToProtoFormat(c)); list.add(convertToProtoFormat(c));
} }
builder.addAllContainersFromPreviousAttempt(list); builder.addAllContainersFromPreviousAttempts(list);
}
@Override
public List<NMToken> getNMTokensFromPreviousAttempts() {
if (nmTokens != null) {
return nmTokens;
}
initLocalNewNMTokenList();
return nmTokens;
}
@Override
public void setNMTokensFromPreviousAttempts(final List<NMToken> nmTokens) {
if (nmTokens == null || nmTokens.isEmpty()) {
if (this.nmTokens != null) {
this.nmTokens.clear();
}
builder.clearNmTokensFromPreviousAttempts();
return;
}
this.nmTokens = new ArrayList<NMToken>();
this.nmTokens.addAll(nmTokens);
}
private synchronized void initLocalNewNMTokenList() {
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
List<NMTokenProto> list = p.getNmTokensFromPreviousAttemptsList();
nmTokens = new ArrayList<NMToken>();
for (NMTokenProto t : list) {
nmTokens.add(convertFromProtoFormat(t));
}
}
private synchronized Iterable<NMTokenProto> getTokenProtoIterable(
final List<NMToken> nmTokenList) {
maybeInitBuilder();
return new Iterable<NMTokenProto>() {
@Override
public synchronized Iterator<NMTokenProto> iterator() {
return new Iterator<NMTokenProto>() {
Iterator<NMToken> iter = nmTokenList.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public NMTokenProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
} }
private Resource convertFromProtoFormat(ResourceProto resource) { private Resource convertFromProtoFormat(ResourceProto resource) {
@ -306,4 +379,12 @@ public class RegisterApplicationMasterResponsePBImpl extends
private ContainerProto convertToProtoFormat(Container t) { private ContainerProto convertToProtoFormat(Container t) {
return ((ContainerPBImpl) t).getProto(); return ((ContainerPBImpl) t).getProto();
} }
private NMTokenProto convertToProtoFormat(NMToken token) {
return ((NMTokenPBImpl) token).getProto();
}
private NMToken convertFromProtoFormat(NMTokenProto proto) {
return new NMTokenPBImpl(proto);
}
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionContainer; import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionContract; import org.apache.hadoop.yarn.api.records.PreemptionContract;
@ -280,10 +282,32 @@ public class ApplicationMasterService extends AbstractService implements
.getMasterKey(applicationAttemptId).getEncoded())); .getMasterKey(applicationAttemptId).getEncoded()));
} }
List<Container> containerList = // For work-preserving AM restart, retrieve previous attempts' containers
// and corresponding NM tokens.
List<Container> transferredContainers =
((AbstractYarnScheduler) rScheduler) ((AbstractYarnScheduler) rScheduler)
.getTransferredContainers(applicationAttemptId); .getTransferredContainers(applicationAttemptId);
response.setContainersFromPreviousAttempt(containerList); if (!transferredContainers.isEmpty()) {
response.setContainersFromPreviousAttempts(transferredContainers);
List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Container container : transferredContainers) {
try {
nmTokens.add(rmContext.getNMTokenSecretManager()
.createAndGetNMToken(app.getUser(), applicationAttemptId,
container));
} catch (IllegalArgumentException e) {
// if it's a DNS issue, throw UnknowHostException directly and that
// will be automatically retried by RMProxy in RPC layer.
if (e.getCause() instanceof UnknownHostException) {
throw (UnknownHostException) e.getCause();
}
}
}
response.setNMTokensFromPreviousAttempts(nmTokens);
LOG.info("Application " + appID + " retrieved "
+ transferredContainers.size() + " containers from previous"
+ " attempts and " + nmTokens.size() + " NM tokens.");
}
return response; return response;
} }
} }

View File

@ -385,9 +385,8 @@ public class SchedulerApplicationAttempt {
} }
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// DNS might be down, skip returning this container. // DNS might be down, skip returning this container.
LOG.error( LOG.error("Error trying to assign container token and NM token to" +
"Error trying to assign container token to allocated container " " an allocated container " + container.getId(), e);
+ container.getId(), e);
continue; continue;
} }
returnContainerList.add(container); returnContainerList.add(container);

View File

@ -486,6 +486,7 @@ public class MockRM extends ResourceManager {
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception { throws Exception {
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt = app.getCurrentAppAttempt(); RMAppAttempt attempt = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());

View File

@ -24,6 +24,7 @@ import java.util.List;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@ -160,11 +162,11 @@ public class TestAMRestart {
am2.registerAppAttempt(); am2.registerAppAttempt();
// Assert two containers are running: container2 and container3; // Assert two containers are running: container2 and container3;
Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempt() Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempts()
.size()); .size());
boolean containerId2Exists = false, containerId3Exists = false; boolean containerId2Exists = false, containerId3Exists = false;
for (Container container : registerResponse for (Container container : registerResponse
.getContainersFromPreviousAttempt()) { .getContainersFromPreviousAttempts()) {
if (container.getId().equals(containerId2)) { if (container.getId().equals(containerId2)) {
containerId2Exists = true; containerId2Exists = true;
} }
@ -232,4 +234,100 @@ public class TestAMRestart {
rm1.stop(); rm1.stop();
} }
@Test
public void testNMTokensRebindOnAMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
MockRM rm1 = new MockRM(conf);
rm1.start();
RMApp app1 =
rm1.submitApp(200, "myname", "myuser",
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
null, "MAPREDUCE", false, true);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
nm1.registerNode();
MockNM nm2 =
new MockNM("127.1.1.1:4321", 8000, rm1.getResourceTrackerService());
nm2.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
int NUM_CONTAINERS = 1;
List<Container> containers = new ArrayList<Container>();
// nmTokens keeps track of all the nmTokens issued in the allocate call.
List<NMToken> expectedNMTokens = new ArrayList<NMToken>();
// am1 allocate 1 container on nm1.
while (true) {
AllocateResponse response =
am1.allocate("127.0.0.1", 2000, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
containers.addAll(response.getAllocatedContainers());
expectedNMTokens.addAll(response.getNMTokens());
if (containers.size() == NUM_CONTAINERS) {
break;
}
Thread.sleep(200);
System.out.println("Waiting for container to be allocated.");
}
// launch the container
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
ContainerId containerId2 =
ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
// fail am1
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am1.waitForState(RMAppAttemptState.FAILED);
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// restart the am
MockAM am2 = MockRM.launchAM(app1, rm1, nm1);
RegisterApplicationMasterResponse registerResponse =
am2.registerAppAttempt();
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
// check am2 get the nm token from am1.
Assert.assertEquals(expectedNMTokens,
registerResponse.getNMTokensFromPreviousAttempts());
// am2 allocate 1 container on nm2
containers = new ArrayList<Container>();
while (true) {
AllocateResponse allocateResponse =
am2.allocate("127.1.1.1", 4000, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm2.nodeHeartbeat(true);
containers.addAll(allocateResponse.getAllocatedContainers());
expectedNMTokens.addAll(allocateResponse.getNMTokens());
if (containers.size() == NUM_CONTAINERS) {
break;
}
Thread.sleep(200);
System.out.println("Waiting for container to be allocated.");
}
nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 2, ContainerState.RUNNING);
ContainerId am2ContainerId2 =
ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, am2ContainerId2, RMContainerState.RUNNING);
// fail am2.
nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am2.waitForState(RMAppAttemptState.FAILED);
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// restart am
MockAM am3 = MockRM.launchAM(app1, rm1, nm1);
registerResponse = am3.registerAppAttempt();
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
// check am3 get the NM token from both am1 and am2;
List<NMToken> transferredTokens = registerResponse.getNMTokensFromPreviousAttempts();
Assert.assertEquals(2, transferredTokens.size());
Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens));
rm1.stop();
}
} }