From 9efc5089de9b774287fd2a68aab81a1dd6563cf6 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Mon, 3 Oct 2011 23:23:04 +0000 Subject: [PATCH] Merge -r 1178630:1178631 from trunk to branch-0.23 to fix MAPREDUCE-2792. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1178633 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/v2/app/MRAppMaster.java | 1 - .../mapreduce/v2/app/rm/RMCommunicator.java | 6 +-- .../hadoop/mapred/ClientServiceDelegate.java | 8 +++- .../security/ContainerTokenIdentifier.java | 3 +- .../yarn/server/nodemanager/Context.java | 8 ++++ .../yarn/server/nodemanager/NodeManager.java | 11 ++++++ .../server/nodemanager/NodeStatusUpdater.java | 3 -- .../nodemanager/NodeStatusUpdaterImpl.java | 39 +++++-------------- .../ContainerManagerImpl.java | 39 ++++++++++++------- .../logaggregation/LogAggregationService.java | 21 ++++------ .../nodemanager/DummyContainerManager.java | 4 +- .../nodemanager/TestNodeStatusUpdater.java | 2 +- .../BaseContainerManagerTest.java | 3 +- .../TestLogAggregationService.java | 6 +-- .../scheduler/capacity/LeafQueue.java | 15 +++++-- .../scheduler/fifo/FifoScheduler.java | 13 +++++-- 17 files changed, 103 insertions(+), 82 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2c99c72c7de..a0fedb9a27b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1472,6 +1472,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3137. Fix broken merge of MAPREDUCE-2179. (Hitesh Shah via acmurthy) + MAPREDUCE-2792. Replace usage of node ip-addresses with hostnames. + (vinodkv via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 8b7d578fc9b..a2dadedaf16 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -84,7 +84,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index cee37f98147..6655581178d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -46,8 +46,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -147,8 +147,8 @@ public abstract class RMCommunicator extends AbstractService { protected void register() { //Register - String host = - clientService.getBindAddress().getAddress().getHostAddress(); + String host = clientService.getBindAddress().getAddress() + .getCanonicalHostName(); try { RegisterApplicationMasterRequest request = recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java index 335d44b0826..4b848b5cd3a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java @@ -21,6 +21,7 @@ package org.apache.hadoop.mapred; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.HashMap; import java.util.List; @@ -156,8 +157,11 @@ public class ClientServiceDelegate { Token clientToken = new Token(); clientToken.decodeFromUrlString(clientTokenEncoded); - clientToken.setService(new Text(application.getHost() + ":" - + application.getRpcPort())); + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = NetUtils.createSocketAddr(application + .getHost(), application.getRpcPort()); + clientToken.setService(new Text(addr.getAddress().getHostAddress() + + ":" + addr.getPort())); UserGroupInformation.getCurrentUser().addToken(clientToken); } LOG.info("Tracking Url of JOB is " + application.getTrackingUrl()); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java index c2510bca3df..313e8333b78 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java @@ -44,7 +44,8 @@ public class ContainerTokenIdentifier extends TokenIdentifier { private String nmHostName; private Resource resource; - public ContainerTokenIdentifier(ContainerId containerID, String hostName, Resource r) { + public ContainerTokenIdentifier(ContainerId containerID, String hostName, + Resource r) { this.containerId = containerID; this.nmHostName = hostName; this.resource = r; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index b869729337c..38967132953 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -32,6 +33,13 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont */ public interface Context { + /** + * Return the nodeId. Usable only when the ContainerManager is started. + * + * @return the NodeId + */ + NodeId getNodeId(); + ConcurrentMap getApplications(); ConcurrentMap getContainers(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 8dc16e97df8..068d6f5f8aa 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; +import org.apache.hadoop.yarn.util.Records; public class NodeManager extends CompositeService { private static final Log LOG = LogFactory.getLog(NodeManager.class); @@ -161,6 +163,7 @@ public class NodeManager extends CompositeService { public static class NMContext implements Context { + private final NodeId nodeId = Records.newRecord(NodeId.class); private final ConcurrentMap applications = new ConcurrentHashMap(); private final ConcurrentMap containers = @@ -175,6 +178,14 @@ public class NodeManager extends CompositeService { this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis()); } + /** + * Usable only after ContainerManager is started. + */ + @Override + public NodeId getNodeId() { + return this.nodeId; + } + @Override public ConcurrentMap getApplications() { return this.applications; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index caf3a729191..2b10c9717a5 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -24,8 +24,5 @@ public interface NodeStatusUpdater extends Service { byte[] getRMNMSharedSecret(); - String getContainerManagerBindAddress(); - void sendOutofBandHeartBeat(); - } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 1b1fd46b9e7..e69ca7ba31d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.nodemanager; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Iterator; @@ -57,7 +56,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.service.AbstractService; -import org.apache.hadoop.yarn.util.Records; public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatusUpdater { @@ -69,16 +67,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private final Context context; private final Dispatcher dispatcher; + private NodeId nodeId; private ContainerTokenSecretManager containerTokenSecretManager; private long heartBeatInterval; private ResourceTracker resourceTracker; private String rmAddress; private Resource totalResource; - private String containerManagerBindAddress; - private String hostName; - private int containerManagerPort; private int httpPort; - private NodeId nodeId; private byte[] secretKeyBytes = new byte[0]; private boolean isStopped; private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -114,24 +109,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @Override public void start() { - String cmBindAddressStr = - getConfig().get(YarnConfiguration.NM_ADDRESS, - YarnConfiguration.DEFAULT_NM_ADDRESS); - InetSocketAddress cmBindAddress = - NetUtils.createSocketAddr(cmBindAddressStr); + + // NodeManager is the last service to start, so NodeId is available. + this.nodeId = this.context.getNodeId(); + String httpBindAddressStr = getConfig().get(YarnConfiguration.NM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS); InetSocketAddress httpBindAddress = NetUtils.createSocketAddr(httpBindAddressStr); try { - this.hostName = InetAddress.getLocalHost().getHostAddress(); - this.containerManagerPort = cmBindAddress.getPort(); + // this.hostName = InetAddress.getLocalHost().getCanonicalHostName(); this.httpPort = httpBindAddress.getPort(); - this.containerManagerBindAddress = - this.hostName + ":" + this.containerManagerPort; - LOG.info("Configured ContainerManager Address is " - + this.containerManagerBindAddress); // Registration has to be in start so that ContainerManager can get the // perNM tokens needed to authenticate ContainerTokens. registerWithRM(); @@ -165,9 +154,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements LOG.info("Connected to ResourceManager at " + this.rmAddress); RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); - this.nodeId = Records.newRecord(NodeId.class); - this.nodeId.setHost(this.hostName); - this.nodeId.setPort(this.containerManagerPort); request.setHttpPort(this.httpPort); request.setResource(this.totalResource); request.setNodeId(this.nodeId); @@ -183,19 +169,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // It is expected that status updater is started by this point and // RM gives the shared secret in registration during StatusUpdater#start(). this.containerTokenSecretManager.setSecretKey( - this.getContainerManagerBindAddress(), + this.nodeId.toString(), this.getRMNMSharedSecret()); } - LOG.info("Registered with ResourceManager as " + this.containerManagerBindAddress + LOG.info("Registered with ResourceManager as " + this.nodeId + " with total resource of " + this.totalResource); } - @Override - public String getContainerManagerBindAddress() { - return this.containerManagerBindAddress; - } - @Override public byte[] getRMNMSharedSecret() { return this.secretKeyBytes.clone(); @@ -230,8 +211,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } nodeStatus.setContainersStatuses(containersStatuses); - LOG.debug(this.containerManagerBindAddress + " sending out status for " + numActiveContainers - + " containers"); + LOG.debug(this.nodeId + " sending out status for " + + numActiveContainers + " containers"); NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); if (this.healthChecker != null) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 8e90552d86e..7c79bd982fe 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; import static org.apache.hadoop.yarn.service.Service.STATE.STARTED; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Map; @@ -36,6 +38,7 @@ import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; @@ -99,7 +102,6 @@ public class ContainerManagerImpl extends CompositeService implements final Context context; private final ContainersMonitor containersMonitor; private Server server; - private InetSocketAddress cmBindAddressStr; private final ResourceLocalizationService rsrcLocalizationSrvc; private final ContainersLauncher containersLauncher; private final AuxServices auxiluaryServices; @@ -144,7 +146,7 @@ public class ContainerManagerImpl extends CompositeService implements addService(this.containersMonitor); LogAggregationService logAggregationService = - createLogAggregationService(this.deletionService); + createLogAggregationService(this.context, this.deletionService); addService(logAggregationService); dispatcher.register(ContainerEventType.class, @@ -159,9 +161,9 @@ public class ContainerManagerImpl extends CompositeService implements addService(dispatcher); } - protected LogAggregationService createLogAggregationService( + protected LogAggregationService createLogAggregationService(Context context, DeletionService deletionService) { - return new LogAggregationService(deletionService); + return new LogAggregationService(context, deletionService); } public ContainersMonitor getContainersMonitor() { @@ -179,29 +181,36 @@ public class ContainerManagerImpl extends CompositeService implements return new ContainersLauncher(context, this.dispatcher, exec); } - @Override - public void init(Configuration conf) { - cmBindAddressStr = NetUtils.createSocketAddr( - conf.get(YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS)); - super.init(conf); - } - @Override public void start() { // Enqueue user dirs in deletion context - YarnRPC rpc = YarnRPC.create(getConfig()); - Configuration cmConf = new Configuration(getConfig()); + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); + + InetSocketAddress initialAddress = NetUtils.createSocketAddr(conf.get( + YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS)); + + Configuration cmConf = new Configuration(conf); cmConf.setClass(YarnConfiguration.YARN_SECURITY_INFO, ContainerManagerSecurityInfo.class, SecurityInfo.class); server = - rpc.getServer(ContainerManager.class, this, cmBindAddressStr, cmConf, + rpc.getServer(ContainerManager.class, this, initialAddress, cmConf, this.containerTokenSecretManager, cmConf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT)); - LOG.info("ContainerManager started at " + cmBindAddressStr); server.start(); + InetAddress hostNameResolved = null; + try { + hostNameResolved = InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + throw new YarnException(e); + } + this.context.getNodeId().setHost(hostNameResolved.getCanonicalHostName()); + this.context.getNodeId().setPort(server.getPort()); + LOG.info("ContainerManager started at " + + this.context.getNodeId().toString()); super.start(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 974455c4602..538bc4607ff 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; import java.util.concurrent.ConcurrentHashMap; @@ -32,7 +31,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -42,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent; import org.apache.hadoop.yarn.service.AbstractService; @@ -53,6 +52,7 @@ public class LogAggregationService extends AbstractService implements private static final Log LOG = LogFactory .getLog(LogAggregationService.class); + private final Context context; private final DeletionService deletionService; private String[] localRootLogDirs; @@ -63,8 +63,10 @@ public class LogAggregationService extends AbstractService implements private final ExecutorService threadPool; - public LogAggregationService(DeletionService deletionService) { + public LogAggregationService(Context context, + DeletionService deletionService) { super(LogAggregationService.class.getName()); + this.context = context; this.deletionService = deletionService; this.appLogAggregators = new ConcurrentHashMap(); @@ -82,16 +84,9 @@ public class LogAggregationService extends AbstractService implements @Override public synchronized void start() { - String address = - getConfig().get(YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS); - InetSocketAddress cmBindAddress = NetUtils.createSocketAddr(address); - try { - this.nodeFile = - InetAddress.getLocalHost().getHostAddress() + "_" - + cmBindAddress.getPort(); - } catch (UnknownHostException e) { - throw new YarnException(e); - } + // NodeId is only available during start, the following cannot be moved + // anywhere else. + this.nodeFile = this.context.getNodeId().toString(); super.start(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index c4db3b6af93..bdf84517611 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -144,9 +144,9 @@ public class DummyContainerManager extends ContainerManagerImpl { } @Override - protected LogAggregationService createLogAggregationService( + protected LogAggregationService createLogAggregationService(Context context, DeletionService deletionService) { - return new LogAggregationService(deletionService) { + return new LogAggregationService(context, deletionService) { @Override public void handle(LogAggregatorEvent event) { switch (event.getType()) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 2f0e3f54d37..63d1ade7c4f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -104,7 +104,7 @@ public class TestNodeStatusUpdater { Resource resource = request.getResource(); LOG.info("Registering " + nodeId.toString()); try { - Assert.assertEquals(InetAddress.getLocalHost().getHostAddress() + Assert.assertEquals(InetAddress.getLocalHost().getCanonicalHostName() + ":12345", nodeId.toString()); } catch (UnknownHostException e) { Assert.fail(e.getMessage()); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 2a366ec2fc2..0eae0aab031 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -41,7 +40,6 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -54,6 +52,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.service.Service.STATE; import org.junit.After; import org.junit.Before; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 164039e0373..449757f9ce6 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -98,7 +98,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); LogAggregationService logAggregationService = - new LogAggregationService(this.delSrvc); + new LogAggregationService(this.context, this.delSrvc); logAggregationService.init(this.conf); logAggregationService.start(); @@ -146,7 +146,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); LogAggregationService logAggregationService = - new LogAggregationService(this.delSrvc); + new LogAggregationService(this.context, this.delSrvc); logAggregationService.init(this.conf); logAggregationService.start(); @@ -179,7 +179,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); LogAggregationService logAggregationService = - new LogAggregationService(this.delSrvc); + new LogAggregationService(this.context, this.delSrvc); logAggregationService.init(this.conf); logAggregationService.start(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index eb42c0e34a6..24754e4e759 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -34,6 +35,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; @@ -41,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerToken; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -1066,9 +1069,9 @@ public class LeafQueue implements CSQueue { if (UserGroupInformation.isSecurityEnabled()) { ContainerToken containerToken = this.recordFactory.newRecordInstance(ContainerToken.class); - ContainerTokenIdentifier tokenidentifier = - new ContainerTokenIdentifier(container.getId(), - container.getNodeId().toString(), container.getResource()); + NodeId nodeId = container.getNodeId(); + ContainerTokenIdentifier tokenidentifier = new ContainerTokenIdentifier( + container.getId(), nodeId.toString(), container.getResource()); containerToken.setIdentifier( ByteBuffer.wrap(tokenidentifier.getBytes())); containerToken.setKind(ContainerTokenIdentifier.KIND.toString()); @@ -1076,7 +1079,11 @@ public class LeafQueue implements CSQueue { ByteBuffer.wrap( containerTokenSecretManager.createPassword(tokenidentifier)) ); - containerToken.setService(container.getNodeId().toString()); + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = NetUtils.createSocketAddr(nodeId.getHost(), + nodeId.getPort()); + containerToken.setService(addr.getAddress().getHostAddress() + ":" + + addr.getPort()); container.setContainerToken(containerToken); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 1457aadfe3f..b031504eeff 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -35,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.Lock; @@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -83,7 +86,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.util.BuilderUtils; -import org.apache.hadoop.yarn.api.records.QueueState; @LimitedPrivate("yarn") @Evolving @@ -543,16 +545,21 @@ public class FifoScheduler implements ResourceScheduler { if (UserGroupInformation.isSecurityEnabled()) { ContainerToken containerToken = recordFactory.newRecordInstance(ContainerToken.class); + NodeId nodeId = container.getNodeId(); ContainerTokenIdentifier tokenidentifier = new ContainerTokenIdentifier(container.getId(), - container.getNodeId().toString(), container.getResource()); + nodeId.toString(), container.getResource()); containerToken.setIdentifier( ByteBuffer.wrap(tokenidentifier.getBytes())); containerToken.setKind(ContainerTokenIdentifier.KIND.toString()); containerToken.setPassword( ByteBuffer.wrap(containerTokenSecretManager .createPassword(tokenidentifier))); - containerToken.setService(container.getNodeId().toString()); + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = NetUtils.createSocketAddr( + nodeId.getHost(), nodeId.getPort()); + containerToken.setService(addr.getAddress().getHostAddress() + ":" + + addr.getPort()); container.setContainerToken(containerToken); }