MAPREDUCE-2986. Fixed MiniYARNCluster to support multiple NodeManagers. Contributed by Anupam Seth.

svn merge -c r1189721 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1189723 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-27 12:44:57 +00:00
parent c5028eaa38
commit 28676b5345
9 changed files with 147 additions and 98 deletions

View File

@ -383,6 +383,9 @@ Release 0.23.0 - Unreleased
virtual, allowing for a ratio between the two to be configurable. (todd virtual, allowing for a ratio between the two to be configurable. (todd
via acmurthy) via acmurthy)
MAPREDUCE-2986. Fixed MiniYARNCluster to support multiple NodeManagers.
(Anupam Seth via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -58,7 +58,11 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
private JobHistoryServerWrapper historyServerWrapper; private JobHistoryServerWrapper historyServerWrapper;
public MiniMRYarnCluster(String testName) { public MiniMRYarnCluster(String testName) {
super(testName); this(testName, 1);
}
public MiniMRYarnCluster(String testName, int noOfNMs) {
super(testName, noOfNMs);
//TODO: add the history server //TODO: add the history server
historyServerWrapper = new JobHistoryServerWrapper(); historyServerWrapper = new JobHistoryServerWrapper();
addService(historyServerWrapper); addService(historyServerWrapper);
@ -80,7 +84,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
Service.class); Service.class);
// Non-standard shuffle port // Non-standard shuffle port
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 8083); conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class); DefaultContainerExecutor.class, ContainerExecutor.class);

View File

@ -102,7 +102,7 @@ public class TestMRJobs {
} }
if (mrCluster == null) { if (mrCluster == null) {
mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName()); mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
mrCluster.init(conf); mrCluster.init(conf);
mrCluster.start(); mrCluster.start();
@ -322,7 +322,7 @@ public class TestMRJobs {
return job; return job;
} }
//@Test //@Test
public void testSleepJobWithSecurityOn() throws IOException, public void testSleepJobWithSecurityOn() throws IOException,
InterruptedException, ClassNotFoundException { InterruptedException, ClassNotFoundException {

View File

@ -249,9 +249,14 @@ public class ShuffleHandler extends AbstractService
public synchronized void start() { public synchronized void start() {
Configuration conf = getConfig(); Configuration conf = getConfig();
ServerBootstrap bootstrap = new ServerBootstrap(selector); ServerBootstrap bootstrap = new ServerBootstrap(selector);
bootstrap.setPipelineFactory(new HttpPipelineFactory(conf)); HttpPipelineFactory pipelineFact = new HttpPipelineFactory(conf);
bootstrap.setPipelineFactory(pipelineFact);
port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
accepted.add(bootstrap.bind(new InetSocketAddress(port))); Channel ch = bootstrap.bind(new InetSocketAddress(port));
accepted.add(ch);
port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
pipelineFact.SHUFFLE.setPort(port);
LOG.info(getName() + " listening on port " + port); LOG.info(getName() + " listening on port " + port);
super.start(); super.start();
} }
@ -304,7 +309,7 @@ public class ShuffleHandler extends AbstractService
private final IndexCache indexCache; private final IndexCache indexCache;
private final LocalDirAllocator lDirAlloc = private final LocalDirAllocator lDirAlloc =
new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
private final int port; private int port;
public Shuffle(Configuration conf) { public Shuffle(Configuration conf) {
this.conf = conf; this.conf = conf;
@ -312,6 +317,10 @@ public class ShuffleHandler extends AbstractService
this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
} }
public void setPort(int port) {
this.port = port;
}
private List<String> splitMaps(List<String> mapq) { private List<String> splitMaps(List<String> mapq) {
if (null == mapq) { if (null == mapq) {
return null; return null;

View File

@ -89,8 +89,9 @@ public class DefaultContainerExecutor extends ContainerExecutor {
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn); Path tokenDst = new Path(appStorageDir, tokenFn);
lfs.util().copy(nmPrivateContainerTokensPath, tokenDst); lfs.util().copy(nmPrivateContainerTokensPath, tokenDst);
LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
lfs.setWorkingDirectory(appStorageDir); lfs.setWorkingDirectory(appStorageDir);
LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
// TODO: DO it over RPC for maintaining similarity? // TODO: DO it over RPC for maintaining similarity?
localizer.runLocalization(nmAddr); localizer.runLocalization(nmAddr);
} }

View File

@ -235,8 +235,15 @@ public class ResourceLocalizationService extends CompositeService
cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher), cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS); cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
server = createServer(); server = createServer();
LOG.info("Localizer started on port " + server.getPort());
server.start(); server.start();
String host = getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS)
.split(":")[0];
getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, host + ":"
+ server.getPort());
localizationServerAddress = NetUtils.createSocketAddr(
getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
LOG.info("Localizer started on port " + server.getPort());
super.start(); super.start();
} }

View File

