Merge -r 1209280:1209281 from trunk to branch-0.23. Fixes: MAPREDUCE-3369
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1209283 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
162413d138
commit
df4a1a9aab
|
@ -72,6 +72,9 @@ Release 0.23.1 - Unreleased
|
||||||
MAPREDUCE-3169 amendment. Deprecate MiniMRCluster. (Ahmed Radwan via
|
MAPREDUCE-3169 amendment. Deprecate MiniMRCluster. (Ahmed Radwan via
|
||||||
sseth)
|
sseth)
|
||||||
|
|
||||||
|
MAPREDUCE-3369. Migrate MR1 tests to run on MR2 using the new interfaces
|
||||||
|
introduced in MAPREDUCE-3169. (Ahmed Radwan via tomwhite)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -59,19 +59,6 @@ public class TestNoDefaultsJobConf extends HadoopTestCase {
|
||||||
|
|
||||||
JobConf conf = new JobConf(false);
|
JobConf conf = new JobConf(false);
|
||||||
|
|
||||||
//seeding JT and NN info into non-defaults (empty jobconf)
|
|
||||||
String jobTrackerAddress = createJobConf().get(JTConfig.JT_IPC_ADDRESS);
|
|
||||||
if (jobTrackerAddress == null) {
|
|
||||||
jobTrackerAddress = "local";
|
|
||||||
}
|
|
||||||
conf.set(JTConfig.JT_IPC_ADDRESS, jobTrackerAddress);
|
|
||||||
if (jobTrackerAddress == "local") {
|
|
||||||
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
conf.set("fs.default.name", createJobConf().get("fs.default.name"));
|
conf.set("fs.default.name", createJobConf().get("fs.default.name"));
|
||||||
|
|
||||||
conf.setJobName("mr");
|
conf.setJobName("mr");
|
|
@ -0,0 +1,214 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract Test case class to run MR in local or cluster mode and in local FS
|
||||||
|
* or DFS.
|
||||||
|
*
|
||||||
|
* The Hadoop instance is started and stopped on each test method.
|
||||||
|
*
|
||||||
|
* If using DFS the filesystem is reformated at each start (test method).
|
||||||
|
*
|
||||||
|
* Job Configurations should be created using a configuration returned by the
|
||||||
|
* 'createJobConf()' method.
|
||||||
|
*/
|
||||||
|
public abstract class HadoopTestCase extends TestCase {
|
||||||
|
public static final int LOCAL_MR = 1;
|
||||||
|
public static final int CLUSTER_MR = 2;
|
||||||
|
public static final int LOCAL_FS = 4;
|
||||||
|
public static final int DFS_FS = 8;
|
||||||
|
|
||||||
|
private boolean localMR;
|
||||||
|
private boolean localFS;
|
||||||
|
|
||||||
|
private int taskTrackers;
|
||||||
|
private int dataNodes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a testcase for local or cluster MR using DFS.
|
||||||
|
*
|
||||||
|
* The DFS will be formatted regardless if there was one or not before in the
|
||||||
|
* given location.
|
||||||
|
*
|
||||||
|
* @param mrMode indicates if the MR should be local (LOCAL_MR) or cluster
|
||||||
|
* (CLUSTER_MR)
|
||||||
|
* @param fsMode indicates if the FS should be local (LOCAL_FS) or DFS (DFS_FS)
|
||||||
|
*
|
||||||
|
* local FS when using relative PATHs)
|
||||||
|
*
|
||||||
|
* @param taskTrackers number of task trackers to start when using cluster
|
||||||
|
*
|
||||||
|
* @param dataNodes number of data nodes to start when using DFS
|
||||||
|
*
|
||||||
|
* @throws IOException thrown if the base directory cannot be set.
|
||||||
|
*/
|
||||||
|
public HadoopTestCase(int mrMode, int fsMode, int taskTrackers, int dataNodes)
|
||||||
|
throws IOException {
|
||||||
|
if (mrMode != LOCAL_MR && mrMode != CLUSTER_MR) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Invalid MapRed mode, must be LOCAL_MR or CLUSTER_MR");
|
||||||
|
}
|
||||||
|
if (fsMode != LOCAL_FS && fsMode != DFS_FS) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Invalid FileSystem mode, must be LOCAL_FS or DFS_FS");
|
||||||
|
}
|
||||||
|
if (taskTrackers < 1) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Invalid taskTrackers value, must be greater than 0");
|
||||||
|
}
|
||||||
|
if (dataNodes < 1) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Invalid dataNodes value, must be greater than 0");
|
||||||
|
}
|
||||||
|
localMR = (mrMode == LOCAL_MR);
|
||||||
|
localFS = (fsMode == LOCAL_FS);
|
||||||
|
/*
|
||||||
|
JobConf conf = new JobConf();
|
||||||
|
fsRoot = conf.get("hadoop.tmp.dir");
|
||||||
|
|
||||||
|
if (fsRoot == null) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"hadoop.tmp.dir is not defined");
|
||||||
|
}
|
||||||
|
|
||||||
|
fsRoot = fsRoot.replace(' ', '+') + "/fs";
|
||||||
|
|
||||||
|
File file = new File(fsRoot);
|
||||||
|
if (!file.exists()) {
|
||||||
|
if (!file.mkdirs()) {
|
||||||
|
throw new RuntimeException("Could not create FS base path: " + file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
this.taskTrackers = taskTrackers;
|
||||||
|
this.dataNodes = dataNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates if the MR is running in local or cluster mode.
|
||||||
|
*
|
||||||
|
* @return returns TRUE if the MR is running locally, FALSE if running in
|
||||||
|
* cluster mode.
|
||||||
|
*/
|
||||||
|
public boolean isLocalMR() {
|
||||||
|
return localMR;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates if the filesystem is local or DFS.
|
||||||
|
*
|
||||||
|
* @return returns TRUE if the filesystem is local, FALSE if it is DFS.
|
||||||
|
*/
|
||||||
|
public boolean isLocalFS() {
|
||||||
|
return localFS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private MiniDFSCluster dfsCluster = null;
|
||||||
|
private MiniMRCluster mrCluster = null;
|
||||||
|
private FileSystem fileSystem = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates Hadoop instance based on constructor configuration before
|
||||||
|
* a test case is run.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
if (localFS) {
|
||||||
|
fileSystem = FileSystem.getLocal(new JobConf());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
dfsCluster = new MiniDFSCluster(new JobConf(), dataNodes, true, null);
|
||||||
|
fileSystem = dfsCluster.getFileSystem();
|
||||||
|
}
|
||||||
|
if (localMR) {
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
//noinspection deprecation
|
||||||
|
mrCluster = new MiniMRCluster(taskTrackers, fileSystem.getUri().toString(), 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destroys Hadoop instance based on constructor configuration after
|
||||||
|
* a test case is run.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
try {
|
||||||
|
if (mrCluster != null) {
|
||||||
|
mrCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
System.out.println(ex);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
if (dfsCluster != null) {
|
||||||
|
dfsCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
System.out.println(ex);
|
||||||
|
}
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Filesystem in use.
|
||||||
|
*
|
||||||
|
* TestCases should use this Filesystem as it
|
||||||
|
* is properly configured with the workingDir for relative PATHs.
|
||||||
|
*
|
||||||
|
* @return the filesystem used by Hadoop.
|
||||||
|
*/
|
||||||
|
protected FileSystem getFileSystem() {
|
||||||
|
return fileSystem;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a job configuration preconfigured to run against the Hadoop
|
||||||
|
* managed by the testcase.
|
||||||
|
* @return configuration that works on the testcase Hadoop instance
|
||||||
|
*/
|
||||||
|
protected JobConf createJobConf() {
|
||||||
|
if (localMR) {
|
||||||
|
JobConf conf = new JobConf();
|
||||||
|
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return mrCluster.createJobConf();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,224 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import org.mortbay.jetty.Server;
|
||||||
|
import org.mortbay.jetty.servlet.Context;
|
||||||
|
import org.mortbay.jetty.servlet.ServletHolder;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.IntWritable;
|
||||||
|
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
|
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
import javax.servlet.http.HttpServlet;
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class to test Job end notification in local and cluster mode.
|
||||||
|
*
|
||||||
|
* Starts up hadoop on Local or Cluster mode (by extending of the
|
||||||
|
* HadoopTestCase class) and it starts a servlet engine that hosts
|
||||||
|
* a servlet that will receive the notification of job finalization.
|
||||||
|
*
|
||||||
|
* The notification servlet returns a HTTP 400 the first time is called
|
||||||
|
* and a HTTP 200 the second time, thus testing retry.
|
||||||
|
*
|
||||||
|
* In both cases local file system is used (this is irrelevant for
|
||||||
|
* the tested functionality)
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public abstract class NotificationTestCase extends HadoopTestCase {
|
||||||
|
|
||||||
|
protected NotificationTestCase(int mode) throws IOException {
|
||||||
|
super(mode, HadoopTestCase.LOCAL_FS, 1, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int port;
|
||||||
|
private String contextPath = "/notification";
|
||||||
|
private String servletPath = "/mapred";
|
||||||
|
private Server webServer;
|
||||||
|
|
||||||
|
private void startHttpServer() throws Exception {
|
||||||
|
|
||||||
|
// Create the webServer
|
||||||
|
if (webServer != null) {
|
||||||
|
webServer.stop();
|
||||||
|
webServer = null;
|
||||||
|
}
|
||||||
|
webServer = new Server(0);
|
||||||
|
|
||||||
|
Context context = new Context(webServer, contextPath);
|
||||||
|
|
||||||
|
// create servlet handler
|
||||||
|
context.addServlet(new ServletHolder(new NotificationServlet()),
|
||||||
|
servletPath);
|
||||||
|
|
||||||
|
// Start webServer
|
||||||
|
webServer.start();
|
||||||
|
port = webServer.getConnectors()[0].getLocalPort();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stopHttpServer() throws Exception {
|
||||||
|
if (webServer != null) {
|
||||||
|
webServer.stop();
|
||||||
|
webServer.destroy();
|
||||||
|
webServer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class NotificationServlet extends HttpServlet {
|
||||||
|
public static int counter = 0;
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
protected void doGet(HttpServletRequest req, HttpServletResponse res)
|
||||||
|
throws ServletException, IOException {
|
||||||
|
switch (counter) {
|
||||||
|
case 0:
|
||||||
|
{
|
||||||
|
assertTrue(req.getQueryString().contains("SUCCEEDED"));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
{
|
||||||
|
assertTrue(req.getQueryString().contains("KILLED"));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case 4:
|
||||||
|
{
|
||||||
|
assertTrue(req.getQueryString().contains("FAILED"));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (counter % 2 == 0) {
|
||||||
|
res.sendError(HttpServletResponse.SC_BAD_REQUEST, "forcing error");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
res.setStatus(HttpServletResponse.SC_OK);
|
||||||
|
}
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getNotificationUrlTemplate() {
|
||||||
|
return "http://localhost:" + port + contextPath + servletPath +
|
||||||
|
"?jobId=$jobId&jobStatus=$jobStatus";
|
||||||
|
}
|
||||||
|
|
||||||
|
protected JobConf createJobConf() {
|
||||||
|
JobConf conf = super.createJobConf();
|
||||||
|
conf.setJobEndNotificationURI(getNotificationUrlTemplate());
|
||||||
|
conf.setInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 3);
|
||||||
|
conf.setInt(JobContext.MR_JOB_END_RETRY_INTERVAL, 200);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
startHttpServer();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
stopHttpServer();
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMR() throws Exception {
|
||||||
|
System.out.println(launchWordCount(this.createJobConf(),
|
||||||
|
"a b c d e f g h", 1, 1));
|
||||||
|
Thread.sleep(2000);
|
||||||
|
assertEquals(2, NotificationServlet.counter);
|
||||||
|
|
||||||
|
Path inDir = new Path("notificationjob/input");
|
||||||
|
Path outDir = new Path("notificationjob/output");
|
||||||
|
|
||||||
|
// Hack for local FS that does not have the concept of a 'mounting point'
|
||||||
|
if (isLocalFS()) {
|
||||||
|
String localPathRoot = System.getProperty("test.build.data","/tmp")
|
||||||
|
.toString().replace(' ', '+');;
|
||||||
|
inDir = new Path(localPathRoot, inDir);
|
||||||
|
outDir = new Path(localPathRoot, outDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
// run a job with KILLED status
|
||||||
|
System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir,
|
||||||
|
outDir).getID());
|
||||||
|
Thread.sleep(2000);
|
||||||
|
assertEquals(4, NotificationServlet.counter);
|
||||||
|
|
||||||
|
// run a job with FAILED status
|
||||||
|
System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir,
|
||||||
|
outDir).getID());
|
||||||
|
Thread.sleep(2000);
|
||||||
|
assertEquals(6, NotificationServlet.counter);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String launchWordCount(JobConf conf,
|
||||||
|
String input,
|
||||||
|
int numMaps,
|
||||||
|
int numReduces) throws IOException {
|
||||||
|
Path inDir = new Path("testing/wc/input");
|
||||||
|
Path outDir = new Path("testing/wc/output");
|
||||||
|
|
||||||
|
// Hack for local FS that does not have the concept of a 'mounting point'
|
||||||
|
if (isLocalFS()) {
|
||||||
|
String localPathRoot = System.getProperty("test.build.data","/tmp")
|
||||||
|
.toString().replace(' ', '+');;
|
||||||
|
inDir = new Path(localPathRoot, inDir);
|
||||||
|
outDir = new Path(localPathRoot, outDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
fs.delete(outDir, true);
|
||||||
|
if (!fs.mkdirs(inDir)) {
|
||||||
|
throw new IOException("Mkdirs failed to create " + inDir.toString());
|
||||||
|
}
|
||||||
|
{
|
||||||
|
DataOutputStream file = fs.create(new Path(inDir, "part-0"));
|
||||||
|
file.writeBytes(input);
|
||||||
|
file.close();
|
||||||
|
}
|
||||||
|
conf.setJobName("wordcount");
|
||||||
|
conf.setInputFormat(TextInputFormat.class);
|
||||||
|
|
||||||
|
// the keys are words (strings)
|
||||||
|
conf.setOutputKeyClass(Text.class);
|
||||||
|
// the values are counts (ints)
|
||||||
|
conf.setOutputValueClass(IntWritable.class);
|
||||||
|
|
||||||
|
conf.setMapperClass(WordCount.MapClass.class);
|
||||||
|
conf.setCombinerClass(WordCount.Reduce.class);
|
||||||
|
conf.setReducerClass(WordCount.Reduce.class);
|
||||||
|
|
||||||
|
FileInputFormat.setInputPaths(conf, inDir);
|
||||||
|
FileOutputFormat.setOutputPath(conf, outDir);
|
||||||
|
conf.setNumMapTasks(numMaps);
|
||||||
|
conf.setNumReduceTasks(numReduces);
|
||||||
|
JobClient.runJob(conf);
|
||||||
|
return MapReduceTestUtil.readOutput(outDir, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,584 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.*;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
|
import org.apache.hadoop.io.IntWritable;
|
||||||
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.io.WritableComparable;
|
||||||
|
import org.apache.hadoop.io.WritableComparator;
|
||||||
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
import org.apache.hadoop.mapred.lib.HashPartitioner;
|
||||||
|
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
||||||
|
import org.apache.hadoop.util.Tool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.apache.hadoop.fs.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A set of utilities to validate the <b>sort</b> of the map-reduce framework.
|
||||||
|
* This utility program has 2 main parts:
|
||||||
|
* 1. Checking the records' statistics
|
||||||
|
* a) Validates the no. of bytes and records in sort's input & output.
|
||||||
|
* b) Validates the xor of the md5's of each key/value pair.
|
||||||
|
* c) Ensures same key/value is present in both input and output.
|
||||||
|
* 2. Check individual records to ensure each record is present in both
|
||||||
|
* the input and the output of the sort (expensive on large data-sets).
|
||||||
|
*
|
||||||
|
* To run: bin/hadoop jar build/hadoop-examples.jar sortvalidate
|
||||||
|
* [-m <i>maps</i>] [-r <i>reduces</i>] [-deep]
|
||||||
|
* -sortInput <i>sort-in-dir</i> -sortOutput <i>sort-out-dir</i>
|
||||||
|
*/
|
||||||
|
public class SortValidator extends Configured implements Tool {
|
||||||
|
|
||||||
|
static private final IntWritable sortInput = new IntWritable(1);
|
||||||
|
static private final IntWritable sortOutput = new IntWritable(2);
|
||||||
|
static public String SORT_REDUCES =
|
||||||
|
"mapreduce.sortvalidator.sort.reduce.tasks";
|
||||||
|
static public String MAPS_PER_HOST = "mapreduce.sortvalidator.mapsperhost";
|
||||||
|
static public String REDUCES_PER_HOST =
|
||||||
|
"mapreduce.sortvalidator.reducesperhost";
|
||||||
|
static void printUsage() {
|
||||||
|
System.err.println("sortvalidate [-m <maps>] [-r <reduces>] [-deep] " +
|
||||||
|
"-sortInput <sort-input-dir> -sortOutput <sort-output-dir>");
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static private IntWritable deduceInputFile(JobConf job) {
|
||||||
|
Path[] inputPaths = FileInputFormat.getInputPaths(job);
|
||||||
|
Path inputFile = new Path(job.get(JobContext.MAP_INPUT_FILE));
|
||||||
|
|
||||||
|
// value == one for sort-input; value == two for sort-output
|
||||||
|
return (inputFile.getParent().equals(inputPaths[0])) ?
|
||||||
|
sortInput : sortOutput;
|
||||||
|
}
|
||||||
|
|
||||||
|
static private byte[] pair(BytesWritable a, BytesWritable b) {
|
||||||
|
byte[] pairData = new byte[a.getLength()+ b.getLength()];
|
||||||
|
System.arraycopy(a.getBytes(), 0, pairData, 0, a.getLength());
|
||||||
|
System.arraycopy(b.getBytes(), 0, pairData, a.getLength(), b.getLength());
|
||||||
|
return pairData;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final PathFilter sortPathsFilter = new PathFilter() {
|
||||||
|
public boolean accept(Path path) {
|
||||||
|
return (path.getName().startsWith("part-"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A simple map-reduce job which checks consistency of the
|
||||||
|
* MapReduce framework's sort by checking:
|
||||||
|
* a) Records are sorted correctly
|
||||||
|
* b) Keys are partitioned correctly
|
||||||
|
* c) The input and output have same no. of bytes and records.
|
||||||
|
* d) The input and output have the correct 'checksum' by xor'ing
|
||||||
|
* the md5 of each record.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public static class RecordStatsChecker {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generic way to get <b>raw</b> data from a {@link Writable}.
|
||||||
|
*/
|
||||||
|
static class Raw {
|
||||||
|
/**
|
||||||
|
* Get raw data bytes from a {@link Writable}
|
||||||
|
* @param writable {@link Writable} object from whom to get the raw data
|
||||||
|
* @return raw data of the writable
|
||||||
|
*/
|
||||||
|
public byte[] getRawBytes(Writable writable) {
|
||||||
|
return writable.toString().getBytes();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get number of raw data bytes of the {@link Writable}
|
||||||
|
* @param writable {@link Writable} object from whom to get the raw data
|
||||||
|
* length
|
||||||
|
* @return number of raw data bytes
|
||||||
|
*/
|
||||||
|
public int getRawBytesLength(Writable writable) {
|
||||||
|
return writable.toString().getBytes().length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specialization of {@link Raw} for {@link BytesWritable}.
|
||||||
|
*/
|
||||||
|
static class RawBytesWritable extends Raw {
|
||||||
|
public byte[] getRawBytes(Writable bw) {
|
||||||
|
return ((BytesWritable)bw).getBytes();
|
||||||
|
}
|
||||||
|
public int getRawBytesLength(Writable bw) {
|
||||||
|
return ((BytesWritable)bw).getLength();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specialization of {@link Raw} for {@link Text}.
|
||||||
|
*/
|
||||||
|
static class RawText extends Raw {
|
||||||
|
public byte[] getRawBytes(Writable text) {
|
||||||
|
return ((Text)text).getBytes();
|
||||||
|
}
|
||||||
|
public int getRawBytesLength(Writable text) {
|
||||||
|
return ((Text)text).getLength();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Raw createRaw(Class rawClass) {
|
||||||
|
if (rawClass == Text.class) {
|
||||||
|
return new RawText();
|
||||||
|
} else if (rawClass == BytesWritable.class) {
|
||||||
|
System.err.println("Returning " + RawBytesWritable.class);
|
||||||
|
return new RawBytesWritable();
|
||||||
|
}
|
||||||
|
return new Raw();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class RecordStatsWritable implements Writable {
|
||||||
|
private long bytes = 0;
|
||||||
|
private long records = 0;
|
||||||
|
private int checksum = 0;
|
||||||
|
|
||||||
|
public RecordStatsWritable() {}
|
||||||
|
|
||||||
|
public RecordStatsWritable(long bytes, long records, int checksum) {
|
||||||
|
this.bytes = bytes;
|
||||||
|
this.records = records;
|
||||||
|
this.checksum = checksum;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
WritableUtils.writeVLong(out, bytes);
|
||||||
|
WritableUtils.writeVLong(out, records);
|
||||||
|
WritableUtils.writeVInt(out, checksum);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
bytes = WritableUtils.readVLong(in);
|
||||||
|
records = WritableUtils.readVLong(in);
|
||||||
|
checksum = WritableUtils.readVInt(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getBytes() { return bytes; }
|
||||||
|
public long getRecords() { return records; }
|
||||||
|
public int getChecksum() { return checksum; }
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Map extends MapReduceBase
|
||||||
|
implements Mapper<WritableComparable, Writable,
|
||||||
|
IntWritable, RecordStatsWritable> {
|
||||||
|
|
||||||
|
private IntWritable key = null;
|
||||||
|
private WritableComparable prevKey = null;
|
||||||
|
private Class<? extends WritableComparable> keyClass;
|
||||||
|
private Partitioner<WritableComparable, Writable> partitioner = null;
|
||||||
|
private int partition = -1;
|
||||||
|
private int noSortReducers = -1;
|
||||||
|
private long recordId = -1;
|
||||||
|
|
||||||
|
private Raw rawKey;
|
||||||
|
private Raw rawValue;
|
||||||
|
|
||||||
|
public void configure(JobConf job) {
|
||||||
|
// 'key' == sortInput for sort-input; key == sortOutput for sort-output
|
||||||
|
key = deduceInputFile(job);
|
||||||
|
|
||||||
|
if (key == sortOutput) {
|
||||||
|
partitioner = new HashPartitioner<WritableComparable, Writable>();
|
||||||
|
|
||||||
|
// Figure the 'current' partition and no. of reduces of the 'sort'
|
||||||
|
try {
|
||||||
|
URI inputURI = new URI(job.get(JobContext.MAP_INPUT_FILE));
|
||||||
|
String inputFile = inputURI.getPath();
|
||||||
|
// part file is of the form part-r-xxxxx
|
||||||
|
partition = Integer.valueOf(inputFile.substring(
|
||||||
|
inputFile.lastIndexOf("part") + 7)).intValue();
|
||||||
|
noSortReducers = job.getInt(SORT_REDUCES, -1);
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("Caught: " + e);
|
||||||
|
System.exit(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void map(WritableComparable key, Writable value,
|
||||||
|
OutputCollector<IntWritable, RecordStatsWritable> output,
|
||||||
|
Reporter reporter) throws IOException {
|
||||||
|
// Set up rawKey and rawValue on the first call to 'map'
|
||||||
|
if (recordId == -1) {
|
||||||
|
rawKey = createRaw(key.getClass());
|
||||||
|
rawValue = createRaw(value.getClass());
|
||||||
|
}
|
||||||
|
++recordId;
|
||||||
|
|
||||||
|
if (this.key == sortOutput) {
|
||||||
|
// Check if keys are 'sorted' if this
|
||||||
|
// record is from sort's output
|
||||||
|
if (prevKey == null) {
|
||||||
|
prevKey = key;
|
||||||
|
keyClass = prevKey.getClass();
|
||||||
|
} else {
|
||||||
|
// Sanity check
|
||||||
|
if (keyClass != key.getClass()) {
|
||||||
|
throw new IOException("Type mismatch in key: expected " +
|
||||||
|
keyClass.getName() + ", received " +
|
||||||
|
key.getClass().getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if they were sorted correctly
|
||||||
|
if (prevKey.compareTo(key) > 0) {
|
||||||
|
throw new IOException("The 'map-reduce' framework wrongly" +
|
||||||
|
" classifed (" + prevKey + ") > (" +
|
||||||
|
key + ") "+ "for record# " + recordId);
|
||||||
|
}
|
||||||
|
prevKey = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the sorted output is 'partitioned' right
|
||||||
|
int keyPartition =
|
||||||
|
partitioner.getPartition(key, value, noSortReducers);
|
||||||
|
if (partition != keyPartition) {
|
||||||
|
throw new IOException("Partitions do not match for record# " +
|
||||||
|
recordId + " ! - '" + partition + "' v/s '" +
|
||||||
|
keyPartition + "'");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct the record-stats and output (this.key, record-stats)
|
||||||
|
byte[] keyBytes = rawKey.getRawBytes(key);
|
||||||
|
int keyBytesLen = rawKey.getRawBytesLength(key);
|
||||||
|
byte[] valueBytes = rawValue.getRawBytes(value);
|
||||||
|
int valueBytesLen = rawValue.getRawBytesLength(value);
|
||||||
|
|
||||||
|
int keyValueChecksum =
|
||||||
|
(WritableComparator.hashBytes(keyBytes, keyBytesLen) ^
|
||||||
|
WritableComparator.hashBytes(valueBytes, valueBytesLen));
|
||||||
|
|
||||||
|
output.collect(this.key,
|
||||||
|
new RecordStatsWritable((keyBytesLen+valueBytesLen),
|
||||||
|
1, keyValueChecksum)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Reduce extends MapReduceBase
|
||||||
|
implements Reducer<IntWritable, RecordStatsWritable,
|
||||||
|
IntWritable, RecordStatsWritable> {
|
||||||
|
|
||||||
|
public void reduce(IntWritable key, Iterator<RecordStatsWritable> values,
|
||||||
|
OutputCollector<IntWritable,
|
||||||
|
RecordStatsWritable> output,
|
||||||
|
Reporter reporter) throws IOException {
|
||||||
|
long bytes = 0;
|
||||||
|
long records = 0;
|
||||||
|
int xor = 0;
|
||||||
|
while (values.hasNext()) {
|
||||||
|
RecordStatsWritable stats = values.next();
|
||||||
|
bytes += stats.getBytes();
|
||||||
|
records += stats.getRecords();
|
||||||
|
xor ^= stats.getChecksum();
|
||||||
|
}
|
||||||
|
|
||||||
|
output.collect(key, new RecordStatsWritable(bytes, records, xor));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class NonSplitableSequenceFileInputFormat
|
||||||
|
extends SequenceFileInputFormat {
|
||||||
|
protected boolean isSplitable(FileSystem fs, Path filename) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void checkRecords(Configuration defaults,
|
||||||
|
Path sortInput, Path sortOutput) throws IOException {
|
||||||
|
FileSystem inputfs = sortInput.getFileSystem(defaults);
|
||||||
|
FileSystem outputfs = sortOutput.getFileSystem(defaults);
|
||||||
|
FileSystem defaultfs = FileSystem.get(defaults);
|
||||||
|
JobConf jobConf = new JobConf(defaults, RecordStatsChecker.class);
|
||||||
|
jobConf.setJobName("sortvalidate-recordstats-checker");
|
||||||
|
|
||||||
|
int noSortReduceTasks =
|
||||||
|
outputfs.listStatus(sortOutput, sortPathsFilter).length;
|
||||||
|
jobConf.setInt(SORT_REDUCES, noSortReduceTasks);
|
||||||
|
int noSortInputpaths = inputfs.listStatus(sortInput).length;
|
||||||
|
|
||||||
|
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
|
||||||
|
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
|
||||||
|
|
||||||
|
jobConf.setOutputKeyClass(IntWritable.class);
|
||||||
|
jobConf.setOutputValueClass(RecordStatsChecker.RecordStatsWritable.class);
|
||||||
|
|
||||||
|
jobConf.setMapperClass(Map.class);
|
||||||
|
jobConf.setCombinerClass(Reduce.class);
|
||||||
|
jobConf.setReducerClass(Reduce.class);
|
||||||
|
|
||||||
|
jobConf.setNumMapTasks(noSortReduceTasks);
|
||||||
|
jobConf.setNumReduceTasks(1);
|
||||||
|
|
||||||
|
FileInputFormat.setInputPaths(jobConf, sortInput);
|
||||||
|
FileInputFormat.addInputPath(jobConf, sortOutput);
|
||||||
|
Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
|
||||||
|
if (defaultfs.exists(outputPath)) {
|
||||||
|
defaultfs.delete(outputPath, true);
|
||||||
|
}
|
||||||
|
FileOutputFormat.setOutputPath(jobConf, outputPath);
|
||||||
|
|
||||||
|
// Uncomment to run locally in a single process
|
||||||
|
//job_conf.set(JTConfig.JT, "local");
|
||||||
|
Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
|
||||||
|
System.out.println("\nSortValidator.RecordStatsChecker: Validate sort " +
|
||||||
|
"from " + inputPaths[0] + " (" +
|
||||||
|
noSortInputpaths + " files), " +
|
||||||
|
inputPaths[1] + " (" +
|
||||||
|
noSortReduceTasks +
|
||||||
|
" files) into " +
|
||||||
|
FileOutputFormat.getOutputPath(jobConf) +
|
||||||
|
" with 1 reducer.");
|
||||||
|
Date startTime = new Date();
|
||||||
|
System.out.println("Job started: " + startTime);
|
||||||
|
JobClient.runJob(jobConf);
|
||||||
|
Date end_time = new Date();
|
||||||
|
System.out.println("Job ended: " + end_time);
|
||||||
|
System.out.println("The job took " +
|
||||||
|
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
|
||||||
|
|
||||||
|
// Check to ensure that the statistics of the
|
||||||
|
// framework's sort-input and sort-output match
|
||||||
|
SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
|
||||||
|
new Path(outputPath, "part-00000"), defaults);
|
||||||
|
IntWritable k1 = new IntWritable();
|
||||||
|
IntWritable k2 = new IntWritable();
|
||||||
|
RecordStatsWritable v1 = new RecordStatsWritable();
|
||||||
|
RecordStatsWritable v2 = new RecordStatsWritable();
|
||||||
|
if (!stats.next(k1, v1)) {
|
||||||
|
throw new IOException("Failed to read record #1 from reduce's output");
|
||||||
|
}
|
||||||
|
if (!stats.next(k2, v2)) {
|
||||||
|
throw new IOException("Failed to read record #2 from reduce's output");
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) ||
|
||||||
|
v1.getChecksum() != v2.getChecksum()) {
|
||||||
|
throw new IOException("(" +
|
||||||
|
v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
|
||||||
|
v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A simple map-reduce task to check if the input and the output
|
||||||
|
* of the framework's sort is consistent by ensuring each record
|
||||||
|
* is present in both the input and the output.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public static class RecordChecker {
|
||||||
|
|
||||||
|
public static class Map extends MapReduceBase
|
||||||
|
implements Mapper<BytesWritable, BytesWritable,
|
||||||
|
BytesWritable, IntWritable> {
|
||||||
|
|
||||||
|
private IntWritable value = null;
|
||||||
|
|
||||||
|
public void configure(JobConf job) {
|
||||||
|
// value == one for sort-input; value == two for sort-output
|
||||||
|
value = deduceInputFile(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void map(BytesWritable key,
|
||||||
|
BytesWritable value,
|
||||||
|
OutputCollector<BytesWritable, IntWritable> output,
|
||||||
|
Reporter reporter) throws IOException {
|
||||||
|
// newKey = (key, value)
|
||||||
|
BytesWritable keyValue = new BytesWritable(pair(key, value));
|
||||||
|
|
||||||
|
// output (newKey, value)
|
||||||
|
output.collect(keyValue, this.value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Reduce extends MapReduceBase
|
||||||
|
implements Reducer<BytesWritable, IntWritable,
|
||||||
|
BytesWritable, IntWritable> {
|
||||||
|
|
||||||
|
public void reduce(BytesWritable key, Iterator<IntWritable> values,
|
||||||
|
OutputCollector<BytesWritable, IntWritable> output,
|
||||||
|
Reporter reporter) throws IOException {
|
||||||
|
int ones = 0;
|
||||||
|
int twos = 0;
|
||||||
|
while (values.hasNext()) {
|
||||||
|
IntWritable count = values.next();
|
||||||
|
if (count.equals(sortInput)) {
|
||||||
|
++ones;
|
||||||
|
} else if (count.equals(sortOutput)) {
|
||||||
|
++twos;
|
||||||
|
} else {
|
||||||
|
throw new IOException("Invalid 'value' of " + count.get() +
|
||||||
|
" for (key,value): " + key.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check to ensure there are equal no. of ones and twos
|
||||||
|
if (ones != twos) {
|
||||||
|
throw new IOException("Illegal ('one', 'two'): (" + ones + ", " + twos +
|
||||||
|
") for (key, value): " + key.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void checkRecords(Configuration defaults, int noMaps, int noReduces,
|
||||||
|
Path sortInput, Path sortOutput) throws IOException {
|
||||||
|
JobConf jobConf = new JobConf(defaults, RecordChecker.class);
|
||||||
|
jobConf.setJobName("sortvalidate-record-checker");
|
||||||
|
|
||||||
|
jobConf.setInputFormat(SequenceFileInputFormat.class);
|
||||||
|
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
|
||||||
|
|
||||||
|
jobConf.setOutputKeyClass(BytesWritable.class);
|
||||||
|
jobConf.setOutputValueClass(IntWritable.class);
|
||||||
|
|
||||||
|
jobConf.setMapperClass(Map.class);
|
||||||
|
jobConf.setReducerClass(Reduce.class);
|
||||||
|
|
||||||
|
JobClient client = new JobClient(jobConf);
|
||||||
|
ClusterStatus cluster = client.getClusterStatus();
|
||||||
|
if (noMaps == -1) {
|
||||||
|
noMaps = cluster.getTaskTrackers() *
|
||||||
|
jobConf.getInt(MAPS_PER_HOST, 10);
|
||||||
|
}
|
||||||
|
if (noReduces == -1) {
|
||||||
|
noReduces = (int) (cluster.getMaxReduceTasks() * 0.9);
|
||||||
|
String sortReduces = jobConf.get(REDUCES_PER_HOST);
|
||||||
|
if (sortReduces != null) {
|
||||||
|
noReduces = cluster.getTaskTrackers() *
|
||||||
|
Integer.parseInt(sortReduces);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jobConf.setNumMapTasks(noMaps);
|
||||||
|
jobConf.setNumReduceTasks(noReduces);
|
||||||
|
|
||||||
|
FileInputFormat.setInputPaths(jobConf, sortInput);
|
||||||
|
FileInputFormat.addInputPath(jobConf, sortOutput);
|
||||||
|
Path outputPath = new Path("/tmp/sortvalidate/recordchecker");
|
||||||
|
FileSystem fs = FileSystem.get(defaults);
|
||||||
|
if (fs.exists(outputPath)) {
|
||||||
|
fs.delete(outputPath, true);
|
||||||
|
}
|
||||||
|
FileOutputFormat.setOutputPath(jobConf, outputPath);
|
||||||
|
|
||||||
|
// Uncomment to run locally in a single process
|
||||||
|
//job_conf.set(JTConfig.JT, "local");
|
||||||
|
Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
|
||||||
|
System.out.println("\nSortValidator.RecordChecker: Running on " +
|
||||||
|
cluster.getTaskTrackers() +
|
||||||
|
" nodes to validate sort from " +
|
||||||
|
inputPaths[0] + ", " +
|
||||||
|
inputPaths[1] + " into " +
|
||||||
|
FileOutputFormat.getOutputPath(jobConf) +
|
||||||
|
" with " + noReduces + " reduces.");
|
||||||
|
Date startTime = new Date();
|
||||||
|
System.out.println("Job started: " + startTime);
|
||||||
|
JobClient.runJob(jobConf);
|
||||||
|
Date end_time = new Date();
|
||||||
|
System.out.println("Job ended: " + end_time);
|
||||||
|
System.out.println("The job took " +
|
||||||
|
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The main driver for sort-validator program.
|
||||||
|
* Invoke this method to submit the map/reduce job.
|
||||||
|
* @throws IOException When there is communication problems with the
|
||||||
|
* job tracker.
|
||||||
|
*/
|
||||||
|
public int run(String[] args) throws Exception {
|
||||||
|
Configuration defaults = getConf();
|
||||||
|
|
||||||
|
int noMaps = -1, noReduces = -1;
|
||||||
|
Path sortInput = null, sortOutput = null;
|
||||||
|
boolean deepTest = false;
|
||||||
|
for(int i=0; i < args.length; ++i) {
|
||||||
|
try {
|
||||||
|
if ("-m".equals(args[i])) {
|
||||||
|
noMaps = Integer.parseInt(args[++i]);
|
||||||
|
} else if ("-r".equals(args[i])) {
|
||||||
|
noReduces = Integer.parseInt(args[++i]);
|
||||||
|
} else if ("-sortInput".equals(args[i])){
|
||||||
|
sortInput = new Path(args[++i]);
|
||||||
|
} else if ("-sortOutput".equals(args[i])){
|
||||||
|
sortOutput = new Path(args[++i]);
|
||||||
|
} else if ("-deep".equals(args[i])) {
|
||||||
|
deepTest = true;
|
||||||
|
} else {
|
||||||
|
printUsage();
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} catch (NumberFormatException except) {
|
||||||
|
System.err.println("ERROR: Integer expected instead of " + args[i]);
|
||||||
|
printUsage();
|
||||||
|
return -1;
|
||||||
|
} catch (ArrayIndexOutOfBoundsException except) {
|
||||||
|
System.err.println("ERROR: Required parameter missing from " +
|
||||||
|
args[i-1]);
|
||||||
|
printUsage();
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sanity check
|
||||||
|
if (sortInput == null || sortOutput == null) {
|
||||||
|
printUsage();
|
||||||
|
return -2;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the records are consistent and sorted correctly
|
||||||
|
RecordStatsChecker.checkRecords(defaults, sortInput, sortOutput);
|
||||||
|
|
||||||
|
// Check if the same records are present in sort's inputs & outputs
|
||||||
|
if (deepTest) {
|
||||||
|
RecordChecker.checkRecords(defaults, noMaps, noReduces, sortInput,
|
||||||
|
sortOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println("\nSUCCESS! Validated the MapReduce framework's 'sort'" +
|
||||||
|
" successfully.");
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
int res = ToolRunner.run(new Configuration(), new SortValidator(), args);
|
||||||
|
System.exit(res);
|
||||||
|
}
|
||||||
|
}
|
|
@ -48,7 +48,6 @@ public class TestSpecialCharactersInOutputPath extends TestCase {
|
||||||
private static final String OUTPUT_FILENAME = "result[0]";
|
private static final String OUTPUT_FILENAME = "result[0]";
|
||||||
|
|
||||||
public static boolean launchJob(URI fileSys,
|
public static boolean launchJob(URI fileSys,
|
||||||
String jobTracker,
|
|
||||||
JobConf conf,
|
JobConf conf,
|
||||||
int numMaps,
|
int numMaps,
|
||||||
int numReduces) throws IOException {
|
int numReduces) throws IOException {
|
||||||
|
@ -68,8 +67,6 @@ public class TestSpecialCharactersInOutputPath extends TestCase {
|
||||||
|
|
||||||
// use WordCount example
|
// use WordCount example
|
||||||
FileSystem.setDefaultUri(conf, fileSys);
|
FileSystem.setDefaultUri(conf, fileSys);
|
||||||
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
|
|
||||||
conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker);
|
|
||||||
conf.setJobName("foo");
|
conf.setJobName("foo");
|
||||||
|
|
||||||
conf.setInputFormat(TextInputFormat.class);
|
conf.setInputFormat(TextInputFormat.class);
|
||||||
|
@ -113,11 +110,9 @@ public class TestSpecialCharactersInOutputPath extends TestCase {
|
||||||
fileSys = dfs.getFileSystem();
|
fileSys = dfs.getFileSystem();
|
||||||
namenode = fileSys.getUri().toString();
|
namenode = fileSys.getUri().toString();
|
||||||
mr = new MiniMRCluster(taskTrackers, namenode, 2);
|
mr = new MiniMRCluster(taskTrackers, namenode, 2);
|
||||||
final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
|
|
||||||
JobConf jobConf = new JobConf();
|
JobConf jobConf = new JobConf();
|
||||||
boolean result;
|
boolean result;
|
||||||
result = launchJob(fileSys.getUri(), jobTrackerName, jobConf,
|
result = launchJob(fileSys.getUri(), jobConf, 3, 1);
|
||||||
3, 1);
|
|
||||||
assertTrue(result);
|
assertTrue(result);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
|
@ -0,0 +1,787 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.text.DecimalFormat;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Enumeration;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.WritableComparable;
|
||||||
|
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
||||||
|
import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
|
||||||
|
import org.apache.hadoop.mapred.lib.IdentityMapper;
|
||||||
|
import org.apache.hadoop.mapred.lib.IdentityReducer;
|
||||||
|
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
|
||||||
|
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utilities used in unit test.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class UtilsForTests {
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(UtilsForTests.class);
|
||||||
|
|
||||||
|
final static long KB = 1024L * 1;
|
||||||
|
final static long MB = 1024L * KB;
|
||||||
|
final static long GB = 1024L * MB;
|
||||||
|
final static long TB = 1024L * GB;
|
||||||
|
final static long PB = 1024L * TB;
|
||||||
|
final static Object waitLock = new Object();
|
||||||
|
|
||||||
|
static DecimalFormat dfm = new DecimalFormat("####.000");
|
||||||
|
static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###");
|
||||||
|
|
||||||
|
public static String dfmt(double d) {
|
||||||
|
return dfm.format(d);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String ifmt(double d) {
|
||||||
|
return ifm.format(d);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String formatBytes(long numBytes) {
|
||||||
|
StringBuffer buf = new StringBuffer();
|
||||||
|
boolean bDetails = true;
|
||||||
|
double num = numBytes;
|
||||||
|
|
||||||
|
if (numBytes < KB) {
|
||||||
|
buf.append(numBytes + " B");
|
||||||
|
bDetails = false;
|
||||||
|
} else if (numBytes < MB) {
|
||||||
|
buf.append(dfmt(num / KB) + " KB");
|
||||||
|
} else if (numBytes < GB) {
|
||||||
|
buf.append(dfmt(num / MB) + " MB");
|
||||||
|
} else if (numBytes < TB) {
|
||||||
|
buf.append(dfmt(num / GB) + " GB");
|
||||||
|
} else if (numBytes < PB) {
|
||||||
|
buf.append(dfmt(num / TB) + " TB");
|
||||||
|
} else {
|
||||||
|
buf.append(dfmt(num / PB) + " PB");
|
||||||
|
}
|
||||||
|
if (bDetails) {
|
||||||
|
buf.append(" (" + ifmt(numBytes) + " bytes)");
|
||||||
|
}
|
||||||
|
return buf.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String formatBytes2(long numBytes) {
|
||||||
|
StringBuffer buf = new StringBuffer();
|
||||||
|
long u = 0;
|
||||||
|
if (numBytes >= TB) {
|
||||||
|
u = numBytes / TB;
|
||||||
|
numBytes -= u * TB;
|
||||||
|
buf.append(u + " TB ");
|
||||||
|
}
|
||||||
|
if (numBytes >= GB) {
|
||||||
|
u = numBytes / GB;
|
||||||
|
numBytes -= u * GB;
|
||||||
|
buf.append(u + " GB ");
|
||||||
|
}
|
||||||
|
if (numBytes >= MB) {
|
||||||
|
u = numBytes / MB;
|
||||||
|
numBytes -= u * MB;
|
||||||
|
buf.append(u + " MB ");
|
||||||
|
}
|
||||||
|
if (numBytes >= KB) {
|
||||||
|
u = numBytes / KB;
|
||||||
|
numBytes -= u * KB;
|
||||||
|
buf.append(u + " KB ");
|
||||||
|
}
|
||||||
|
buf.append(u + " B"); //even if zero
|
||||||
|
return buf.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
static final String regexpSpecials = "[]()?*+|.!^-\\~@";
|
||||||
|
|
||||||
|
public static String regexpEscape(String plain) {
|
||||||
|
StringBuffer buf = new StringBuffer();
|
||||||
|
char[] ch = plain.toCharArray();
|
||||||
|
int csup = ch.length;
|
||||||
|
for (int c = 0; c < csup; c++) {
|
||||||
|
if (regexpSpecials.indexOf(ch[c]) != -1) {
|
||||||
|
buf.append("\\");
|
||||||
|
}
|
||||||
|
buf.append(ch[c]);
|
||||||
|
}
|
||||||
|
return buf.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String safeGetCanonicalPath(File f) {
|
||||||
|
try {
|
||||||
|
String s = f.getCanonicalPath();
|
||||||
|
return (s == null) ? f.toString() : s;
|
||||||
|
} catch (IOException io) {
|
||||||
|
return f.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String slurp(File f) throws IOException {
|
||||||
|
int len = (int) f.length();
|
||||||
|
byte[] buf = new byte[len];
|
||||||
|
FileInputStream in = new FileInputStream(f);
|
||||||
|
String contents = null;
|
||||||
|
try {
|
||||||
|
in.read(buf, 0, len);
|
||||||
|
contents = new String(buf, "UTF-8");
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
return contents;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String slurpHadoop(Path p, FileSystem fs) throws IOException {
|
||||||
|
int len = (int) fs.getFileStatus(p).getLen();
|
||||||
|
byte[] buf = new byte[len];
|
||||||
|
InputStream in = fs.open(p);
|
||||||
|
String contents = null;
|
||||||
|
try {
|
||||||
|
in.read(buf, 0, len);
|
||||||
|
contents = new String(buf, "UTF-8");
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
return contents;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String rjustify(String s, int width) {
|
||||||
|
if (s == null) s = "null";
|
||||||
|
if (width > s.length()) {
|
||||||
|
s = getSpace(width - s.length()) + s;
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String ljustify(String s, int width) {
|
||||||
|
if (s == null) s = "null";
|
||||||
|
if (width > s.length()) {
|
||||||
|
s = s + getSpace(width - s.length());
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
static char[] space;
|
||||||
|
static {
|
||||||
|
space = new char[300];
|
||||||
|
Arrays.fill(space, '\u0020');
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getSpace(int len) {
|
||||||
|
if (len > space.length) {
|
||||||
|
space = new char[Math.max(len, 2 * space.length)];
|
||||||
|
Arrays.fill(space, '\u0020');
|
||||||
|
}
|
||||||
|
return new String(space, 0, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets job status from the jobtracker given the jobclient and the job id
|
||||||
|
*/
|
||||||
|
static JobStatus getJobStatus(JobClient jc, JobID id) throws IOException {
|
||||||
|
JobStatus[] statuses = jc.getAllJobs();
|
||||||
|
for (JobStatus jobStatus : statuses) {
|
||||||
|
if (jobStatus.getJobID().equals(id)) {
|
||||||
|
return jobStatus;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A utility that waits for specified amount of time
|
||||||
|
*/
|
||||||
|
public static void waitFor(long duration) {
|
||||||
|
try {
|
||||||
|
synchronized (waitLock) {
|
||||||
|
waitLock.wait(duration);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for the jobtracker to be RUNNING.
|
||||||
|
*/
|
||||||
|
static void waitForJobTracker(JobClient jobClient) {
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
ClusterStatus status = jobClient.getClusterStatus();
|
||||||
|
while (status.getJobTrackerStatus() != JobTrackerStatus.RUNNING) {
|
||||||
|
waitFor(100);
|
||||||
|
status = jobClient.getClusterStatus();
|
||||||
|
}
|
||||||
|
break; // means that the jt is ready
|
||||||
|
} catch (IOException ioe) {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits until all the jobs at the jobtracker complete.
|
||||||
|
*/
|
||||||
|
static void waitTillDone(JobClient jobClient) throws IOException {
|
||||||
|
// Wait for the last job to complete
|
||||||
|
while (true) {
|
||||||
|
boolean shouldWait = false;
|
||||||
|
for (JobStatus jobStatuses : jobClient.getAllJobs()) {
|
||||||
|
if (jobStatuses.getRunState() != JobStatus.SUCCEEDED
|
||||||
|
&& jobStatuses.getRunState() != JobStatus.FAILED
|
||||||
|
&& jobStatuses.getRunState() != JobStatus.KILLED) {
|
||||||
|
shouldWait = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (shouldWait) {
|
||||||
|
waitFor(100);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure a waiting job
|
||||||
|
*/
|
||||||
|
static void configureWaitingJobConf(JobConf jobConf, Path inDir,
|
||||||
|
Path outputPath, int numMaps, int numRed,
|
||||||
|
String jobName, String mapSignalFilename,
|
||||||
|
String redSignalFilename)
|
||||||
|
throws IOException {
|
||||||
|
jobConf.setJobName(jobName);
|
||||||
|
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
|
||||||
|
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
|
||||||
|
FileInputFormat.setInputPaths(jobConf, inDir);
|
||||||
|
FileOutputFormat.setOutputPath(jobConf, outputPath);
|
||||||
|
jobConf.setMapperClass(UtilsForTests.HalfWaitingMapper.class);
|
||||||
|
jobConf.setReducerClass(IdentityReducer.class);
|
||||||
|
jobConf.setOutputKeyClass(BytesWritable.class);
|
||||||
|
jobConf.setOutputValueClass(BytesWritable.class);
|
||||||
|
jobConf.setInputFormat(RandomInputFormat.class);
|
||||||
|
jobConf.setNumMapTasks(numMaps);
|
||||||
|
jobConf.setNumReduceTasks(numRed);
|
||||||
|
jobConf.setJar("build/test/mapred/testjar/testjob.jar");
|
||||||
|
jobConf.set(getTaskSignalParameter(true), mapSignalFilename);
|
||||||
|
jobConf.set(getTaskSignalParameter(false), redSignalFilename);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Commonly used map and reduce classes
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map is a Mapper that just waits for a file to be created on the dfs. The
|
||||||
|
* file creation is a signal to the mappers and hence acts as a waiting job.
|
||||||
|
*/
|
||||||
|
|
||||||
|
static class WaitingMapper
|
||||||
|
extends MapReduceBase
|
||||||
|
implements Mapper<WritableComparable, Writable,
|
||||||
|
WritableComparable, Writable> {
|
||||||
|
|
||||||
|
FileSystem fs = null;
|
||||||
|
Path signal;
|
||||||
|
int id = 0;
|
||||||
|
int totalMaps = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if the map task needs to wait. By default all the maps will wait.
|
||||||
|
* This method needs to be overridden to make a custom waiting mapper.
|
||||||
|
*/
|
||||||
|
public boolean shouldWait(int id) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a signal file on which the map task should wait. By default all
|
||||||
|
* the maps wait on a single file passed as test.mapred.map.waiting.target.
|
||||||
|
* This method needs to be overridden to make a custom waiting mapper
|
||||||
|
*/
|
||||||
|
public Path getSignalFile(int id) {
|
||||||
|
return signal;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** The waiting function. The map exits once it gets a signal. Here the
|
||||||
|
* signal is the file existence.
|
||||||
|
*/
|
||||||
|
public void map(WritableComparable key, Writable val,
|
||||||
|
OutputCollector<WritableComparable, Writable> output,
|
||||||
|
Reporter reporter)
|
||||||
|
throws IOException {
|
||||||
|
if (shouldWait(id)) {
|
||||||
|
if (fs != null) {
|
||||||
|
while (!fs.exists(getSignalFile(id))) {
|
||||||
|
try {
|
||||||
|
reporter.progress();
|
||||||
|
synchronized (this) {
|
||||||
|
this.wait(1000); // wait for 1 sec
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
System.out.println("Interrupted while the map was waiting for "
|
||||||
|
+ " the signal.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new IOException("Could not get the DFS!!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void configure(JobConf conf) {
|
||||||
|
try {
|
||||||
|
String taskId = conf.get(JobContext.TASK_ATTEMPT_ID);
|
||||||
|
id = Integer.parseInt(taskId.split("_")[4]);
|
||||||
|
totalMaps = Integer.parseInt(conf.get(JobContext.NUM_MAPS));
|
||||||
|
fs = FileSystem.get(conf);
|
||||||
|
signal = new Path(conf.get(getTaskSignalParameter(true)));
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
System.out.println("Got an exception while obtaining the filesystem");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Only the later half of the maps wait for the signal while the rest
|
||||||
|
* complete immediately.
|
||||||
|
*/
|
||||||
|
static class HalfWaitingMapper extends WaitingMapper {
|
||||||
|
@Override
|
||||||
|
public boolean shouldWait(int id) {
|
||||||
|
return id >= (totalMaps / 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reduce that just waits for a file to be created on the dfs. The
|
||||||
|
* file creation is a signal to the reduce.
|
||||||
|
*/
|
||||||
|
|
||||||
|
static class WaitingReducer extends MapReduceBase
|
||||||
|
implements Reducer<WritableComparable, Writable,
|
||||||
|
WritableComparable, Writable> {
|
||||||
|
|
||||||
|
FileSystem fs = null;
|
||||||
|
Path signal;
|
||||||
|
|
||||||
|
/** The waiting function. The reduce exits once it gets a signal. Here the
|
||||||
|
* signal is the file existence.
|
||||||
|
*/
|
||||||
|
public void reduce(WritableComparable key, Iterator<Writable> val,
|
||||||
|
OutputCollector<WritableComparable, Writable> output,
|
||||||
|
Reporter reporter)
|
||||||
|
throws IOException {
|
||||||
|
if (fs != null) {
|
||||||
|
while (!fs.exists(signal)) {
|
||||||
|
try {
|
||||||
|
reporter.progress();
|
||||||
|
synchronized (this) {
|
||||||
|
this.wait(1000); // wait for 1 sec
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
System.out.println("Interrupted while the map was waiting for the"
|
||||||
|
+ " signal.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new IOException("Could not get the DFS!!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void configure(JobConf conf) {
|
||||||
|
try {
|
||||||
|
fs = FileSystem.get(conf);
|
||||||
|
signal = new Path(conf.get(getTaskSignalParameter(false)));
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
System.out.println("Got an exception while obtaining the filesystem");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static String getTaskSignalParameter(boolean isMap) {
|
||||||
|
return isMap
|
||||||
|
? "test.mapred.map.waiting.target"
|
||||||
|
: "test.mapred.reduce.waiting.target";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signal the maps/reduces to start.
|
||||||
|
*/
|
||||||
|
static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys,
|
||||||
|
String mapSignalFile,
|
||||||
|
String reduceSignalFile, int replication)
|
||||||
|
throws IOException {
|
||||||
|
writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(mapSignalFile),
|
||||||
|
(short)replication);
|
||||||
|
writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(reduceSignalFile),
|
||||||
|
(short)replication);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signal the maps/reduces to start.
|
||||||
|
*/
|
||||||
|
static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys,
|
||||||
|
boolean isMap, String mapSignalFile,
|
||||||
|
String reduceSignalFile)
|
||||||
|
throws IOException {
|
||||||
|
// signal the maps to complete
|
||||||
|
writeFile(dfs.getNameNode(), fileSys.getConf(),
|
||||||
|
isMap
|
||||||
|
? new Path(mapSignalFile)
|
||||||
|
: new Path(reduceSignalFile), (short)1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static String getSignalFile(Path dir) {
|
||||||
|
return (new Path(dir, "signal")).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
static String getMapSignalFile(Path dir) {
|
||||||
|
return (new Path(dir, "map-signal")).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
static String getReduceSignalFile(Path dir) {
|
||||||
|
return (new Path(dir, "reduce-signal")).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void writeFile(NameNode namenode, Configuration conf, Path name,
|
||||||
|
short replication) throws IOException {
|
||||||
|
FileSystem fileSys = FileSystem.get(conf);
|
||||||
|
SequenceFile.Writer writer =
|
||||||
|
SequenceFile.createWriter(fileSys, conf, name,
|
||||||
|
BytesWritable.class, BytesWritable.class,
|
||||||
|
CompressionType.NONE);
|
||||||
|
writer.append(new BytesWritable(), new BytesWritable());
|
||||||
|
writer.close();
|
||||||
|
fileSys.setReplication(name, replication);
|
||||||
|
DFSTestUtil.waitReplication(fileSys, name, replication);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Input formats
|
||||||
|
/**
|
||||||
|
* A custom input format that creates virtual inputs of a single string
|
||||||
|
* for each map.
|
||||||
|
*/
|
||||||
|
public static class RandomInputFormat implements InputFormat<Text, Text> {
|
||||||
|
|
||||||
|
public InputSplit[] getSplits(JobConf job,
|
||||||
|
int numSplits) throws IOException {
|
||||||
|
InputSplit[] result = new InputSplit[numSplits];
|
||||||
|
Path outDir = FileOutputFormat.getOutputPath(job);
|
||||||
|
for(int i=0; i < result.length; ++i) {
|
||||||
|
result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i),
|
||||||
|
0, 1, (String[])null);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
static class RandomRecordReader implements RecordReader<Text, Text> {
|
||||||
|
Path name;
|
||||||
|
public RandomRecordReader(Path p) {
|
||||||
|
name = p;
|
||||||
|
}
|
||||||
|
public boolean next(Text key, Text value) {
|
||||||
|
if (name != null) {
|
||||||
|
key.set(name.getName());
|
||||||
|
name = null;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
public Text createKey() {
|
||||||
|
return new Text();
|
||||||
|
}
|
||||||
|
public Text createValue() {
|
||||||
|
return new Text();
|
||||||
|
}
|
||||||
|
public long getPos() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
public void close() {}
|
||||||
|
public float getProgress() {
|
||||||
|
return 0.0f;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public RecordReader<Text, Text> getRecordReader(InputSplit split,
|
||||||
|
JobConf job,
|
||||||
|
Reporter reporter)
|
||||||
|
throws IOException {
|
||||||
|
return new RandomRecordReader(((FileSplit) split).getPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a job and return its RunningJob object
|
||||||
|
static RunningJob runJob(JobConf conf, Path inDir, Path outDir)
|
||||||
|
throws IOException {
|
||||||
|
return runJob(conf, inDir, outDir, conf.getNumMapTasks(), conf.getNumReduceTasks());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a job and return its RunningJob object
|
||||||
|
static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
|
||||||
|
int numReds) throws IOException {
|
||||||
|
|
||||||
|
String input = "The quick brown fox\n" + "has many silly\n"
|
||||||
|
+ "red fox sox\n";
|
||||||
|
|
||||||
|
// submit the job and wait for it to complete
|
||||||
|
return runJob(conf, inDir, outDir, numMaps, numReds, input);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a job with the specified input and return its RunningJob object
|
||||||
|
static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
|
||||||
|
int numReds, String input) throws IOException {
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
if (fs.exists(outDir)) {
|
||||||
|
fs.delete(outDir, true);
|
||||||
|
}
|
||||||
|
if (!fs.exists(inDir)) {
|
||||||
|
fs.mkdirs(inDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < numMaps; ++i) {
|
||||||
|
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
|
||||||
|
file.writeBytes(input);
|
||||||
|
file.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
conf.setInputFormat(TextInputFormat.class);
|
||||||
|
conf.setOutputKeyClass(LongWritable.class);
|
||||||
|
conf.setOutputValueClass(Text.class);
|
||||||
|
|
||||||
|
FileInputFormat.setInputPaths(conf, inDir);
|
||||||
|
FileOutputFormat.setOutputPath(conf, outDir);
|
||||||
|
conf.setNumMapTasks(numMaps);
|
||||||
|
conf.setNumReduceTasks(numReds);
|
||||||
|
|
||||||
|
JobClient jobClient = new JobClient(conf);
|
||||||
|
RunningJob job = jobClient.submitJob(conf);
|
||||||
|
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run a job that will be succeeded and wait until it completes
|
||||||
|
public static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
|
||||||
|
throws IOException {
|
||||||
|
conf.setJobName("test-job-succeed");
|
||||||
|
conf.setMapperClass(IdentityMapper.class);
|
||||||
|
conf.setReducerClass(IdentityReducer.class);
|
||||||
|
|
||||||
|
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
|
||||||
|
while (!job.isComplete()) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run a job that will be failed and wait until it completes
|
||||||
|
public static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
|
||||||
|
throws IOException {
|
||||||
|
conf.setJobName("test-job-fail");
|
||||||
|
conf.setMapperClass(FailMapper.class);
|
||||||
|
conf.setReducerClass(IdentityReducer.class);
|
||||||
|
conf.setMaxMapAttempts(1);
|
||||||
|
|
||||||
|
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
|
||||||
|
while (!job.isComplete()) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run a job that will be killed and wait until it completes
|
||||||
|
public static RunningJob runJobKill(JobConf conf, Path inDir, Path outDir)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
conf.setJobName("test-job-kill");
|
||||||
|
conf.setMapperClass(KillMapper.class);
|
||||||
|
conf.setReducerClass(IdentityReducer.class);
|
||||||
|
|
||||||
|
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
|
||||||
|
while (job.getJobState() != JobStatus.RUNNING) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
job.killJob();
|
||||||
|
while (job.cleanupProgress() == 0.0f) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(10);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleans up files/dirs inline. CleanupQueue deletes in a separate thread
|
||||||
|
* asynchronously.
|
||||||
|
*/
|
||||||
|
public static class InlineCleanupQueue extends CleanupQueue {
|
||||||
|
List<String> stalePaths = new ArrayList<String>();
|
||||||
|
|
||||||
|
public InlineCleanupQueue() {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addToQueue(PathDeletionContext... contexts) {
|
||||||
|
// delete paths in-line
|
||||||
|
for (PathDeletionContext context : contexts) {
|
||||||
|
try {
|
||||||
|
if (!deletePath(context)) {
|
||||||
|
LOG.warn("Stale path " + context.fullPath);
|
||||||
|
stalePaths.add(context.fullPath);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Caught exception while deleting path "
|
||||||
|
+ context.fullPath);
|
||||||
|
LOG.info(StringUtils.stringifyException(e));
|
||||||
|
stalePaths.add(context.fullPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class FakeClock extends Clock {
|
||||||
|
long time = 0;
|
||||||
|
|
||||||
|
public void advance(long millis) {
|
||||||
|
time += millis;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
long getTime() {
|
||||||
|
return time;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Mapper that fails
|
||||||
|
static class FailMapper extends MapReduceBase implements
|
||||||
|
Mapper<WritableComparable, Writable, WritableComparable, Writable> {
|
||||||
|
|
||||||
|
public void map(WritableComparable key, Writable value,
|
||||||
|
OutputCollector<WritableComparable, Writable> out, Reporter reporter)
|
||||||
|
throws IOException {
|
||||||
|
//NOTE- the next line is required for the TestDebugScript test to succeed
|
||||||
|
System.err.println("failing map");
|
||||||
|
throw new RuntimeException("failing map");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mapper that sleeps for a long time.
|
||||||
|
// Used for running a job that will be killed
|
||||||
|
static class KillMapper extends MapReduceBase implements
|
||||||
|
Mapper<WritableComparable, Writable, WritableComparable, Writable> {
|
||||||
|
|
||||||
|
public void map(WritableComparable key, Writable value,
|
||||||
|
OutputCollector<WritableComparable, Writable> out, Reporter reporter)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void setUpConfigFile(Properties confProps, File configFile)
|
||||||
|
throws IOException {
|
||||||
|
Configuration config = new Configuration(false);
|
||||||
|
FileOutputStream fos = new FileOutputStream(configFile);
|
||||||
|
|
||||||
|
for (Enumeration<?> e = confProps.propertyNames(); e.hasMoreElements();) {
|
||||||
|
String key = (String) e.nextElement();
|
||||||
|
config.set(key, confProps.getProperty(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
config.writeXml(fos);
|
||||||
|
fos.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This creates a file in the dfs
|
||||||
|
* @param dfs FileSystem Local File System where file needs to be picked
|
||||||
|
* @param URIPATH Path dfs path where file needs to be copied
|
||||||
|
* @param permission FsPermission File permission
|
||||||
|
* @return returns the DataOutputStream
|
||||||
|
*/
|
||||||
|
public static DataOutputStream
|
||||||
|
createTmpFileDFS(FileSystem dfs, Path URIPATH,
|
||||||
|
FsPermission permission, String input) throws Exception {
|
||||||
|
//Creating the path with the file
|
||||||
|
DataOutputStream file =
|
||||||
|
FileSystem.create(dfs, URIPATH, permission);
|
||||||
|
file.writeBytes(input);
|
||||||
|
file.close();
|
||||||
|
return file;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This formats the long tasktracker name to just the FQDN
|
||||||
|
* @param taskTrackerLong String The long format of the tasktracker string
|
||||||
|
* @return String The FQDN of the tasktracker
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public static String getFQDNofTT (String taskTrackerLong) throws Exception {
|
||||||
|
//Getting the exact FQDN of the tasktracker from the tasktracker string.
|
||||||
|
String[] firstSplit = taskTrackerLong.split("_");
|
||||||
|
String tmpOutput = firstSplit[1];
|
||||||
|
String[] secondSplit = tmpOutput.split(":");
|
||||||
|
String tmpTaskTracker = secondSplit[0];
|
||||||
|
return tmpTaskTracker;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,159 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.StringTokenizer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.IntWritable;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapred.FileInputFormat;
|
||||||
|
import org.apache.hadoop.mapred.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.mapred.JobClient;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.MapReduceBase;
|
||||||
|
import org.apache.hadoop.mapred.Mapper;
|
||||||
|
import org.apache.hadoop.mapred.OutputCollector;
|
||||||
|
import org.apache.hadoop.mapred.Reducer;
|
||||||
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
|
import org.apache.hadoop.util.Tool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is an example Hadoop Map/Reduce application.
|
||||||
|
* It reads the text input files, breaks each line into words
|
||||||
|
* and counts them. The output is a locally sorted list of words and the
|
||||||
|
* count of how often they occurred.
|
||||||
|
*
|
||||||
|
* To run: bin/hadoop jar build/hadoop-examples.jar wordcount
|
||||||
|
* [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i>
|
||||||
|
*/
|
||||||
|
public class WordCount extends Configured implements Tool {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Counts the words in each line.
|
||||||
|
* For each line of input, break the line into words and emit them as
|
||||||
|
* (<b>word</b>, <b>1</b>).
|
||||||
|
*/
|
||||||
|
public static class MapClass extends MapReduceBase
|
||||||
|
implements Mapper<LongWritable, Text, Text, IntWritable> {
|
||||||
|
|
||||||
|
private final static IntWritable one = new IntWritable(1);
|
||||||
|
private Text word = new Text();
|
||||||
|
|
||||||
|
public void map(LongWritable key, Text value,
|
||||||
|
OutputCollector<Text, IntWritable> output,
|
||||||
|
Reporter reporter) throws IOException {
|
||||||
|
String line = value.toString();
|
||||||
|
StringTokenizer itr = new StringTokenizer(line);
|
||||||
|
while (itr.hasMoreTokens()) {
|
||||||
|
word.set(itr.nextToken());
|
||||||
|
output.collect(word, one);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A reducer class that just emits the sum of the input values.
|
||||||
|
*/
|
||||||
|
public static class Reduce extends MapReduceBase
|
||||||
|
implements Reducer<Text, IntWritable, Text, IntWritable> {
|
||||||
|
|
||||||
|
public void reduce(Text key, Iterator<IntWritable> values,
|
||||||
|
OutputCollector<Text, IntWritable> output,
|
||||||
|
Reporter reporter) throws IOException {
|
||||||
|
int sum = 0;
|
||||||
|
while (values.hasNext()) {
|
||||||
|
sum += values.next().get();
|
||||||
|
}
|
||||||
|
output.collect(key, new IntWritable(sum));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int printUsage() {
|
||||||
|
System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
|
||||||
|
ToolRunner.printGenericCommandUsage(System.out);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The main driver for word count map/reduce program.
|
||||||
|
* Invoke this method to submit the map/reduce job.
|
||||||
|
* @throws IOException When there is communication problems with the
|
||||||
|
* job tracker.
|
||||||
|
*/
|
||||||
|
public int run(String[] args) throws Exception {
|
||||||
|
JobConf conf = new JobConf(getConf(), WordCount.class);
|
||||||
|
conf.setJobName("wordcount");
|
||||||
|
|
||||||
|
// the keys are words (strings)
|
||||||
|
conf.setOutputKeyClass(Text.class);
|
||||||
|
// the values are counts (ints)
|
||||||
|
conf.setOutputValueClass(IntWritable.class);
|
||||||
|
|
||||||
|
conf.setMapperClass(MapClass.class);
|
||||||
|
conf.setCombinerClass(Reduce.class);
|
||||||
|
conf.setReducerClass(Reduce.class);
|
||||||
|
|
||||||
|
List<String> other_args = new ArrayList<String>();
|
||||||
|
for(int i=0; i < args.length; ++i) {
|
||||||
|
try {
|
||||||
|
if ("-m".equals(args[i])) {
|
||||||
|
conf.setNumMapTasks(Integer.parseInt(args[++i]));
|
||||||
|
} else if ("-r".equals(args[i])) {
|
||||||
|
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
|
||||||
|
} else {
|
||||||
|
other_args.add(args[i]);
|
||||||
|
}
|
||||||
|
} catch (NumberFormatException except) {
|
||||||
|
System.out.println("ERROR: Integer expected instead of " + args[i]);
|
||||||
|
return printUsage();
|
||||||
|
} catch (ArrayIndexOutOfBoundsException except) {
|
||||||
|
System.out.println("ERROR: Required parameter missing from " +
|
||||||
|
args[i-1]);
|
||||||
|
return printUsage();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Make sure there are exactly 2 parameters left.
|
||||||
|
if (other_args.size() != 2) {
|
||||||
|
System.out.println("ERROR: Wrong number of parameters: " +
|
||||||
|
other_args.size() + " instead of 2.");
|
||||||
|
return printUsage();
|
||||||
|
}
|
||||||
|
FileInputFormat.setInputPaths(conf, other_args.get(0));
|
||||||
|
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
|
||||||
|
|
||||||
|
JobClient.runJob(conf);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
int res = ToolRunner.run(new Configuration(), new WordCount(), args);
|
||||||
|
System.exit(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,154 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred.jobcontrol;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.text.NumberFormat;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapred.FileInputFormat;
|
||||||
|
import org.apache.hadoop.mapred.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.MapReduceBase;
|
||||||
|
import org.apache.hadoop.mapred.Mapper;
|
||||||
|
import org.apache.hadoop.mapred.OutputCollector;
|
||||||
|
import org.apache.hadoop.mapred.Reducer;
|
||||||
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility methods used in various Job Control unit tests.
|
||||||
|
*/
|
||||||
|
public class JobControlTestUtils {
|
||||||
|
|
||||||
|
static private Random rand = new Random();
|
||||||
|
|
||||||
|
private static NumberFormat idFormat = NumberFormat.getInstance();
|
||||||
|
|
||||||
|
static {
|
||||||
|
idFormat.setMinimumIntegerDigits(4);
|
||||||
|
idFormat.setGroupingUsed(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleans the data from the passed Path in the passed FileSystem.
|
||||||
|
*
|
||||||
|
* @param fs FileSystem to delete data from.
|
||||||
|
* @param dirPath Path to be deleted.
|
||||||
|
* @throws IOException If an error occurs cleaning the data.
|
||||||
|
*/
|
||||||
|
static void cleanData(FileSystem fs, Path dirPath) throws IOException {
|
||||||
|
fs.delete(dirPath, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a string of random digits.
|
||||||
|
*
|
||||||
|
* @return A random string.
|
||||||
|
*/
|
||||||
|
private static String generateRandomWord() {
|
||||||
|
return idFormat.format(rand.nextLong());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a line of random text.
|
||||||
|
*
|
||||||
|
* @return A line of random text.
|
||||||
|
*/
|
||||||
|
private static String generateRandomLine() {
|
||||||
|
long r = rand.nextLong() % 7;
|
||||||
|
long n = r + 20;
|
||||||
|
StringBuffer sb = new StringBuffer();
|
||||||
|
for (int i = 0; i < n; i++) {
|
||||||
|
sb.append(generateRandomWord()).append(" ");
|
||||||
|
}
|
||||||
|
sb.append("\n");
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates data that can be used for Job Control tests.
|
||||||
|
*
|
||||||
|
* @param fs FileSystem to create data in.
|
||||||
|
* @param dirPath Path to create the data in.
|
||||||
|
* @throws IOException If an error occurs creating the data.
|
||||||
|
*/
|
||||||
|
static void generateData(FileSystem fs, Path dirPath) throws IOException {
|
||||||
|
FSDataOutputStream out = fs.create(new Path(dirPath, "data.txt"));
|
||||||
|
for (int i = 0; i < 10000; i++) {
|
||||||
|
String line = generateRandomLine();
|
||||||
|
out.write(line.getBytes("UTF-8"));
|
||||||
|
}
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a simple copy job.
|
||||||
|
*
|
||||||
|
* @param indirs List of input directories.
|
||||||
|
* @param outdir Output directory.
|
||||||
|
* @return JobConf initialised for a simple copy job.
|
||||||
|
* @throws Exception If an error occurs creating job configuration.
|
||||||
|
*/
|
||||||
|
static JobConf createCopyJob(List<Path> indirs, Path outdir) throws Exception {
|
||||||
|
|
||||||
|
Configuration defaults = new Configuration();
|
||||||
|
JobConf theJob = new JobConf(defaults, TestJobControl.class);
|
||||||
|
theJob.setJobName("DataMoveJob");
|
||||||
|
|
||||||
|
FileInputFormat.setInputPaths(theJob, indirs.toArray(new Path[0]));
|
||||||
|
theJob.setMapperClass(DataCopy.class);
|
||||||
|
FileOutputFormat.setOutputPath(theJob, outdir);
|
||||||
|
theJob.setOutputKeyClass(Text.class);
|
||||||
|
theJob.setOutputValueClass(Text.class);
|
||||||
|
theJob.setReducerClass(DataCopy.class);
|
||||||
|
theJob.setNumMapTasks(12);
|
||||||
|
theJob.setNumReduceTasks(4);
|
||||||
|
return theJob;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple Mapper and Reducer implementation which copies data it reads in.
|
||||||
|
*/
|
||||||
|
public static class DataCopy extends MapReduceBase implements
|
||||||
|
Mapper<LongWritable, Text, Text, Text>, Reducer<Text, Text, Text, Text> {
|
||||||
|
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
|
||||||
|
Reporter reporter) throws IOException {
|
||||||
|
output.collect(new Text(key.toString()), value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void reduce(Text key, Iterator<Text> values,
|
||||||
|
OutputCollector<Text, Text> output, Reporter reporter)
|
||||||
|
throws IOException {
|
||||||
|
Text dumbKey = new Text("");
|
||||||
|
while (values.hasNext()) {
|
||||||
|
Text data = values.next();
|
||||||
|
output.collect(dumbKey, data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue