MAPREDUCE-3867. MiniMRYarn/MiniYarn uses fixed ports (tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1328109 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2012-04-19 20:33:11 +00:00
parent 7d2466c0c8
commit df654cca49
20 changed files with 213 additions and 62 deletions

View File

@ -253,6 +253,8 @@ Release 2.0.0 - UNRELEASED
MAPREDUCE-4008. ResourceManager throws MetricsException on start up
saying QueueMetrics MBean already exists (Devaraj K via tgraves)
MAPREDUCE-3867. MiniMRYarn/MiniYarn uses fixed ports (tucu)
Release 0.23.3 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -147,6 +148,18 @@ public class HistoryClientService extends AbstractService {
+ ":" + server.getPort());
LOG.info("Instantiated MRClientService at " + this.bindAddress);
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress = bindAddress.getHostName() + ":" + bindAddress.getPort();
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, resolvedAddress);
String hostname = getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS);
hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname;
int port = webApp.port();
resolvedAddress = hostname + ":" + port;
conf.set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, resolvedAddress);
}
super.start();
}

View File

@ -83,7 +83,9 @@ public class TestSocketFactory {
JobConf jobConf = new JobConf();
FileSystem.setDefaultUri(jobConf, fs.getUri().toString());
miniMRYarnCluster = initAndStartMiniMRYarnCluster(jobConf);
JobConf jconf = new JobConf(cconf);
JobConf jconf = new JobConf(miniMRYarnCluster.getConfig());
jconf.set("hadoop.rpc.socket.factory.class.default",
"org.apache.hadoop.ipc.DummySocketFactory");
jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
String rmAddress = jconf.get("yarn.resourcemanager.address");
String[] split = rmAddress.split(":");

View File

@ -173,7 +173,7 @@ public class TestMiniMRClasspath {
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(taskTrackers, namenode, 3);
JobConf jobConf = new JobConf();
JobConf jobConf = mr.createJobConf();
String result;
result = launchWordCount(fileSys.getUri(), jobConf,
"The quick brown fox\nhas many silly\n" + "red fox sox\n", 3, 1);
@ -205,7 +205,7 @@ public class TestMiniMRClasspath {
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(taskTrackers, namenode, 3);
JobConf jobConf = new JobConf();
JobConf jobConf = mr.createJobConf();
String result;
result = launchExternal(fileSys.getUri(), jobConf,

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.v2;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -126,6 +128,10 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
@Override
public synchronized void start() {
try {
getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
historyServer = new JobHistoryServer();
historyServer.init(getConfig());
new Thread() {
@ -145,6 +151,20 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
} catch (Throwable t) {
throw new YarnException(t);
}
//need to do this because historyServer.init creates a new Configuration
getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
LOG.info("MiniMRYARN ResourceManager address: " +
getConfig().get(YarnConfiguration.RM_ADDRESS));
LOG.info("MiniMRYARN ResourceManager web address: " +
getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
LOG.info("MiniMRYARN HistoryServer address: " +
getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
LOG.info("MiniMRYARN HistoryServer web address: " +
getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
}
@Override

View File

@ -197,12 +197,18 @@ public class Client {
/**
*/
public Client() throws Exception {
public Client(Configuration conf) throws Exception {
// Set up the configuration and RPC
conf = new Configuration();
this.conf = conf;
rpc = YarnRPC.create(conf);
}
/**
*/
public Client() throws Exception {
this(new Configuration());
}
/**
* Helper function to print out usage
* @param opts Parsed command line options

View File

@ -18,11 +18,17 @@
package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.junit.AfterClass;
@ -48,6 +54,14 @@ public class TestDistributedShell {
1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
if (url == null) {
throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
}
yarnCluster.getConfig().set("yarn.application.classpath", new File(url.getPath()).getParent());
OutputStream os = new FileOutputStream(new File(url.getPath()));
yarnCluster.getConfig().writeXml(os);
os.close();
}
try {
Thread.sleep(2000);
@ -81,14 +95,14 @@ public class TestDistributedShell {
};
LOG.info("Initializing DS Client");
Client client = new Client();
Client client = new Client(new Configuration(yarnCluster.getConfig()));
boolean initSuccess = client.init(args);
assert(initSuccess);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
assert (result == true);
Assert.assertTrue(result);
}

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<configuration>
<!-- Dummy (invalid) config file to be overwriten by TestDistributedShell with MiniCluster configuration. -->
</configuration>

View File

@ -538,6 +538,8 @@ public class YarnConfiguration extends Configuration {
/** Container temp directory */
public static final String DEFAULT_CONTAINER_TEMP_DIR = "./tmp";
public static final String IS_MINI_YARN_CLUSTER = YARN_PREFIX + ".is.minicluster";
public YarnConfiguration() {
super();
}

View File

@ -120,6 +120,11 @@ public class AdminService extends AbstractService implements RMAdminProtocol {
}
this.server.start();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress =
this.server.getListenerAddress().getHostName() + ":" + this.server.getListenerAddress().getPort();
conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, resolvedAddress);
}
super.start();
}

View File

@ -119,11 +119,14 @@ public class ApplicationMasterService extends AbstractService implements
}
this.server.start();
this.bindAddress =
NetUtils.createSocketAddr(masterServiceAddress.getHostName(),
this.server.getPort());
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress =
this.server.getListenerAddress().getHostName() + ":" + this.server.getListenerAddress().getPort();
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, resolvedAddress);
}
super.start();
}

View File

@ -150,6 +150,11 @@ public class ClientRMService extends AbstractService implements
}
this.server.start();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress =
this.server.getListenerAddress().getHostName() + ":" + this.server.getListenerAddress().getPort();
conf.set(YarnConfiguration.RM_ADDRESS, resolvedAddress);
}
super.start();
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.net.InetAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -475,6 +476,15 @@ public class ResourceManager extends CompositeService implements Recoverable {
} catch(IOException ie) {
throw new YarnException("Failed to start secret manager threads", ie);
}
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String hostname = getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname;
int port = webApp.port();
String resolvedAddress = hostname + ":" + port;
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress);
}
super.start();

