Merge -r 1328108:1328109 from trunk to branch. FIXES: MAPREDUCE-3867

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1328110 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2012-04-19 20:34:49 +00:00
parent 8f4c3499f5
commit 479e5b0102
20 changed files with 213 additions and 62 deletions

View File

@ -146,6 +146,8 @@ Release 2.0.0 - UNRELEASED
MAPREDUCE-4008. ResourceManager throws MetricsException on start up MAPREDUCE-4008. ResourceManager throws MetricsException on start up
saying QueueMetrics MBean already exists (Devaraj K via tgraves) saying QueueMetrics MBean already exists (Devaraj K via tgraves)
MAPREDUCE-3867. MiniMRYarn/MiniYarn uses fixed ports (tucu)
Release 0.23.3 - UNRELEASED Release 0.23.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -78,6 +78,7 @@
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -147,6 +148,18 @@ public void start() {
+ ":" + server.getPort()); + ":" + server.getPort());
LOG.info("Instantiated MRClientService at " + this.bindAddress); LOG.info("Instantiated MRClientService at " + this.bindAddress);
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress = bindAddress.getHostName() + ":" + bindAddress.getPort();
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, resolvedAddress);
String hostname = getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS);
hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname;
int port = webApp.port();
resolvedAddress = hostname + ":" + port;
conf.set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, resolvedAddress);
}
super.start(); super.start();
} }

View File

