MAPREDUCE-5399. Unnecessary Configuration instantiation in IFileInputStream slows down merge. (Stanislav Barton via Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1510811 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
95f9a515aa
commit
a00a729729
|
@ -647,6 +647,9 @@ Release 2.1.0-beta - 2013-08-06
|
||||||
MAPREDUCE-5419. TestSlive is getting FileNotFound Exception (Robert Parker
|
MAPREDUCE-5419. TestSlive is getting FileNotFound Exception (Robert Parker
|
||||||
via jlowe)
|
via jlowe)
|
||||||
|
|
||||||
|
MAPREDUCE-5399. Unnecessary Configuration instantiation in IFileInputStream
|
||||||
|
slows down merge. (Stanislav Barton via Sandy Ryza)
|
||||||
|
|
||||||
BREAKDOWN OF HADOOP-8562 SUBTASKS
|
BREAKDOWN OF HADOOP-8562 SUBTASKS
|
||||||
|
|
||||||
MAPREDUCE-4739. Some MapReduce tests fail to find winutils.
|
MAPREDUCE-4739. Some MapReduce tests fail to find winutils.
|
||||||
|
|
|
@ -81,6 +81,8 @@ public class BackupStore<K,V> {
|
||||||
private boolean inReset = false;
|
private boolean inReset = false;
|
||||||
private boolean clearMarkFlag = false;
|
private boolean clearMarkFlag = false;
|
||||||
private boolean lastSegmentEOF = false;
|
private boolean lastSegmentEOF = false;
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
public BackupStore(Configuration conf, TaskAttemptID taskid)
|
public BackupStore(Configuration conf, TaskAttemptID taskid)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -106,6 +108,8 @@ public class BackupStore<K,V> {
|
||||||
fileCache = new FileCache(conf);
|
fileCache = new FileCache(conf);
|
||||||
tid = taskid;
|
tid = taskid;
|
||||||
|
|
||||||
|
this.conf = conf;
|
||||||
|
|
||||||
LOG.info("Created a new BackupStore with a memory of " + maxSize);
|
LOG.info("Created a new BackupStore with a memory of " + maxSize);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -500,7 +504,7 @@ public class BackupStore<K,V> {
|
||||||
Reader<K, V> reader =
|
Reader<K, V> reader =
|
||||||
new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null,
|
new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null,
|
||||||
(org.apache.hadoop.mapred.TaskAttemptID) tid,
|
(org.apache.hadoop.mapred.TaskAttemptID) tid,
|
||||||
dataOut.getData(), 0, usedSize);
|
dataOut.getData(), 0, usedSize, conf);
|
||||||
Segment<K, V> segment = new Segment<K, V>(reader, false);
|
Segment<K, V> segment = new Segment<K, V>(reader, false);
|
||||||
segmentList.add(segment);
|
segmentList.add(segment);
|
||||||
LOG.debug("Added Memory Segment to List. List Size is " +
|
LOG.debug("Added Memory Segment to List. List Size is " +
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.apache.hadoop.mapred.IFile.Reader;
|
import org.apache.hadoop.mapred.IFile.Reader;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
@ -39,11 +40,11 @@ public class InMemoryReader<K, V> extends Reader<K, V> {
|
||||||
DataInputBuffer memDataIn = new DataInputBuffer();
|
DataInputBuffer memDataIn = new DataInputBuffer();
|
||||||
private int start;
|
private int start;
|
||||||
private int length;
|
private int length;
|
||||||
|
|
||||||
public InMemoryReader(MergeManagerImpl<K,V> merger, TaskAttemptID taskAttemptId,
|
public InMemoryReader(MergeManagerImpl<K,V> merger, TaskAttemptID taskAttemptId,
|
||||||
byte[] data, int start, int length)
|
byte[] data, int start, int length, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super(null, null, length - start, null, null);
|
super(conf, null, length - start, null, null);
|
||||||
this.merger = merger;
|
this.merger = merger;
|
||||||
this.taskAttemptId = taskAttemptId;
|
this.taskAttemptId = taskAttemptId;
|
||||||
|
|
||||||
|
|
|
@ -613,7 +613,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
||||||
fullSize -= size;
|
fullSize -= size;
|
||||||
Reader<K,V> reader = new InMemoryReader<K,V>(MergeManagerImpl.this,
|
Reader<K,V> reader = new InMemoryReader<K,V>(MergeManagerImpl.this,
|
||||||
mo.getMapId(),
|
mo.getMapId(),
|
||||||
data, 0, (int)size);
|
data, 0, (int)size, jobConf);
|
||||||
inMemorySegments.add(new Segment<K,V>(reader, true,
|
inMemorySegments.add(new Segment<K,V>(reader, true,
|
||||||
(mo.isPrimaryMapOutput() ?
|
(mo.isPrimaryMapOutput() ?
|
||||||
mergedMapOutputsCounter : null)));
|
mergedMapOutputsCounter : null)));
|
||||||
|
|
Loading…
Reference in New Issue