MAPREDUCE-3822. Changed FS counter computation to use all occurences of the same FS scheme, instead of randomly using one. (Contributed by Mahadev Konar)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1241721 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
47c5d51bd8
commit
5fdfa2b4bd
|
@ -767,6 +767,10 @@ Release 0.23.1 - Unreleased
|
|||
MAPREDUCE-3827. Changed Counters to use ConcurrentSkipListMap for
|
||||
performance. (vinodkv via acmurthy)
|
||||
|
||||
MAPREDUCE-3822. Changed FS counter computation to use all occurences of
|
||||
the same FS scheme, instead of randomly using one. (Mahadev Konar via
|
||||
sseth)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -141,7 +141,7 @@ class MapTask extends Task {
|
|||
private TaskReporter reporter;
|
||||
private long bytesInPrev = -1;
|
||||
private long bytesInCurr = -1;
|
||||
private final Statistics fsStats;
|
||||
private final List<Statistics> fsStats;
|
||||
|
||||
TrackedRecordReader(TaskReporter reporter, JobConf job)
|
||||
throws IOException{
|
||||
|
@ -149,7 +149,7 @@ class MapTask extends Task {
|
|||
fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);
|
||||
this.reporter = reporter;
|
||||
|
||||
Statistics matchedStats = null;
|
||||
List<Statistics> matchedStats = null;
|
||||
if (this.reporter.getInputSplit() instanceof FileSplit) {
|
||||
matchedStats = getFsStatistics(((FileSplit) this.reporter
|
||||
.getInputSplit()).getPath(), job);
|
||||
|
@ -210,8 +210,13 @@ class MapTask extends Task {
|
|||
return reporter;
|
||||
}
|
||||
|
||||
private long getInputBytes(Statistics stats) {
|
||||
return stats == null ? 0 : stats.getBytesRead();
|
||||
private long getInputBytes(List<Statistics> stats) {
|
||||
if (stats == null) return 0;
|
||||
long bytesRead = 0;
|
||||
for (Statistics stat: stats) {
|
||||
bytesRead = bytesRead + stat.getBytesRead();
|
||||
}
|
||||
return bytesRead;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -426,7 +431,7 @@ class MapTask extends Task {
|
|||
private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
|
||||
private final org.apache.hadoop.mapreduce.Counter fileInputByteCounter;
|
||||
private final TaskReporter reporter;
|
||||
private final Statistics fsStats;
|
||||
private final List<Statistics> fsStats;
|
||||
|
||||
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
|
||||
org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
|
||||
|
@ -439,7 +444,7 @@ class MapTask extends Task {
|
|||
this.fileInputByteCounter = reporter
|
||||
.getCounter(FileInputFormatCounter.BYTES_READ);
|
||||
|
||||
Statistics matchedStats = null;
|
||||
List <Statistics> matchedStats = null;
|
||||
if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
|
||||
matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
|
||||
.getPath(), taskContext.getConfiguration());
|
||||
|
@ -498,8 +503,13 @@ class MapTask extends Task {
|
|||
return result;
|
||||
}
|
||||
|
||||
private long getInputBytes(Statistics stats) {
|
||||
return stats == null ? 0 : stats.getBytesRead();
|
||||
private long getInputBytes(List<Statistics> stats) {
|
||||
if (stats == null) return 0;
|
||||
long bytesRead = 0;
|
||||
for (Statistics stat: stats) {
|
||||
bytesRead = bytesRead + stat.getBytesRead();
|
||||
}
|
||||
return bytesRead;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -554,7 +564,7 @@ class MapTask extends Task {
|
|||
|
||||
private final Counters.Counter mapOutputRecordCounter;
|
||||
private final Counters.Counter fileOutputByteCounter;
|
||||
private final Statistics fsStats;
|
||||
private final List<Statistics> fsStats;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
NewDirectOutputCollector(MRJobConfig jobContext,
|
||||
|
@ -566,7 +576,7 @@ class MapTask extends Task {
|
|||
fileOutputByteCounter = reporter
|
||||
.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
|
||||
|
||||
Statistics matchedStats = null;
|
||||
List<Statistics> matchedStats = null;
|
||||
if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
|
||||
matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
|
||||
.getOutputPath(taskContext), taskContext.getConfiguration());
|
||||
|
@ -603,8 +613,13 @@ class MapTask extends Task {
|
|||
}
|
||||
}
|
||||
|
||||
private long getOutputBytes(Statistics stats) {
|
||||
return stats == null ? 0 : stats.getBytesWritten();
|
||||
private long getOutputBytes(List<Statistics> stats) {
|
||||
if (stats == null) return 0;
|
||||
long bytesWritten = 0;
|
||||
for (Statistics stat: stats) {
|
||||
bytesWritten = bytesWritten + stat.getBytesWritten();
|
||||
}
|
||||
return bytesWritten;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -735,7 +750,7 @@ class MapTask extends Task {
|
|||
|
||||
private final Counters.Counter mapOutputRecordCounter;
|
||||
private final Counters.Counter fileOutputByteCounter;
|
||||
private final Statistics fsStats;
|
||||
private final List<Statistics> fsStats;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
|
||||
|
@ -750,7 +765,7 @@ class MapTask extends Task {
|
|||
fileOutputByteCounter = reporter
|
||||
.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
|
||||
|
||||
Statistics matchedStats = null;
|
||||
List<Statistics> matchedStats = null;
|
||||
if (outputFormat instanceof FileOutputFormat) {
|
||||
matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
|
||||
}
|
||||
|
@ -785,8 +800,13 @@ class MapTask extends Task {
|
|||
mapOutputRecordCounter.increment(1);
|
||||
}
|
||||
|
||||
private long getOutputBytes(Statistics stats) {
|
||||
return stats == null ? 0 : stats.getBytesWritten();
|
||||
private long getOutputBytes(List<Statistics> stats) {
|
||||
if (stats == null) return 0;
|
||||
long bytesWritten = 0;
|
||||
for (Statistics stat: stats) {
|
||||
bytesWritten = bytesWritten + stat.getBytesWritten();
|
||||
}
|
||||
return bytesWritten;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -476,14 +476,14 @@ public class ReduceTask extends Task {
|
|||
private final RecordWriter<K, V> real;
|
||||
private final org.apache.hadoop.mapred.Counters.Counter reduceOutputCounter;
|
||||
private final org.apache.hadoop.mapred.Counters.Counter fileOutputByteCounter;
|
||||
private final Statistics fsStats;
|
||||
private final List<Statistics> fsStats;
|
||||
|
||||
@SuppressWarnings({ "deprecation", "unchecked" })
|
||||
public OldTrackingRecordWriter(ReduceTask reduce, JobConf job,
|
||||
TaskReporter reporter, String finalName) throws IOException {
|
||||
this.reduceOutputCounter = reduce.reduceOutputCounter;
|
||||
this.fileOutputByteCounter = reduce.fileOutputByteCounter;
|
||||
Statistics matchedStats = null;
|
||||
List<Statistics> matchedStats = null;
|
||||
if (job.getOutputFormat() instanceof FileOutputFormat) {
|
||||
matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
|
||||
}
|
||||
|
@ -514,8 +514,13 @@ public class ReduceTask extends Task {
|
|||
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
|
||||
}
|
||||
|
||||
private long getOutputBytes(Statistics stats) {
|
||||
return stats == null ? 0 : stats.getBytesWritten();
|
||||
private long getOutputBytes(List<Statistics> stats) {
|
||||
if (stats == null) return 0;
|
||||
long bytesWritten = 0;
|
||||
for (Statistics stat: stats) {
|
||||
bytesWritten = bytesWritten + stat.getBytesWritten();
|
||||
}
|
||||
return bytesWritten;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -524,7 +529,7 @@ public class ReduceTask extends Task {
|
|||
private final org.apache.hadoop.mapreduce.RecordWriter<K,V> real;
|
||||
private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;
|
||||
private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter;
|
||||
private final Statistics fsStats;
|
||||
private final List<Statistics> fsStats;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
NewTrackingRecordWriter(ReduceTask reduce,
|
||||
|
@ -533,7 +538,7 @@ public class ReduceTask extends Task {
|
|||
this.outputRecordCounter = reduce.reduceOutputCounter;
|
||||
this.fileOutputByteCounter = reduce.fileOutputByteCounter;
|
||||
|
||||
Statistics matchedStats = null;
|
||||
List<Statistics> matchedStats = null;
|
||||
if (reduce.outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
|
||||
matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
|
||||
.getOutputPath(taskContext), taskContext.getConfiguration());
|
||||
|
@ -566,8 +571,13 @@ public class ReduceTask extends Task {
|
|||
outputRecordCounter.increment(1);
|
||||
}
|
||||
|
||||
private long getOutputBytes(Statistics stats) {
|
||||
return stats == null ? 0 : stats.getBytesWritten();
|
||||
private long getOutputBytes(List<Statistics> stats) {
|
||||
if (stats == null) return 0;
|
||||
long bytesWritten = 0;
|
||||
for (Statistics stat: stats) {
|
||||
bytesWritten = bytesWritten + stat.getBytesWritten();
|
||||
}
|
||||
return bytesWritten;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
|||
import java.lang.management.GarbageCollectorMXBean;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.text.NumberFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -326,14 +327,13 @@ abstract public class Task implements Writable, Configurable {
|
|||
* the path.
|
||||
* @return a Statistics instance, or null if none is found for the scheme.
|
||||
*/
|
||||
protected static Statistics getFsStatistics(Path path, Configuration conf) throws IOException {
|
||||
Statistics matchedStats = null;
|
||||
protected static List<Statistics> getFsStatistics(Path path, Configuration conf) throws IOException {
|
||||
List<Statistics> matchedStats = new ArrayList<FileSystem.Statistics>();
|
||||
path = path.getFileSystem(conf).makeQualified(path);
|
||||
String scheme = path.toUri().getScheme();
|
||||
for (Statistics stats : FileSystem.getAllStatistics()) {
|
||||
if (stats.getScheme().equals(scheme)) {
|
||||
matchedStats = stats;
|
||||
break;
|
||||
matchedStats.add(stats);
|
||||
}
|
||||
}
|
||||
return matchedStats;
|
||||
|
@ -866,41 +866,53 @@ abstract public class Task implements Writable, Configurable {
|
|||
* system and only creates the counters when they are needed.
|
||||
*/
|
||||
class FileSystemStatisticUpdater {
|
||||
private FileSystem.Statistics stats;
|
||||
private List<FileSystem.Statistics> stats;
|
||||
private Counters.Counter readBytesCounter, writeBytesCounter,
|
||||
readOpsCounter, largeReadOpsCounter, writeOpsCounter;
|
||||
|
||||
FileSystemStatisticUpdater(FileSystem.Statistics stats) {
|
||||
private String scheme;
|
||||
FileSystemStatisticUpdater(List<FileSystem.Statistics> stats, String scheme) {
|
||||
this.stats = stats;
|
||||
this.scheme = scheme;
|
||||
}
|
||||
|
||||
void updateCounters() {
|
||||
String scheme = stats.getScheme();
|
||||
if (readBytesCounter == null) {
|
||||
readBytesCounter = counters.findCounter(scheme,
|
||||
FileSystemCounter.BYTES_READ);
|
||||
}
|
||||
readBytesCounter.setValue(stats.getBytesRead());
|
||||
if (writeBytesCounter == null) {
|
||||
writeBytesCounter = counters.findCounter(scheme,
|
||||
FileSystemCounter.BYTES_WRITTEN);
|
||||
}
|
||||
writeBytesCounter.setValue(stats.getBytesWritten());
|
||||
if (readOpsCounter == null) {
|
||||
readOpsCounter = counters.findCounter(scheme,
|
||||
FileSystemCounter.READ_OPS);
|
||||
}
|
||||
readOpsCounter.setValue(stats.getReadOps());
|
||||
if (largeReadOpsCounter == null) {
|
||||
largeReadOpsCounter = counters.findCounter(scheme,
|
||||
FileSystemCounter.LARGE_READ_OPS);
|
||||
}
|
||||
largeReadOpsCounter.setValue(stats.getLargeReadOps());
|
||||
if (writeOpsCounter == null) {
|
||||
writeOpsCounter = counters.findCounter(scheme,
|
||||
FileSystemCounter.WRITE_OPS);
|
||||
}
|
||||
writeOpsCounter.setValue(stats.getWriteOps());
|
||||
long readBytes = 0;
|
||||
long writeBytes = 0;
|
||||
long readOps = 0;
|
||||
long largeReadOps = 0;
|
||||
long writeOps = 0;
|
||||
for (FileSystem.Statistics stat: stats) {
|
||||
readBytes = readBytes + stat.getBytesRead();
|
||||
writeBytes = writeBytes + stat.getBytesWritten();
|
||||
readOps = readOps + stat.getReadOps();
|
||||
largeReadOps = largeReadOps + stat.getLargeReadOps();
|
||||
writeOps = writeOps + stat.getWriteOps();
|
||||
}
|
||||
readBytesCounter.setValue(readBytes);
|
||||
writeBytesCounter.setValue(writeBytes);
|
||||
readOpsCounter.setValue(readOps);
|
||||
largeReadOpsCounter.setValue(largeReadOps);
|
||||
writeOpsCounter.setValue(writeOps);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -911,16 +923,28 @@ abstract public class Task implements Writable, Configurable {
|
|||
new HashMap<String, FileSystemStatisticUpdater>();
|
||||
|
||||
private synchronized void updateCounters() {
|
||||
Map<String, List<FileSystem.Statistics>> map = new
|
||||
HashMap<String, List<FileSystem.Statistics>>();
|
||||
for(Statistics stat: FileSystem.getAllStatistics()) {
|
||||
String uriScheme = stat.getScheme();
|
||||
FileSystemStatisticUpdater updater = statisticUpdaters.get(uriScheme);
|
||||
if(updater==null) {//new FileSystem has been found in the cache
|
||||
updater = new FileSystemStatisticUpdater(stat);
|
||||
statisticUpdaters.put(uriScheme, updater);
|
||||
if (map.containsKey(uriScheme)) {
|
||||
List<FileSystem.Statistics> list = map.get(uriScheme);
|
||||
list.add(stat);
|
||||
} else {
|
||||
List<FileSystem.Statistics> list = new ArrayList<FileSystem.Statistics>();
|
||||
list.add(stat);
|
||||
map.put(uriScheme, list);
|
||||
}
|
||||
updater.updateCounters();
|
||||
}
|
||||
|
||||
for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet()) {
|
||||
FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
|
||||
if(updater==null) {//new FileSystem has been found in the cache
|
||||
updater = new FileSystemStatisticUpdater(entry.getValue(), entry.getKey());
|
||||
statisticUpdaters.put(entry.getKey(), updater);
|
||||
}
|
||||
updater.updateCounters();
|
||||
}
|
||||
|
||||
gcUpdater.incrementGcCounter();
|
||||
updateResourceCounters();
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
|
@ -73,6 +74,7 @@ public interface Counter extends Writable {
|
|||
*/
|
||||
void increment(long incr);
|
||||
|
||||
@Private
|
||||
/**
|
||||
* Return the underlying object if this is a facade.
|
||||
* @return the undelying object.
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.mapreduce.counters;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.mapreduce.Counter;
|
||||
|
@ -99,6 +100,7 @@ public interface CounterGroupBase<T extends Counter>
|
|||
*/
|
||||
void incrAllCounters(CounterGroupBase<T> rightGroup);
|
||||
|
||||
@Private
|
||||
/**
|
||||
* Exposes the underlying group type if a facade.
|
||||
* @return the underlying object that this object is wrapping up.
|
||||
|
|
Loading…
Reference in New Issue