MAPREDUCE-4146. Support limits on task status string length and number of block locations in branch-2. Contributed by Ahmed Radwan.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1343755 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e576bac2f7
commit
45be2c085d
|
@ -126,6 +126,9 @@ Release 2.0.1-alpha - UNRELEASED
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
MAPREDUCE-4146. Support limits on task status string length and number of
|
||||||
|
block locations in branch-2. (Ahmed Radwan via tomwhite)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -53,7 +53,6 @@ import org.apache.hadoop.io.WritableUtils;
|
||||||
import org.apache.hadoop.io.serializer.Deserializer;
|
import org.apache.hadoop.io.serializer.Deserializer;
|
||||||
import org.apache.hadoop.io.serializer.SerializationFactory;
|
import org.apache.hadoop.io.serializer.SerializationFactory;
|
||||||
import org.apache.hadoop.mapred.IFile.Writer;
|
import org.apache.hadoop.mapred.IFile.Writer;
|
||||||
import org.apache.hadoop.mapreduce.Counter;
|
|
||||||
import org.apache.hadoop.mapreduce.FileSystemCounter;
|
import org.apache.hadoop.mapreduce.FileSystemCounter;
|
||||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||||
import org.apache.hadoop.mapreduce.TaskCounter;
|
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||||
|
@ -570,6 +569,20 @@ abstract public class Task implements Writable, Configurable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String normalizeStatus(String status, Configuration conf) {
|
||||||
|
// Check to see if the status string is too long
|
||||||
|
// and truncate it if needed.
|
||||||
|
int progressStatusLength = conf.getInt(
|
||||||
|
MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY,
|
||||||
|
MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT);
|
||||||
|
if (status.length() > progressStatusLength) {
|
||||||
|
LOG.warn("Task status: \"" + status + "\" truncated to max limit ("
|
||||||
|
+ progressStatusLength + " characters)");
|
||||||
|
status = status.substring(0, progressStatusLength);
|
||||||
|
}
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
protected class TaskReporter
|
protected class TaskReporter
|
||||||
|
@ -603,7 +616,7 @@ abstract public class Task implements Writable, Configurable {
|
||||||
return progressFlag.getAndSet(false);
|
return progressFlag.getAndSet(false);
|
||||||
}
|
}
|
||||||
public void setStatus(String status) {
|
public void setStatus(String status) {
|
||||||
taskProgress.setStatus(status);
|
taskProgress.setStatus(normalizeStatus(status, conf));
|
||||||
// indicate that progress update needs to be sent
|
// indicate that progress update needs to be sent
|
||||||
setProgressFlag();
|
setProgressFlag();
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,4 +71,12 @@ public interface MRConfig {
|
||||||
|
|
||||||
public static final String TASK_LOCAL_OUTPUT_CLASS =
|
public static final String TASK_LOCAL_OUTPUT_CLASS =
|
||||||
"mapreduce.task.local.output.class";
|
"mapreduce.task.local.output.class";
|
||||||
|
|
||||||
|
public static final String PROGRESS_STATUS_LEN_LIMIT_KEY =
|
||||||
|
"mapreduce.task.max.status.length";
|
||||||
|
public static final int PROGRESS_STATUS_LEN_LIMIT_DEFAULT = 512;
|
||||||
|
|
||||||
|
public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10;
|
||||||
|
public static final String MAX_BLOCK_LOCATIONS_KEY =
|
||||||
|
"mapreduce.job.max.split.locations";
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.io.serializer.Serializer;
|
||||||
import org.apache.hadoop.mapreduce.InputSplit;
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
|
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
|
||||||
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
|
import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
@ -48,6 +49,7 @@ public class JobSplitWriter {
|
||||||
|
|
||||||
private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
|
private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
|
||||||
private static final byte[] SPLIT_FILE_HEADER;
|
private static final byte[] SPLIT_FILE_HEADER;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
try {
|
try {
|
||||||
SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
|
SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
|
||||||
|
@ -82,7 +84,7 @@ public class JobSplitWriter {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
FSDataOutputStream out = createFile(fs,
|
FSDataOutputStream out = createFile(fs,
|
||||||
JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
|
JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
|
||||||
SplitMetaInfo[] info = writeOldSplits(splits, out);
|
SplitMetaInfo[] info = writeOldSplits(splits, out, conf);
|
||||||
out.close();
|
out.close();
|
||||||
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
|
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
|
||||||
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
|
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
|
||||||
|
@ -114,6 +116,8 @@ public class JobSplitWriter {
|
||||||
if (array.length != 0) {
|
if (array.length != 0) {
|
||||||
SerializationFactory factory = new SerializationFactory(conf);
|
SerializationFactory factory = new SerializationFactory(conf);
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
|
||||||
|
MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
|
||||||
long offset = out.getPos();
|
long offset = out.getPos();
|
||||||
for(T split: array) {
|
for(T split: array) {
|
||||||
long prevCount = out.getPos();
|
long prevCount = out.getPos();
|
||||||
|
@ -123,9 +127,15 @@ public class JobSplitWriter {
|
||||||
serializer.open(out);
|
serializer.open(out);
|
||||||
serializer.serialize(split);
|
serializer.serialize(split);
|
||||||
long currCount = out.getPos();
|
long currCount = out.getPos();
|
||||||
|
String[] locations = split.getLocations();
|
||||||
|
if (locations.length > maxBlockLocations) {
|
||||||
|
throw new IOException("Max block location exceeded for split: "
|
||||||
|
+ split + " splitsize: " + locations.length +
|
||||||
|
" maxsize: " + maxBlockLocations);
|
||||||
|
}
|
||||||
info[i++] =
|
info[i++] =
|
||||||
new JobSplit.SplitMetaInfo(
|
new JobSplit.SplitMetaInfo(
|
||||||
split.getLocations(), offset,
|
locations, offset,
|
||||||
split.getLength());
|
split.getLength());
|
||||||
offset += currCount - prevCount;
|
offset += currCount - prevCount;
|
||||||
}
|
}
|
||||||
|
@ -135,18 +145,26 @@ public class JobSplitWriter {
|
||||||
|
|
||||||
private static SplitMetaInfo[] writeOldSplits(
|
private static SplitMetaInfo[] writeOldSplits(
|
||||||
org.apache.hadoop.mapred.InputSplit[] splits,
|
org.apache.hadoop.mapred.InputSplit[] splits,
|
||||||
FSDataOutputStream out) throws IOException {
|
FSDataOutputStream out, Configuration conf) throws IOException {
|
||||||
SplitMetaInfo[] info = new SplitMetaInfo[splits.length];
|
SplitMetaInfo[] info = new SplitMetaInfo[splits.length];
|
||||||
if (splits.length != 0) {
|
if (splits.length != 0) {
|
||||||
int i = 0;
|
int i = 0;
|
||||||
long offset = out.getPos();
|
long offset = out.getPos();
|
||||||
|
int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
|
||||||
|
MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
|
||||||
for(org.apache.hadoop.mapred.InputSplit split: splits) {
|
for(org.apache.hadoop.mapred.InputSplit split: splits) {
|
||||||
long prevLen = out.getPos();
|
long prevLen = out.getPos();
|
||||||
Text.writeString(out, split.getClass().getName());
|
Text.writeString(out, split.getClass().getName());
|
||||||
split.write(out);
|
split.write(out);
|
||||||
long currLen = out.getPos();
|
long currLen = out.getPos();
|
||||||
|
String[] locations = split.getLocations();
|
||||||
|
if (locations.length > maxBlockLocations) {
|
||||||
|
throw new IOException("Max block location exceeded for split: "
|
||||||
|
+ split + " splitsize: " + locations.length +
|
||||||
|
" maxsize: " + maxBlockLocations);
|
||||||
|
}
|
||||||
info[i++] = new JobSplit.SplitMetaInfo(
|
info[i++] = new JobSplit.SplitMetaInfo(
|
||||||
split.getLocations(), offset,
|
locations, offset,
|
||||||
split.getLength());
|
split.getLength());
|
||||||
offset += currLen - prevLen;
|
offset += currLen - prevLen;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.task;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.mapred.Task;
|
||||||
import org.apache.hadoop.mapreduce.Counter;
|
import org.apache.hadoop.mapreduce.Counter;
|
||||||
import org.apache.hadoop.mapreduce.Counters;
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
import org.apache.hadoop.mapreduce.StatusReporter;
|
import org.apache.hadoop.mapreduce.StatusReporter;
|
||||||
|
@ -92,8 +93,9 @@ public class TaskAttemptContextImpl extends JobContextImpl
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void setStatus(String status) {
|
public void setStatus(String status) {
|
||||||
setStatusString(status);
|
String normalizedStatus = Task.normalizeStatus(status, conf);
|
||||||
reporter.setStatus(status);
|
setStatusString(normalizedStatus);
|
||||||
|
reporter.setStatus(normalizedStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class DummyReporter extends StatusReporter {
|
public static class DummyReporter extends StatusReporter {
|
||||||
|
|
|
@ -0,0 +1,176 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.IntWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.io.WritableComparable;
|
||||||
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A JUnit test to test limits on block locations
|
||||||
|
*/
|
||||||
|
public class TestBlockLimits extends TestCase {
|
||||||
|
private static String TEST_ROOT_DIR = new File(System.getProperty(
|
||||||
|
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
|
||||||
|
|
||||||
|
public void testWithLimits() throws IOException, InterruptedException,
|
||||||
|
ClassNotFoundException {
|
||||||
|
MiniMRClientCluster mr = null;
|
||||||
|
try {
|
||||||
|
mr = MiniMRClientClusterFactory.create(this.getClass(), 2,
|
||||||
|
new Configuration());
|
||||||
|
runCustomFormat(mr);
|
||||||
|
} finally {
|
||||||
|
if (mr != null) {
|
||||||
|
mr.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runCustomFormat(MiniMRClientCluster mr) throws IOException {
|
||||||
|
JobConf job = new JobConf(mr.getConfig());
|
||||||
|
FileSystem fileSys = FileSystem.get(job);
|
||||||
|
Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
|
||||||
|
Path outDir = new Path(testDir, "out");
|
||||||
|
System.out.println("testDir= " + testDir);
|
||||||
|
fileSys.delete(testDir, true);
|
||||||
|
job.setInputFormat(MyInputFormat.class);
|
||||||
|
job.setOutputFormat(MyOutputFormat.class);
|
||||||
|
job.setOutputKeyClass(Text.class);
|
||||||
|
job.setOutputValueClass(IntWritable.class);
|
||||||
|
|
||||||
|
job.setMapperClass(MyMapper.class);
|
||||||
|
job.setReducerClass(MyReducer.class);
|
||||||
|
job.setNumMapTasks(100);
|
||||||
|
job.setNumReduceTasks(1);
|
||||||
|
job.set("non.std.out", outDir.toString());
|
||||||
|
try {
|
||||||
|
JobClient.runJob(job);
|
||||||
|
assertTrue(false);
|
||||||
|
} catch (IOException ie) {
|
||||||
|
System.out.println("Failed job " + StringUtils.stringifyException(ie));
|
||||||
|
} finally {
|
||||||
|
fileSys.delete(testDir, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static class MyMapper extends MapReduceBase implements
|
||||||
|
Mapper<WritableComparable, Writable, WritableComparable, Writable> {
|
||||||
|
|
||||||
|
public void map(WritableComparable key, Writable value,
|
||||||
|
OutputCollector<WritableComparable, Writable> out, Reporter reporter)
|
||||||
|
throws IOException {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class MyReducer extends MapReduceBase implements
|
||||||
|
Reducer<WritableComparable, Writable, WritableComparable, Writable> {
|
||||||
|
public void reduce(WritableComparable key, Iterator<Writable> values,
|
||||||
|
OutputCollector<WritableComparable, Writable> output, Reporter reporter)
|
||||||
|
throws IOException {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class MyInputFormat implements InputFormat<IntWritable, Text> {
|
||||||
|
|
||||||
|
private static class MySplit implements InputSplit {
|
||||||
|
int first;
|
||||||
|
int length;
|
||||||
|
|
||||||
|
public MySplit() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public MySplit(int first, int length) {
|
||||||
|
this.first = first;
|
||||||
|
this.length = length;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String[] getLocations() {
|
||||||
|
return new String[200];
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLength() {
|
||||||
|
return length;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
WritableUtils.writeVInt(out, first);
|
||||||
|
WritableUtils.writeVInt(out, length);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
first = WritableUtils.readVInt(in);
|
||||||
|
length = WritableUtils.readVInt(in);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public InputSplit[] getSplits(JobConf job, int numSplits)
|
||||||
|
throws IOException {
|
||||||
|
return new MySplit[] { new MySplit(0, 1), new MySplit(1, 3),
|
||||||
|
new MySplit(4, 2) };
|
||||||
|
}
|
||||||
|
|
||||||
|
public RecordReader<IntWritable, Text> getRecordReader(InputSplit split,
|
||||||
|
JobConf job, Reporter reporter) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static class MyOutputFormat implements OutputFormat {
|
||||||
|
static class MyRecordWriter implements RecordWriter<Object, Object> {
|
||||||
|
|
||||||
|
public MyRecordWriter(Path outputFile, JobConf job) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void write(Object key, Object value) throws IOException {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close(Reporter reporter) throws IOException {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
|
||||||
|
String name, Progressable progress) throws IOException {
|
||||||
|
return new MyRecordWriter(new Path(job.get("non.std.out")), job);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void checkOutputSpecs(FileSystem ignored, JobConf job)
|
||||||
|
throws IOException {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.DataOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
@ -25,10 +26,15 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -99,6 +105,27 @@ public class TestReporter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class StatusLimitMapper extends
|
||||||
|
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void map(LongWritable key, Text value, Context context)
|
||||||
|
throws IOException {
|
||||||
|
StringBuilder sb = new StringBuilder(512);
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
sb.append("a");
|
||||||
|
}
|
||||||
|
context.setStatus(sb.toString());
|
||||||
|
int progressStatusLength = context.getConfiguration().getInt(
|
||||||
|
MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY,
|
||||||
|
MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT);
|
||||||
|
|
||||||
|
if (context.getStatus().length() > progressStatusLength) {
|
||||||
|
throw new IOException("Status is not truncated");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test {@link Reporter}'s progress for a map-only job.
|
* Test {@link Reporter}'s progress for a map-only job.
|
||||||
* This will make sure that only the map phase decides the attempt's progress.
|
* This will make sure that only the map phase decides the attempt's progress.
|
||||||
|
@ -166,7 +193,6 @@ public class TestReporter {
|
||||||
/**
|
/**
|
||||||
* Test {@link Reporter}'s progress for map-reduce job.
|
* Test {@link Reporter}'s progress for map-reduce job.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
@Test
|
@Test
|
||||||
public void testReporterProgressForMRJob() throws IOException {
|
public void testReporterProgressForMRJob() throws IOException {
|
||||||
Path test = new Path(testRootTempDir, "testReporterProgressForMRJob");
|
Path test = new Path(testRootTempDir, "testReporterProgressForMRJob");
|
||||||
|
@ -186,4 +212,39 @@ public class TestReporter {
|
||||||
|
|
||||||
assertTrue("Job failed", job.isSuccessful());
|
assertTrue("Job failed", job.isSuccessful());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStatusLimit() throws IOException, InterruptedException,
|
||||||
|
ClassNotFoundException {
|
||||||
|
Path test = new Path(testRootTempDir, "testStatusLimit");
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
Path inDir = new Path(test, "in");
|
||||||
|
Path outDir = new Path(test, "out");
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
if (fs.exists(inDir)) {
|
||||||
|
fs.delete(inDir, true);
|
||||||
|
}
|
||||||
|
fs.mkdirs(inDir);
|
||||||
|
DataOutputStream file = fs.create(new Path(inDir, "part-" + 0));
|
||||||
|
file.writeBytes("testStatusLimit");
|
||||||
|
file.close();
|
||||||
|
|
||||||
|
if (fs.exists(outDir)) {
|
||||||
|
fs.delete(outDir, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
Job job = Job.getInstance(conf, "testStatusLimit");
|
||||||
|
|
||||||
|
job.setMapperClass(StatusLimitMapper.class);
|
||||||
|
job.setNumReduceTasks(0);
|
||||||
|
|
||||||
|
FileInputFormat.addInputPath(job, inDir);
|
||||||
|
FileOutputFormat.setOutputPath(job, outDir);
|
||||||
|
|
||||||
|
job.waitForCompletion(true);
|
||||||
|
|
||||||
|
assertTrue("Job failed", job.isSuccessful());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue