svn merge -c 1367789 FIXES: MAPREDUCE-4234. SortValidator.java is incompatible with multi-user or parallel use (due to a /tmp file with static name) (Robert Evans via jeagles)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1367793 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bc65eefdd3
commit
d40b6be2f5
|
@ -661,6 +661,10 @@ Release 0.23.3 - UNRELEASED
|
||||||
MAPREDUCE-4457. mr job invalid transition TA_TOO_MANY_FETCH_FAILURE at
|
MAPREDUCE-4457. mr job invalid transition TA_TOO_MANY_FETCH_FAILURE at
|
||||||
FAILED (Robert Evans via tgraves)
|
FAILED (Robert Evans via tgraves)
|
||||||
|
|
||||||
|
MAPREDUCE-4234. SortValidator.java is incompatible with multi-user or
|
||||||
|
parallel use (due to a /tmp file with static name) (Robert Evans via
|
||||||
|
jeagles)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.io.WritableComparable;
|
||||||
import org.apache.hadoop.io.WritableComparator;
|
import org.apache.hadoop.io.WritableComparator;
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
import org.apache.hadoop.mapred.lib.HashPartitioner;
|
import org.apache.hadoop.mapred.lib.HashPartitioner;
|
||||||
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.hadoop.fs.*;
|
import org.apache.hadoop.fs.*;
|
||||||
|
@ -345,7 +344,8 @@ public class SortValidator extends Configured implements Tool {
|
||||||
|
|
||||||
FileInputFormat.setInputPaths(jobConf, sortInput);
|
FileInputFormat.setInputPaths(jobConf, sortInput);
|
||||||
FileInputFormat.addInputPath(jobConf, sortOutput);
|
FileInputFormat.addInputPath(jobConf, sortOutput);
|
||||||
Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
|
Path outputPath = new Path(new Path(jobConf.get("hadoop.tmp.dir", "/tmp"),
|
||||||
|
"sortvalidate"), UUID.randomUUID().toString());
|
||||||
if (defaultfs.exists(outputPath)) {
|
if (defaultfs.exists(outputPath)) {
|
||||||
defaultfs.delete(outputPath, true);
|
defaultfs.delete(outputPath, true);
|
||||||
}
|
}
|
||||||
|
@ -365,6 +365,7 @@ public class SortValidator extends Configured implements Tool {
|
||||||
Date startTime = new Date();
|
Date startTime = new Date();
|
||||||
System.out.println("Job started: " + startTime);
|
System.out.println("Job started: " + startTime);
|
||||||
JobClient.runJob(jobConf);
|
JobClient.runJob(jobConf);
|
||||||
|
try {
|
||||||
Date end_time = new Date();
|
Date end_time = new Date();
|
||||||
System.out.println("Job ended: " + end_time);
|
System.out.println("Job ended: " + end_time);
|
||||||
System.out.println("The job took " +
|
System.out.println("The job took " +
|
||||||
|
@ -374,22 +375,34 @@ public class SortValidator extends Configured implements Tool {
|
||||||
// framework's sort-input and sort-output match
|
// framework's sort-input and sort-output match
|
||||||
SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
|
SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
|
||||||
new Path(outputPath, "part-00000"), defaults);
|
new Path(outputPath, "part-00000"), defaults);
|
||||||
|
try {
|
||||||
IntWritable k1 = new IntWritable();
|
IntWritable k1 = new IntWritable();
|
||||||
IntWritable k2 = new IntWritable();
|
IntWritable k2 = new IntWritable();
|
||||||
RecordStatsWritable v1 = new RecordStatsWritable();
|
RecordStatsWritable v1 = new RecordStatsWritable();
|
||||||
RecordStatsWritable v2 = new RecordStatsWritable();
|
RecordStatsWritable v2 = new RecordStatsWritable();
|
||||||
if (!stats.next(k1, v1)) {
|
if (!stats.next(k1, v1)) {
|
||||||
throw new IOException("Failed to read record #1 from reduce's output");
|
throw new IOException(
|
||||||
|
"Failed to read record #1 from reduce's output");
|
||||||
}
|
}
|
||||||
if (!stats.next(k2, v2)) {
|
if (!stats.next(k2, v2)) {
|
||||||
throw new IOException("Failed to read record #2 from reduce's output");
|
throw new IOException(
|
||||||
|
"Failed to read record #2 from reduce's output");
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) ||
|
if ((v1.getBytes() != v2.getBytes()) ||
|
||||||
|
(v1.getRecords() != v2.getRecords()) ||
|
||||||
v1.getChecksum() != v2.getChecksum()) {
|
v1.getChecksum() != v2.getChecksum()) {
|
||||||
throw new IOException("(" +
|
throw new IOException("(" +
|
||||||
v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
|
v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum()
|
||||||
v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
|
+ ") v/s (" +
|
||||||
|
v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum()
|
||||||
|
+ ")");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
stats.close();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
defaultfs.delete(outputPath, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.io.WritableComparable;
|
||||||
import org.apache.hadoop.io.WritableComparator;
|
import org.apache.hadoop.io.WritableComparator;
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
import org.apache.hadoop.mapred.lib.HashPartitioner;
|
import org.apache.hadoop.mapred.lib.HashPartitioner;
|
||||||
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.hadoop.fs.*;
|
import org.apache.hadoop.fs.*;
|
||||||
|
@ -345,7 +344,8 @@ public class SortValidator extends Configured implements Tool {
|
||||||
|
|
||||||
FileInputFormat.setInputPaths(jobConf, sortInput);
|
FileInputFormat.setInputPaths(jobConf, sortInput);
|
||||||
FileInputFormat.addInputPath(jobConf, sortOutput);
|
FileInputFormat.addInputPath(jobConf, sortOutput);
|
||||||
Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
|
Path outputPath = new Path(new Path(jobConf.get("hadoop.tmp.dir", "/tmp"),
|
||||||
|
"sortvalidate"), UUID.randomUUID().toString());
|
||||||
if (defaultfs.exists(outputPath)) {
|
if (defaultfs.exists(outputPath)) {
|
||||||
defaultfs.delete(outputPath, true);
|
defaultfs.delete(outputPath, true);
|
||||||
}
|
}
|
||||||
|
@ -365,6 +365,7 @@ public class SortValidator extends Configured implements Tool {
|
||||||
Date startTime = new Date();
|
Date startTime = new Date();
|
||||||
System.out.println("Job started: " + startTime);
|
System.out.println("Job started: " + startTime);
|
||||||
JobClient.runJob(jobConf);
|
JobClient.runJob(jobConf);
|
||||||
|
try {
|
||||||
Date end_time = new Date();
|
Date end_time = new Date();
|
||||||
System.out.println("Job ended: " + end_time);
|
System.out.println("Job ended: " + end_time);
|
||||||
System.out.println("The job took " +
|
System.out.println("The job took " +
|
||||||
|
@ -374,22 +375,34 @@ public class SortValidator extends Configured implements Tool {
|
||||||
// framework's sort-input and sort-output match
|
// framework's sort-input and sort-output match
|
||||||
SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
|
SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
|
||||||
new Path(outputPath, "part-00000"), defaults);
|
new Path(outputPath, "part-00000"), defaults);
|
||||||
|
try {
|
||||||
IntWritable k1 = new IntWritable();
|
IntWritable k1 = new IntWritable();
|
||||||
IntWritable k2 = new IntWritable();
|
IntWritable k2 = new IntWritable();
|
||||||
RecordStatsWritable v1 = new RecordStatsWritable();
|
RecordStatsWritable v1 = new RecordStatsWritable();
|
||||||
RecordStatsWritable v2 = new RecordStatsWritable();
|
RecordStatsWritable v2 = new RecordStatsWritable();
|
||||||
if (!stats.next(k1, v1)) {
|
if (!stats.next(k1, v1)) {
|
||||||
throw new IOException("Failed to read record #1 from reduce's output");
|
throw new IOException(
|
||||||
|
"Failed to read record #1 from reduce's output");
|
||||||
}
|
}
|
||||||
if (!stats.next(k2, v2)) {
|
if (!stats.next(k2, v2)) {
|
||||||
throw new IOException("Failed to read record #2 from reduce's output");
|
throw new IOException(
|
||||||
|
"Failed to read record #2 from reduce's output");
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) ||
|
if ((v1.getBytes() != v2.getBytes()) ||
|
||||||
|
(v1.getRecords() != v2.getRecords()) ||
|
||||||
v1.getChecksum() != v2.getChecksum()) {
|
v1.getChecksum() != v2.getChecksum()) {
|
||||||
throw new IOException("(" +
|
throw new IOException("(" +
|
||||||
v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
|
v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum()
|
||||||
v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
|
+ ") v/s (" +
|
||||||
|
v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum()
|
||||||
|
+ ")");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
stats.close();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
defaultfs.delete(outputPath, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue