MAPREDUCE-2792. Replace usage of node ip-addresses with hostnames. Contributed by Vinod K V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1178631 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-03 23:21:20 +00:00
parent 93e0265af0
commit 12743d2169
17 changed files with 103 additions and 82 deletions

View File

@ -1501,6 +1501,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3113. Ensure bin/yarn and bin/yarn-daemon.sh identify the root MAPREDUCE-3113. Ensure bin/yarn and bin/yarn-daemon.sh identify the root
of the install properly. (Xie Xianshan via acmurthy) of the install properly. (Xie Xianshan via acmurthy)
MAPREDUCE-2792. Replace usage of node ip-addresses with hostnames.
(vinodkv via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -84,7 +84,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;

View File

@ -46,8 +46,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -147,8 +147,8 @@ public abstract class RMCommunicator extends AbstractService {
protected void register() { protected void register() {
//Register //Register
String host = String host = clientService.getBindAddress().getAddress()
clientService.getBindAddress().getAddress().getHostAddress(); .getCanonicalHostName();
try { try {
RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest request =
recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class); recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -156,8 +157,11 @@ public class ClientServiceDelegate {
Token<ApplicationTokenIdentifier> clientToken = Token<ApplicationTokenIdentifier> clientToken =
new Token<ApplicationTokenIdentifier>(); new Token<ApplicationTokenIdentifier>();
clientToken.decodeFromUrlString(clientTokenEncoded); clientToken.decodeFromUrlString(clientTokenEncoded);
clientToken.setService(new Text(application.getHost() + ":" // RPC layer client expects ip:port as service for tokens
+ application.getRpcPort())); InetSocketAddress addr = NetUtils.createSocketAddr(application
.getHost(), application.getRpcPort());
clientToken.setService(new Text(addr.getAddress().getHostAddress()
+ ":" + addr.getPort()));
UserGroupInformation.getCurrentUser().addToken(clientToken); UserGroupInformation.getCurrentUser().addToken(clientToken);
} }
LOG.info("Tracking Url of JOB is " + application.getTrackingUrl()); LOG.info("Tracking Url of JOB is " + application.getTrackingUrl());

View File

@ -44,7 +44,8 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
private String nmHostName; private String nmHostName;
private Resource resource; private Resource resource;
public ContainerTokenIdentifier(ContainerId containerID, String hostName, Resource r) { public ContainerTokenIdentifier(ContainerId containerID, String hostName,
Resource r) {
this.containerId = containerID; this.containerId = containerID;
this.nmHostName = hostName; this.nmHostName = hostName;
this.resource = r; this.resource = r;

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -32,6 +33,13 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
*/ */
public interface Context { public interface Context {
/**
* Return the nodeId. Usable only when the ContainerManager is started.
*
* @return the NodeId
*/
NodeId getNodeId();
ConcurrentMap<ApplicationId, Application> getApplications(); ConcurrentMap<ApplicationId, Application> getApplications();
ConcurrentMap<ContainerId, Container> getContainers(); ConcurrentMap<ContainerId, Container> getContainers();

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.Records;
public class NodeManager extends CompositeService { public class NodeManager extends CompositeService {
private static final Log LOG = LogFactory.getLog(NodeManager.class); private static final Log LOG = LogFactory.getLog(NodeManager.class);
@ -161,6 +163,7 @@ public class NodeManager extends CompositeService {
public static class NMContext implements Context { public static class NMContext implements Context {
private final NodeId nodeId = Records.newRecord(NodeId.class);
private final ConcurrentMap<ApplicationId, Application> applications = private final ConcurrentMap<ApplicationId, Application> applications =
new ConcurrentHashMap<ApplicationId, Application>(); new ConcurrentHashMap<ApplicationId, Application>();
private final ConcurrentMap<ContainerId, Container> containers = private final ConcurrentMap<ContainerId, Container> containers =
@ -175,6 +178,14 @@ public class NodeManager extends CompositeService {
this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis()); this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
} }
/**
* Usable only after ContainerManager is started.
*/
@Override
public NodeId getNodeId() {
return this.nodeId;
}
@Override @Override
public ConcurrentMap<ApplicationId, Application> getApplications() { public ConcurrentMap<ApplicationId, Application> getApplications() {
return this.applications; return this.applications;

View File

@ -24,8 +24,5 @@ public interface NodeStatusUpdater extends Service {
byte[] getRMNMSharedSecret(); byte[] getRMNMSharedSecret();
String getContainerManagerBindAddress();
void sendOutofBandHeartBeat(); void sendOutofBandHeartBeat();
} }

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
@ -57,7 +56,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.Records;
public class NodeStatusUpdaterImpl extends AbstractService implements public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdater { NodeStatusUpdater {
@ -69,16 +67,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private final Context context; private final Context context;
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
private NodeId nodeId;
private ContainerTokenSecretManager containerTokenSecretManager; private ContainerTokenSecretManager containerTokenSecretManager;
private long heartBeatInterval; private long heartBeatInterval;
private ResourceTracker resourceTracker; private ResourceTracker resourceTracker;
private String rmAddress; private String rmAddress;
private Resource totalResource; private Resource totalResource;
private String containerManagerBindAddress;
private String hostName;
private int containerManagerPort;
private int httpPort; private int httpPort;
private NodeId nodeId;
private byte[] secretKeyBytes = new byte[0]; private byte[] secretKeyBytes = new byte[0];
private boolean isStopped; private boolean isStopped;
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
@ -114,24 +109,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@Override @Override
public void start() { public void start() {
String cmBindAddressStr =
getConfig().get(YarnConfiguration.NM_ADDRESS, // NodeManager is the last service to start, so NodeId is available.
YarnConfiguration.DEFAULT_NM_ADDRESS); this.nodeId = this.context.getNodeId();
InetSocketAddress cmBindAddress =
NetUtils.createSocketAddr(cmBindAddressStr);
String httpBindAddressStr = String httpBindAddressStr =
getConfig().get(YarnConfiguration.NM_WEBAPP_ADDRESS, getConfig().get(YarnConfiguration.NM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS); YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS);
InetSocketAddress httpBindAddress = InetSocketAddress httpBindAddress =
NetUtils.createSocketAddr(httpBindAddressStr); NetUtils.createSocketAddr(httpBindAddressStr);
try { try {
this.hostName = InetAddress.getLocalHost().getHostAddress(); // this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
this.containerManagerPort = cmBindAddress.getPort();
this.httpPort = httpBindAddress.getPort(); this.httpPort = httpBindAddress.getPort();
this.containerManagerBindAddress =
this.hostName + ":" + this.containerManagerPort;
LOG.info("Configured ContainerManager Address is "
+ this.containerManagerBindAddress);
// Registration has to be in start so that ContainerManager can get the // Registration has to be in start so that ContainerManager can get the
// perNM tokens needed to authenticate ContainerTokens. // perNM tokens needed to authenticate ContainerTokens.
registerWithRM(); registerWithRM();
@ -165,9 +154,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
LOG.info("Connected to ResourceManager at " + this.rmAddress); LOG.info("Connected to ResourceManager at " + this.rmAddress);
RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
this.nodeId = Records.newRecord(NodeId.class);
this.nodeId.setHost(this.hostName);
this.nodeId.setPort(this.containerManagerPort);
request.setHttpPort(this.httpPort); request.setHttpPort(this.httpPort);
request.setResource(this.totalResource); request.setResource(this.totalResource);
request.setNodeId(this.nodeId); request.setNodeId(this.nodeId);
@ -183,19 +169,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// It is expected that status updater is started by this point and // It is expected that status updater is started by this point and
// RM gives the shared secret in registration during StatusUpdater#start(). // RM gives the shared secret in registration during StatusUpdater#start().
this.containerTokenSecretManager.setSecretKey( this.containerTokenSecretManager.setSecretKey(
this.getContainerManagerBindAddress(), this.nodeId.toString(),
this.getRMNMSharedSecret()); this.getRMNMSharedSecret());
} }
LOG.info("Registered with ResourceManager as " + this.containerManagerBindAddress LOG.info("Registered with ResourceManager as " + this.nodeId
+ " with total resource of " + this.totalResource); + " with total resource of " + this.totalResource);
} }
@Override
public String getContainerManagerBindAddress() {
return this.containerManagerBindAddress;
}
@Override @Override
public byte[] getRMNMSharedSecret() { public byte[] getRMNMSharedSecret() {
return this.secretKeyBytes.clone(); return this.secretKeyBytes.clone();
@ -230,8 +211,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
} }
nodeStatus.setContainersStatuses(containersStatuses); nodeStatus.setContainersStatuses(containersStatuses);
LOG.debug(this.containerManagerBindAddress + " sending out status for " + numActiveContainers LOG.debug(this.nodeId + " sending out status for "
+ " containers"); + numActiveContainers + " containers");
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
if (this.healthChecker != null) { if (this.healthChecker != null) {

View File

@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import static org.apache.hadoop.yarn.service.Service.STATE.STARTED; import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
@ -36,6 +38,7 @@ import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
@ -99,7 +102,6 @@ public class ContainerManagerImpl extends CompositeService implements
final Context context; final Context context;
private final ContainersMonitor containersMonitor; private final ContainersMonitor containersMonitor;
private Server server; private Server server;
private InetSocketAddress cmBindAddressStr;
private final ResourceLocalizationService rsrcLocalizationSrvc; private final ResourceLocalizationService rsrcLocalizationSrvc;
private final ContainersLauncher containersLauncher; private final ContainersLauncher containersLauncher;
private final AuxServices auxiluaryServices; private final AuxServices auxiluaryServices;
@ -144,7 +146,7 @@ public class ContainerManagerImpl extends CompositeService implements
addService(this.containersMonitor); addService(this.containersMonitor);
LogAggregationService logAggregationService = LogAggregationService logAggregationService =
createLogAggregationService(this.deletionService); createLogAggregationService(this.context, this.deletionService);
addService(logAggregationService); addService(logAggregationService);
dispatcher.register(ContainerEventType.class, dispatcher.register(ContainerEventType.class,
@ -159,9 +161,9 @@ public class ContainerManagerImpl extends CompositeService implements
addService(dispatcher); addService(dispatcher);
} }
protected LogAggregationService createLogAggregationService( protected LogAggregationService createLogAggregationService(Context context,
DeletionService deletionService) { DeletionService deletionService) {
return new LogAggregationService(deletionService); return new LogAggregationService(context, deletionService);
} }
public ContainersMonitor getContainersMonitor() { public ContainersMonitor getContainersMonitor() {
@ -179,29 +181,36 @@ public class ContainerManagerImpl extends CompositeService implements
return new ContainersLauncher(context, this.dispatcher, exec); return new ContainersLauncher(context, this.dispatcher, exec);
} }
@Override
public void init(Configuration conf) {
cmBindAddressStr = NetUtils.createSocketAddr(
conf.get(YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS));
super.init(conf);
}
@Override @Override
public void start() { public void start() {
// Enqueue user dirs in deletion context // Enqueue user dirs in deletion context
YarnRPC rpc = YarnRPC.create(getConfig()); Configuration conf = getConfig();
Configuration cmConf = new Configuration(getConfig()); YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress initialAddress = NetUtils.createSocketAddr(conf.get(
YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS));
Configuration cmConf = new Configuration(conf);
cmConf.setClass(YarnConfiguration.YARN_SECURITY_INFO, cmConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
ContainerManagerSecurityInfo.class, SecurityInfo.class); ContainerManagerSecurityInfo.class, SecurityInfo.class);
server = server =
rpc.getServer(ContainerManager.class, this, cmBindAddressStr, cmConf, rpc.getServer(ContainerManager.class, this, initialAddress, cmConf,
this.containerTokenSecretManager, this.containerTokenSecretManager,
cmConf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, cmConf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT,
YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT)); YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));
LOG.info("ContainerManager started at " + cmBindAddressStr);
server.start(); server.start();
InetAddress hostNameResolved = null;
try {
hostNameResolved = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
throw new YarnException(e);
}
this.context.getNodeId().setHost(hostNameResolved.getCanonicalHostName());
this.context.getNodeId().setPort(server.getPort());
LOG.info("ContainerManager started at "
+ this.context.getNodeId().toString());
super.start(); super.start();
} }

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -32,7 +31,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -42,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
@ -53,6 +52,7 @@ public class LogAggregationService extends AbstractService implements
private static final Log LOG = LogFactory private static final Log LOG = LogFactory
.getLog(LogAggregationService.class); .getLog(LogAggregationService.class);
private final Context context;
private final DeletionService deletionService; private final DeletionService deletionService;
private String[] localRootLogDirs; private String[] localRootLogDirs;
@ -63,8 +63,10 @@ public class LogAggregationService extends AbstractService implements
private final ExecutorService threadPool; private final ExecutorService threadPool;
public LogAggregationService(DeletionService deletionService) { public LogAggregationService(Context context,
DeletionService deletionService) {
super(LogAggregationService.class.getName()); super(LogAggregationService.class.getName());
this.context = context;
this.deletionService = deletionService; this.deletionService = deletionService;
this.appLogAggregators = this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>(); new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
@ -82,16 +84,9 @@ public class LogAggregationService extends AbstractService implements
@Override @Override
public synchronized void start() { public synchronized void start() {
String address = // NodeId is only available during start, the following cannot be moved
getConfig().get(YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS); // anywhere else.
InetSocketAddress cmBindAddress = NetUtils.createSocketAddr(address); this.nodeFile = this.context.getNodeId().toString();
try {
this.nodeFile =
InetAddress.getLocalHost().getHostAddress() + "_"
+ cmBindAddress.getPort();
} catch (UnknownHostException e) {
throw new YarnException(e);
}
super.start(); super.start();
} }

View File

@ -144,9 +144,9 @@ public class DummyContainerManager extends ContainerManagerImpl {
} }
@Override @Override
protected LogAggregationService createLogAggregationService( protected LogAggregationService createLogAggregationService(Context context,
DeletionService deletionService) { DeletionService deletionService) {
return new LogAggregationService(deletionService) { return new LogAggregationService(context, deletionService) {
@Override @Override
public void handle(LogAggregatorEvent event) { public void handle(LogAggregatorEvent event) {
switch (event.getType()) { switch (event.getType()) {

View File

@ -104,7 +104,7 @@ public class TestNodeStatusUpdater {
Resource resource = request.getResource(); Resource resource = request.getResource();
LOG.info("Registering " + nodeId.toString()); LOG.info("Registering " + nodeId.toString());
try { try {
Assert.assertEquals(InetAddress.getLocalHost().getHostAddress() Assert.assertEquals(InetAddress.getLocalHost().getCanonicalHostName()
+ ":12345", nodeId.toString()); + ":12345", nodeId.toString());
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
Assert.fail(e.getMessage()); Assert.fail(e.getMessage());

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -41,7 +40,6 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -54,6 +52,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.Service.STATE; import org.apache.hadoop.yarn.service.Service.STATE;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;

View File

@ -98,7 +98,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath()); this.remoteRootLogDir.getAbsolutePath());
LogAggregationService logAggregationService = LogAggregationService logAggregationService =
new LogAggregationService(this.delSrvc); new LogAggregationService(this.context, this.delSrvc);
logAggregationService.init(this.conf); logAggregationService.init(this.conf);
logAggregationService.start(); logAggregationService.start();
@ -146,7 +146,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath()); this.remoteRootLogDir.getAbsolutePath());
LogAggregationService logAggregationService = LogAggregationService logAggregationService =
new LogAggregationService(this.delSrvc); new LogAggregationService(this.context, this.delSrvc);
logAggregationService.init(this.conf); logAggregationService.init(this.conf);
logAggregationService.start(); logAggregationService.start();
@ -179,7 +179,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath()); this.remoteRootLogDir.getAbsolutePath());
LogAggregationService logAggregationService = LogAggregationService logAggregationService =
new LogAggregationService(this.delSrvc); new LogAggregationService(this.context, this.delSrvc);
logAggregationService.init(this.conf); logAggregationService.init(this.conf);
logAggregationService.start(); logAggregationService.start();

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -34,6 +35,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
@ -41,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -1066,9 +1069,9 @@ public class LeafQueue implements CSQueue {
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
ContainerToken containerToken = ContainerToken containerToken =
this.recordFactory.newRecordInstance(ContainerToken.class); this.recordFactory.newRecordInstance(ContainerToken.class);
ContainerTokenIdentifier tokenidentifier = NodeId nodeId = container.getNodeId();
new ContainerTokenIdentifier(container.getId(), ContainerTokenIdentifier tokenidentifier = new ContainerTokenIdentifier(
container.getNodeId().toString(), container.getResource()); container.getId(), nodeId.toString(), container.getResource());
containerToken.setIdentifier( containerToken.setIdentifier(
ByteBuffer.wrap(tokenidentifier.getBytes())); ByteBuffer.wrap(tokenidentifier.getBytes()));
containerToken.setKind(ContainerTokenIdentifier.KIND.toString()); containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
@ -1076,7 +1079,11 @@ public class LeafQueue implements CSQueue {
ByteBuffer.wrap( ByteBuffer.wrap(
containerTokenSecretManager.createPassword(tokenidentifier)) containerTokenSecretManager.createPassword(tokenidentifier))
); );
containerToken.setService(container.getNodeId().toString()); // RPC layer client expects ip:port as service for tokens
InetSocketAddress addr = NetUtils.createSocketAddr(nodeId.getHost(),
nodeId.getPort());
containerToken.setService(addr.getAddress().getHostAddress() + ":"
+ addr.getPort());
container.setContainerToken(containerToken); container.setContainerToken(containerToken);
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -35,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Lock; import org.apache.hadoop.yarn.Lock;
@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -83,7 +86,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.api.records.QueueState;
@LimitedPrivate("yarn") @LimitedPrivate("yarn")
@Evolving @Evolving
@ -543,16 +545,21 @@ public class FifoScheduler implements ResourceScheduler {
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
ContainerToken containerToken = ContainerToken containerToken =
recordFactory.newRecordInstance(ContainerToken.class); recordFactory.newRecordInstance(ContainerToken.class);
NodeId nodeId = container.getNodeId();
ContainerTokenIdentifier tokenidentifier = ContainerTokenIdentifier tokenidentifier =
new ContainerTokenIdentifier(container.getId(), new ContainerTokenIdentifier(container.getId(),
container.getNodeId().toString(), container.getResource()); nodeId.toString(), container.getResource());
containerToken.setIdentifier( containerToken.setIdentifier(
ByteBuffer.wrap(tokenidentifier.getBytes())); ByteBuffer.wrap(tokenidentifier.getBytes()));
containerToken.setKind(ContainerTokenIdentifier.KIND.toString()); containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
containerToken.setPassword( containerToken.setPassword(
ByteBuffer.wrap(containerTokenSecretManager ByteBuffer.wrap(containerTokenSecretManager
.createPassword(tokenidentifier))); .createPassword(tokenidentifier)));
containerToken.setService(container.getNodeId().toString()); // RPC layer client expects ip:port as service for tokens
InetSocketAddress addr = NetUtils.createSocketAddr(
nodeId.getHost(), nodeId.getPort());
containerToken.setService(addr.getAddress().getHostAddress() + ":"
+ addr.getPort());
container.setContainerToken(containerToken); container.setContainerToken(containerToken);
} }