View File

@ -133,6 +133,11 @@ public class ResourceTrackerService extends AbstractService implements
}
this.server.start();
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String resolvedAddress =
server.getListenerAddress().getHostName() + ":" + server.getListenerAddress().getPort();
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, resolvedAddress);
}
}
@Override

View File

@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.server;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -113,7 +116,16 @@ public class MiniYARNCluster extends CompositeService {
public NodeManager getNodeManager(int i) {
return this.nodeManagers[i];
}
public static String getHostname() {
try {
return InetAddress.getLocalHost().getHostName();
}
catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}
private class ResourceManagerWrapper extends AbstractService {
public ResourceManagerWrapper() {
super(ResourceManagerWrapper.class.getName());
@ -122,6 +134,19 @@ public class MiniYARNCluster extends CompositeService {
@Override
public synchronized void start() {
try {
getConfig().setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
getConfig().set(YarnConfiguration.RM_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_ADMIN_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.RM_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
Store store = StoreFactory.getStore(getConfig());
resourceManager = new ResourceManager(store) {
@Override
@ -151,6 +176,10 @@ public class MiniYARNCluster extends CompositeService {
} catch (Throwable t) {
throw new YarnException(t);
}
LOG.info("MiniYARN ResourceManager address: " +
getConfig().get(YarnConfiguration.RM_ADDRESS));
LOG.info("MiniYARN ResourceManager web address: " +
getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
}
@Override
@ -212,9 +241,12 @@ public class MiniYARNCluster extends CompositeService {
remoteLogDir.getAbsolutePath());
// By default AM + 2 containers
getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024);
getConfig().set(YarnConfiguration.NM_ADDRESS, "0.0.0.0:0");
getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0");
getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS, "0.0.0.0:0");
getConfig().set(YarnConfiguration.NM_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
LOG.info("Starting NM: " + index);
nodeManagers[index].init(getConfig());
new Thread() {

View File

@ -20,12 +20,13 @@ package org.apache.hadoop.streaming;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.junit.After;
import org.junit.Before;
@ -38,8 +39,6 @@ public class TestFileArgs extends TestStreaming
private MiniDFSCluster dfs = null;
private MiniMRCluster mr = null;
private FileSystem fileSys = null;
private String strJobTracker = null;
private String strNamenode = null;
private String namenode = null;
private Configuration conf = null;
@ -56,8 +55,6 @@ public class TestFileArgs extends TestStreaming
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().getAuthority();
mr = new MiniMRCluster(1, namenode, 1);
strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS);
strNamenode = "fs.default.name=" + mr.createJobConf().get("fs.default.name");
map = LS_PATH;
FileSystem.setDefaultUri(conf, "hdfs://" + namenode);
@ -100,18 +97,16 @@ public class TestFileArgs extends TestStreaming
@Override
protected String[] genArgs() {
for (Map.Entry<String, String> entry : mr.createJobConf()) {
args.add("-jobconf");
args.add(entry.getKey() + "=" + entry.getValue());
}
args.add("-file");
args.add(new java.io.File("target/sidefile").getAbsolutePath());
args.add("-numReduceTasks");
args.add("0");
args.add("-jobconf");
args.add(strNamenode);
args.add("-jobconf");
args.add(strJobTracker);
args.add("-jobconf");
args.add("mapred.jar=" + STREAMING_JAR);
args.add("-jobconf");
args.add("mapreduce.framework.name=yarn");
args.add("-verbose");
return super.genArgs();
}

View File

@ -19,14 +19,10 @@
package org.apache.hadoop.streaming;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.util.Arrays;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipOutputStream;
import org.apache.commons.logging.Log;
@ -37,12 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* This class tests cacheArchive option of streaming
@ -66,8 +57,6 @@ public class TestMultipleArchiveFiles extends TestStreaming
private MiniDFSCluster dfs = null;
private MiniMRCluster mr = null;
private FileSystem fileSys = null;
private String strJobTracker = null;
private String strNamenode = null;
private String namenode = null;
public TestMultipleArchiveFiles() throws Exception {
@ -80,8 +69,6 @@ public class TestMultipleArchiveFiles extends TestStreaming
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().getAuthority();
mr = new MiniMRCluster(1, namenode, 1);
strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS);
strNamenode = "fs.default.name=" + mr.createJobConf().get("fs.default.name");
map = "xargs cat";
reduce = "cat";
@ -123,6 +110,10 @@ public class TestMultipleArchiveFiles extends TestStreaming
String cache1 = workDir + CACHE_ARCHIVE_1 + "#symlink1";
String cache2 = workDir + CACHE_ARCHIVE_2 + "#symlink2";
for (Map.Entry<String, String> entry : mr.createJobConf()) {
args.add("-jobconf");
args.add(entry.getKey() + "=" + entry.getValue());
}
args.add("-jobconf");
args.add("mapreduce.job.reduces=1");
args.add("-cacheArchive");
@ -130,13 +121,7 @@ public class TestMultipleArchiveFiles extends TestStreaming
args.add("-cacheArchive");
args.add(cache2);
args.add("-jobconf");
args.add(strNamenode);
args.add("-jobconf");
args.add(strJobTracker);
args.add("-jobconf");
args.add("mapred.jar=" + STREAMING_JAR);
args.add("-jobconf");
args.add("mapreduce.framework.name=yarn");
return super.genArgs();
}

View File

@ -22,8 +22,9 @@ import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.junit.Test;
import static org.junit.Assert.*;
@ -36,7 +37,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
/**
* This test case tests the symlink creation
* utility provided by distributed caching
@ -73,15 +74,18 @@ public class TestMultipleCachefiles
String namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(1, namenode, 3);
String strJobtracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS);
String strNamenode = "fs.default.name=" + mr.createJobConf().get("fs.default.name");
List<String> args = new ArrayList<String>();
for (Map.Entry<String, String> entry : mr.createJobConf()) {
args.add("-jobconf");
args.add(entry.getKey() + "=" + entry.getValue());
}
String argv[] = new String[] {
"-input", INPUT_FILE,
"-output", OUTPUT_DIR,
"-mapper", map,
"-reducer", reduce,
"-jobconf", strNamenode,
"-jobconf", strJobtracker,
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
"-jobconf",
JobConf.MAPRED_MAP_TASK_JAVA_OPTS + "=" +
@ -98,9 +102,13 @@ public class TestMultipleCachefiles
"-cacheFile", fileSys.getUri() + CACHE_FILE + "#" + mapString,
"-cacheFile", fileSys.getUri() + CACHE_FILE_2 + "#" + mapString2,
"-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR,
"-jobconf", "mapreduce.framework.name=yarn"
};
for (String arg : argv) {
args.add(arg);
}
argv = args.toArray(new String[args.size()]);
fileSys.delete(new Path(OUTPUT_DIR), true);
DataOutputStream file = fileSys.create(new Path(INPUT_FILE));

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.streaming;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -47,20 +50,30 @@ public class TestStreamingTaskLog {
final long USERLOG_LIMIT_KB = 5;//consider 5kb as logSize
String[] genArgs() {
return new String[] {
List<String> args = new ArrayList<String>();
for (Map.Entry<String, String> entry : mr.createJobConf()) {
args.add("-jobconf");
args.add(entry.getKey() + "=" + entry.getValue());
}
String[] argv = new String[] {
"-input", inputPath.toString(),
"-output", outputPath.toString(),
"-mapper", map,
"-reducer", StreamJob.REDUCE_NONE,
"-jobconf", "mapred.job.tracker=" + mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS),
"-jobconf", "fs.default.name=" + fs.getUri().toString(),
"-jobconf", "mapred.map.tasks=1",
"-jobconf", "keep.failed.task.files=true",
"-jobconf", "mapreduce.task.userlog.limit.kb=" + USERLOG_LIMIT_KB,
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
"-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR,
"-jobconf", "mapreduce.framework.name=yarn"
};
for (String arg : argv) {
args.add(arg);
}
argv = args.toArray(new String[args.size()]);
return argv;
}
/**

View File

@ -21,6 +21,9 @@ package org.apache.hadoop.streaming;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.junit.Test;
import static org.junit.Assert.*;
@ -62,17 +65,20 @@ public class TestSymLink
FileSystem fileSys = dfs.getFileSystem();
String namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(1, namenode, 3);
List<String> args = new ArrayList<String>();
for (Map.Entry<String, String> entry : mr.createJobConf()) {
args.add("-jobconf");
args.add(entry.getKey() + "=" + entry.getValue());
}
// During tests, the default Configuration will use a local mapred
// So don't specify -config or -cluster
String strJobtracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS);
String strNamenode = "fs.default.name=" + mr.createJobConf().get("fs.default.name");
String argv[] = new String[] {
"-input", INPUT_FILE,
"-output", OUTPUT_DIR,
"-mapper", map,
"-reducer", reduce,
"-jobconf", strNamenode,
"-jobconf", strJobtracker,
"-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
"-jobconf",
JobConf.MAPRED_MAP_TASK_JAVA_OPTS+ "=" +
@ -88,9 +94,13 @@ public class TestSymLink
conf.get(JobConf.MAPRED_TASK_JAVA_OPTS, "")),
"-cacheFile", fileSys.getUri() + CACHE_FILE + "#testlink",
"-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR,
"-jobconf", "mapreduce.framework.name=yarn"
};
for (String arg : argv) {
args.add(arg);
}
argv = args.toArray(new String[args.size()]);
fileSys.delete(new Path(OUTPUT_DIR), true);
DataOutputStream file = fileSys.create(new Path(INPUT_FILE));