svn merge -c 1333579 FIXES: MAPREDUCE-4163. consistently set the bind address (Daryn Sharp via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1333580 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cd9063fa57
commit
c9aac0a1d1
|
@ -338,6 +338,8 @@ Release 0.23.3 - UNRELEASED
|
||||||
MAPREDUCE-4211. Error conditions (missing appid, appid not found) are
|
MAPREDUCE-4211. Error conditions (missing appid, appid not found) are
|
||||||
masked in the RM app page (Jonathan Eagles via bobby)
|
masked in the RM app page (Jonathan Eagles via bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-4163. consistently set the bind address (Daryn Sharp via bobby)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -127,10 +126,7 @@ public class TaskAttemptListenerImpl extends CompositeService
|
||||||
}
|
}
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
InetSocketAddress listenerAddress = server.getListenerAddress();
|
this.address = NetUtils.getConnectAddress(server);
|
||||||
listenerAddress.getAddress();
|
|
||||||
this.address = NetUtils.createSocketAddr(InetAddress.getLocalHost()
|
|
||||||
.getCanonicalHostName() + ":" + listenerAddress.getPort());
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new YarnException(e);
|
throw new YarnException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,9 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2.app.client;
|
package org.apache.hadoop.mapreduce.v2.app.client;
|
||||||
|
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.UnknownHostException;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
|
@ -78,7 +76,6 @@ import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
@ -116,13 +113,7 @@ public class MRClientService extends AbstractService
|
||||||
public void start() {
|
public void start() {
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
YarnRPC rpc = YarnRPC.create(conf);
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
InetSocketAddress address = NetUtils.createSocketAddr("0.0.0.0:0");
|
InetSocketAddress address = new InetSocketAddress(0);
|
||||||
InetAddress hostNameResolved = null;
|
|
||||||
try {
|
|
||||||
hostNameResolved = InetAddress.getLocalHost();
|
|
||||||
} catch (UnknownHostException e) {
|
|
||||||
throw new YarnException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
ClientToAMSecretManager secretManager = null;
|
ClientToAMSecretManager secretManager = null;
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
@ -150,9 +141,7 @@ public class MRClientService extends AbstractService
|
||||||
}
|
}
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
this.bindAddress =
|
this.bindAddress = NetUtils.getConnectAddress(server);
|
||||||
NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
|
|
||||||
+ ":" + server.getPort());
|
|
||||||
LOG.info("Instantiated MRClientService at " + this.bindAddress);
|
LOG.info("Instantiated MRClientService at " + this.bindAddress);
|
||||||
try {
|
try {
|
||||||
webApp = WebApps.$for("mapreduce", AppContext.class, appContext, "ws").with(conf).
|
webApp = WebApps.$for("mapreduce", AppContext.class, appContext, "ws").with(conf).
|
||||||
|
|
|
@ -356,7 +356,7 @@ public class TestContainerLauncher {
|
||||||
// make proxy connect to our local containerManager server
|
// make proxy connect to our local containerManager server
|
||||||
ContainerManager proxy = (ContainerManager) rpc.getProxy(
|
ContainerManager proxy = (ContainerManager) rpc.getProxy(
|
||||||
ContainerManager.class,
|
ContainerManager.class,
|
||||||
NetUtils.createSocketAddr("localhost:" + server.getPort()), conf);
|
NetUtils.getConnectAddress(server), conf);
|
||||||
return proxy;
|
return proxy;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -19,9 +19,7 @@
|
||||||
package org.apache.hadoop.mapreduce.v2.hs;
|
package org.apache.hadoop.mapreduce.v2.hs;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.UnknownHostException;
|
|
||||||
import java.security.AccessControlException;
|
import java.security.AccessControlException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -76,7 +74,6 @@ import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
|
||||||
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
|
@ -117,17 +114,10 @@ public class HistoryClientService extends AbstractService {
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
YarnRPC rpc = YarnRPC.create(conf);
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
initializeWebApp(conf);
|
initializeWebApp(conf);
|
||||||
String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS,
|
InetSocketAddress address = conf.getSocketAddr(
|
||||||
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS);
|
JHAdminConfig.MR_HISTORY_ADDRESS,
|
||||||
InetSocketAddress address = NetUtils.createSocketAddr(serviceAddr,
|
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
|
||||||
JHAdminConfig.DEFAULT_MR_HISTORY_PORT,
|
JHAdminConfig.DEFAULT_MR_HISTORY_PORT);
|
||||||
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS);
|
|
||||||
InetAddress hostNameResolved = null;
|
|
||||||
try {
|
|
||||||
hostNameResolved = InetAddress.getLocalHost();
|
|
||||||
} catch (UnknownHostException e) {
|
|
||||||
throw new YarnException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
server =
|
server =
|
||||||
rpc.getServer(HSClientProtocol.class, protocolHandler, address,
|
rpc.getServer(HSClientProtocol.class, protocolHandler, address,
|
||||||
|
@ -143,31 +133,24 @@ public class HistoryClientService extends AbstractService {
|
||||||
}
|
}
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
this.bindAddress =
|
this.bindAddress = conf.updateConnectAddr(JHAdminConfig.MR_HISTORY_ADDRESS,
|
||||||
NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
|
server.getListenerAddress());
|
||||||
+ ":" + server.getPort());
|
|
||||||
LOG.info("Instantiated MRClientService at " + this.bindAddress);
|
LOG.info("Instantiated MRClientService at " + this.bindAddress);
|
||||||
|
|
||||||
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
|
|
||||||
String resolvedAddress = bindAddress.getHostName() + ":" + bindAddress.getPort();
|
|
||||||
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, resolvedAddress);
|
|
||||||
|
|
||||||
String hostname = getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
|
|
||||||
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS);
|
|
||||||
hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname;
|
|
||||||
int port = webApp.port();
|
|
||||||
resolvedAddress = hostname + ":" + port;
|
|
||||||
conf.set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, resolvedAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initializeWebApp(Configuration conf) {
|
private void initializeWebApp(Configuration conf) {
|
||||||
webApp = new HsWebApp(history);
|
webApp = new HsWebApp(history);
|
||||||
String bindAddress = conf.get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
|
InetSocketAddress bindAddress = conf.getSocketAddr(
|
||||||
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS);
|
JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
|
||||||
WebApps.$for("jobhistory", HistoryClientService.class, this, "ws").with(conf).at(bindAddress).start(webApp);
|
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS,
|
||||||
|
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_PORT);
|
||||||
|
// NOTE: there should be a .at(InetSocketAddress)
|
||||||
|
WebApps.$for("jobhistory", HistoryClientService.class, this, "ws")
|
||||||
|
.with(conf).at(NetUtils.getHostPortString(bindAddress)).start(webApp);
|
||||||
|
conf.updateConnectAddr(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
|
||||||
|
webApp.getListenerAddress());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -390,9 +390,7 @@ public class TestClientRedirect {
|
||||||
rpc.getServer(protocol, this, address,
|
rpc.getServer(protocol, this, address,
|
||||||
conf, null, 1);
|
conf, null, 1);
|
||||||
server.start();
|
server.start();
|
||||||
this.bindAddress =
|
this.bindAddress = NetUtils.getConnectAddress(server);
|
||||||
NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
|
|
||||||
+ ":" + server.getPort());
|
|
||||||
super.start();
|
super.start();
|
||||||
amRunning = true;
|
amRunning = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,7 +85,7 @@ public class TestContainerLaunchRPC {
|
||||||
|
|
||||||
ContainerManager proxy = (ContainerManager) rpc.getProxy(
|
ContainerManager proxy = (ContainerManager) rpc.getProxy(
|
||||||
ContainerManager.class,
|
ContainerManager.class,
|
||||||
NetUtils.createSocketAddr("localhost:" + server.getPort()), conf);
|
server.getListenerAddress(), conf);
|
||||||
ContainerLaunchContext containerLaunchContext = recordFactory
|
ContainerLaunchContext containerLaunchContext = recordFactory
|
||||||
.newRecordInstance(ContainerLaunchContext.class);
|
.newRecordInstance(ContainerLaunchContext.class);
|
||||||
containerLaunchContext.setUser("dummy-user");
|
containerLaunchContext.setUser("dummy-user");
|
||||||
|
|
|
@ -75,8 +75,7 @@ public class TestRPC {
|
||||||
|
|
||||||
// Any unrelated protocol would do
|
// Any unrelated protocol would do
|
||||||
ClientRMProtocol proxy = (ClientRMProtocol) rpc.getProxy(
|
ClientRMProtocol proxy = (ClientRMProtocol) rpc.getProxy(
|
||||||
ClientRMProtocol.class, NetUtils.createSocketAddr("localhost:"
|
ClientRMProtocol.class, NetUtils.getConnectAddress(server), conf);
|
||||||
+ server.getPort()), conf);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
proxy.getNewApplication(Records
|
proxy.getNewApplication(Records
|
||||||
|
@ -109,7 +108,7 @@ public class TestRPC {
|
||||||
RPC.setProtocolEngine(conf, ContainerManagerPB.class, ProtobufRpcEngine.class);
|
RPC.setProtocolEngine(conf, ContainerManagerPB.class, ProtobufRpcEngine.class);
|
||||||
ContainerManager proxy = (ContainerManager)
|
ContainerManager proxy = (ContainerManager)
|
||||||
rpc.getProxy(ContainerManager.class,
|
rpc.getProxy(ContainerManager.class,
|
||||||
NetUtils.createSocketAddr("localhost:" + server.getPort()), conf);
|
NetUtils.getConnectAddress(server), conf);
|
||||||
ContainerLaunchContext containerLaunchContext =
|
ContainerLaunchContext containerLaunchContext =
|
||||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
containerLaunchContext.setUser("dummy-user");
|
containerLaunchContext.setUser("dummy-user");
|
||||||
|
|
|
@ -23,7 +23,6 @@ import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.UnknownHostException;
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -33,13 +32,13 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
|
||||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
|
||||||
|
@ -244,15 +243,10 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
}
|
}
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
try {
|
InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
|
||||||
resolvedAddress = InetAddress.getLocalHost();
|
this.context.getNodeId().setHost(connectAddress.getHostName());
|
||||||
} catch (UnknownHostException e) {
|
this.context.getNodeId().setPort(connectAddress.getPort());
|
||||||
throw new YarnException(e);
|
LOG.info("ContainerManager started at " + connectAddress);
|
||||||
}
|
|
||||||
this.context.getNodeId().setHost(resolvedAddress.getCanonicalHostName());
|
|
||||||
this.context.getNodeId().setPort(server.getPort());
|
|
||||||
LOG.info("ContainerManager started at "
|
|
||||||
+ this.context.getNodeId().toString());
|
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,7 +69,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
@ -228,14 +227,9 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
|
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
|
||||||
server = createServer();
|
server = createServer();
|
||||||
server.start();
|
server.start();
|
||||||
String host = getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS)
|
localizationServerAddress =
|
||||||
.split(":")[0];
|
getConfig().updateConnectAddr(YarnConfiguration.NM_LOCALIZER_ADDRESS,
|
||||||
getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, host + ":"
|
server.getListenerAddress());
|
||||||
+ server.getPort());
|
|
||||||
localizationServerAddress = getConfig().getSocketAddr(
|
|
||||||
YarnConfiguration.NM_LOCALIZER_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
|
|
||||||
LOG.info("Localizer started on port " + server.getPort());
|
LOG.info("Localizer started on port " + server.getPort());
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.net.UnknownHostException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -88,7 +86,7 @@ public class TestNodeStatusUpdater {
|
||||||
.getRecordFactory(null);
|
.getRecordFactory(null);
|
||||||
|
|
||||||
int heartBeatID = 0;
|
int heartBeatID = 0;
|
||||||
volatile Error nmStartError = null;
|
volatile Throwable nmStartError = null;
|
||||||
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
|
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
|
||||||
private final Configuration conf = new YarnConfiguration();
|
private final Configuration conf = new YarnConfiguration();
|
||||||
private NodeManager nm;
|
private NodeManager nm;
|
||||||
|
@ -118,12 +116,8 @@ public class TestNodeStatusUpdater {
|
||||||
NodeId nodeId = request.getNodeId();
|
NodeId nodeId = request.getNodeId();
|
||||||
Resource resource = request.getResource();
|
Resource resource = request.getResource();
|
||||||
LOG.info("Registering " + nodeId.toString());
|
LOG.info("Registering " + nodeId.toString());
|
||||||
try {
|
// NOTE: this really should be checking against the config value
|
||||||
Assert.assertEquals(InetAddress.getLocalHost().getCanonicalHostName()
|
Assert.assertEquals("localhost:12345", nodeId.toString());
|
||||||
+ ":12345", nodeId.toString());
|
|
||||||
} catch (UnknownHostException e) {
|
|
||||||
Assert.fail(e.getMessage());
|
|
||||||
}
|
|
||||||
Assert.assertEquals(5 * 1024, resource.getMemory());
|
Assert.assertEquals(5 * 1024, resource.getMemory());
|
||||||
registeredNodes.add(nodeId);
|
registeredNodes.add(nodeId);
|
||||||
RegistrationResponse regResponse = recordFactory
|
RegistrationResponse regResponse = recordFactory
|
||||||
|
@ -421,8 +415,9 @@ public class TestNodeStatusUpdater {
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
nm.start();
|
nm.start();
|
||||||
} catch (Error e) {
|
} catch (Throwable e) {
|
||||||
TestNodeStatusUpdater.this.nmStartError = e;
|
TestNodeStatusUpdater.this.nmStartError = e;
|
||||||
|
throw new YarnException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}.start();
|
}.start();
|
||||||
|
@ -433,11 +428,11 @@ public class TestNodeStatusUpdater {
|
||||||
int waitCount = 0;
|
int waitCount = 0;
|
||||||
while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) {
|
while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) {
|
||||||
LOG.info("Waiting for NM to start..");
|
LOG.info("Waiting for NM to start..");
|
||||||
|
if (nmStartError != null) {
|
||||||
|
Assert.fail(nmStartError.getCause().getMessage());
|
||||||
|
}
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
if (nmStartError != null) {
|
|
||||||
throw nmStartError;
|
|
||||||
}
|
|
||||||
if (nm.getServiceState() != STATE.STARTED) {
|
if (nm.getServiceState() != STATE.STARTED) {
|
||||||
// NM could have failed.
|
// NM could have failed.
|
||||||
Assert.fail("NodeManager failed to start");
|
Assert.fail("NodeManager failed to start");
|
||||||
|
|
|
@ -95,6 +95,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.ArgumentMatcher;
|
import org.mockito.ArgumentMatcher;
|
||||||
|
@ -103,6 +104,13 @@ public class TestResourceLocalizationService {
|
||||||
|
|
||||||
static final Path basedir =
|
static final Path basedir =
|
||||||
new Path("target", TestResourceLocalizationService.class.getName());
|
new Path("target", TestResourceLocalizationService.class.getName());
|
||||||
|
static Server mockServer;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() {
|
||||||
|
mockServer = mock(Server.class);
|
||||||
|
doReturn(new InetSocketAddress(123)).when(mockServer).getListenerAddress();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLocalizationInit() throws Exception {
|
public void testLocalizationInit() throws Exception {
|
||||||
|
@ -178,7 +186,6 @@ public class TestResourceLocalizationService {
|
||||||
}
|
}
|
||||||
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
|
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
|
||||||
|
|
||||||
Server ignore = mock(Server.class);
|
|
||||||
LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
|
LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
|
||||||
DrainDispatcher dispatcher = new DrainDispatcher();
|
DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
dispatcher.init(conf);
|
dispatcher.init(conf);
|
||||||
|
@ -203,7 +210,7 @@ public class TestResourceLocalizationService {
|
||||||
new ResourceLocalizationService(dispatcher, exec, delService,
|
new ResourceLocalizationService(dispatcher, exec, delService,
|
||||||
dirsHandler);
|
dirsHandler);
|
||||||
ResourceLocalizationService spyService = spy(rawService);
|
ResourceLocalizationService spyService = spy(rawService);
|
||||||
doReturn(ignore).when(spyService).createServer();
|
doReturn(mockServer).when(spyService).createServer();
|
||||||
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
|
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
|
||||||
isA(Configuration.class));
|
isA(Configuration.class));
|
||||||
doReturn(lfs).when(spyService)
|
doReturn(lfs).when(spyService)
|
||||||
|
@ -366,7 +373,6 @@ public class TestResourceLocalizationService {
|
||||||
}
|
}
|
||||||
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
|
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
|
||||||
|
|
||||||
Server ignore = mock(Server.class);
|
|
||||||
DrainDispatcher dispatcher = new DrainDispatcher();
|
DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
dispatcher.init(conf);
|
dispatcher.init(conf);
|
||||||
dispatcher.start();
|
dispatcher.start();
|
||||||
|
@ -388,7 +394,7 @@ public class TestResourceLocalizationService {
|
||||||
new ResourceLocalizationService(dispatcher, exec, delService,
|
new ResourceLocalizationService(dispatcher, exec, delService,
|
||||||
dirsHandler);
|
dirsHandler);
|
||||||
ResourceLocalizationService spyService = spy(rawService);
|
ResourceLocalizationService spyService = spy(rawService);
|
||||||
doReturn(ignore).when(spyService).createServer();
|
doReturn(mockServer).when(spyService).createServer();
|
||||||
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
|
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
|
||||||
try {
|
try {
|
||||||
spyService.init(conf);
|
spyService.init(conf);
|
||||||
|
|
|
@ -120,11 +120,8 @@ public class AdminService extends AbstractService implements RMAdminProtocol {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.server.start();
|
this.server.start();
|
||||||
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
|
conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||||
String resolvedAddress =
|
server.getListenerAddress());
|
||||||
this.server.getListenerAddress().getHostName() + ":" + this.server.getListenerAddress().getPort();
|
|
||||||
conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, resolvedAddress);
|
|
||||||
}
|
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
@ -120,13 +119,8 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
|
|
||||||
this.server.start();
|
this.server.start();
|
||||||
this.bindAddress =
|
this.bindAddress =
|
||||||
NetUtils.createSocketAddr(masterServiceAddress.getHostName(),
|
conf.updateConnectAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||||
this.server.getPort());
|
server.getListenerAddress());
|
||||||
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
|
|
||||||
String resolvedAddress =
|
|
||||||
this.server.getListenerAddress().getHostName() + ":" + this.server.getListenerAddress().getPort();
|
|
||||||
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, resolvedAddress);
|
|
||||||
}
|
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -150,11 +150,8 @@ public class ClientRMService extends AbstractService implements
|
||||||
}
|
}
|
||||||
|
|
||||||
this.server.start();
|
this.server.start();
|
||||||
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
|
clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
|
||||||
String resolvedAddress =
|
server.getListenerAddress());
|
||||||
this.server.getListenerAddress().getHostName() + ":" + this.server.getListenerAddress().getPort();
|
|
||||||
conf.set(YarnConfiguration.RM_ADDRESS, resolvedAddress);
|
|
||||||
}
|
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -133,11 +133,8 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
}
|
}
|
||||||
|
|
||||||
this.server.start();
|
this.server.start();
|
||||||
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
|
conf.updateConnectAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
||||||
String resolvedAddress =
|
server.getListenerAddress());
|
||||||
server.getListenerAddress().getHostName() + ":" + server.getListenerAddress().getPort();
|
|
||||||
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, resolvedAddress);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -399,8 +399,7 @@ public class TestContainerManagerSecurity {
|
||||||
Token<ApplicationTokenIdentifier> appToken =
|
Token<ApplicationTokenIdentifier> appToken =
|
||||||
new Token<ApplicationTokenIdentifier>(appTokenIdentifier,
|
new Token<ApplicationTokenIdentifier>(appTokenIdentifier,
|
||||||
appTokenSecretManager);
|
appTokenSecretManager);
|
||||||
appToken.setService(new Text(schedulerAddr.getHostName() + ":"
|
SecurityUtil.setTokenService(appToken, schedulerAddr);
|
||||||
+ schedulerAddr.getPort()));
|
|
||||||
currentUser.addToken(appToken);
|
currentUser.addToken(appToken);
|
||||||
SecurityUtil.setTokenService(appToken, schedulerAddr);
|
SecurityUtil.setTokenService(appToken, schedulerAddr);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue