MAPREDUCE-4163. consistently set the bind address (Daryn Sharp via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1333579 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-05-03 18:35:21 +00:00
parent e3242b95b3
commit 48414b0827
17 changed files with 61 additions and 121 deletions

View File

@ -447,6 +447,8 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4211. Error conditions (missing appid, appid not found) are
masked in the RM app page (Jonathan Eagles via bobby)
MAPREDUCE-4163. consistently set the bind address (Daryn Sharp via bobby)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
@ -127,10 +126,7 @@ public class TaskAttemptListenerImpl extends CompositeService
}
server.start();
InetSocketAddress listenerAddress = server.getListenerAddress();
listenerAddress.getAddress();
this.address = NetUtils.createSocketAddr(InetAddress.getLocalHost()
.getCanonicalHostName() + ":" + listenerAddress.getPort());
this.address = NetUtils.getConnectAddress(server);
} catch (IOException e) {
throw new YarnException(e);
}

View File

@ -18,9 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app.client;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collection;
@ -78,7 +76,6 @@ import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -116,13 +113,7 @@ public class MRClientService extends AbstractService
public void start() {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress address = NetUtils.createSocketAddr("0.0.0.0:0");
InetAddress hostNameResolved = null;
try {
hostNameResolved = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
throw new YarnException(e);
}
InetSocketAddress address = new InetSocketAddress(0);
ClientToAMSecretManager secretManager = null;
if (UserGroupInformation.isSecurityEnabled()) {
@ -150,9 +141,7 @@ public class MRClientService extends AbstractService
}
server.start();
this.bindAddress =
NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
+ ":" + server.getPort());
this.bindAddress = NetUtils.getConnectAddress(server);
LOG.info("Instantiated MRClientService at " + this.bindAddress);
try {
webApp = WebApps.$for("mapreduce", AppContext.class, appContext, "ws").with(conf).

View File

@ -356,7 +356,7 @@ public class TestContainerLauncher {
// make proxy connect to our local containerManager server
ContainerManager proxy = (ContainerManager) rpc.getProxy(
ContainerManager.class,
NetUtils.createSocketAddr("localhost:" + server.getPort()), conf);
NetUtils.getConnectAddress(server), conf);
return proxy;
}
};

View File

@ -19,9 +19,7 @@
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.security.AccessControlException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
@ -76,7 +74,6 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@ -117,17 +114,10 @@ public class HistoryClientService extends AbstractService {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
initializeWebApp(conf);
String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS);
InetSocketAddress address = NetUtils.createSocketAddr(serviceAddr,
JHAdminConfig.DEFAULT_MR_HISTORY_PORT,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS);
InetAddress hostNameResolved = null;
try {
hostNameResolved = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
throw new YarnException(e);
}
InetSocketAddress address = conf.getSocketAddr(
JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_PORT);
server =
rpc.getServer(HSClientProtocol.class, protocolHandler, address,
@ -143,31 +133,24 @@ public class HistoryClientService extends AbstractService {
}
server.start();
this.bindAddress =
NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
+ ":" + server.getPort());
this.bindAddress = conf.updateConnectAddr(JHAdminConfig.MR_HISTORY_ADDRESS,
server.getListenerAddress());
LOG.info("Instantiated MRClientService at " + this.bindAddress);
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress = bindAddress.getHostName() + ":" + bindAddress.getPort();
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, resolvedAddress);
String hostname = getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS);
hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname;
int port = webApp.port();
resolvedAddress = hostname + ":" + port;
conf.set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, resolvedAddress);
}
super.start();
}
private void initializeWebApp(Configuration conf) {
webApp = new HsWebApp(history);
String bindAddress = conf.get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS);
WebApps.$for("jobhistory", HistoryClientService.class, this, "ws").with(conf).at(bindAddress).start(webApp);
InetSocketAddress bindAddress = conf.getSocketAddr(
JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_PORT);
// NOTE: there should be a .at(InetSocketAddress)
WebApps.$for("jobhistory", HistoryClientService.class, this, "ws")
.with(conf).at(NetUtils.getHostPortString(bindAddress)).start(webApp);
conf.updateConnectAddr(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
webApp.getListenerAddress());
}
@Override

