MAPREDUCE-2986. Fixed MiniYARNCluster to support multiple NodeManagers. Contributed by Anupam Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1189721 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-27 12:42:12 +00:00
parent 2fe343f963
commit f114dad5ef
9 changed files with 147 additions and 98 deletions

View File

@ -436,6 +436,9 @@ Release 0.23.0 - Unreleased
virtual, allowing for a ratio between the two to be configurable. (todd
via acmurthy)
MAPREDUCE-2986. Fixed MiniYARNCluster to support multiple NodeManagers.
(Anupam Seth via vinodkv)
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -58,7 +58,11 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
private JobHistoryServerWrapper historyServerWrapper;
public MiniMRYarnCluster(String testName) {
super(testName);
this(testName, 1);
}
public MiniMRYarnCluster(String testName, int noOfNMs) {
super(testName, noOfNMs);
//TODO: add the history server
historyServerWrapper = new JobHistoryServerWrapper();
addService(historyServerWrapper);
@ -80,7 +84,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
Service.class);
// Non-standard shuffle port
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 8083);
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class);

View File

@ -102,7 +102,7 @@ public class TestMRJobs {
}
if (mrCluster == null) {
mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName());
mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3);
Configuration conf = new Configuration();
mrCluster.init(conf);
mrCluster.start();
@ -322,7 +322,7 @@ public class TestMRJobs {
return job;
}
//@Test
//@Test
public void testSleepJobWithSecurityOn() throws IOException,
InterruptedException, ClassNotFoundException {

View File

@ -249,9 +249,14 @@ public class ShuffleHandler extends AbstractService
public synchronized void start() {
Configuration conf = getConfig();
ServerBootstrap bootstrap = new ServerBootstrap(selector);
bootstrap.setPipelineFactory(new HttpPipelineFactory(conf));
HttpPipelineFactory pipelineFact = new HttpPipelineFactory(conf);
bootstrap.setPipelineFactory(pipelineFact);
port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
accepted.add(bootstrap.bind(new InetSocketAddress(port)));
Channel ch = bootstrap.bind(new InetSocketAddress(port));
accepted.add(ch);
port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
pipelineFact.SHUFFLE.setPort(port);
LOG.info(getName() + " listening on port " + port);
super.start();
}
@ -304,7 +309,7 @@ public class ShuffleHandler extends AbstractService
private final IndexCache indexCache;
private final LocalDirAllocator lDirAlloc =
new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
private final int port;
private int port;
public Shuffle(Configuration conf) {
this.conf = conf;
@ -312,6 +317,10 @@ public class ShuffleHandler extends AbstractService
this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
}
public void setPort(int port) {
this.port = port;
}
private List<String> splitMaps(List<String> mapq) {
if (null == mapq) {
return null;

View File

@ -89,8 +89,9 @@ public class DefaultContainerExecutor extends ContainerExecutor {
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
lfs.util().copy(nmPrivateContainerTokensPath, tokenDst);
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
lfs.setWorkingDirectory(appStorageDir);
LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
// TODO: DO it over RPC for maintaining similarity?
localizer.runLocalization(nmAddr);
}

View File

@ -235,8 +235,15 @@ public class ResourceLocalizationService extends CompositeService
cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
server = createServer();
LOG.info("Localizer started on port " + server.getPort());
server.start();
String host = getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS)
.split(":")[0];
getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, host + ":"
+ server.getPort());
localizationServerAddress = NetUtils.createSocketAddr(
getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
LOG.info("Localizer started on port " + server.getPort());
super.start();
}

View File

@ -147,7 +147,7 @@ public class TestResourceLocalizationService {
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testResourceRelease() throws Exception {
Configuration conf = new Configuration();
Configuration conf = new YarnConfiguration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);
@ -331,7 +331,7 @@ public class TestResourceLocalizationService {
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception {
Configuration conf = new Configuration();
Configuration conf = new YarnConfiguration();
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf);

View File

@ -292,6 +292,7 @@ public class ApplicationMasterService extends AbstractService implements
public void registerAppAttempt(ApplicationAttemptId attemptId) {
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
response.setResponseId(0);
LOG.info("Registering " + attemptId);
responseMap.put(attemptId, response);
}

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service.STATE;
public class MiniYARNCluster extends CompositeService {
@ -60,15 +62,19 @@ public class MiniYARNCluster extends CompositeService {
DefaultMetricsSystem.setMiniClusterMode(true);
}
private NodeManager nodeManager;
private NodeManager[] nodeManagers;
private ResourceManager resourceManager;
private ResourceManagerWrapper resourceManagerWrapper;
private NodeManagerWrapper nodeManagerWrapper;
private File testWorkDir;
public MiniYARNCluster(String testName) {
//default number of nodeManagers = 1
this(testName, 1);
}
public MiniYARNCluster(String testName, int noOfNodeManagers) {
super(testName);
this.testWorkDir = new File("target", testName);
try {
@ -80,8 +86,11 @@ public class MiniYARNCluster extends CompositeService {
}
resourceManagerWrapper = new ResourceManagerWrapper();
addService(resourceManagerWrapper);
nodeManagerWrapper = new NodeManagerWrapper();
addService(nodeManagerWrapper);
nodeManagers = new CustomNodeManager[noOfNodeManagers];
for(int index = 0; index < noOfNodeManagers; index++) {
addService(new NodeManagerWrapper(index));
nodeManagers[index] = new CustomNodeManager();
}
}
public File getTestWorkDir() {
@ -92,8 +101,8 @@ public class MiniYARNCluster extends CompositeService {
return this.resourceManager;
}
public NodeManager getNodeManager() {
return this.nodeManager;
public NodeManager getNodeManager(int i) {
return this.nodeManagers[i];
}
private class ResourceManagerWrapper extends AbstractService {
@ -145,106 +154,60 @@ public class MiniYARNCluster extends CompositeService {
}
private class NodeManagerWrapper extends AbstractService {
public NodeManagerWrapper() {
super(NodeManagerWrapper.class.getName());
int index = 0;
public NodeManagerWrapper(int i) {
super(NodeManagerWrapper.class.getName() + "_" + i);
index = i;
}
public synchronized void init(Configuration conf) {
Configuration config = new Configuration(conf);
super.init(config);
}
public synchronized void start() {
try {
File localDir =
new File(testWorkDir, MiniYARNCluster.this.getName() + "-localDir");
File localDir = new File(testWorkDir, MiniYARNCluster.this.getName()
+ "-localDir-nm-" + index);
localDir.mkdir();
LOG.info("Created localDir in " + localDir.getAbsolutePath());
getConfig().set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
getConfig().set(YarnConfiguration.NM_LOCAL_DIRS,
localDir.getAbsolutePath());
File logDir =
new File(testWorkDir, MiniYARNCluster.this.getName()
+ "-logDir");
+ "-logDir-nm-" + index);
File remoteLogDir =
new File(testWorkDir, MiniYARNCluster.this.getName()
+ "-remoteLogDir");
new File(testWorkDir, MiniYARNCluster.this.getName()
+ "-remoteLogDir-nm-" + index);
logDir.mkdir();
remoteLogDir.mkdir();
LOG.info("Created logDir in " + logDir.getAbsolutePath());
getConfig().set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath());
getConfig().set(YarnConfiguration.NM_LOG_DIRS,
logDir.getAbsolutePath());
getConfig().set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogDir.getAbsolutePath());
getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024); // By default AM + 2 containers
nodeManager = new NodeManager() {
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcase.
};
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
ContainerTokenSecretManager containerTokenSecretManager) {
return new NodeStatusUpdaterImpl(context, dispatcher,
healthChecker, metrics, containerTokenSecretManager) {
@Override
protected ResourceTracker getRMClient() {
final ResourceTrackerService rt = resourceManager
.getResourceTrackerService();
final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
// For in-process communication without RPC
return new ResourceTracker() {
@Override
public NodeHeartbeatResponse nodeHeartbeat(
NodeHeartbeatRequest request) throws YarnRemoteException {
NodeHeartbeatResponse response = recordFactory.newRecordInstance(
NodeHeartbeatResponse.class);
try {
response.setHeartbeatResponse(rt.nodeHeartbeat(request)
.getHeartbeatResponse());
} catch (IOException ioe) {
LOG.info("Exception in heartbeat from node " +
request.getNodeStatus().getNodeId(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
return response;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request)
throws YarnRemoteException {
RegisterNodeManagerResponse response = recordFactory.newRecordInstance(
RegisterNodeManagerResponse.class);
try {
response.setRegistrationResponse(rt
.registerNodeManager(request)
.getRegistrationResponse());
} catch (IOException ioe) {
LOG.info("Exception in node registration from "
+ request.getNodeId().toString(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
return response;
}
};
};
};
};
};
nodeManager.init(getConfig());
remoteLogDir.getAbsolutePath());
// By default AM + 2 containers
getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024);
getConfig().set(YarnConfiguration.NM_ADDRESS, "0.0.0.0:0");
getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0");
getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS, "0.0.0.0:0");
LOG.info("Starting NM: " + index);
nodeManagers[index].init(getConfig());
new Thread() {
public void run() {
nodeManager.start();
nodeManagers[index].start();
};
}.start();
int waitCount = 0;
while (nodeManager.getServiceState() == STATE.INITED
while (nodeManagers[index].getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for NM to start...");
LOG.info("Waiting for NM " + index + " to start...");
Thread.sleep(1000);
}
if (nodeManager.getServiceState() != STATE.STARTED) {
if (nodeManagers[index].getServiceState() != STATE.STARTED) {
// RM could have failed.
throw new IOException("NodeManager failed to start");
throw new IOException("NodeManager " + index + " failed to start");
}
super.start();
} catch (Throwable t) {
@ -254,10 +217,71 @@ public class MiniYARNCluster extends CompositeService {
@Override
public synchronized void stop() {
if (nodeManager != null) {
nodeManager.stop();
if (nodeManagers[index] != null) {
nodeManagers[index].stop();
}
super.stop();
}
}
private class CustomNodeManager extends NodeManager {
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcase.
};
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
ContainerTokenSecretManager containerTokenSecretManager) {
return new NodeStatusUpdaterImpl(context, dispatcher,
healthChecker, metrics, containerTokenSecretManager) {
@Override
protected ResourceTracker getRMClient() {
final ResourceTrackerService rt = resourceManager
.getResourceTrackerService();
final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
// For in-process communication without RPC
return new ResourceTracker() {
@Override
public NodeHeartbeatResponse nodeHeartbeat(
NodeHeartbeatRequest request) throws YarnRemoteException {
NodeHeartbeatResponse response = recordFactory.newRecordInstance(
NodeHeartbeatResponse.class);
try {
response.setHeartbeatResponse(rt.nodeHeartbeat(request)
.getHeartbeatResponse());
} catch (IOException ioe) {
LOG.info("Exception in heartbeat from node " +
request.getNodeStatus().getNodeId(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
return response;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request)
throws YarnRemoteException {
RegisterNodeManagerResponse response = recordFactory.
newRecordInstance(RegisterNodeManagerResponse.class);
try {
response.setRegistrationResponse(rt
.registerNodeManager(request)
.getRegistrationResponse());
} catch (IOException ioe) {
LOG.info("Exception in node registration from "
+ request.getNodeId().toString(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
return response;
}
};
};
};
};
}
}