@ -83,7 +83,9 @@ public void testSocketFactory() throws IOException {
JobConf jobConf = new JobConf(); JobConf jobConf = new JobConf();
FileSystem.setDefaultUri(jobConf, fs.getUri().toString()); FileSystem.setDefaultUri(jobConf, fs.getUri().toString());
miniMRYarnCluster = initAndStartMiniMRYarnCluster(jobConf); miniMRYarnCluster = initAndStartMiniMRYarnCluster(jobConf);
JobConf jconf = new JobConf(cconf); JobConf jconf = new JobConf(miniMRYarnCluster.getConfig());
jconf.set("hadoop.rpc.socket.factory.class.default",
"org.apache.hadoop.ipc.DummySocketFactory");
jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
String rmAddress = jconf.get("yarn.resourcemanager.address"); String rmAddress = jconf.get("yarn.resourcemanager.address");
String[] split = rmAddress.split(":"); String[] split = rmAddress.split(":");

View File

@ -173,7 +173,7 @@ public void testClassPath() throws IOException {
fileSys = dfs.getFileSystem(); fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString(); namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(taskTrackers, namenode, 3); mr = new MiniMRCluster(taskTrackers, namenode, 3);
JobConf jobConf = new JobConf(); JobConf jobConf = mr.createJobConf();
String result; String result;
result = launchWordCount(fileSys.getUri(), jobConf, result = launchWordCount(fileSys.getUri(), jobConf,
"The quick brown fox\nhas many silly\n" + "red fox sox\n", 3, 1); "The quick brown fox\nhas many silly\n" + "red fox sox\n", 3, 1);
@ -205,7 +205,7 @@ public void testExternalWritable()
fileSys = dfs.getFileSystem(); fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString(); namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(taskTrackers, namenode, 3); mr = new MiniMRCluster(taskTrackers, namenode, 3);
JobConf jobConf = new JobConf(); JobConf jobConf = mr.createJobConf();
String result; String result;
result = launchExternal(fileSys.getUri(), jobConf, result = launchExternal(fileSys.getUri(), jobConf,

View File

@ -20,6 +20,8 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -126,6 +128,10 @@ public JobHistoryServerWrapper() {
@Override @Override
public synchronized void start() { public synchronized void start() {
try { try {
getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
historyServer = new JobHistoryServer(); historyServer = new JobHistoryServer();
historyServer.init(getConfig()); historyServer.init(getConfig());
new Thread() { new Thread() {
@ -145,6 +151,20 @@ public void run() {
} catch (Throwable t) { } catch (Throwable t) {
throw new YarnException(t); throw new YarnException(t);
} }
//need to do this because historyServer.init creates a new Configuration
getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
LOG.info("MiniMRYARN ResourceManager address: " +
getConfig().get(YarnConfiguration.RM_ADDRESS));
LOG.info("MiniMRYARN ResourceManager web address: " +
getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
LOG.info("MiniMRYARN HistoryServer address: " +
getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
LOG.info("MiniMRYARN HistoryServer web address: " +
getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
} }
@Override @Override

View File

@ -197,12 +197,18 @@ public static void main(String[] args) {
/** /**
*/ */
public Client() throws Exception { public Client(Configuration conf) throws Exception {
// Set up the configuration and RPC // Set up the configuration and RPC
conf = new Configuration(); this.conf = conf;
rpc = YarnRPC.create(conf); rpc = YarnRPC.create(conf);
} }
/**
*/
public Client() throws Exception {
this(new Configuration());
}
/** /**
* Helper function to print out usage * Helper function to print out usage
* @param opts Parsed command line options * @param opts Parsed command line options

View File

@ -18,11 +18,17 @@
package org.apache.hadoop.yarn.applications.distributedshell; package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -48,6 +54,14 @@ public static void setup() throws InterruptedException, IOException {
1, 1, 1); 1, 1, 1);
yarnCluster.init(conf); yarnCluster.init(conf);
yarnCluster.start(); yarnCluster.start();
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
if (url == null) {
throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
}
yarnCluster.getConfig().set("yarn.application.classpath", new File(url.getPath()).getParent());
OutputStream os = new FileOutputStream(new File(url.getPath()));
yarnCluster.getConfig().writeXml(os);
os.close();
} }
try { try {
Thread.sleep(2000); Thread.sleep(2000);
@ -81,14 +95,14 @@ public void testDSShell() throws Exception {
}; };
LOG.info("Initializing DS Client"); LOG.info("Initializing DS Client");
Client client = new Client(); Client client = new Client(new Configuration(yarnCluster.getConfig()));
boolean initSuccess = client.init(args); boolean initSuccess = client.init(args);
assert(initSuccess); Assert.assertTrue(initSuccess);
LOG.info("Running DS Client"); LOG.info("Running DS Client");
boolean result = client.run(); boolean result = client.run();
LOG.info("Client run completed. Result=" + result); LOG.info("Client run completed. Result=" + result);
assert (result == true); Assert.assertTrue(result);
} }

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<configuration>
<!-- Dummy (invalid) config file to be overwriten by TestDistributedShell with MiniCluster configuration. -->
</configuration>

View File

@ -538,6 +538,8 @@ public class YarnConfiguration extends Configuration {
/** Container temp directory */ /** Container temp directory */
public static final String DEFAULT_CONTAINER_TEMP_DIR = "./tmp"; public static final String DEFAULT_CONTAINER_TEMP_DIR = "./tmp";
public static final String IS_MINI_YARN_CLUSTER = YARN_PREFIX + ".is.minicluster";
public YarnConfiguration() { public YarnConfiguration() {
super(); super();
} }

View File

@ -120,6 +120,11 @@ public void start() {
} }
this.server.start(); this.server.start();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress =
this.server.getListenerAddress().getHostName() + ":" + this.server.getListenerAddress().getPort();
conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, resolvedAddress);
}
super.start(); super.start();
} }

View File

@ -119,11 +119,14 @@ public void start() {
} }
this.server.start(); this.server.start();
this.bindAddress = this.bindAddress =
NetUtils.createSocketAddr(masterServiceAddress.getHostName(), NetUtils.createSocketAddr(masterServiceAddress.getHostName(),
this.server.getPort()); this.server.getPort());
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress =
this.server.getListenerAddress().getHostName() + ":" + this.server.getListenerAddress().getPort();
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, resolvedAddress);
}
super.start(); super.start();
} }

View File

@ -150,6 +150,11 @@ public void start() {
} }
this.server.start(); this.server.start();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress =
this.server.getListenerAddress().getHostName() + ":" + this.server.getListenerAddress().getPort();
conf.set(YarnConfiguration.RM_ADDRESS, resolvedAddress);
}
super.start(); super.start();
} }

View File