View File

@ -390,9 +390,7 @@ public class TestClientRedirect {
rpc.getServer(protocol, this, address,
conf, null, 1);
server.start();
this.bindAddress =
NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
+ ":" + server.getPort());
this.bindAddress = NetUtils.getConnectAddress(server);
super.start();
amRunning = true;
}

View File

@ -85,7 +85,7 @@ public class TestContainerLaunchRPC {
ContainerManager proxy = (ContainerManager) rpc.getProxy(
ContainerManager.class,
NetUtils.createSocketAddr("localhost:" + server.getPort()), conf);
server.getListenerAddress(), conf);
ContainerLaunchContext containerLaunchContext = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
containerLaunchContext.setUser("dummy-user");

View File

@ -75,8 +75,7 @@ public class TestRPC {
// Any unrelated protocol would do
ClientRMProtocol proxy = (ClientRMProtocol) rpc.getProxy(
ClientRMProtocol.class, NetUtils.createSocketAddr("localhost:"
+ server.getPort()), conf);
ClientRMProtocol.class, NetUtils.getConnectAddress(server), conf);
try {
proxy.getNewApplication(Records
@ -109,7 +108,7 @@ public class TestRPC {
RPC.setProtocolEngine(conf, ContainerManagerPB.class, ProtobufRpcEngine.class);
ContainerManager proxy = (ContainerManager)
rpc.getProxy(ContainerManager.class,
NetUtils.createSocketAddr("localhost:" + server.getPort()), conf);
NetUtils.getConnectAddress(server), conf);
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
containerLaunchContext.setUser("dummy-user");

View File

@ -23,7 +23,6 @@ import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Map;
@ -33,13 +32,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
@ -244,15 +243,10 @@ public class ContainerManagerImpl extends CompositeService implements
}
server.start();
try {
resolvedAddress = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
throw new YarnException(e);
}
this.context.getNodeId().setHost(resolvedAddress.getCanonicalHostName());
this.context.getNodeId().setPort(server.getPort());
LOG.info("ContainerManager started at "
+ this.context.getNodeId().toString());
InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
this.context.getNodeId().setHost(connectAddress.getHostName());
this.context.getNodeId().setPort(connectAddress.getPort());
LOG.info("ContainerManager started at " + connectAddress);
super.start();
}

View File

@ -69,7 +69,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -228,14 +227,9 @@ public class ResourceLocalizationService extends CompositeService
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
server = createServer();
server.start();
String host = getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS)
.split(":")[0];
getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, host + ":"
+ server.getPort());
localizationServerAddress = getConfig().getSocketAddr(
YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
localizationServerAddress =
getConfig().updateConnectAddr(YarnConfiguration.NM_LOCALIZER_ADDRESS,
server.getListenerAddress());
LOG.info("Localizer started on port " + server.getPort());
super.start();
}

View File

