From fbb55784d93e1a819daf55d936e864d344579cbf Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 26 Apr 2013 03:50:20 +0000 Subject: [PATCH] YARN-562. Modified NM to reject any containers allocated by a previous ResourceManager. Contributed by Jian He. MAPREDUCE-5167. Update MR App after YARN-562 to use the new builder API for the container. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1476034 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/job/impl/TaskAttemptImpl.java | 8 +- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 2 +- .../mapreduce/v2/app/MRAppBenchmark.java | 2 +- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/api/records/Container.java | 12 +++ .../api/records/impl/pb/ContainerPBImpl.java | 12 +++ .../src/main/proto/yarn_protos.proto | 1 + .../yarn/client/TestAMRMClientAsync.java | 2 +- .../apache/hadoop/yarn/util/BuilderUtils.java | 7 +- .../hadoop/yarn/TestContainerLaunchRPC.java | 2 +- .../java/org/apache/hadoop/yarn/TestRPC.java | 2 +- .../RegisterNodeManagerResponse.java | 3 + .../pb/RegisterNodeManagerResponsePBImpl.java | 12 +++ .../yarn_server_common_service_protos.proto | 1 + .../yarn/server/nodemanager/Context.java | 3 + .../yarn/server/nodemanager/NodeManager.java | 20 ++++- .../server/nodemanager/NodeStatusUpdater.java | 3 + .../nodemanager/NodeStatusUpdaterImpl.java | 17 +++- .../ContainerManagerImpl.java | 30 ++++++- .../nodemanager/DummyContainerManager.java | 5 ++ .../TestContainerManagerWithLCE.java | 11 +++ .../nodemanager/TestNodeManagerShutdown.java | 88 +++--------------- .../nodemanager/TestNodeStatusUpdater.java | 1 + .../BaseContainerManagerTest.java | 8 +- .../TestContainerManager.java | 90 ++++++++++++++++++- .../ResourceTrackerService.java | 1 + .../scheduler/capacity/LeafQueue.java | 3 +- .../scheduler/fair/AppSchedulable.java | 5 +- .../scheduler/fifo/FifoScheduler.java | 3 +- .../server/resourcemanager/NodeManager.java | 1 + .../TestResourceTrackerService.java | 16 ++++ .../rmcontainer/TestRMContainerImpl.java | 4 +- .../server/TestContainerManagerSecurity.java | 6 +- 34 files changed, 278 insertions(+), 109 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e18cd9fb5f9..99582138758 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -356,6 +356,9 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5178. Update MR App to set progress in ApplicationReport after YARN-577. (Hitesh Shah via vinodkv) + MAPREDUCE-5167. Update MR App after YARN-562 to use the new builder API + for the container. (Jian He via vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index fae70742107..60facec1284 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1094,12 +1094,12 @@ public abstract class TaskAttemptImpl implements + taInfo.getPort()); String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":" + taInfo.getHttpPort()); - // Resource/Priority/Tokens are only needed while launching the - // container on an NM, these are already completed tasks, so setting them to - // null + // Resource/Priority/Tokens and RMIdentifier are only needed while + // launching the container on an NM, these are already completed tasks, so + // setting them to null and RMIdentifier as 0 container = BuilderUtils.newContainer(containerId, containerNodeId, - nodeHttpAddress, null, null, null); + nodeHttpAddress, null, null, null, 0); computeRackAndLocality(); launchTime = taInfo.getStartTime(); finishTime = (taInfo.getFinishTime() != -1) ? diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 4ef4d8d9f4b..c187bc8d0b7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -519,7 +519,7 @@ public class MRApp extends MRAppMaster { cId.setId(containerCount++); NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT); Container container = BuilderUtils.newContainer(cId, nodeId, - NM_HOST + ":" + NM_HTTP_PORT, null, null, null); + NM_HOST + ":" + NM_HTTP_PORT, null, null, null, 0); JobID id = TypeConverter.fromYarn(applicationId); JobId jobId = TypeConverter.toYarn(id); getContext().getEventHandler().handle(new JobHistoryEvent(jobId, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java index efb8b7a134b..380df645ea7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java @@ -243,7 +243,7 @@ public class MRAppBenchmark { .newContainer(containerId, BuilderUtils.newNodeId("host" + containerId.getId(), 2345), "host" + containerId.getId() + ":5678", req - .getCapability(), req.getPriority(), null)); + .getCapability(), req.getPriority(), null, 0)); } } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1e260185247..9a1824e8b84 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -187,6 +187,9 @@ Release 2.0.5-beta - UNRELEASED YARN-595. Refactor fair scheduler to use common Resources. (Sandy Ryza via tomwhite) + YARN-562. Modified NM to reject any containers allocated by a previous + ResourceManager. (Jian He via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java index 9478d341216..b0860e5aa19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java @@ -135,4 +135,16 @@ public interface Container extends Comparable { @Private @Unstable void setContainerToken(ContainerToken containerToken); + + /** + * Get the RMIdentifier of RM in which containers are allocated + * @return RMIdentifier + */ + @Private + @Unstable + long getRMIdentifer(); + + @Private + @Unstable + void setRMIdentifier(long rmIdentifier); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java index dd6941ff79f..68bb0839a9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java @@ -230,6 +230,18 @@ public class ContainerPBImpl extends ProtoBase implements Contai this.containerToken = containerToken; } + @Override + public long getRMIdentifer() { + ContainerProtoOrBuilder p = viaProto ? proto : builder; + return p.getRmIdentifier(); + } + + @Override + public void setRMIdentifier(long rmIdentifier) { + maybeInitBuilder(); + builder.setRmIdentifier((rmIdentifier)); + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index a84bf37e39e..f7591a77294 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -68,6 +68,7 @@ message ContainerProto { optional ResourceProto resource = 4; optional PriorityProto priority = 5; optional hadoop.common.TokenProto container_token = 6; + optional int64 rm_identifier = 7; } enum YarnApplicationStateProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java index bbb8ad1463f..d95ce64f630 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java @@ -56,7 +56,7 @@ public class TestAMRMClientAsync { BuilderUtils.newContainerId(0, 0, 0, 0), ContainerState.COMPLETE, "", 0)); List allocated1 = Arrays.asList( - BuilderUtils.newContainer(null, null, null, null, null, null)); + BuilderUtils.newContainer(null, null, null, null, null, null, 0)); final AllocateResponse response1 = createAllocateResponse( new ArrayList(), allocated1); final AllocateResponse response2 = createAllocateResponse(completed1, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java index 7dc25de8208..f09046e3712 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java @@ -237,9 +237,9 @@ public class BuilderUtils { return containerStatus; } - public static Container newContainer(ContainerId containerId, - NodeId nodeId, String nodeHttpAddress, - Resource resource, Priority priority, ContainerToken containerToken) { + public static Container newContainer(ContainerId containerId, NodeId nodeId, + String nodeHttpAddress, Resource resource, Priority priority, + ContainerToken containerToken, long rmIdentifier) { Container container = recordFactory.newRecordInstance(Container.class); container.setId(containerId); container.setNodeId(nodeId); @@ -247,6 +247,7 @@ public class BuilderUtils { container.setResource(resource); container.setPriority(priority); container.setContainerToken(containerToken); + container.setRMIdentifier(rmIdentifier); return container; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index 295a38cee80..7454955bbc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -105,7 +105,7 @@ public class TestContainerLaunchRPC { containerId.setId(100); Container container = BuilderUtils.newContainer(containerId, null, null, recordFactory - .newRecordInstance(Resource.class), null, null); + .newRecordInstance(Resource.class), null, null, 0); StartContainerRequest scRequest = recordFactory .newRecordInstance(StartContainerRequest.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 7d941e92a23..92bbb8dc4bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -128,7 +128,7 @@ public class TestRPC { containerId.setId(100); Container mockContainer = BuilderUtils.newContainer(containerId, null, null, recordFactory - .newRecordInstance(Resource.class), null, null); + .newRecordInstance(Resource.class), null, null, 0); // containerLaunchContext.env = new HashMap(); // containerLaunchContext.command = new ArrayList(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java index 1c1a9dd4a6a..11b02115c0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java @@ -30,4 +30,7 @@ public interface RegisterNodeManagerResponse { void setNodeAction(NodeAction nodeAction); + long getRMIdentifier(); + + void setRMIdentifier(long rmIdentifier); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java index c28d4c96103..43451dc6793 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java @@ -121,6 +121,18 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase(); private final NMContainerTokenSecretManager containerTokenSecretManager; - + private ContainerManager containerManager; private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class); @@ -333,6 +338,15 @@ public class NodeManager extends CompositeService public NodeHealthStatus getNodeHealthStatus() { return this.nodeHealthStatus; } + + @Override + public ContainerManager getContainerManager() { + return this.containerManager; + } + + public void setContainerManager(ContainerManager containerManager) { + this.containerManager = containerManager; + } } @@ -376,7 +390,7 @@ public class NodeManager extends CompositeService stop(); break; case RESYNC: - cleanupContainersOnResync(); + resyncWithRM(); break; default: LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index 41949e7baab..c9577714e27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -24,5 +24,8 @@ import org.apache.hadoop.yarn.service.Service; public interface NodeStatusUpdater extends Service { void sendOutofBandHeartBeat(); + NodeStatus getNodeStatusAndUpdateContainersInContext(); + + long getRMIdentifier(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index e9583c2a2e9..284cd946005 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.service.AbstractService; @@ -95,6 +97,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private Runnable statusUpdaterRunnable; private Thread statusUpdater; + private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -267,6 +270,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements this.resourceTracker = getRMClient(); regNMResponse = this.resourceTracker.registerNodeManager(request); + this.rmIdentifier = regNMResponse.getRMIdentifier(); break; } catch(Throwable e) { LOG.warn("Trying to connect to ResourceManager, " + @@ -308,7 +312,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements LOG.info("Registered with ResourceManager as " + this.nodeId + " with total resource of " + this.totalResource); - + LOG.info("Notifying ContainerManager to unblock new container-requests"); + ((ContainerManagerImpl) this.context.getContainerManager()) + .setBlockNewContainerRequests(false); } private List createKeepAliveApplicationList() { @@ -334,6 +340,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements return appList; } + @Override public NodeStatus getNodeStatusAndUpdateContainersInContext() { NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); @@ -407,6 +414,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } + @Override + public long getRMIdentifier() { + return this.rmIdentifier; + } + protected void startStatusUpdater() { statusUpdaterRunnable = new Runnable() { @@ -478,6 +490,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements if (response.getNodeAction() == NodeAction.RESYNC) { LOG.info("Node is out of sync with ResourceManager," + " hence rebooting."); + // Invalidate the RMIdentifier while resync + NodeStatusUpdaterImpl.this.rmIdentifier = + ResourceManagerConstants.RM_INVALID_IDENTIFIER; dispatcher.getEventHandler().handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index e4ec8fe55e6..c79d7c94df6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -23,10 +23,9 @@ import static org.apache.hadoop.yarn.service.Service.STATE.STARTED; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -125,6 +124,7 @@ public class ContainerManagerImpl extends CompositeService implements private final ApplicationACLsManager aclsManager; private final DeletionService deletionService; + private AtomicBoolean blockNewContainerRequests = new AtomicBoolean(false); public ContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, @@ -239,7 +239,10 @@ public class ContainerManagerImpl extends CompositeService implements false)) { refreshServiceAcls(conf, new NMPolicyProvider()); } - + + LOG.info("Blocking new container-requests as container manager rpc" + + " server is still starting."); + this.setBlockNewContainerRequests(true); server.start(); InetSocketAddress connectAddress = NetUtils.getConnectAddress(server); this.context.getNodeId().setHost(connectAddress.getHostName()); @@ -393,6 +396,13 @@ public class ContainerManagerImpl extends CompositeService implements @Override public StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException { + + if (blockNewContainerRequests.get()) { + throw RPCUtil.getRemoteException(new NMNotYetReadyException( + "Rejecting new containers as NodeManager has not" + + " yet connected with ResourceManager")); + } + ContainerLaunchContext launchContext = request.getContainerLaunchContext(); org.apache.hadoop.yarn.api.records.Container lauchContainer = request.getContainer(); @@ -402,6 +412,16 @@ public class ContainerManagerImpl extends CompositeService implements UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr); authorizeRequest(containerIDStr, launchContext, lauchContainer, remoteUgi); + // Is the container coming from unknown RM + if (lauchContainer.getRMIdentifer() != nodeStatusUpdater + .getRMIdentifier()) { + String msg = "\nContainer "+ containerIDStr + + " rejected as it is allocated by a previous RM"; + LOG.error(msg); + throw RPCUtil + .getRemoteException(new InvalidContainerException(msg)); + } + LOG.info("Start request for " + containerIDStr + " by user " + launchContext.getUser()); @@ -615,6 +635,10 @@ public class ContainerManagerImpl extends CompositeService implements } } + public void setBlockNewContainerRequests(boolean blockNewContainerRequests) { + this.blockNewContainerRequests.set(blockNewContainerRequests); + } + @Override public void stateChanged(Service service) { // TODO Auto-generated method stub diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index 3284634c62c..3f74c29e18c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -168,4 +168,9 @@ public class DummyContainerManager extends ContainerManagerImpl { } }; } + + @Override + public void setBlockNewContainerRequests(boolean blockNewContainerRequests) { + // do nothing + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index 44328dbe0aa..a23f125a569 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -142,6 +142,17 @@ public class TestContainerManagerWithLCE extends TestContainerManager { super.testLocalFilesCleanup(); } + @Override + public void testContainerLaunchFromPreviousRM() throws InterruptedException, + IOException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testContainerLaunchFromPreviousRM"); + super.testContainerLaunchFromPreviousRM(); + } private boolean shouldRunTest() { return System .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java index 1792988bef9..ab634c4c251 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java @@ -24,17 +24,12 @@ import static org.mockito.Mockito.when; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; -import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CyclicBarrier; import junit.framework.Assert; @@ -59,12 +54,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; -import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; @@ -86,7 +78,6 @@ public class TestNodeManagerShutdown { static final String user = "nobody"; private FileContext localFS; private ContainerId cId; - private CyclicBarrier syncBarrier = new CyclicBarrier(2); @Before public void setup() throws UnsupportedFileSystemException { @@ -110,7 +101,7 @@ public class TestNodeManagerShutdown { NodeManager nm = getNodeManager(); nm.init(createNMConfig()); nm.start(); - startContainers(nm); + startContainer(nm, cId, localFS, tmpDir, processStartFile); final int MAX_TRIES=20; int numTries = 0; @@ -150,29 +141,13 @@ public class TestNodeManagerShutdown { reader.close(); } } - - @SuppressWarnings("unchecked") - @Test - public void testKillContainersOnResync() throws IOException, InterruptedException { - NodeManager nm = new TestNodeManager(); - YarnConfiguration conf = createNMConfig(); - nm.init(conf); - nm.start(); - startContainers(nm); - assert ((TestNodeManager) nm).getNMRegistrationCount() == 1; - nm.getNMDispatcher().getEventHandler(). - handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); - try { - syncBarrier.await(); - } catch (BrokenBarrierException e) { - } - assert ((TestNodeManager) nm).getNMRegistrationCount() == 2; - } - - private void startContainers(NodeManager nm) throws IOException { + public static void startContainer(NodeManager nm, ContainerId cId, + FileContext localFS, File scriptFileDir, File processStartFile) + throws IOException { ContainerManagerImpl containerManager = nm.getContainerManager(); - File scriptFile = createUnhaltingScriptFile(); + File scriptFile = + createUnhaltingScriptFile(cId, scriptFileDir, processStartFile); ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class); @@ -218,7 +193,7 @@ public class TestNodeManagerShutdown { Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState()); } - private ContainerId createContainerId() { + public static ContainerId createContainerId() { ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); appId.setClusterTimestamp(0); appId.setId(0); @@ -247,8 +222,9 @@ public class TestNodeManagerShutdown { * Creates a script to run a container that will run forever unless * stopped by external means. */ - private File createUnhaltingScriptFile() throws IOException { - File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); + private static File createUnhaltingScriptFile(ContainerId cId, + File scriptFileDir, File processStartFile) throws IOException { + File scriptFile = Shell.appendScriptExtension(scriptFileDir, "scriptFile"); PrintWriter fileWriter = new PrintWriter(scriptFile); if (Shell.WINDOWS) { fileWriter.println("@echo \"Running testscript for delayed kill\""); @@ -282,48 +258,4 @@ public class TestNodeManagerShutdown { } }; } - - class TestNodeManager extends NodeManager { - - private int registrationCount = 0; - - @Override - protected NodeStatusUpdater createNodeStatusUpdater(Context context, - Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - return new TestNodeStatusUpdaterImpl(context, dispatcher, - healthChecker, metrics); - } - - public int getNMRegistrationCount() { - return registrationCount; - } - - class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater { - - public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { - super(context, dispatcher, healthChecker, metrics); - } - - @Override - protected void registerWithRM() throws YarnRemoteException { - super.registerWithRM(); - registrationCount++; - } - - @Override - protected void rebootNodeStatusUpdater() { - ConcurrentMap containers = - getNMContext().getContainers(); - // ensure that containers are empty before restart nodeStatusUpdater - Assert.assertTrue(containers.isEmpty()); - super.rebootNodeStatusUpdater(); - try { - syncBarrier.await(); - } catch (InterruptedException e) { - } catch (BrokenBarrierException e) { - } - } - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 29d6a4c3a84..10dd155da1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 55e92a440ff..5fd11d58e75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -156,7 +156,13 @@ public abstract class BaseContainerManagerTest { dirsHandler = nodeHealthChecker.getDiskHandler(); containerManager = new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, - metrics, new ApplicationACLsManager(conf), dirsHandler); + metrics, new ApplicationACLsManager(conf), dirsHandler) { + @Override + public void setBlockNewContainerRequests( + boolean blockNewContainerRequests) { + // do nothing + } + }; containerManager.init(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 981ab39acbd..df3d9173c93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -49,13 +52,18 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; @@ -63,7 +71,6 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; -import static org.mockito.Mockito.*; public class TestContainerManager extends BaseContainerManagerTest { @@ -411,7 +418,13 @@ public class TestContainerManager extends BaseContainerManagerTest { containerManager = new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, - metrics, new ApplicationACLsManager(conf), dirsHandler); + metrics, new ApplicationACLsManager(conf), dirsHandler) { + @Override + public void setBlockNewContainerRequests( + boolean blockNewContainerRequests) { + // do nothing + } + }; containerManager.init(conf); containerManager.start(); @@ -524,4 +537,77 @@ public class TestContainerManager extends BaseContainerManagerTest { Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!", targetFile.exists()); } + + @Test + public void testContainerLaunchFromPreviousRM() throws IOException, + InterruptedException { + // There is no real RM registration, simulate and set RMIdentifier + NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class); + when(nodeStatusUpdater.getRMIdentifier()).thenReturn((long) 1234); + containerManager = + new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, + metrics, new ApplicationACLsManager(conf), dirsHandler) { + @Override + public void setBlockNewContainerRequests( + boolean blockNewContainerRequests) { + // do nothing + } + }; + containerManager.init(conf); + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + ContainerId cId1 = createContainerId(); + ContainerId cId2 = createContainerId(); + containerLaunchContext.setUser(user); + containerLaunchContext + .setLocalResources(new HashMap()); + containerLaunchContext.setUser(containerLaunchContext.getUser()); + Resource mockResource = mock(Resource.class); + + Container mockContainer1 = mock(Container.class); + when(mockContainer1.getId()).thenReturn(cId1); + // Construct the Container with Invalid RMIdentifier + when(mockContainer1.getRMIdentifer()).thenReturn( + (long) ResourceManagerConstants.RM_INVALID_IDENTIFIER); + StartContainerRequest startRequest1 = + recordFactory.newRecordInstance(StartContainerRequest.class); + startRequest1.setContainerLaunchContext(containerLaunchContext); + startRequest1.setContainer(mockContainer1); + boolean catchException = false; + try { + containerManager.startContainer(startRequest1); + } catch (YarnRemoteException e) { + catchException = true; + Assert.assertTrue(e.getMessage().contains( + "Container " + cId1 + " rejected as it is allocated by a previous RM")); + // TO DO: This should be replaced to explicitly check exception + // class name after YARN-142 + Assert.assertTrue(e.getRemoteTrace().contains( + InvalidContainerException.class.getName())); + } + + // Verify that startContainer fail because of invalid container request + Assert.assertTrue(catchException); + + // Construct the Container with a RMIdentifier within current RM + Container mockContainer2 = mock(Container.class); + when(mockContainer2.getId()).thenReturn(cId2); + when(mockContainer2.getRMIdentifer()).thenReturn((long) 1234); + when(mockContainer2.getResource()).thenReturn(mockResource); + StartContainerRequest startRequest2 = + recordFactory.newRecordInstance(StartContainerRequest.class); + startRequest2.setContainerLaunchContext(containerLaunchContext); + startRequest2.setContainer(mockContainer2); + boolean noException = true; + try { + containerManager.startContainer(startRequest2); + } catch (YarnRemoteException e) { + noException = false; + } + // Verify that startContainer get no YarnRemoteException + Assert.assertTrue(noException); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 258c7dc0e47..1ee355268a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -196,6 +196,7 @@ public class ResourceTrackerService extends AbstractService implements + capability + ", assigned nodeId " + nodeId); response.setNodeAction(NodeAction.NORMAL); + response.setRMIdentifier(ResourceManager.clusterTimeStamp); return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 58dcb73767d..64f711434b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -1243,7 +1244,7 @@ public class LeafQueue implements CSQueue { // Create the container Container container = BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, priority, - null); + null, ResourceManager.clusterTimeStamp); return container; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index a8e71735efc..4bd6e2b54b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -173,7 +174,7 @@ public class AppSchedulable extends Schedulable { // Create the container Container container = BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, priority, - containerToken); + containerToken, ResourceManager.clusterTimeStamp); return container; } @@ -371,4 +372,4 @@ public class AppSchedulable extends Schedulable { Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, request.getCapability(), node.getRMNode().getTotalCapability()); } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index d5a542700f4..2024e746dea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator; @@ -565,7 +566,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable { // Create the container Container container = BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, priority, - containerToken); + containerToken, ResourceManager.clusterTimeStamp); // Allocate! diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index dba5acdd82d..46683dcb619 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -188,6 +188,7 @@ public class NodeManager implements ContainerManager { this.nodeId, nodeHttpAddress, requestContainer.getResource(), null, null // DKDC - Doesn't matter + , 0 ); ContainerStatus containerStatus = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index af9d5d2c0bf..9641adbce02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; @@ -267,6 +268,21 @@ public class TestResourceTrackerService { Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction()); } + @Test + public void testSetRMIdentifierInRegistration() throws Exception { + + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.start(); + + MockNM nm = new MockNM("host1:1234", 5120, rm.getResourceTrackerService()); + RegisterNodeManagerResponse response = nm.registerNode(); + + // Verify the RMIdentifier is correctly set in RegisterNodeManagerResponse + Assert.assertEquals(ResourceManager.clusterTimeStamp, + response.getRMIdentifier()); + } + @Test public void testReboot() throws Exception { Configuration conf = new Configuration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 6e2b834f69a..abceecf6211 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -69,7 +69,7 @@ public class TestRMContainerImpl { Priority priority = BuilderUtils.newPriority(5); Container container = BuilderUtils.newContainer(containerId, nodeId, - "host:3465", resource, priority, null); + "host:3465", resource, priority, null, 0); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, nodeId, eventHandler, expirer); @@ -139,7 +139,7 @@ public class TestRMContainerImpl { Priority priority = BuilderUtils.newPriority(5); Container container = BuilderUtils.newContainer(containerId, nodeId, - "host:3465", resource, priority, null); + "host:3465", resource, priority, null, 0); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, nodeId, eventHandler, expirer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java index 0432444168a..46fffb47d55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java @@ -360,7 +360,7 @@ public class TestContainerManagerSecurity { Container container = BuilderUtils.newContainer(newTokenId.getContainerID(), null, null, BuilderUtils.newResource(newTokenId.getResource().getMemory(), - newTokenId.getResource().getVirtualCores()), null, null); + newTokenId.getResource().getVirtualCores()), null, null, 0); StartContainerRequest request = Records.newRecord(StartContainerRequest.class); request.setContainerLaunchContext(context); request.setContainer(container); @@ -547,7 +547,7 @@ public class TestContainerManagerSecurity { createContainerLaunchContextForTest(tokenId); Container container = BuilderUtils.newContainer(tokenId.getContainerID(), null, null, - BuilderUtils.newResource(2048, 1), null, null); + BuilderUtils.newResource(2048, 1), null, null, 0); request.setContainerLaunchContext(context); request.setContainer(container); try { @@ -575,7 +575,7 @@ public class TestContainerManagerSecurity { Container container = BuilderUtils.newContainer(tokenId.getContainerID(), null, null, BuilderUtils.newResource(tokenId.getResource().getMemory(), tokenId - .getResource().getVirtualCores()), null, null); + .getResource().getVirtualCores()), null, null, 0); request.setContainerLaunchContext(context); request.setContainer(container); try {