@ -20,6 +20,7 @@
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -476,6 +477,15 @@ public void start() {
throw new YarnException("Failed to start secret manager threads", ie); throw new YarnException("Failed to start secret manager threads", ie);
} }
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String hostname = getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname;
int port = webApp.port();
String resolvedAddress = hostname + ":" + port;
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress);
}
super.start(); super.start();
/*synchronized(shutdown) { /*synchronized(shutdown) {

View File

@ -133,6 +133,11 @@ public synchronized void start() {
} }
this.server.start(); this.server.start();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress =
server.getListenerAddress().getHostName() + ":" + server.getListenerAddress().getPort();
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, resolvedAddress);
}
} }
@Override @Override

View File

@ -20,6 +20,9 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -114,6 +117,15 @@ public NodeManager getNodeManager(int i) {
return this.nodeManagers[i]; return this.nodeManagers[i];
} }
public static String getHostname() {
try {
return InetAddress.getLocalHost().getHostName();
}
catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
private class ResourceManagerWrapper extends AbstractService { private class ResourceManagerWrapper extends AbstractService {
public ResourceManagerWrapper() { public ResourceManagerWrapper() {
super(ResourceManagerWrapper.class.getName()); super(ResourceManagerWrapper.class.getName());
@ -122,6 +134,19 @@ public ResourceManagerWrapper() {
@Override @Override
public synchronized void start() { public synchronized void start() {
try { try {
getConfig().setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
getConfig().set(YarnConfiguration.RM_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_ADMIN_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
Store store = StoreFactory.getStore(getConfig()); Store store = StoreFactory.getStore(getConfig());
resourceManager = new ResourceManager(store) { resourceManager = new ResourceManager(store) {
@Override @Override
@ -151,6 +176,10 @@ public void run() {
} catch (Throwable t) { } catch (Throwable t) {
throw new YarnException(t); throw new YarnException(t);
} }
LOG.info("MiniYARN ResourceManager address: " +
getConfig().get(YarnConfiguration.RM_ADDRESS));
LOG.info("MiniYARN ResourceManager web address: " +
getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
} }
@Override @Override
@ -212,9 +241,12 @@ public synchronized void start() {
remoteLogDir.getAbsolutePath()); remoteLogDir.getAbsolutePath());
// By default AM + 2 containers // By default AM + 2 containers
getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024); getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024);
getConfig().set(YarnConfiguration.NM_ADDRESS, "0.0.0.0:0"); getConfig().set(YarnConfiguration.NM_ADDRESS,
getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0"); MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS, "0.0.0.0:0"); getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
LOG.info("Starting NM: " + index); LOG.info("Starting NM: " + index);
nodeManagers[index].init(getConfig()); nodeManagers[index].init(getConfig());
new Thread() { new Thread() {

View File

@ -20,12 +20,13 @@
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -38,8 +39,6 @@ public class TestFileArgs extends TestStreaming
private MiniDFSCluster dfs = null; private MiniDFSCluster dfs = null;
private MiniMRCluster mr = null; private MiniMRCluster mr = null;
private FileSystem fileSys = null; private FileSystem fileSys = null;
private String strJobTracker = null;
private String strNamenode = null;
private String namenode = null; private String namenode = null;
private Configuration conf = null; private Configuration conf = null;
@ -56,8 +55,6 @@ public TestFileArgs() throws IOException
fileSys = dfs.getFileSystem(); fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().getAuthority(); namenode = fileSys.getUri().getAuthority();
mr = new MiniMRCluster(1, namenode, 1); mr = new MiniMRCluster(1, namenode, 1);
strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS);
strNamenode = "fs.default.name=" + mr.createJobConf().get("fs.default.name");
map = LS_PATH; map = LS_PATH;
FileSystem.setDefaultUri(conf, "hdfs://" + namenode); FileSystem.setDefaultUri(conf, "hdfs://" + namenode);
@ -100,18 +97,16 @@ protected Configuration getConf() {
@Override @Override
protected String[] genArgs() { protected String[] genArgs() {
for (Map.Entry<String, String> entry : mr.createJobConf()) {
args.add("-jobconf");
args.add(entry.getKey() + "=" + entry.getValue());
}
args.add("-file"); args.add("-file");
args.add(new java.io.File("target/sidefile").getAbsolutePath()); args.add(new java.io.File("target/sidefile").getAbsolutePath());
args.add("-numReduceTasks"); args.add("-numReduceTasks");
args.add("0"); args.add("0");
args.add("-jobconf"); args.add("-jobconf");
args.add(strNamenode);
args.add("-jobconf");
args.add(strJobTracker);
args.add("-jobconf");
args.add("mapred.jar=" + STREAMING_JAR); args.add("mapred.jar=" + STREAMING_JAR);
args.add("-jobconf");
args.add("mapreduce.framework.name=yarn");
args.add("-verbose"); args.add("-verbose");
return super.genArgs(); return super.genArgs();
} }

View File

@ -19,14 +19,10 @@
package org.apache.hadoop.streaming; package org.apache.hadoop.streaming;
import java.io.File; import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.InputStreamReader; import java.util.Map;
import java.io.BufferedReader;
import java.util.Arrays;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -37,12 +33,7 @@
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import static org.junit.Assert.*;
/** /**
* This class tests cacheArchive option of streaming * This class tests cacheArchive option of streaming
@ -66,8 +57,6 @@ public class TestMultipleArchiveFiles extends TestStreaming
private MiniDFSCluster dfs = null; private MiniDFSCluster dfs = null;
private MiniMRCluster mr = null; private MiniMRCluster mr = null;
private FileSystem fileSys = null; private FileSystem fileSys = null;
private String strJobTracker = null;
private String strNamenode = null;
private String namenode = null; private String namenode = null;
public TestMultipleArchiveFiles() throws Exception { public TestMultipleArchiveFiles() throws Exception {
@ -80,8 +69,6 @@ public TestMultipleArchiveFiles() throws Exception {
fileSys = dfs.getFileSystem(); fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().getAuthority(); namenode = fileSys.getUri().getAuthority();
mr = new MiniMRCluster(1, namenode, 1); mr = new MiniMRCluster(1, namenode, 1);
strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS);
strNamenode = "fs.default.name=" + mr.createJobConf().get("fs.default.name");
map = "xargs cat"; map = "xargs cat";
reduce = "cat"; reduce = "cat";
@ -123,6 +110,10 @@ protected String[] genArgs() {
String cache1 = workDir + CACHE_ARCHIVE_1 + "#symlink1"; String cache1 = workDir + CACHE_ARCHIVE_1 + "#symlink1";
String cache2 = workDir + CACHE_ARCHIVE_2 + "#symlink2"; String cache2 = workDir + CACHE_ARCHIVE_2 + "#symlink2";
for (Map.Entry<String, String> entry : mr.createJobConf()) {
args.add("-jobconf");
args.add(entry.getKey() + "=" + entry.getValue());
}
args.add("-jobconf"); args.add("-jobconf");
args.add("mapreduce.job.reduces=1"); args.add("mapreduce.job.reduces=1");
args.add("-cacheArchive"); args.add("-cacheArchive");
@ -130,13 +121,7 @@ protected String[] genArgs() {
args.add("-cacheArchive"); args.add("-cacheArchive");
args.add(cache2); args.add(cache2);
args.add("-jobconf"); args.add("-jobconf");
args.add(strNamenode);
args.add("-jobconf");
args.add(strJobTracker);
args.add("-jobconf");
args.add("mapred.jar=" + STREAMING_JAR); args.add("mapred.jar=" + STREAMING_JAR);
args.add("-jobconf");
args.add("mapreduce.framework.name=yarn");
return super.genArgs(); return super.genArgs();
} }

View File

@ -22,8 +22,9 @@
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.PrintWriter; import java.util.ArrayList;
import java.io.StringWriter; import java.util.List;
import java.util.Map;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -36,7 +37,7 @@
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.Utils; import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
/** /**
* This test case tests the symlink creation * This test case tests the symlink creation
* utility provided by distributed caching * utility provided by distributed caching
@ -73,15 +74,18 @@ public void testMultipleCachefiles() throws Exception
String namenode = fileSys.getUri().toString(); String namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(1, namenode, 3); mr = new MiniMRCluster(1, namenode, 3);
String strJobtracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS);
String strNamenode = "fs.default.name=" + mr.createJobConf().get("fs.default.name"); List<String> args = new ArrayList<String>();
for (Map.Entry<String, String> entry : mr.createJobConf()) {
args.add("-jobconf");
args.add(entry.getKey() + "=" + entry.getValue());
}
String argv[] = new String[] { String argv[] = new String[] {
"-input", INPUT_FILE, "-input", INPUT_FILE,
"-output", OUTPUT_DIR, "-output", OUTPUT_DIR,
"-mapper", map, "-mapper", map,
"-reducer", reduce, "-reducer", reduce,
"-jobconf", strNamenode,
"-jobconf", strJobtracker,
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"), "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
"-jobconf", "-jobconf",
JobConf.MAPRED_MAP_TASK_JAVA_OPTS + "=" + JobConf.MAPRED_MAP_TASK_JAVA_OPTS + "=" +
@ -98,9 +102,13 @@ public void testMultipleCachefiles() throws Exception
"-cacheFile", fileSys.getUri() + CACHE_FILE + "#" + mapString, "-cacheFile", fileSys.getUri() + CACHE_FILE + "#" + mapString,
"-cacheFile", fileSys.getUri() + CACHE_FILE_2 + "#" + mapString2, "-cacheFile", fileSys.getUri() + CACHE_FILE_2 + "#" + mapString2,
"-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR, "-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR,
"-jobconf", "mapreduce.framework.name=yarn"
}; };
for (String arg : argv) {
args.add(arg);
}
argv = args.toArray(new String[args.size()]);
fileSys.delete(new Path(OUTPUT_DIR), true); fileSys.delete(new Path(OUTPUT_DIR), true);
DataOutputStream file = fileSys.create(new Path(INPUT_FILE)); DataOutputStream file = fileSys.create(new Path(INPUT_FILE));

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.streaming; package org.apache.hadoop.streaming;
import java.io.*; import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -47,20 +50,30 @@ public class TestStreamingTaskLog {
final long USERLOG_LIMIT_KB = 5;//consider 5kb as logSize final long USERLOG_LIMIT_KB = 5;//consider 5kb as logSize
String[] genArgs() { String[] genArgs() {
return new String[] {
List<String> args = new ArrayList<String>();
for (Map.Entry<String, String> entry : mr.createJobConf()) {
args.add("-jobconf");
args.add(entry.getKey() + "=" + entry.getValue());
}
String[] argv = new String[] {
"-input", inputPath.toString(), "-input", inputPath.toString(),
"-output", outputPath.toString(), "-output", outputPath.toString(),
"-mapper", map, "-mapper", map,
"-reducer", StreamJob.REDUCE_NONE, "-reducer", StreamJob.REDUCE_NONE,
"-jobconf", "mapred.job.tracker=" + mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS),
"-jobconf", "fs.default.name=" + fs.getUri().toString(),
"-jobconf", "mapred.map.tasks=1", "-jobconf", "mapred.map.tasks=1",
"-jobconf", "keep.failed.task.files=true", "-jobconf", "keep.failed.task.files=true",
"-jobconf", "mapreduce.task.userlog.limit.kb=" + USERLOG_LIMIT_KB, "-jobconf", "mapreduce.task.userlog.limit.kb=" + USERLOG_LIMIT_KB,
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"), "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
"-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR, "-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR,
"-jobconf", "mapreduce.framework.name=yarn"
}; };
for (String arg : argv) {
args.add(arg);
}
argv = args.toArray(new String[args.size()]);
return argv;
} }
/** /**

View File

@ -21,6 +21,9 @@
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -62,17 +65,20 @@ public void testSymLink() throws Exception
FileSystem fileSys = dfs.getFileSystem(); FileSystem fileSys = dfs.getFileSystem();
String namenode = fileSys.getUri().toString(); String namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(1, namenode, 3); mr = new MiniMRCluster(1, namenode, 3);
List<String> args = new ArrayList<String>();
for (Map.Entry<String, String> entry : mr.createJobConf()) {
args.add("-jobconf");
args.add(entry.getKey() + "=" + entry.getValue());
}
// During tests, the default Configuration will use a local mapred // During tests, the default Configuration will use a local mapred
// So don't specify -config or -cluster // So don't specify -config or -cluster
String strJobtracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS);
String strNamenode = "fs.default.name=" + mr.createJobConf().get("fs.default.name");
String argv[] = new String[] { String argv[] = new String[] {
"-input", INPUT_FILE, "-input", INPUT_FILE,
"-output", OUTPUT_DIR, "-output", OUTPUT_DIR,
"-mapper", map, "-mapper", map,
"-reducer", reduce, "-reducer", reduce,
"-jobconf", strNamenode,
"-jobconf", strJobtracker,
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"), "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
"-jobconf", "-jobconf",
JobConf.MAPRED_MAP_TASK_JAVA_OPTS+ "=" + JobConf.MAPRED_MAP_TASK_JAVA_OPTS+ "=" +
@ -88,9 +94,13 @@ public void testSymLink() throws Exception
conf.get(JobConf.MAPRED_TASK_JAVA_OPTS, "")), conf.get(JobConf.MAPRED_TASK_JAVA_OPTS, "")),
"-cacheFile", fileSys.getUri() + CACHE_FILE + "#testlink", "-cacheFile", fileSys.getUri() + CACHE_FILE + "#testlink",
"-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR, "-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR,
"-jobconf", "mapreduce.framework.name=yarn"
}; };
for (String arg : argv) {
args.add(arg);
}
argv = args.toArray(new String[args.size()]);
fileSys.delete(new Path(OUTPUT_DIR), true); fileSys.delete(new Path(OUTPUT_DIR), true);
DataOutputStream file = fileSys.create(new Path(INPUT_FILE)); DataOutputStream file = fileSys.create(new Path(INPUT_FILE));