MAPREDUCE-7315. LocatedFileStatusFetcher to collect/publish IOStatistics. (#2579)

Part of the HADOOP-16830 IOStatistics API feature.

If the source FileSystem's listing RemoteIterators
implement IOStatisticsSource, these are collected and served through
the IOStatisticsSource API. If they are not: getIOStatistics() returns
null.

Only the listing statistics are collected; FileSystem.globStatus() doesn't
provide any, so IO use there is not included in the aggregate results.

Contributed by Steve Loughran.

Change-Id: Iff1485297c2c7e181b54eaf1d2c4f80faeee7cfa
This commit is contained in:
Steve Loughran 2020-12-31 16:02:10 +00:00
parent 57abfae136
commit 5be450393c
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
1 changed files with 58 additions and 2 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@ -37,6 +38,9 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
@ -52,6 +56,9 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
/**
* Utility class to fetch block locations for specified Input paths using a
* configured number of threads.
@ -60,7 +67,7 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors;
* configuration.
*/
@Private
public class LocatedFileStatusFetcher {
public class LocatedFileStatusFetcher implements IOStatisticsSource {
public static final Logger LOG =
LoggerFactory.getLogger(LocatedFileStatusFetcher.class.getName());
@ -87,6 +94,12 @@ public class LocatedFileStatusFetcher {
private volatile Throwable unknownError;
/**
* Demand created IO Statistics: only if the filesystem
* returns statistics does this fetch collect them.
*/
private IOStatisticsSnapshot iostats;
/**
* Instantiate.
* The newApi switch is only used to configure what exception is raised
@ -227,6 +240,45 @@ public class LocatedFileStatusFetcher {
}
}
/**
* Return any IOStatistics collected during listing.
* @return IO stats accrued.
*/
@Override
public synchronized IOStatistics getIOStatistics() {
return iostats;
}
/**
* Add the statistics of an individual thread's scan.
* @param stats possibly null statistics.
*/
private void addResultStatistics(IOStatistics stats) {
if (stats != null) {
// demand creation of IO statistics.
synchronized (this) {
LOG.debug("Adding IOStatistics: {}", stats);
if (iostats == null) {
// demand create the statistics
iostats = snapshotIOStatistics(stats);
} else {
iostats.aggregate(stats);
}
}
}
}
@Override
public String toString() {
final IOStatistics ioStatistics = getIOStatistics();
StringJoiner stringJoiner = new StringJoiner(", ",
LocatedFileStatusFetcher.class.getSimpleName() + "[", "]");
if (ioStatistics != null) {
stringJoiner.add("IOStatistics=" + ioStatistics);
}
return stringJoiner.toString();
}
/**
* Retrieves block locations for the given @link {@link FileStatus}, and adds
* additional paths to the process queue if required.
@ -265,6 +317,8 @@ public class LocatedFileStatusFetcher {
}
}
}
// aggregate any stats
result.stats = retrieveIOStatistics(iter);
} else {
result.locatedFileStatuses.add(fileStatus);
}
@ -275,6 +329,7 @@ public class LocatedFileStatusFetcher {
private List<FileStatus> locatedFileStatuses = new LinkedList<>();
private List<FileStatus> dirsNeedingRecursiveCalls = new LinkedList<>();
private FileSystem fs;
private IOStatistics stats;
}
}
@ -289,6 +344,7 @@ public class LocatedFileStatusFetcher {
@Override
public void onSuccess(ProcessInputDirCallable.Result result) {
try {
addResultStatistics(result.stats);
if (!result.locatedFileStatuses.isEmpty()) {
resultQueue.add(result.locatedFileStatuses);
}