@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -88,7 +86,7 @@ public class TestNodeStatusUpdater {
.getRecordFactory(null);
int heartBeatID = 0;
volatile Error nmStartError = null;
volatile Throwable nmStartError = null;
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
private final Configuration conf = new YarnConfiguration();
private NodeManager nm;
@ -118,12 +116,8 @@ public class TestNodeStatusUpdater {
NodeId nodeId = request.getNodeId();
Resource resource = request.getResource();
LOG.info("Registering " + nodeId.toString());
try {
Assert.assertEquals(InetAddress.getLocalHost().getCanonicalHostName()
+ ":12345", nodeId.toString());
} catch (UnknownHostException e) {
Assert.fail(e.getMessage());
}
// NOTE: this really should be checking against the config value
Assert.assertEquals("localhost:12345", nodeId.toString());
Assert.assertEquals(5 * 1024, resource.getMemory());
registeredNodes.add(nodeId);
RegistrationResponse regResponse = recordFactory
@ -421,8 +415,9 @@ public class TestNodeStatusUpdater {
public void run() {
try {
nm.start();
} catch (Error e) {
} catch (Throwable e) {
TestNodeStatusUpdater.this.nmStartError = e;
throw new YarnException(e);
}
}
}.start();
@ -433,11 +428,11 @@ public class TestNodeStatusUpdater {
int waitCount = 0;
while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) {
LOG.info("Waiting for NM to start..");
if (nmStartError != null) {
Assert.fail(nmStartError.getCause().getMessage());
}
Thread.sleep(1000);
}
if (nmStartError != null) {
throw nmStartError;
}
if (nm.getServiceState() != STATE.STARTED) {
// NM could have failed.
Assert.fail("NodeManager failed to start");

View File

@ -95,6 +95,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
@ -103,7 +104,14 @@ public class TestResourceLocalizationService {
static final Path basedir =
new Path("target", TestResourceLocalizationService.class.getName());
static Server mockServer;
@BeforeClass
public static void setup() {
mockServer = mock(Server.class);
doReturn(new InetSocketAddress(123)).when(mockServer).getListenerAddress();
}
@Test
public void testLocalizationInit() throws Exception {
final Configuration conf = new Configuration();
@ -178,7 +186,6 @@ public class TestResourceLocalizationService {
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
Server ignore = mock(Server.class);
LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
@ -203,7 +210,7 @@ public class TestResourceLocalizationService {
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler);
ResourceLocalizationService spyService = spy(rawService);
doReturn(ignore).when(spyService).createServer();
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
isA(Configuration.class));
doReturn(lfs).when(spyService)
@ -366,7 +373,6 @@ public class TestResourceLocalizationService {
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
Server ignore = mock(Server.class);
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
dispatcher.start();
@ -388,7 +394,7 @@ public class TestResourceLocalizationService {
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler);
ResourceLocalizationService spyService = spy(rawService);
doReturn(ignore).when(spyService).createServer();
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
try {
spyService.init(conf);

View File

@ -120,11 +120,8 @@ public class AdminService extends AbstractService implements RMAdminProtocol {
}
this.server.start();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress =
this.server.getListenerAddress().getHostName() + ":" + this.server.getListenerAddress().getPort();
conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, resolvedAddress);
}
conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
server.getListenerAddress());
super.start();
}

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.util.StringUtils;
@ -120,13 +119,8 @@ public class ApplicationMasterService extends AbstractService implements
this.server.start();
this.bindAddress =
NetUtils.createSocketAddr(masterServiceAddress.getHostName(),
this.server.getPort());
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress =
this.server.getListenerAddress().getHostName() + ":" + this.server.getListenerAddress().getPort();
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, resolvedAddress);
}
conf.updateConnectAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
server.getListenerAddress());
super.start();
}

View File

@ -150,11 +150,8 @@ public class ClientRMService extends AbstractService implements
}
this.server.start();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress =
this.server.getListenerAddress().getHostName() + ":" + this.server.getListenerAddress().getPort();
conf.set(YarnConfiguration.RM_ADDRESS, resolvedAddress);
}
clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
server.getListenerAddress());
super.start();
}

View File

@ -133,11 +133,8 @@ public class ResourceTrackerService extends AbstractService implements
}
this.server.start();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress =
server.getListenerAddress().getHostName() + ":" + server.getListenerAddress().getPort();
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, resolvedAddress);
}
conf.updateConnectAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
server.getListenerAddress());
}
@Override

View File

@ -399,8 +399,7 @@ public class TestContainerManagerSecurity {
Token<ApplicationTokenIdentifier> appToken =
new Token<ApplicationTokenIdentifier>(appTokenIdentifier,
appTokenSecretManager);
appToken.setService(new Text(schedulerAddr.getHostName() + ":"
+ schedulerAddr.getPort()));
SecurityUtil.setTokenService(appToken, schedulerAddr);
currentUser.addToken(appToken);
SecurityUtil.setTokenService(appToken, schedulerAddr);