@ -147,7 +147,7 @@ public class TestResourceLocalizationService {
@Test @Test
@SuppressWarnings("unchecked") // mocked generics @SuppressWarnings("unchecked") // mocked generics
public void testResourceRelease() throws Exception { public void testResourceRelease() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new YarnConfiguration();
AbstractFileSystem spylfs = AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf); final FileContext lfs = FileContext.getFileContext(spylfs, conf);
@ -331,7 +331,7 @@ public class TestResourceLocalizationService {
@Test @Test
@SuppressWarnings("unchecked") // mocked generics @SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception { public void testLocalizationHeartbeat() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new YarnConfiguration();
AbstractFileSystem spylfs = AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
final FileContext lfs = FileContext.getFileContext(spylfs, conf); final FileContext lfs = FileContext.getFileContext(spylfs, conf);

View File

@ -292,6 +292,7 @@ public class ApplicationMasterService extends AbstractService implements
public void registerAppAttempt(ApplicationAttemptId attemptId) { public void registerAppAttempt(ApplicationAttemptId attemptId) {
AMResponse response = recordFactory.newRecordInstance(AMResponse.class); AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
response.setResponseId(0); response.setResponseId(0);
LOG.info("Registering " + attemptId);
responseMap.put(attemptId, response); responseMap.put(attemptId, response);
} }

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService; import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service.STATE;
public class MiniYARNCluster extends CompositeService { public class MiniYARNCluster extends CompositeService {
@ -60,15 +62,19 @@ public class MiniYARNCluster extends CompositeService {
DefaultMetricsSystem.setMiniClusterMode(true); DefaultMetricsSystem.setMiniClusterMode(true);
} }
private NodeManager nodeManager; private NodeManager[] nodeManagers;
private ResourceManager resourceManager; private ResourceManager resourceManager;
private ResourceManagerWrapper resourceManagerWrapper; private ResourceManagerWrapper resourceManagerWrapper;
private NodeManagerWrapper nodeManagerWrapper;
private File testWorkDir; private File testWorkDir;
public MiniYARNCluster(String testName) { public MiniYARNCluster(String testName) {
//default number of nodeManagers = 1
this(testName, 1);
}
public MiniYARNCluster(String testName, int noOfNodeManagers) {
super(testName); super(testName);
this.testWorkDir = new File("target", testName); this.testWorkDir = new File("target", testName);
try { try {
@ -80,8 +86,11 @@ public class MiniYARNCluster extends CompositeService {
} }
resourceManagerWrapper = new ResourceManagerWrapper(); resourceManagerWrapper = new ResourceManagerWrapper();
addService(resourceManagerWrapper); addService(resourceManagerWrapper);
nodeManagerWrapper = new NodeManagerWrapper(); nodeManagers = new CustomNodeManager[noOfNodeManagers];
addService(nodeManagerWrapper); for(int index = 0; index < noOfNodeManagers; index++) {
addService(new NodeManagerWrapper(index));
nodeManagers[index] = new CustomNodeManager();
}
} }
public File getTestWorkDir() { public File getTestWorkDir() {
@ -92,8 +101,8 @@ public class MiniYARNCluster extends CompositeService {
return this.resourceManager; return this.resourceManager;
} }
public NodeManager getNodeManager() { public NodeManager getNodeManager(int i) {
return this.nodeManager; return this.nodeManagers[i];
} }
private class ResourceManagerWrapper extends AbstractService { private class ResourceManagerWrapper extends AbstractService {
@ -145,106 +154,60 @@ public class MiniYARNCluster extends CompositeService {
} }
private class NodeManagerWrapper extends AbstractService { private class NodeManagerWrapper extends AbstractService {
public NodeManagerWrapper() { int index = 0;
super(NodeManagerWrapper.class.getName());
public NodeManagerWrapper(int i) {
super(NodeManagerWrapper.class.getName() + "_" + i);
index = i;
}
public synchronized void init(Configuration conf) {
Configuration config = new Configuration(conf);
super.init(config);
} }
public synchronized void start() { public synchronized void start() {
try { try {
File localDir = File localDir = new File(testWorkDir, MiniYARNCluster.this.getName()
new File(testWorkDir, MiniYARNCluster.this.getName() + "-localDir"); + "-localDir-nm-" + index);
localDir.mkdir(); localDir.mkdir();
LOG.info("Created localDir in " + localDir.getAbsolutePath()); LOG.info("Created localDir in " + localDir.getAbsolutePath());
getConfig().set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath()); getConfig().set(YarnConfiguration.NM_LOCAL_DIRS,
localDir.getAbsolutePath());
File logDir = File logDir =
new File(testWorkDir, MiniYARNCluster.this.getName() new File(testWorkDir, MiniYARNCluster.this.getName()
+ "-logDir"); + "-logDir-nm-" + index);
File remoteLogDir = File remoteLogDir =
new File(testWorkDir, MiniYARNCluster.this.getName() new File(testWorkDir, MiniYARNCluster.this.getName()
+ "-remoteLogDir"); + "-remoteLogDir-nm-" + index);
logDir.mkdir(); logDir.mkdir();
remoteLogDir.mkdir(); remoteLogDir.mkdir();
LOG.info("Created logDir in " + logDir.getAbsolutePath()); LOG.info("Created logDir in " + logDir.getAbsolutePath());
getConfig().set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath()); getConfig().set(YarnConfiguration.NM_LOG_DIRS,
logDir.getAbsolutePath());
getConfig().set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, getConfig().set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogDir.getAbsolutePath()); remoteLogDir.getAbsolutePath());
getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024); // By default AM + 2 containers // By default AM + 2 containers
nodeManager = new NodeManager() { getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024);
getConfig().set(YarnConfiguration.NM_ADDRESS, "0.0.0.0:0");
@Override getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0");
protected void doSecureLogin() throws IOException { getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS, "0.0.0.0:0");
// Don't try to login using keytab in the testcase. LOG.info("Starting NM: " + index);
}; nodeManagers[index].init(getConfig());
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
ContainerTokenSecretManager containerTokenSecretManager) {
return new NodeStatusUpdaterImpl(context, dispatcher,
healthChecker, metrics, containerTokenSecretManager) {
@Override
protected ResourceTracker getRMClient() {
final ResourceTrackerService rt = resourceManager
.getResourceTrackerService();
final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
// For in-process communication without RPC
return new ResourceTracker() {
@Override
public NodeHeartbeatResponse nodeHeartbeat(
NodeHeartbeatRequest request) throws YarnRemoteException {
NodeHeartbeatResponse response = recordFactory.newRecordInstance(
NodeHeartbeatResponse.class);
try {
response.setHeartbeatResponse(rt.nodeHeartbeat(request)
.getHeartbeatResponse());
} catch (IOException ioe) {
LOG.info("Exception in heartbeat from node " +
request.getNodeStatus().getNodeId(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
return response;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request)
throws YarnRemoteException {
RegisterNodeManagerResponse response = recordFactory.newRecordInstance(
RegisterNodeManagerResponse.class);
try {
response.setRegistrationResponse(rt
.registerNodeManager(request)
.getRegistrationResponse());
} catch (IOException ioe) {
LOG.info("Exception in node registration from "
+ request.getNodeId().toString(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
return response;
}
};
};
};
};
};
nodeManager.init(getConfig());
new Thread() { new Thread() {
public void run() { public void run() {
nodeManager.start(); nodeManagers[index].start();
}; };
}.start(); }.start();
int waitCount = 0; int waitCount = 0;
while (nodeManager.getServiceState() == STATE.INITED while (nodeManagers[index].getServiceState() == STATE.INITED
&& waitCount++ < 60) { && waitCount++ < 60) {
LOG.info("Waiting for NM to start..."); LOG.info("Waiting for NM " + index + " to start...");
Thread.sleep(1000); Thread.sleep(1000);
} }
if (nodeManager.getServiceState() != STATE.STARTED) { if (nodeManagers[index].getServiceState() != STATE.STARTED) {
// RM could have failed. // RM could have failed.
throw new IOException("NodeManager failed to start"); throw new IOException("NodeManager " + index + " failed to start");
} }
super.start(); super.start();
} catch (Throwable t) { } catch (Throwable t) {
@ -254,10 +217,71 @@ public class MiniYARNCluster extends CompositeService {
@Override @Override
public synchronized void stop() { public synchronized void stop() {
if (nodeManager != null) { if (nodeManagers[index] != null) {
nodeManager.stop(); nodeManagers[index].stop();
} }
super.stop(); super.stop();
} }
} }
private class CustomNodeManager extends NodeManager {
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcase.
};
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
ContainerTokenSecretManager containerTokenSecretManager) {
return new NodeStatusUpdaterImpl(context, dispatcher,
healthChecker, metrics, containerTokenSecretManager) {
@Override
protected ResourceTracker getRMClient() {
final ResourceTrackerService rt = resourceManager
.getResourceTrackerService();
final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
// For in-process communication without RPC
return new ResourceTracker() {
@Override
public NodeHeartbeatResponse nodeHeartbeat(
NodeHeartbeatRequest request) throws YarnRemoteException {
NodeHeartbeatResponse response = recordFactory.newRecordInstance(
NodeHeartbeatResponse.class);
try {
response.setHeartbeatResponse(rt.nodeHeartbeat(request)
.getHeartbeatResponse());
} catch (IOException ioe) {
LOG.info("Exception in heartbeat from node " +
request.getNodeStatus().getNodeId(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
return response;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request)
throws YarnRemoteException {
RegisterNodeManagerResponse response = recordFactory.
newRecordInstance(RegisterNodeManagerResponse.class);
try {
response.setRegistrationResponse(rt
.registerNodeManager(request)
.getRegistrationResponse());
} catch (IOException ioe) {
LOG.info("Exception in node registration from "
+ request.getNodeId().toString(), ioe);
throw RPCUtil.getRemoteException(ioe);
}
return response;
}
};
};
};
};
}
} }