diff --git a/mapreduce/CHANGES.txt b/mapreduce/CHANGES.txt index 4a65e84b7bf..7b91e48b350 100644 --- a/mapreduce/CHANGES.txt +++ b/mapreduce/CHANGES.txt @@ -40,6 +40,9 @@ Trunk (unreleased changes) IMPROVEMENTS + MAPREDUCE-2365. Add counters to track bytes (read,written) via + File(Input,Output)Format. (Siddharth Seth via acmurthy) + MAPREDUCE-2680. Display queue name in job client CLI. (acmurthy) MAPREDUCE-2679. Minor changes to sync trunk with MR-279 branch. (acmurthy) diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java b/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java index 2751c6b3bcc..7e1ed31ef86 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java @@ -38,7 +38,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter; import org.apache.hadoop.util.StringUtils; /** @@ -457,11 +457,10 @@ public class Counters implements Writable, Iterable { */ public synchronized Counter findCounter(String group, String name) { if (name.equals("MAP_INPUT_BYTES")) { - group = FileInputFormat.COUNTER_GROUP; - name = FileInputFormat.BYTES_READ; LOG.warn("Counter name MAP_INPUT_BYTES is deprecated. " + "Use FileInputFormatCounters as group name and " + " BYTES_READ as counter name instead"); + return findCounter(FileInputFormatCounter.BYTES_READ); } return getGroup(group).getCounterForName(name); } diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java b/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java index 2265c38f158..44ba9a7e68a 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java @@ -35,7 +35,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; @@ -51,12 +51,12 @@ import org.apache.hadoop.io.serializer.Serializer; import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator; -import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskCounter; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter; import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; import org.apache.hadoop.mapreduce.task.MapContextImpl; import org.apache.hadoop.util.IndexedSortable; @@ -141,20 +141,31 @@ class MapTask extends Task { class TrackedRecordReader implements RecordReader { private RecordReader rawIn; - private Counters.Counter inputByteCounter; + private Counters.Counter fileInputByteCounter; private Counters.Counter inputRecordCounter; private TaskReporter reporter; - private long beforePos = -1; - private long afterPos = -1; + private long bytesInPrev = -1; + private long bytesInCurr = -1; + private final Statistics fsStats; - TrackedRecordReader(RecordReader raw, TaskReporter reporter) + TrackedRecordReader(TaskReporter reporter, JobConf job) throws IOException{ - rawIn = raw; inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS); - inputByteCounter = reporter.getCounter( - FileInputFormat.COUNTER_GROUP, - FileInputFormat.BYTES_READ); + fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ); this.reporter = reporter; + + Statistics matchedStats = null; + if (this.reporter.getInputSplit() instanceof FileSplit) { + matchedStats = getFsStatistics(((FileSplit) this.reporter + .getInputSplit()).getPath(), job); + } + fsStats = matchedStats; + + bytesInPrev = getInputBytes(fsStats); + rawIn = job.getInputFormat().getRecordReader(reporter.getInputSplit(), + job, reporter); + bytesInCurr = getInputBytes(fsStats); + fileInputByteCounter.increment(bytesInCurr - bytesInPrev); } public K createKey() { @@ -176,26 +187,37 @@ class MapTask extends Task { protected void incrCounters() { inputRecordCounter.increment(1); - inputByteCounter.increment(afterPos - beforePos); } protected synchronized boolean moveToNext(K key, V value) throws IOException { - beforePos = getPos(); + bytesInPrev = getInputBytes(fsStats); boolean ret = rawIn.next(key, value); - afterPos = getPos(); + bytesInCurr = getInputBytes(fsStats); + fileInputByteCounter.increment(bytesInCurr - bytesInPrev); reporter.setProgress(getProgress()); return ret; } public long getPos() throws IOException { return rawIn.getPos(); } - public void close() throws IOException { rawIn.close(); } + + public void close() throws IOException { + bytesInPrev = getInputBytes(fsStats); + rawIn.close(); + bytesInCurr = getInputBytes(fsStats); + fileInputByteCounter.increment(bytesInCurr - bytesInPrev); + } + public float getProgress() throws IOException { return rawIn.getProgress(); } TaskReporter getTaskReporter() { return reporter; } + + private long getInputBytes(Statistics stats) { + return stats == null ? 0 : stats.getBytesRead(); + } } /** @@ -210,9 +232,9 @@ class MapTask extends Task { private Counters.Counter skipRecCounter; private long recIndex = -1; - SkippingRecordReader(RecordReader raw, TaskUmbilicalProtocol umbilical, - TaskReporter reporter) throws IOException{ - super(raw, reporter); + SkippingRecordReader(TaskUmbilicalProtocol umbilical, + TaskReporter reporter, JobConf job) throws IOException{ + super(reporter, job); this.umbilical = umbilical; this.skipRecCounter = reporter.getCounter(TaskCounter.MAP_SKIPPED_RECORDS); this.toWriteSkipRecs = toWriteSkipRecs() && @@ -356,11 +378,9 @@ class MapTask extends Task { updateJobWithSplit(job, inputSplit); reporter.setInputSplit(inputSplit); - RecordReader rawIn = // open input - job.getInputFormat().getRecordReader(inputSplit, job, reporter); RecordReader in = isSkipping() ? - new SkippingRecordReader(rawIn, umbilical, reporter) : - new TrackedRecordReader(rawIn, reporter); + new SkippingRecordReader(umbilical, reporter, job) : + new TrackedRecordReader(reporter, job); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); @@ -409,18 +429,40 @@ class MapTask extends Task { extends org.apache.hadoop.mapreduce.RecordReader { private final org.apache.hadoop.mapreduce.RecordReader real; private final org.apache.hadoop.mapreduce.Counter inputRecordCounter; + private final org.apache.hadoop.mapreduce.Counter fileInputByteCounter; private final TaskReporter reporter; + private final Statistics fsStats; - NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader real, - TaskReporter reporter) { - this.real = real; + NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split, + org.apache.hadoop.mapreduce.InputFormat inputFormat, + TaskReporter reporter, + org.apache.hadoop.mapreduce.TaskAttemptContext taskContext) + throws InterruptedException, IOException { this.reporter = reporter; - this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS); + this.inputRecordCounter = reporter + .getCounter(TaskCounter.MAP_INPUT_RECORDS); + this.fileInputByteCounter = reporter + .getCounter(FileInputFormatCounter.BYTES_READ); + + Statistics matchedStats = null; + if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) { + matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split) + .getPath(), taskContext.getConfiguration()); + } + fsStats = matchedStats; + + long bytesInPrev = getInputBytes(fsStats); + this.real = inputFormat.createRecordReader(split, taskContext); + long bytesInCurr = getInputBytes(fsStats); + fileInputByteCounter.increment(bytesInCurr - bytesInPrev); } @Override public void close() throws IOException { + long bytesInPrev = getInputBytes(fsStats); real.close(); + long bytesInCurr = getInputBytes(fsStats); + fileInputByteCounter.increment(bytesInCurr - bytesInPrev); } @Override @@ -442,18 +484,28 @@ class MapTask extends Task { public void initialize(org.apache.hadoop.mapreduce.InputSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context ) throws IOException, InterruptedException { + long bytesInPrev = getInputBytes(fsStats); real.initialize(split, context); + long bytesInCurr = getInputBytes(fsStats); + fileInputByteCounter.increment(bytesInCurr - bytesInPrev); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { + long bytesInPrev = getInputBytes(fsStats); boolean result = real.nextKeyValue(); + long bytesInCurr = getInputBytes(fsStats); if (result) { inputRecordCounter.increment(1); } + fileInputByteCounter.increment(bytesInCurr - bytesInPrev); reporter.setProgress(getProgress()); return result; } + + private long getInputBytes(Statistics stats) { + return stats == null ? 0 : stats.getBytesRead(); + } } /** @@ -506,15 +558,30 @@ class MapTask extends Task { private final TaskReporter reporter; private final Counters.Counter mapOutputRecordCounter; + private final Counters.Counter fileOutputByteCounter; + private final Statistics fsStats; @SuppressWarnings("unchecked") NewDirectOutputCollector(MRJobConfig jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException { this.reporter = reporter; + mapOutputRecordCounter = reporter + .getCounter(TaskCounter.MAP_OUTPUT_RECORDS); + fileOutputByteCounter = reporter + .getCounter(FileOutputFormatCounter.BYTES_WRITTEN); + + Statistics matchedStats = null; + if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) { + matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat + .getOutputPath(taskContext), taskContext.getConfiguration()); + } + fsStats = matchedStats; + + long bytesOutPrev = getOutputBytes(fsStats); out = outputFormat.getRecordWriter(taskContext); - mapOutputRecordCounter = - reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); + long bytesOutCurr = getOutputBytes(fsStats); + fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } @Override @@ -522,7 +589,10 @@ class MapTask extends Task { public void write(K key, V value) throws IOException, InterruptedException { reporter.progress(); + long bytesOutPrev = getOutputBytes(fsStats); out.write(key, value); + long bytesOutCurr = getOutputBytes(fsStats); + fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); mapOutputRecordCounter.increment(1); } @@ -531,9 +601,16 @@ class MapTask extends Task { throws IOException,InterruptedException { reporter.progress(); if (out != null) { + long bytesOutPrev = getOutputBytes(fsStats); out.close(context); + long bytesOutCurr = getOutputBytes(fsStats); + fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } } + + private long getOutputBytes(Statistics stats) { + return stats == null ? 0 : stats.getBytesWritten(); + } } private class NewOutputCollector @@ -609,7 +686,7 @@ class MapTask extends Task { org.apache.hadoop.mapreduce.RecordReader input = new NewTrackingRecordReader - (inputFormat.createRecordReader(split, taskContext), reporter); + (split, inputFormat, reporter, taskContext); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null; @@ -662,6 +739,8 @@ class MapTask extends Task { private TaskReporter reporter = null; private final Counters.Counter mapOutputRecordCounter; + private final Counters.Counter fileOutputByteCounter; + private final Statistics fsStats; @SuppressWarnings("unchecked") public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical, @@ -670,14 +749,30 @@ class MapTask extends Task { String finalName = getOutputName(getPartition()); FileSystem fs = FileSystem.get(job); - out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter); - + OutputFormat outputFormat = job.getOutputFormat(); mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); + + fileOutputByteCounter = reporter + .getCounter(FileOutputFormatCounter.BYTES_WRITTEN); + + Statistics matchedStats = null; + if (outputFormat instanceof FileOutputFormat) { + matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job); + } + fsStats = matchedStats; + + long bytesOutPrev = getOutputBytes(fsStats); + out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter); + long bytesOutCurr = getOutputBytes(fsStats); + fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } public void close() throws IOException { if (this.out != null) { + long bytesOutPrev = getOutputBytes(fsStats); out.close(this.reporter); + long bytesOutCurr = getOutputBytes(fsStats); + fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } } @@ -688,10 +783,16 @@ class MapTask extends Task { public void collect(K key, V value, int partition) throws IOException { reporter.progress(); + long bytesOutPrev = getOutputBytes(fsStats); out.write(key, value); + long bytesOutCurr = getOutputBytes(fsStats); + fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); mapOutputRecordCounter.increment(1); } - + + private long getOutputBytes(Statistics stats) { + return stats == null ? 0 : stats.getBytesWritten(); + } } private class MapOutputBuffer @@ -757,6 +858,7 @@ class MapTask extends Task { // Counters final Counters.Counter mapOutputByteCounter; final Counters.Counter mapOutputRecordCounter; + final Counters.Counter fileOutputByteCounter; final ArrayList indexCacheList = new ArrayList(); @@ -823,6 +925,8 @@ class MapTask extends Task { mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); + fileOutputByteCounter = reporter + .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES); // compression if (job.getCompressMapOutput()) { @@ -1317,6 +1421,8 @@ class MapTask extends Task { // release sort buffer before the merge kvbuffer = null; mergeParts(); + Path outputPath = mapOutputFile.getOutputFile(); + fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen()); } public void close() { } diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java b/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java index 31cc5873929..0225982139b 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java @@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.RawComparator; @@ -48,6 +49,7 @@ import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator; import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.task.reduce.Shuffle; import org.apache.hadoop.util.Progress; @@ -95,6 +97,8 @@ public class ReduceTask extends Task { getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS); private Counters.Counter reduceCombineOutputCounter = getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); + private Counters.Counter fileOutputByteCounter = + getCounters().findCounter(FileOutputFormatCounter.BYTES_WRITTEN); // A custom comparator for map output files. Here the ordering is determined // by the file's size and path. In case of files with same size and different @@ -407,17 +411,14 @@ public class ReduceTask extends Task { // make output collector String finalName = getOutputName(getPartition()); - FileSystem fs = FileSystem.get(job); + final RecordWriter out = new OldTrackingRecordWriter( + this, job, reporter, finalName); - final RecordWriter out = - job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter); - OutputCollector collector = new OutputCollector() { public void collect(OUTKEY key, OUTVALUE value) throws IOException { out.write(key, value); - reduceOutputCounter.increment(1); // indicate that progress update needs to be sent reporter.progress(); } @@ -465,28 +466,104 @@ public class ReduceTask extends Task { } } + static class OldTrackingRecordWriter implements RecordWriter { + + private final RecordWriter real; + private final org.apache.hadoop.mapred.Counters.Counter reduceOutputCounter; + private final org.apache.hadoop.mapred.Counters.Counter fileOutputByteCounter; + private final Statistics fsStats; + + @SuppressWarnings({ "deprecation", "unchecked" }) + public OldTrackingRecordWriter(ReduceTask reduce, JobConf job, + TaskReporter reporter, String finalName) throws IOException { + this.reduceOutputCounter = reduce.reduceOutputCounter; + this.fileOutputByteCounter = reduce.fileOutputByteCounter; + Statistics matchedStats = null; + if (job.getOutputFormat() instanceof FileOutputFormat) { + matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job); + } + fsStats = matchedStats; + + FileSystem fs = FileSystem.get(job); + long bytesOutPrev = getOutputBytes(fsStats); + this.real = job.getOutputFormat().getRecordWriter(fs, job, finalName, + reporter); + long bytesOutCurr = getOutputBytes(fsStats); + fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); + } + + @Override + public void write(K key, V value) throws IOException { + long bytesOutPrev = getOutputBytes(fsStats); + real.write(key, value); + long bytesOutCurr = getOutputBytes(fsStats); + fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); + reduceOutputCounter.increment(1); + } + + @Override + public void close(Reporter reporter) throws IOException { + long bytesOutPrev = getOutputBytes(fsStats); + real.close(reporter); + long bytesOutCurr = getOutputBytes(fsStats); + fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); + } + + private long getOutputBytes(Statistics stats) { + return stats == null ? 0 : stats.getBytesWritten(); + } + } + static class NewTrackingRecordWriter extends org.apache.hadoop.mapreduce.RecordWriter { private final org.apache.hadoop.mapreduce.RecordWriter real; private final org.apache.hadoop.mapreduce.Counter outputRecordCounter; - - NewTrackingRecordWriter(org.apache.hadoop.mapreduce.RecordWriter real, - org.apache.hadoop.mapreduce.Counter recordCounter) { - this.real = real; - this.outputRecordCounter = recordCounter; + private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter; + private final Statistics fsStats; + + @SuppressWarnings("unchecked") + NewTrackingRecordWriter(ReduceTask reduce, + org.apache.hadoop.mapreduce.TaskAttemptContext taskContext) + throws InterruptedException, IOException { + this.outputRecordCounter = reduce.reduceOutputCounter; + this.fileOutputByteCounter = reduce.fileOutputByteCounter; + + Statistics matchedStats = null; + if (reduce.outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) { + matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat + .getOutputPath(taskContext), taskContext.getConfiguration()); + } + + fsStats = matchedStats; + + long bytesOutPrev = getOutputBytes(fsStats); + this.real = (org.apache.hadoop.mapreduce.RecordWriter) reduce.outputFormat + .getRecordWriter(taskContext); + long bytesOutCurr = getOutputBytes(fsStats); + fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { + long bytesOutPrev = getOutputBytes(fsStats); real.close(context); + long bytesOutCurr = getOutputBytes(fsStats); + fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } @Override public void write(K key, V value) throws IOException, InterruptedException { + long bytesOutPrev = getOutputBytes(fsStats); real.write(key,value); + long bytesOutCurr = getOutputBytes(fsStats); + fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); outputRecordCounter.increment(1); } + + private long getOutputBytes(Statistics stats) { + return stats == null ? 0 : stats.getBytesWritten(); + } } @SuppressWarnings("unchecked") @@ -529,11 +606,8 @@ public class ReduceTask extends Task { org.apache.hadoop.mapreduce.Reducer reducer = (org.apache.hadoop.mapreduce.Reducer) ReflectionUtils.newInstance(taskContext.getReducerClass(), job); - org.apache.hadoop.mapreduce.RecordWriter output = - (org.apache.hadoop.mapreduce.RecordWriter) - outputFormat.getRecordWriter(taskContext); org.apache.hadoop.mapreduce.RecordWriter trackedRW = - new NewTrackingRecordWriter(output, reduceOutputCounter); + new NewTrackingRecordWriter(this, taskContext); job.setBoolean("mapred.skip.on", isSkipping()); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.Reducer.Context @@ -545,6 +619,6 @@ public class ReduceTask extends Task { reporter, comparator, keyClass, valueClass); reducer.run(reducerContext); - output.close(reducerContext); + trackedRW.close(reducerContext); } } diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/Task.java b/mapreduce/src/java/org/apache/hadoop/mapred/Task.java index 3c0d3f3b10e..f5abb3022a5 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/Task.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/Task.java @@ -289,6 +289,28 @@ abstract public class Task implements Writable, Configurable { } } + /** + * Gets a handle to the Statistics instance based on the scheme associated + * with path. + * + * @param path the path. + * @param conf the configuration to extract the scheme from if not part of + * the path. + * @return a Statistics instance, or null if none is found for the scheme. + */ + protected static Statistics getFsStatistics(Path path, Configuration conf) throws IOException { + Statistics matchedStats = null; + path = path.getFileSystem(conf).makeQualified(path); + String scheme = path.toUri().getScheme(); + for (Statistics stats : FileSystem.getAllStatistics()) { + if (stats.getScheme().equals(scheme)) { + matchedStats = stats; + break; + } + } + return matchedStats; + } + /** * Get skipRanges. */ diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.java index ecdc717fcf9..42ef067aceb 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.java @@ -29,6 +29,7 @@ public enum TaskCounter { MAP_OUTPUT_RECORDS, MAP_SKIPPED_RECORDS, MAP_OUTPUT_BYTES, + MAP_OUTPUT_MATERIALIZED_BYTES, SPLIT_RAW_BYTES, COMBINE_INPUT_RECORDS, COMBINE_OUTPUT_RECORDS, diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties b/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties index 9650012bf66..9cfdea6e937 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties @@ -17,6 +17,7 @@ CounterGroupName= Map-Reduce Framework MAP_INPUT_RECORDS.name= Map input records MAP_OUTPUT_RECORDS.name= Map output records MAP_OUTPUT_BYTES.name= Map output bytes +MAP_OUTPUT_MATERIALIZED_BYTES.name= Map output materialized bytes MAP_SKIPPED_RECORDS.name= Map skipped records COMBINE_INPUT_RECORDS.name= Combine input records COMBINE_OUTPUT_RECORDS.name= Combine output records diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java index 90bafb4865f..781715dbeef 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @@ -54,9 +54,6 @@ import org.apache.hadoop.util.StringUtils; @InterfaceAudience.Public @InterfaceStability.Stable public abstract class FileInputFormat extends InputFormat { - public static final String COUNTER_GROUP = - "FileInputFormatCounters"; - public static final String BYTES_READ = "BYTES_READ"; public static final String INPUT_DIR = "mapreduce.input.fileinputformat.inputdir"; public static final String SPLIT_MAXSIZE = diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java index 7affcd100a4..e1dcee0b24b 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java @@ -35,7 +35,6 @@ import org.apache.hadoop.io.compress.SplitCompressionInputStream; import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -63,7 +62,6 @@ public class LineRecordReader extends RecordReader { private int maxLineLength; private LongWritable key = null; private Text value = null; - private Counter inputByteCounter; private CompressionCodec codec; private Decompressor decompressor; private byte[] recordDelimiterBytes; @@ -78,8 +76,6 @@ public class LineRecordReader extends RecordReader { public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; - inputByteCounter = context.getCounter( - FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ); Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); @@ -174,7 +170,6 @@ public class LineRecordReader extends RecordReader { break; } pos += newSize; - inputByteCounter.increment(newSize); if (newSize < maxLineLength) { break; } diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java index b6b81d30b11..06d913fe070 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java @@ -27,9 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.MapContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -44,16 +42,12 @@ public class SequenceFileRecordReader extends RecordReader { private K key = null; private V value = null; protected Configuration conf; - private Counter inputByteCounter; - private long pos; - + @Override public void initialize(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) split; - inputByteCounter = ((MapContext)context).getCounter( - FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ); conf = context.getConfiguration(); Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); @@ -74,8 +68,7 @@ public class SequenceFileRecordReader extends RecordReader { if (!more) { return false; } - inputByteCounter.increment(in.getPosition()-pos); - pos = in.getPosition(); + long pos = in.getPosition(); key = (K) in.next(key); if (key == null || (pos >= end && in.syncSeen())) { more = false; diff --git a/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java b/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java index 6984d7001f7..a02f15e2f6f 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java +++ b/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java @@ -150,7 +150,9 @@ public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir * @param outputDir the {@link Path} of the output directory for * the map-reduce job. */ - public static void setOutputPath(Job job, Path outputDir) { + public static void setOutputPath(Job job, Path outputDir) throws IOException { + outputDir = outputDir.getFileSystem(job.getConfiguration()).makeQualified( + outputDir); job.getConfiguration().set(FileOutputFormat.OUTDIR, outputDir.toString()); } diff --git a/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java b/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java index 3f0d9421c54..5d7e21af26c 100644 --- a/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java +++ b/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java @@ -42,9 +42,10 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter; /** * This is an wordcount application that tests the count of records @@ -58,6 +59,26 @@ import org.apache.hadoop.mapreduce.TaskType; */ public class TestJobCounters { + private void validateFileCounters(Counters counter, long fileBytesRead, + long fileBytesWritten, long mapOutputBytes, + long mapOutputMaterializedBytes) { + assertTrue(counter.findCounter(FileInputFormatCounter.BYTES_READ) + .getValue() != 0); + assertEquals(fileBytesRead, + counter.findCounter(FileInputFormatCounter.BYTES_READ).getValue()); + + assertTrue(counter.findCounter(FileOutputFormatCounter.BYTES_WRITTEN) + .getValue() != 0); + + if (mapOutputBytes >= 0) { + assertTrue(counter.findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue() != 0); + } + if (mapOutputMaterializedBytes >= 0) { + assertTrue(counter.findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES) + .getValue() != 0); + } + } + private void validateCounters(Counters counter, long spillRecCnt, long mapInputRecords, long mapOutputRecords) { // Check if the numer of Spilled Records is same as expected @@ -108,6 +129,19 @@ public class TestJobCounters { private static Path OUT_DIR = null; private static Path testdir = null; + private static Path[] inFiles = new Path[5]; + + private static long getFileSize(Path path) throws IOException { + FileSystem fs = FileSystem.getLocal(new Configuration()); + long len = 0; + len += fs.getFileStatus(path).getLen(); + Path crcPath = new Path(path.getParent(), "." + path.getName() + ".crc"); + if (fs.exists(crcPath)) { + len += fs.getFileStatus(crcPath).getLen(); + } + return len; + } + @BeforeClass public static void initPaths() throws IOException { final Configuration conf = new Configuration(); @@ -125,11 +159,15 @@ public class TestJobCounters { if (!fs.mkdirs(IN_DIR)) { throw new IOException("Mkdirs failed to create " + IN_DIR); } - // create 3 input files each with 5*2k words - createWordsFile(new Path(IN_DIR, "input5_2k_1"), conf); - createWordsFile(new Path(IN_DIR, "input5_2k_2"), conf); - createWordsFile(new Path(IN_DIR, "input5_2k_3"), conf); + for (int i = 0; i < inFiles.length; i++) { + inFiles[i] = new Path(IN_DIR, "input5_2k_" + i); + } + + // create 3 input files each with 5*2k words + createWordsFile(inFiles[0], conf); + createWordsFile(inFiles[1], conf); + createWordsFile(inFiles[2], conf); } @AfterClass @@ -181,8 +219,12 @@ public class TestJobCounters { JobConf conf = createConfiguration(); conf.setNumMapTasks(3); conf.setInt(JobContext.IO_SORT_FACTOR, 2); - removeWordsFile(new Path(IN_DIR, "input5_2k_4"), conf); - removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf); + removeWordsFile(inFiles[3], conf); + removeWordsFile(inFiles[4], conf); + long inputSize = 0; + inputSize += getFileSize(inFiles[0]); + inputSize += getFileSize(inFiles[1]); + inputSize += getFileSize(inFiles[2]); FileInputFormat.setInputPaths(conf, IN_DIR); FileOutputFormat.setOutputPath(conf, new Path(OUT_DIR, "outputO0")); @@ -211,6 +253,7 @@ public class TestJobCounters { // 3 files, 5120 = 5 * 1024 rec/file = 15360 input records // 4 records/line = 61440 output records validateCounters(c1, 90112, 15360, 61440); + validateFileCounters(c1, inputSize, 0, 0, 0); } @@ -218,8 +261,13 @@ public class TestJobCounters { public void testOldCounterB() throws Exception { JobConf conf = createConfiguration(); - createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf); - removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf); + createWordsFile(inFiles[3], conf); + removeWordsFile(inFiles[4], conf); + long inputSize = 0; + inputSize += getFileSize(inFiles[0]); + inputSize += getFileSize(inFiles[1]); + inputSize += getFileSize(inFiles[2]); + inputSize += getFileSize(inFiles[3]); conf.setNumMapTasks(4); conf.setInt(JobContext.IO_SORT_FACTOR, 2); FileInputFormat.setInputPaths(conf, IN_DIR); @@ -239,13 +287,20 @@ public class TestJobCounters { // 4 files, 5120 = 5 * 1024 rec/file = 15360 input records // 4 records/line = 81920 output records validateCounters(c1, 131072, 20480, 81920); + validateFileCounters(c1, inputSize, 0, 0, 0); } @Test public void testOldCounterC() throws Exception { JobConf conf = createConfiguration(); - createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf); - createWordsFile(new Path(IN_DIR, "input5_2k_5"), conf); + createWordsFile(inFiles[3], conf); + createWordsFile(inFiles[4], conf); + long inputSize = 0; + inputSize += getFileSize(inFiles[0]); + inputSize += getFileSize(inFiles[1]); + inputSize += getFileSize(inFiles[2]); + inputSize += getFileSize(inFiles[3]); + inputSize += getFileSize(inFiles[4]); conf.setNumMapTasks(4); conf.setInt(JobContext.IO_SORT_FACTOR, 3); FileInputFormat.setInputPaths(conf, IN_DIR); @@ -260,6 +315,31 @@ public class TestJobCounters { // 5 files, 5120 = 5 * 1024 rec/file = 15360 input records // 4 records/line = 102400 output records validateCounters(c1, 147456, 25600, 102400); + validateFileCounters(c1, inputSize, 0, 0, 0); + } + + @Test + public void testOldCounterD() throws Exception { + JobConf conf = createConfiguration(); + conf.setNumMapTasks(3); + conf.setInt(JobContext.IO_SORT_FACTOR, 2); + conf.setNumReduceTasks(0); + removeWordsFile(inFiles[3], conf); + removeWordsFile(inFiles[4], conf); + long inputSize = 0; + inputSize += getFileSize(inFiles[0]); + inputSize += getFileSize(inFiles[1]); + inputSize += getFileSize(inFiles[2]); + FileInputFormat.setInputPaths(conf, IN_DIR); + FileOutputFormat.setOutputPath(conf, new Path(OUT_DIR, "outputO3")); + + RunningJob myJob = JobClient.runJob(conf); + Counters c1 = myJob.getCounters(); + + // No Reduces. Will go through the direct output collector. Spills=0 + + validateCounters(c1, 0, 15360, 61440); + validateFileCounters(c1, inputSize, 0, -1, -1); } @Test @@ -267,8 +347,12 @@ public class TestJobCounters { final Job job = createJob(); final Configuration conf = job.getConfiguration(); conf.setInt(JobContext.IO_SORT_FACTOR, 2); - removeWordsFile(new Path(IN_DIR, "input5_2k_4"), conf); - removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf); + removeWordsFile(inFiles[3], conf); + removeWordsFile(inFiles[4], conf); + long inputSize = 0; + inputSize += getFileSize(inFiles[0]); + inputSize += getFileSize(inFiles[1]); + inputSize += getFileSize(inFiles[2]); org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths( job, IN_DIR); org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath( @@ -276,6 +360,7 @@ public class TestJobCounters { assertTrue(job.waitForCompletion(true)); final Counters c1 = Counters.downgrade(job.getCounters()); validateCounters(c1, 90112, 15360, 61440); + validateFileCounters(c1, inputSize, 0, 0, 0); } @Test @@ -283,8 +368,13 @@ public class TestJobCounters { final Job job = createJob(); final Configuration conf = job.getConfiguration(); conf.setInt(JobContext.IO_SORT_FACTOR, 2); - createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf); - removeWordsFile(new Path(IN_DIR, "input5_2k_5"), conf); + createWordsFile(inFiles[3], conf); + removeWordsFile(inFiles[4], conf); + long inputSize = 0; + inputSize += getFileSize(inFiles[0]); + inputSize += getFileSize(inFiles[1]); + inputSize += getFileSize(inFiles[2]); + inputSize += getFileSize(inFiles[3]); org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths( job, IN_DIR); org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath( @@ -292,6 +382,7 @@ public class TestJobCounters { assertTrue(job.waitForCompletion(true)); final Counters c1 = Counters.downgrade(job.getCounters()); validateCounters(c1, 131072, 20480, 81920); + validateFileCounters(c1, inputSize, 0, 0, 0); } @Test @@ -299,8 +390,14 @@ public class TestJobCounters { final Job job = createJob(); final Configuration conf = job.getConfiguration(); conf.setInt(JobContext.IO_SORT_FACTOR, 3); - createWordsFile(new Path(IN_DIR, "input5_2k_4"), conf); - createWordsFile(new Path(IN_DIR, "input5_2k_5"), conf); + createWordsFile(inFiles[3], conf); + createWordsFile(inFiles[4], conf); + long inputSize = 0; + inputSize += getFileSize(inFiles[0]); + inputSize += getFileSize(inFiles[1]); + inputSize += getFileSize(inFiles[2]); + inputSize += getFileSize(inFiles[3]); + inputSize += getFileSize(inFiles[4]); org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths( job, IN_DIR); org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath( @@ -308,6 +405,29 @@ public class TestJobCounters { assertTrue(job.waitForCompletion(true)); final Counters c1 = Counters.downgrade(job.getCounters()); validateCounters(c1, 147456, 25600, 102400); + validateFileCounters(c1, inputSize, 0, 0, 0); + } + + @Test + public void testNewCounterD() throws Exception { + final Job job = createJob(); + final Configuration conf = job.getConfiguration(); + conf.setInt(JobContext.IO_SORT_FACTOR, 2); + job.setNumReduceTasks(0); + removeWordsFile(inFiles[3], conf); + removeWordsFile(inFiles[4], conf); + long inputSize = 0; + inputSize += getFileSize(inFiles[0]); + inputSize += getFileSize(inFiles[1]); + inputSize += getFileSize(inFiles[2]); + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, + IN_DIR); + org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, + new Path(OUT_DIR, "outputN3")); + assertTrue(job.waitForCompletion(true)); + final Counters c1 = Counters.downgrade(job.getCounters()); + validateCounters(c1, 0, 15360, 61440); + validateFileCounters(c1, inputSize, 0, -1, -1); } /** diff --git a/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java b/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java index 50584107a61..d32f8c615a4 100644 --- a/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java +++ b/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java @@ -34,6 +34,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.ToolRunner; @@ -101,10 +102,8 @@ public class TestMiniMRDFSSort extends TestCase { Sort sort = new Sort(); assertEquals(ToolRunner.run(job, sort, sortArgs), 0); org.apache.hadoop.mapreduce.Counters counters = sort.getResult().getCounters(); - long mapInput = counters.findCounter( - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.COUNTER_GROUP, - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.BYTES_READ). - getValue(); + long mapInput = counters.findCounter(FileInputFormatCounter.BYTES_READ) + .getValue(); long hdfsRead = counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, "HDFS_BYTES_READ").getValue(); // the hdfs read should be between 100% and 110% of the map input bytes diff --git a/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java b/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java index 318364b512d..638f5f27988 100644 --- a/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java +++ b/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java @@ -38,6 +38,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -147,8 +148,7 @@ public class TestMapReduceLocal extends TestCase { out); Counters ctrs = job.getCounters(); System.out.println("Counters: " + ctrs); - long mapIn = ctrs.findCounter(FileInputFormat.COUNTER_GROUP, - FileInputFormat.BYTES_READ).getValue(); + long mapIn = ctrs.findCounter(FileInputFormatCounter.BYTES_READ).getValue(); assertTrue(mapIn != 0); long combineIn = ctrs.findCounter(COUNTER_GROUP, "COMBINE_INPUT_RECORDS").getValue();