MAPREDUCE-5478. TeraInputFormat unnecessarily defines its own FileSplit subclass (Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1517055 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
80ead68bfc
commit
5b8b923cd1
|
@ -44,6 +44,9 @@ Release 2.1.1-beta - UNRELEASED
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
MAPREDUCE-5478. TeraInputFormat unnecessarily defines its own FileSplit
|
||||||
|
subclass (Sandy Ryza)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race
|
MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race
|
||||||
|
|
|
@ -60,48 +60,6 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
||||||
private static MRJobConfig lastContext = null;
|
private static MRJobConfig lastContext = null;
|
||||||
private static List<InputSplit> lastResult = null;
|
private static List<InputSplit> lastResult = null;
|
||||||
|
|
||||||
static class TeraFileSplit extends FileSplit {
|
|
||||||
static private String[] ZERO_LOCATIONS = new String[0];
|
|
||||||
|
|
||||||
private String[] locations;
|
|
||||||
|
|
||||||
public TeraFileSplit() {
|
|
||||||
locations = ZERO_LOCATIONS;
|
|
||||||
}
|
|
||||||
public TeraFileSplit(Path file, long start, long length, String[] hosts) {
|
|
||||||
super(file, start, length, hosts);
|
|
||||||
try {
|
|
||||||
locations = super.getLocations();
|
|
||||||
} catch (IOException e) {
|
|
||||||
locations = ZERO_LOCATIONS;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// XXXXXX should this also be null-protected?
|
|
||||||
protected void setLocations(String[] hosts) {
|
|
||||||
locations = hosts;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String[] getLocations() {
|
|
||||||
return locations;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String toString() {
|
|
||||||
StringBuffer result = new StringBuffer();
|
|
||||||
result.append(getPath());
|
|
||||||
result.append(" from ");
|
|
||||||
result.append(getStart());
|
|
||||||
result.append(" length ");
|
|
||||||
result.append(getLength());
|
|
||||||
for(String host: getLocations()) {
|
|
||||||
result.append(" ");
|
|
||||||
result.append(host);
|
|
||||||
}
|
|
||||||
return result.toString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static class TextSampler implements IndexedSortable {
|
static class TextSampler implements IndexedSortable {
|
||||||
private ArrayList<Text> records = new ArrayList<Text>();
|
private ArrayList<Text> records = new ArrayList<Text>();
|
||||||
|
|
||||||
|
@ -325,11 +283,6 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
||||||
return new TeraRecordReader();
|
return new TeraRecordReader();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected FileSplit makeSplit(Path file, long start, long length,
|
|
||||||
String[] hosts) {
|
|
||||||
return new TeraFileSplit(file, start, length, hosts);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<InputSplit> getSplits(JobContext job) throws IOException {
|
public List<InputSplit> getSplits(JobContext job) throws IOException {
|
||||||
if (job == lastContext) {
|
if (job == lastContext) {
|
||||||
|
@ -343,7 +296,7 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
||||||
System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
|
System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
|
||||||
if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
|
if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
|
||||||
TeraScheduler scheduler = new TeraScheduler(
|
TeraScheduler scheduler = new TeraScheduler(
|
||||||
lastResult.toArray(new TeraFileSplit[0]), job.getConfiguration());
|
lastResult.toArray(new FileSplit[0]), job.getConfiguration());
|
||||||
lastResult = scheduler.getNewFileSplits();
|
lastResult = scheduler.getNewFileSplits();
|
||||||
t3 = System.currentTimeMillis();
|
t3 = System.currentTimeMillis();
|
||||||
System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
|
System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.util.*;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.examples.terasort.TeraInputFormat.TeraFileSplit;
|
|
||||||
import org.apache.hadoop.mapreduce.InputSplit;
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
|
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
|
||||||
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
|
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
|
||||||
|
@ -214,8 +213,9 @@ class TeraScheduler {
|
||||||
for(int i=0; i < splits.length; ++i) {
|
for(int i=0; i < splits.length; ++i) {
|
||||||
if (splits[i].isAssigned) {
|
if (splits[i].isAssigned) {
|
||||||
// copy the split and fix up the locations
|
// copy the split and fix up the locations
|
||||||
((TeraFileSplit) realSplits[i]).setLocations
|
String[] newLocations = {splits[i].locations.get(0).hostname};
|
||||||
(new String[]{splits[i].locations.get(0).hostname});
|
realSplits[i] = new FileSplit(realSplits[i].getPath(),
|
||||||
|
realSplits[i].getStart(), realSplits[i].getLength(), newLocations);
|
||||||
result[left++] = realSplits[i];
|
result[left++] = realSplits[i];
|
||||||
} else {
|
} else {
|
||||||
result[right--] = realSplits[i];
|
result[right--] = realSplits[i];
|
||||||
|
|
Loading…
Reference in New Issue