MAPREDUCE-4161. create sockets consistently (Daryn Sharp via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1327621 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4e4ba4cd90
commit
32d511065a
|
@ -274,6 +274,8 @@ Release 0.23.3 - UNRELEASED
|
||||||
MAPREDUCE-3972. Fix locking and exception issues in JobHistory server.
|
MAPREDUCE-3972. Fix locking and exception issues in JobHistory server.
|
||||||
(Robert Joseph Evans via sseth)
|
(Robert Joseph Evans via sseth)
|
||||||
|
|
||||||
|
MAPREDUCE-4161. create sockets consistently (Daryn Sharp via bobby)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.mapreduce.v2.app.rm;
|
package org.apache.hadoop.mapreduce.v2.app.rm;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -35,7 +36,6 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
@ -245,11 +245,12 @@ public abstract class RMCommunicator extends AbstractService {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected AMRMProtocol createSchedulerProxy() {
|
protected AMRMProtocol createSchedulerProxy() {
|
||||||
final YarnRPC rpc = YarnRPC.create(getConfig());
|
|
||||||
final Configuration conf = getConfig();
|
final Configuration conf = getConfig();
|
||||||
final String serviceAddr = conf.get(
|
final YarnRPC rpc = YarnRPC.create(conf);
|
||||||
|
final InetSocketAddress serviceAddr = conf.getSocketAddr(
|
||||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||||
|
|
||||||
UserGroupInformation currentUser;
|
UserGroupInformation currentUser;
|
||||||
try {
|
try {
|
||||||
|
@ -279,7 +280,7 @@ public abstract class RMCommunicator extends AbstractService {
|
||||||
@Override
|
@Override
|
||||||
public AMRMProtocol run() {
|
public AMRMProtocol run() {
|
||||||
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
|
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
|
||||||
NetUtils.createSocketAddr(serviceAddr), conf);
|
serviceAddr, conf);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,10 +55,10 @@ public class Master {
|
||||||
return NetUtils.createSocketAddr(masterAddress, 8012, MRConfig.MASTER_ADDRESS);
|
return NetUtils.createSocketAddr(masterAddress, 8012, MRConfig.MASTER_ADDRESS);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
masterAddress = conf.get(YarnConfiguration.RM_ADDRESS,
|
return conf.getSocketAddr(
|
||||||
YarnConfiguration.DEFAULT_RM_ADDRESS);
|
YarnConfiguration.RM_ADDRESS,
|
||||||
return NetUtils.createSocketAddr(masterAddress, YarnConfiguration.DEFAULT_RM_PORT,
|
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||||
YarnConfiguration.RM_ADDRESS);
|
YarnConfiguration.DEFAULT_RM_PORT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIden
|
||||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
|
@ -88,12 +87,10 @@ public class ResourceMgrDelegate {
|
||||||
public ResourceMgrDelegate(YarnConfiguration conf) {
|
public ResourceMgrDelegate(YarnConfiguration conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
YarnRPC rpc = YarnRPC.create(this.conf);
|
YarnRPC rpc = YarnRPC.create(this.conf);
|
||||||
InetSocketAddress rmAddress =
|
InetSocketAddress rmAddress = conf.getSocketAddr(
|
||||||
NetUtils.createSocketAddr(this.conf.get(
|
|
||||||
YarnConfiguration.RM_ADDRESS,
|
YarnConfiguration.RM_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_ADDRESS),
|
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_PORT,
|
YarnConfiguration.DEFAULT_RM_PORT);
|
||||||
YarnConfiguration.RM_ADDRESS);
|
|
||||||
this.rmAddress = rmAddress.toString();
|
this.rmAddress = rmAddress.toString();
|
||||||
LOG.debug("Connecting to ResourceManager at " + rmAddress);
|
LOG.debug("Connecting to ResourceManager at " + rmAddress);
|
||||||
applicationsManager =
|
applicationsManager =
|
||||||
|
|
|
@ -742,9 +742,10 @@ public class ApplicationMaster {
|
||||||
*/
|
*/
|
||||||
private AMRMProtocol connectToRM() {
|
private AMRMProtocol connectToRM() {
|
||||||
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
||||||
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
|
InetSocketAddress rmAddress = yarnConf.getSocketAddr(
|
||||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||||
return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
|
return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,8 +42,6 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.SecurityInfo;
|
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
@ -723,9 +721,10 @@ public class Client {
|
||||||
});
|
});
|
||||||
*/
|
*/
|
||||||
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
||||||
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
|
InetSocketAddress rmAddress = yarnConf.getSocketAddr(
|
||||||
YarnConfiguration.RM_ADDRESS,
|
YarnConfiguration.RM_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_ADDRESS));
|
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_PORT);
|
||||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||||
applicationsManager = ((ClientRMProtocol) rpc.getProxy(
|
applicationsManager = ((ClientRMProtocol) rpc.getProxy(
|
||||||
ClientRMProtocol.class, rmAddress, conf));
|
ClientRMProtocol.class, rmAddress, conf));
|
||||||
|
|
|
@ -558,17 +558,16 @@ public class YarnConfiguration extends Configuration {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getRMWebAppHostAndPort(Configuration conf) {
|
public static String getRMWebAppHostAndPort(Configuration conf) {
|
||||||
String addr = conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS,
|
int port = conf.getSocketAddr(
|
||||||
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
|
YarnConfiguration.RM_WEBAPP_ADDRESS,
|
||||||
Iterator<String> it = ADDR_SPLITTER.split(addr).iterator();
|
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
|
||||||
it.next(); // ignore the bind host
|
YarnConfiguration.DEFAULT_RM_WEBAPP_PORT).getPort();
|
||||||
String port = it.next();
|
|
||||||
// Use apps manager address to figure out the host for webapp
|
// Use apps manager address to figure out the host for webapp
|
||||||
addr = conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS);
|
String host = conf.getSocketAddr(
|
||||||
String host = ADDR_SPLITTER.split(addr).iterator().next();
|
YarnConfiguration.RM_ADDRESS,
|
||||||
String rmAddress = JOINER.join(host, ":", port);
|
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||||
InetSocketAddress address = NetUtils.createSocketAddr(
|
YarnConfiguration.DEFAULT_RM_PORT).getHostName();
|
||||||
rmAddress, DEFAULT_RM_WEBAPP_PORT, RM_WEBAPP_ADDRESS);
|
InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);
|
||||||
StringBuffer sb = new StringBuffer();
|
StringBuffer sb = new StringBuffer();
|
||||||
InetAddress resolved = address.getAddress();
|
InetAddress resolved = address.getAddress();
|
||||||
if (resolved == null || resolved.isAnyLocalAddress() ||
|
if (resolved == null || resolved.isAnyLocalAddress() ||
|
||||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.avro.AvroRuntimeException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -75,7 +74,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
private ContainerTokenSecretManager containerTokenSecretManager;
|
private ContainerTokenSecretManager containerTokenSecretManager;
|
||||||
private long heartBeatInterval;
|
private long heartBeatInterval;
|
||||||
private ResourceTracker resourceTracker;
|
private ResourceTracker resourceTracker;
|
||||||
private String rmAddress;
|
private InetSocketAddress rmAddress;
|
||||||
private Resource totalResource;
|
private Resource totalResource;
|
||||||
private int httpPort;
|
private int httpPort;
|
||||||
private byte[] secretKeyBytes = new byte[0];
|
private byte[] secretKeyBytes = new byte[0];
|
||||||
|
@ -106,9 +105,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void init(Configuration conf) {
|
public synchronized void init(Configuration conf) {
|
||||||
this.rmAddress =
|
this.rmAddress = conf.getSocketAddr(
|
||||||
conf.get(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS);
|
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
|
||||||
this.heartBeatInterval =
|
this.heartBeatInterval =
|
||||||
conf.getLong(YarnConfiguration.NM_TO_RM_HEARTBEAT_INTERVAL_MS,
|
conf.getLong(YarnConfiguration.NM_TO_RM_HEARTBEAT_INTERVAL_MS,
|
||||||
YarnConfiguration.DEFAULT_NM_TO_RM_HEARTBEAT_INTERVAL_MS);
|
YarnConfiguration.DEFAULT_NM_TO_RM_HEARTBEAT_INTERVAL_MS);
|
||||||
|
@ -132,13 +132,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
// NodeManager is the last service to start, so NodeId is available.
|
// NodeManager is the last service to start, so NodeId is available.
|
||||||
this.nodeId = this.context.getNodeId();
|
this.nodeId = this.context.getNodeId();
|
||||||
|
|
||||||
String httpBindAddressStr =
|
InetSocketAddress httpBindAddress = getConfig().getSocketAddr(
|
||||||
getConfig().get(YarnConfiguration.NM_WEBAPP_ADDRESS,
|
YarnConfiguration.NM_WEBAPP_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS);
|
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS,
|
||||||
InetSocketAddress httpBindAddress =
|
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT);
|
||||||
NetUtils.createSocketAddr(httpBindAddressStr,
|
|
||||||
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT,
|
|
||||||
YarnConfiguration.NM_WEBAPP_ADDRESS);
|
|
||||||
try {
|
try {
|
||||||
// this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
|
// this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
|
||||||
this.httpPort = httpBindAddress.getPort();
|
this.httpPort = httpBindAddress.getPort();
|
||||||
|
@ -178,9 +175,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
protected ResourceTracker getRMClient() {
|
protected ResourceTracker getRMClient() {
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
YarnRPC rpc = YarnRPC.create(conf);
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
InetSocketAddress rmAddress = NetUtils.createSocketAddr(this.rmAddress,
|
|
||||||
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT,
|
|
||||||
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS);
|
|
||||||
return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress,
|
return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress,
|
||||||
conf);
|
conf);
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
|
@ -226,10 +225,10 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
YarnRPC rpc = YarnRPC.create(conf);
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
|
|
||||||
InetSocketAddress initialAddress = NetUtils.createSocketAddr(conf.get(
|
InetSocketAddress initialAddress = conf.getSocketAddr(
|
||||||
YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS),
|
YarnConfiguration.NM_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_NM_PORT,
|
YarnConfiguration.DEFAULT_NM_ADDRESS,
|
||||||
YarnConfiguration.NM_ADDRESS);
|
YarnConfiguration.DEFAULT_NM_PORT);
|
||||||
|
|
||||||
server =
|
server =
|
||||||
rpc.getServer(ContainerManager.class, this, initialAddress, conf,
|
rpc.getServer(ContainerManager.class, this, initialAddress, conf,
|
||||||
|
|
|
@ -207,10 +207,10 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) << 20;
|
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) << 20;
|
||||||
cacheCleanupPeriod =
|
cacheCleanupPeriod =
|
||||||
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
|
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
|
||||||
localizationServerAddress = NetUtils.createSocketAddr(
|
localizationServerAddress = conf.getSocketAddr(
|
||||||
conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS),
|
YarnConfiguration.NM_LOCALIZER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT,
|
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
|
||||||
YarnConfiguration.NM_LOCALIZER_ADDRESS);
|
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
|
||||||
localizerTracker = createLocalizerTracker(conf);
|
localizerTracker = createLocalizerTracker(conf);
|
||||||
addService(localizerTracker);
|
addService(localizerTracker);
|
||||||
dispatcher.register(LocalizerEventType.class, localizerTracker);
|
dispatcher.register(LocalizerEventType.class, localizerTracker);
|
||||||
|
@ -232,9 +232,10 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
.split(":")[0];
|
.split(":")[0];
|
||||||
getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, host + ":"
|
getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, host + ":"
|
||||||
+ server.getPort());
|
+ server.getPort());
|
||||||
localizationServerAddress = NetUtils.createSocketAddr(
|
localizationServerAddress = getConfig().getSocketAddr(
|
||||||
getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS,
|
YarnConfiguration.NM_LOCALIZER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
|
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
|
||||||
LOG.info("Localizer started on port " + server.getPort());
|
LOG.info("Localizer started on port " + server.getPort());
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.Groups;
|
import org.apache.hadoop.security.Groups;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -95,12 +94,10 @@ public class AdminService extends AbstractService implements RMAdminProtocol {
|
||||||
@Override
|
@Override
|
||||||
public void init(Configuration conf) {
|
public void init(Configuration conf) {
|
||||||
super.init(conf);
|
super.init(conf);
|
||||||
String bindAddress =
|
masterServiceAddress = conf.getSocketAddr(
|
||||||
conf.get(YarnConfiguration.RM_ADMIN_ADDRESS,
|
YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS);
|
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
|
||||||
masterServiceAddress = NetUtils.createSocketAddr(bindAddress,
|
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
|
||||||
YarnConfiguration.DEFAULT_RM_ADMIN_PORT,
|
|
||||||
YarnConfiguration.RM_ADMIN_ADDRESS);
|
|
||||||
adminAcl = new AccessControlList(conf.get(
|
adminAcl = new AccessControlList(conf.get(
|
||||||
YarnConfiguration.YARN_ADMIN_ACL,
|
YarnConfiguration.YARN_ADMIN_ACL,
|
||||||
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
|
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
|
||||||
|
|
|
@ -100,13 +100,10 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
YarnRPC rpc = YarnRPC.create(conf);
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
|
|
||||||
String bindAddressStr =
|
InetSocketAddress masterServiceAddress = conf.getSocketAddr(
|
||||||
conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||||
InetSocketAddress masterServiceAddress =
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||||
NetUtils.createSocketAddr(bindAddressStr,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT,
|
|
||||||
YarnConfiguration.RM_SCHEDULER_ADDRESS);
|
|
||||||
|
|
||||||
this.server =
|
this.server =
|
||||||
rpc.getServer(AMRMProtocol.class, this, masterServiceAddress,
|
rpc.getServer(AMRMProtocol.class, this, masterServiceAddress,
|
||||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
|
@ -104,7 +103,6 @@ public class ClientRMService extends AbstractService implements
|
||||||
final private RMContext rmContext;
|
final private RMContext rmContext;
|
||||||
private final RMAppManager rmAppManager;
|
private final RMAppManager rmAppManager;
|
||||||
|
|
||||||
private String clientServiceBindAddress;
|
|
||||||
private Server server;
|
private Server server;
|
||||||
private RMDelegationTokenSecretManager rmDTSecretManager;
|
private RMDelegationTokenSecretManager rmDTSecretManager;
|
||||||
|
|
||||||
|
@ -126,13 +124,10 @@ public class ClientRMService extends AbstractService implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Configuration conf) {
|
public void init(Configuration conf) {
|
||||||
clientServiceBindAddress =
|
clientBindAddress = conf.getSocketAddr(
|
||||||
conf.get(YarnConfiguration.RM_ADDRESS,
|
YarnConfiguration.RM_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_ADDRESS);
|
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||||
clientBindAddress =
|
YarnConfiguration.DEFAULT_RM_PORT);
|
||||||
NetUtils.createSocketAddr(clientServiceBindAddress,
|
|
||||||
YarnConfiguration.DEFAULT_RM_PORT,
|
|
||||||
YarnConfiguration.RM_ADDRESS);
|
|
||||||
super.init(conf);
|
super.init(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -104,13 +103,10 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void init(Configuration conf) {
|
public synchronized void init(Configuration conf) {
|
||||||
String resourceTrackerBindAddress =
|
resourceTrackerAddress = conf.getSocketAddr(
|
||||||
conf.get(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS);
|
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
|
||||||
resourceTrackerAddress = NetUtils.createSocketAddr(
|
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
|
||||||
resourceTrackerBindAddress,
|
|
||||||
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT,
|
|
||||||
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS);
|
|
||||||
|
|
||||||
RackResolver.init(conf);
|
RackResolver.init(conf);
|
||||||
super.init(conf);
|
super.init(conf);
|
||||||
|
|
|
@ -218,11 +218,10 @@ public class AMLauncher implements Runnable {
|
||||||
Token<ApplicationTokenIdentifier> token =
|
Token<ApplicationTokenIdentifier> token =
|
||||||
new Token<ApplicationTokenIdentifier>(id,
|
new Token<ApplicationTokenIdentifier>(id,
|
||||||
this.rmContext.getApplicationTokenSecretManager());
|
this.rmContext.getApplicationTokenSecretManager());
|
||||||
String schedulerAddressStr =
|
InetSocketAddress unresolvedAddr = conf.getSocketAddr(
|
||||||
this.conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||||
InetSocketAddress unresolvedAddr =
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||||
NetUtils.createSocketAddr(schedulerAddressStr);
|
|
||||||
String resolvedAddr =
|
String resolvedAddr =
|
||||||
unresolvedAddr.getAddress().getHostAddress() + ":"
|
unresolvedAddr.getAddress().getHostAddress() + ":"
|
||||||
+ unresolvedAddr.getPort();
|
+ unresolvedAddr.getPort();
|
||||||
|
|
|
@ -25,7 +25,6 @@ import java.security.PrivilegedAction;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
@ -158,13 +157,10 @@ public class RMAdmin extends Configured implements Tool {
|
||||||
final YarnConfiguration conf = new YarnConfiguration(getConf());
|
final YarnConfiguration conf = new YarnConfiguration(getConf());
|
||||||
|
|
||||||
// Create the client
|
// Create the client
|
||||||
final String adminAddress =
|
final InetSocketAddress addr = conf.getSocketAddr(
|
||||||
conf.get(YarnConfiguration.RM_ADMIN_ADDRESS,
|
YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS);
|
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
|
||||||
final InetSocketAddress addr =
|
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
|
||||||
NetUtils.createSocketAddr(adminAddress,
|
|
||||||
YarnConfiguration.DEFAULT_RM_ADMIN_PORT,
|
|
||||||
YarnConfiguration.RM_ADMIN_ADDRESS);
|
|
||||||
final YarnRPC rpc = YarnRPC.create(conf);
|
final YarnRPC rpc = YarnRPC.create(conf);
|
||||||
|
|
||||||
RMAdminProtocol adminProtocol =
|
RMAdminProtocol adminProtocol =
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -27,7 +28,6 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
@ -196,9 +196,10 @@ public class TestAMAuthorization {
|
||||||
// Create a client to the RM.
|
// Create a client to the RM.
|
||||||
final Configuration conf = rm.getConfig();
|
final Configuration conf = rm.getConfig();
|
||||||
final YarnRPC rpc = YarnRPC.create(conf);
|
final YarnRPC rpc = YarnRPC.create(conf);
|
||||||
final String serviceAddr = conf.get(
|
final InetSocketAddress serviceAddr = conf.getSocketAddr(
|
||||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||||
|
|
||||||
UserGroupInformation currentUser = UserGroupInformation
|
UserGroupInformation currentUser = UserGroupInformation
|
||||||
.createRemoteUser(applicationAttemptId.toString());
|
.createRemoteUser(applicationAttemptId.toString());
|
||||||
|
@ -213,8 +214,8 @@ public class TestAMAuthorization {
|
||||||
.doAs(new PrivilegedAction<AMRMProtocol>() {
|
.doAs(new PrivilegedAction<AMRMProtocol>() {
|
||||||
@Override
|
@Override
|
||||||
public AMRMProtocol run() {
|
public AMRMProtocol run() {
|
||||||
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, NetUtils
|
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
|
||||||
.createSocketAddr(serviceAddr), conf);
|
serviceAddr, conf);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,6 @@ import junit.framework.Assert;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
|
@ -75,9 +74,10 @@ public class TestApplicationACLs {
|
||||||
static MockRM resourceManager;
|
static MockRM resourceManager;
|
||||||
static Configuration conf = new YarnConfiguration();
|
static Configuration conf = new YarnConfiguration();
|
||||||
final static YarnRPC rpc = YarnRPC.create(conf);
|
final static YarnRPC rpc = YarnRPC.create(conf);
|
||||||
final static InetSocketAddress rmAddress = NetUtils
|
final static InetSocketAddress rmAddress = conf.getSocketAddr(
|
||||||
.createSocketAddr(conf.get(YarnConfiguration.RM_ADDRESS,
|
YarnConfiguration.RM_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_ADDRESS));
|
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_PORT);
|
||||||
private static ClientRMProtocol rmClient;
|
private static ClientRMProtocol rmClient;
|
||||||
|
|
||||||
private static RecordFactory recordFactory = RecordFactoryProvider
|
private static RecordFactory recordFactory = RecordFactoryProvider
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||||
|
@ -51,10 +50,10 @@ public class AppReportFetcher {
|
||||||
public AppReportFetcher(Configuration conf) {
|
public AppReportFetcher(Configuration conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
YarnRPC rpc = YarnRPC.create(this.conf);
|
YarnRPC rpc = YarnRPC.create(this.conf);
|
||||||
InetSocketAddress rmAddress =
|
InetSocketAddress rmAddress = conf.getSocketAddr(
|
||||||
NetUtils.createSocketAddr(this.conf.get(
|
|
||||||
YarnConfiguration.RM_ADDRESS,
|
YarnConfiguration.RM_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_ADDRESS));
|
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_PORT);
|
||||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||||
applicationsManager =
|
applicationsManager =
|
||||||
(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
|
(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
|
||||||
|
|
Loading…
Reference in New Issue