HDFS-8873. Allow the directoryScanner to be rate-limited. Contributed by Daniel Templeton.

This commit is contained in:
Zhe Zhang 2017-04-27 15:04:10 -07:00
parent 5e67eb7560
commit 8ad9efbe14
5 changed files with 565 additions and 33 deletions

View File

@ -75,6 +75,9 @@ Release 2.7.4 - UNRELEASED
HDFS-11466. Change dfs.namenode.write-lock-reporting-threshold-ms default HDFS-11466. Change dfs.namenode.write-lock-reporting-threshold-ms default
from 1000ms to 5000ms. (wang via zhz) from 1000ms to 5000ms. (wang via zhz)
HDFS-8873. Allow the directoryScanner to be rate-limited (Daniel Templeton
via Colin P. McCabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-10896. Move lock logging logic from FSNamesystem into FSNamesystemLock. HDFS-10896. Move lock logging logic from FSNamesystem into FSNamesystemLock.

View File

@ -481,6 +481,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600; public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600;
public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads"; public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads";
public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1; public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1;
public static final String
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
"dfs.datanode.directoryscan.throttle.limit.ms.per.sec";
public static final int
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = 1000;
public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface"; public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default"; public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver"; public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -32,6 +33,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -47,6 +49,7 @@ import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
/** /**
@ -56,27 +59,59 @@ import org.apache.hadoop.util.Time;
@InterfaceAudience.Private @InterfaceAudience.Private
public class DirectoryScanner implements Runnable { public class DirectoryScanner implements Runnable {
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class); private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
private static final int MILLIS_PER_SECOND = 1000;
private static final String START_MESSAGE =
"Periodic Directory Tree Verification scan"
+ " starting at %dms with interval of %dms";
private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE
+ " and throttle limit of %dms/s";
private final FsDatasetSpi<?> dataset; private final FsDatasetSpi<?> dataset;
private final ExecutorService reportCompileThreadPool; private final ExecutorService reportCompileThreadPool;
private final ScheduledExecutorService masterThread; private final ScheduledExecutorService masterThread;
private final long scanPeriodMsecs; private final long scanPeriodMsecs;
private final int throttleLimitMsPerSec;
private volatile boolean shouldRun = false; private volatile boolean shouldRun = false;
private boolean retainDiffs = false; private boolean retainDiffs = false;
private final DataNode datanode; private final DataNode datanode;
/**
* Total combined wall clock time (in milliseconds) spent by the report
* compiler threads executing. Used for testing purposes.
*/
@VisibleForTesting
final AtomicLong timeRunningMs = new AtomicLong(0L);
/**
* Total combined wall clock time (in milliseconds) spent by the report
* compiler threads blocked by the throttle. Used for testing purposes.
*/
@VisibleForTesting
final AtomicLong timeWaitingMs = new AtomicLong(0L);
/**
* The complete list of block differences indexed by block pool ID.
*/
@VisibleForTesting
final ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool(); final ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool();
/**
* Statistics about the block differences in each blockpool, indexed by
* block pool ID.
*/
@VisibleForTesting
final Map<String, Stats> stats = new HashMap<String, Stats>(); final Map<String, Stats> stats = new HashMap<String, Stats>();
/** /**
* Allow retaining diffs for unit test and analysis * Allow retaining diffs for unit test and analysis. Defaults to false (off)
* @param b - defaults to false (off) * @param b whether to retain diffs
*/ */
@VisibleForTesting
void setRetainDiffs(boolean b) { void setRetainDiffs(boolean b) {
retainDiffs = b; retainDiffs = b;
} }
/** Stats tracked for reporting and testing, per blockpool */ /**
* Stats tracked for reporting and testing, per blockpool
*/
@VisibleForTesting
static class Stats { static class Stats {
final String bpid; final String bpid;
long totalBlocks = 0; long totalBlocks = 0;
@ -86,6 +121,10 @@ public class DirectoryScanner implements Runnable {
long mismatchBlocks = 0; long mismatchBlocks = 0;
long duplicateBlocks = 0; long duplicateBlocks = 0;
/**
* Create a new Stats object for the given blockpool ID.
* @param bpid blockpool ID
*/
public Stats(String bpid) { public Stats(String bpid) {
this.bpid = bpid; this.bpid = bpid;
} }
@ -100,17 +139,31 @@ public class DirectoryScanner implements Runnable {
} }
} }
/**
* Helper class for compiling block info reports from report compiler threads.
*/
static class ScanInfoPerBlockPool extends static class ScanInfoPerBlockPool extends
HashMap<String, LinkedList<ScanInfo>> { HashMap<String, LinkedList<ScanInfo>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
/**
* Create a new info list.
*/
ScanInfoPerBlockPool() {super();} ScanInfoPerBlockPool() {super();}
/**
* Create a new info list initialized to the given expected size.
* See {@link java.util.HashMap#HashMap(int)}.
*
* @param sz initial expected size
*/
ScanInfoPerBlockPool(int sz) {super(sz);} ScanInfoPerBlockPool(int sz) {super(sz);}
/** /**
* Merges {@code that} ScanInfoPerBlockPool into this one * Merges {@code that} ScanInfoPerBlockPool into this one
*
* @param the ScanInfoPerBlockPool to merge
*/ */
public void addAll(ScanInfoPerBlockPool that) { public void addAll(ScanInfoPerBlockPool that) {
if (that == null) return; if (that == null) return;
@ -132,6 +185,7 @@ public class DirectoryScanner implements Runnable {
/** /**
* Convert all the LinkedList values in this ScanInfoPerBlockPool map * Convert all the LinkedList values in this ScanInfoPerBlockPool map
* into sorted arrays, and return a new map of these arrays per blockpool * into sorted arrays, and return a new map of these arrays per blockpool
*
* @return a map of ScanInfo arrays per blockpool * @return a map of ScanInfo arrays per blockpool
*/ */
public Map<String, ScanInfo[]> toSortedArrays() { public Map<String, ScanInfo[]> toSortedArrays() {
@ -208,6 +262,9 @@ public class DirectoryScanner implements Runnable {
* For example, the condensed version of /foo//bar is /foo/bar * For example, the condensed version of /foo//bar is /foo/bar
* Unlike {@link File#getCanonicalPath()}, this will never perform I/O * Unlike {@link File#getCanonicalPath()}, this will never perform I/O
* on the filesystem. * on the filesystem.
*
* @param path the path to condense
* @return the condensed path
*/ */
private static String getCondensedPath(String path) { private static String getCondensedPath(String path) {
return CONDENSED_PATH_REGEX.matcher(path). return CONDENSED_PATH_REGEX.matcher(path).
@ -230,6 +287,15 @@ public class DirectoryScanner implements Runnable {
throw new RuntimeException(prefix + " is not a prefix of " + fullPath); throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
} }
/**
* Create a ScanInfo object for a block. This constructor will examine
* the block data and meta-data files.
*
* @param blockId the block ID
* @param blockFile the path to the block data file
* @param metaFile the path to the block meta-data file
* @param vol the volume that contains the block
*/
ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) { ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
this.blockId = blockId; this.blockId = blockId;
String condensedVolPath = vol == null ? null : String condensedVolPath = vol == null ? null :
@ -248,15 +314,31 @@ public class DirectoryScanner implements Runnable {
this.volume = vol; this.volume = vol;
} }
/**
* Returns the block data file.
*
* @return the block data file
*/
File getBlockFile() { File getBlockFile() {
return (blockSuffix == null) ? null : return (blockSuffix == null) ? null :
new File(volume.getBasePath(), blockSuffix); new File(volume.getBasePath(), blockSuffix);
} }
/**
* Return the length of the data block. The length returned is the length
* cached when this object was created.
*
* @return the length of the data block
*/
long getBlockFileLength() { long getBlockFileLength() {
return blockFileLength; return blockFileLength;
} }
/**
* Returns the block meta data file or null if there isn't one.
*
* @return the block meta data file
*/
File getMetaFile() { File getMetaFile() {
if (metaSuffix == null) { if (metaSuffix == null) {
return null; return null;
@ -267,10 +349,20 @@ public class DirectoryScanner implements Runnable {
} }
} }
/**
* Returns the block ID.
*
* @return the block ID
*/
long getBlockId() { long getBlockId() {
return blockId; return blockId;
} }
/**
* Returns the volume that contains the block that this object describes.
*
* @return the volume
*/
FsVolumeSpi getVolume() { FsVolumeSpi getVolume() {
return volume; return volume;
} }
@ -309,12 +401,44 @@ public class DirectoryScanner implements Runnable {
} }
} }
/**
* Create a new directory scanner, but don't cycle it running yet.
*
* @param datanode the parent datanode
* @param dataset the dataset to scan
* @param conf the Configuration object
*/
DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) { DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
this.datanode = datanode; this.datanode = datanode;
this.dataset = dataset; this.dataset = dataset;
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT); DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
scanPeriodMsecs = interval * 1000L; //msec scanPeriodMsecs = interval * MILLIS_PER_SECOND; //msec
int throttle =
conf.getInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
if ((throttle > MILLIS_PER_SECOND) || (throttle <= 0)) {
if (throttle > MILLIS_PER_SECOND) {
LOG.error(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
+ " set to value above 1000 ms/sec. Assuming default value of " +
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
} else {
LOG.error(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
+ " set to value below 1 ms/sec. Assuming default value of " +
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
}
throttleLimitMsPerSec =
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT;
} else {
throttleLimitMsPerSec = throttle;
}
int threads = int threads =
conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT); DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
@ -325,29 +449,50 @@ public class DirectoryScanner implements Runnable {
new Daemon.DaemonFactory()); new Daemon.DaemonFactory());
} }
/**
* Start the scanner. The scanner will run every
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
*/
void start() { void start() {
shouldRun = true; shouldRun = true;
long offset = DFSUtil.getRandom().nextInt((int) (scanPeriodMsecs/1000L)) * 1000L; //msec long offset = DFSUtil.getRandom().nextInt(
(int) (scanPeriodMsecs/MILLIS_PER_SECOND)) * MILLIS_PER_SECOND; //msec
long firstScanTime = Time.now() + offset; long firstScanTime = Time.now() + offset;
LOG.info("Periodic Directory Tree Verification scan starting at " String logMsg;
+ firstScanTime + " with interval " + scanPeriodMsecs);
if (throttleLimitMsPerSec < MILLIS_PER_SECOND) {
logMsg = String.format(START_MESSAGE_WITH_THROTTLE, firstScanTime,
scanPeriodMsecs, throttleLimitMsPerSec);
} else {
logMsg = String.format(START_MESSAGE, firstScanTime, scanPeriodMsecs);
}
LOG.info(logMsg);
masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs, masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
} }
// for unit test /**
* Return whether the scanner has been started.
*
* @return whether the scanner has been started
*/
@VisibleForTesting
boolean getRunStatus() { boolean getRunStatus() {
return shouldRun; return shouldRun;
} }
/**
* Clear the current cache of diffs and statistics.
*/
private void clear() { private void clear() {
diffs.clear(); diffs.clear();
stats.clear(); stats.clear();
} }
/** /**
* Main program loop for DirectoryScanner * Main program loop for DirectoryScanner. Runs {@link reconcile()}
* Runs "reconcile()" periodically under the masterThread. * and handles any exceptions.
*/ */
@Override @Override
public void run() { public void run() {
@ -371,6 +516,12 @@ public class DirectoryScanner implements Runnable {
} }
} }
/**
* Stops the directory scanner. This method will wait for 1 minute for the
* main thread to exit and an additional 1 minute for the report compilation
* threads to exit. If a thread does not exit in that time period, it is
* left running, and an error is logged.
*/
void shutdown() { void shutdown() {
if (!shouldRun) { if (!shouldRun) {
LOG.warn("DirectoryScanner: shutdown has been called, but periodic scanner not started"); LOG.warn("DirectoryScanner: shutdown has been called, but periodic scanner not started");
@ -379,7 +530,11 @@ public class DirectoryScanner implements Runnable {
} }
shouldRun = false; shouldRun = false;
if (masterThread != null) masterThread.shutdown(); if (masterThread != null) masterThread.shutdown();
if (reportCompileThreadPool != null) reportCompileThreadPool.shutdown();
if (reportCompileThreadPool != null) {
reportCompileThreadPool.shutdownNow();
}
if (masterThread != null) { if (masterThread != null) {
try { try {
masterThread.awaitTermination(1, TimeUnit.MINUTES); masterThread.awaitTermination(1, TimeUnit.MINUTES);
@ -402,6 +557,7 @@ public class DirectoryScanner implements Runnable {
/** /**
* Reconcile differences between disk and in-memory blocks * Reconcile differences between disk and in-memory blocks
*/ */
@VisibleForTesting
void reconcile() throws IOException { void reconcile() throws IOException {
scan(); scan();
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) { for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
@ -420,7 +576,7 @@ public class DirectoryScanner implements Runnable {
* Scan for the differences between disk and in-memory blocks * Scan for the differences between disk and in-memory blocks
* Scan only the "finalized blocks" lists of both disk and memory. * Scan only the "finalized blocks" lists of both disk and memory.
*/ */
void scan() { private void scan() {
clear(); clear();
Map<String, ScanInfo[]> diskReport = getDiskReport(); Map<String, ScanInfo[]> diskReport = getDiskReport();
@ -508,8 +664,13 @@ public class DirectoryScanner implements Runnable {
} }
/** /**
* Block is found on the disk. In-memory block is missing or does not match * Add the ScanInfo object to the list of differences and adjust the stats
* the block on the disk * accordingly. This method is called when a block is found on the disk,
* but the in-memory block is missing or does not match the block on the disk.
*
* @param diffRecord the list to which to add the info
* @param statsRecord the stats to update
* @param info the differing info
*/ */
private void addDifference(LinkedList<ScanInfo> diffRecord, private void addDifference(LinkedList<ScanInfo> diffRecord,
Stats statsRecord, ScanInfo info) { Stats statsRecord, ScanInfo info) {
@ -518,7 +679,15 @@ public class DirectoryScanner implements Runnable {
diffRecord.add(info); diffRecord.add(info);
} }
/** Block is not found on the disk */ /**
* Add a new ScanInfo object to the list of differences and adjust the stats
* accordingly. This method is called when a block is not found on the disk.
*
* @param diffRecord the list to which to add the info
* @param statsRecord the stats to update
* @param blockId the id of the missing block
* @param vol the volume that contains the missing block
*/
private void addDifference(LinkedList<ScanInfo> diffRecord, private void addDifference(LinkedList<ScanInfo> diffRecord,
Stats statsRecord, long blockId, Stats statsRecord, long blockId,
FsVolumeSpi vol) { FsVolumeSpi vol) {
@ -538,7 +707,13 @@ public class DirectoryScanner implements Runnable {
return false; return false;
} }
/** Get lists of blocks on the disk sorted by blockId, per blockpool */ /**
* Get the lists of blocks on the disks in the dataset, sorted by blockId.
* The returned map contains one entry per blockpool, keyed by the blockpool
* ID.
*
* @return a map of sorted arrays of block information
*/
private Map<String, ScanInfo[]> getDiskReport() { private Map<String, ScanInfo[]> getDiskReport() {
// First get list of data directories // First get list of data directories
final List<? extends FsVolumeSpi> volumes = dataset.getVolumes(); final List<? extends FsVolumeSpi> volumes = dataset.getVolumes();
@ -564,6 +739,12 @@ public class DirectoryScanner implements Runnable {
compilersInProgress.entrySet()) { compilersInProgress.entrySet()) {
try { try {
dirReports[report.getKey()] = report.getValue().get(); dirReports[report.getKey()] = report.getValue().get();
// If our compiler threads were interrupted, give up on this run
if (dirReports[report.getKey()] == null) {
dirReports = null;
break;
}
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Error compiling report", ex); LOG.error("Error compiling report", ex);
// Propagate ex to DataBlockScanner to deal with // Propagate ex to DataBlockScanner to deal with
@ -573,48 +754,106 @@ public class DirectoryScanner implements Runnable {
// Compile consolidated report for all the volumes // Compile consolidated report for all the volumes
ScanInfoPerBlockPool list = new ScanInfoPerBlockPool(); ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
if (dirReports != null) {
for (int i = 0; i < volumes.size(); i++) { for (int i = 0; i < volumes.size(); i++) {
if (isValid(dataset, volumes.get(i))) { if (isValid(dataset, volumes.get(i))) {
// volume is still valid // volume is still valid
list.addAll(dirReports[i]); list.addAll(dirReports[i]);
} }
} }
}
return list.toSortedArrays(); return list.toSortedArrays();
} }
/**
* Helper method to determine if a file name is consistent with a block.
* meta-data file
*
* @param blockId the block ID
* @param metaFile the file to check
* @return whether the file name is a block meta-data file name
*/
private static boolean isBlockMetaFile(String blockId, String metaFile) { private static boolean isBlockMetaFile(String blockId, String metaFile) {
return metaFile.startsWith(blockId) return metaFile.startsWith(blockId)
&& metaFile.endsWith(Block.METADATA_EXTENSION); && metaFile.endsWith(Block.METADATA_EXTENSION);
} }
private static class ReportCompiler /**
implements Callable<ScanInfoPerBlockPool> { * The ReportCompiler class encapsulates the process of searching a datanode's
* disks for block information. It operates by performing a DFS of the
* volume to discover block information.
*
* When the ReportCompiler discovers block information, it create a new
* ScanInfo object for it and adds that object to its report list. The report
* list is returned by the {@link #call()} method.
*/
private class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
private final FsVolumeSpi volume; private final FsVolumeSpi volume;
private final DataNode datanode; private final DataNode datanode;
// Variable for tracking time spent running for throttling purposes
private final StopWatch throttleTimer = new StopWatch();
// Variable for tracking time spent running and waiting for testing
// purposes
private final StopWatch perfTimer = new StopWatch();
/**
* Create a report compiler for the given volume on the given datanode.
*
* @param datanode the target datanode
* @param volume the target volume
*/
public ReportCompiler(DataNode datanode, FsVolumeSpi volume) { public ReportCompiler(DataNode datanode, FsVolumeSpi volume) {
this.datanode = datanode; this.datanode = datanode;
this.volume = volume; this.volume = volume;
} }
/**
* Run this report compiler thread.
*
* @return the block info report list
* @throws IOException if the block pool isn't found
*/
@Override @Override
public ScanInfoPerBlockPool call() throws Exception { public ScanInfoPerBlockPool call() throws IOException {
String[] bpList = volume.getBlockPoolList(); String[] bpList = volume.getBlockPoolList();
ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length); ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
for (String bpid : bpList) { for (String bpid : bpList) {
LinkedList<ScanInfo> report = new LinkedList<ScanInfo>(); LinkedList<ScanInfo> report = new LinkedList<>();
File bpFinalizedDir = volume.getFinalizedDir(bpid); File bpFinalizedDir = volume.getFinalizedDir(bpid);
perfTimer.start();
throttleTimer.start();
try {
result.put(bpid, result.put(bpid,
compileReport(volume, bpFinalizedDir, bpFinalizedDir, report)); compileReport(volume, bpFinalizedDir, bpFinalizedDir, report));
} catch (InterruptedException ex) {
// Exit quickly and flag the scanner to do the same
result = null;
break;
}
} }
return result; return result;
} }
/** Compile list {@link ScanInfo} for the blocks in the directory <dir> */ /**
* Compile a list of {@link ScanInfo} for the blocks in the directory
* given by {@code dir}.
*
* @param vol the volume that contains the directory to scan
* @param bpFinalizedDir the root directory of the directory to scan
* @param dir the directory to scan
* @param report the list onto which blocks reports are placed
*/
private LinkedList<ScanInfo> compileReport(FsVolumeSpi vol, private LinkedList<ScanInfo> compileReport(FsVolumeSpi vol,
File bpFinalizedDir, File dir, LinkedList<ScanInfo> report) { File bpFinalizedDir, File dir, LinkedList<ScanInfo> report)
throws InterruptedException {
File[] files; File[] files;
throttle();
try { try {
files = FileUtil.listFiles(dir); files = FileUtil.listFiles(dir);
} catch (IOException ioe) { } catch (IOException ioe) {
@ -632,6 +871,12 @@ public class DirectoryScanner implements Runnable {
* blk_<blockid>_<genstamp>.meta * blk_<blockid>_<genstamp>.meta
*/ */
for (int i = 0; i < files.length; i++) { for (int i = 0; i < files.length; i++) {
// Make sure this thread can make a timely exit. With a low throttle
// rate, completing a run can take a looooong time.
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (files[i].isDirectory()) { if (files[i].isDirectory()) {
compileReport(vol, bpFinalizedDir, files[i], report); compileReport(vol, bpFinalizedDir, files[i], report);
continue; continue;
@ -680,5 +925,40 @@ public class DirectoryScanner implements Runnable {
+ ", expected block file path: " + expBlockFile); + ", expected block file path: " + expBlockFile);
} }
} }
/**
* Called by the thread before each potential disk scan so that a pause
* can be optionally inserted to limit the number of scans per second.
* The limit is controlled by
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
*/
private void throttle() throws InterruptedException {
accumulateTimeRunning();
if ((throttleLimitMsPerSec < 1000) &&
(throttleTimer.now(TimeUnit.MILLISECONDS) > throttleLimitMsPerSec)) {
Thread.sleep(MILLIS_PER_SECOND - throttleLimitMsPerSec);
throttleTimer.reset().start();
}
accumulateTimeWaiting();
}
/**
* Helper method to measure time running.
*/
private void accumulateTimeRunning() {
timeRunningMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
perfTimer.reset().start();
}
/**
* Helper method to measure time waiting.
*/
private void accumulateTimeWaiting() {
timeWaitingMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
perfTimer.reset().start();
}
} }
} }

View File

@ -600,6 +600,26 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.directoryscan.throttle.limit.ms.per.sec</name>
<value>0</value>
<description>The report compilation threads are limited to only running for
a given number of milliseconds per second, as configured by the
property. The limit is taken per thread, not in aggregate, e.g. setting
a limit of 100ms for 4 compiler threads will result in each thread being
limited to 100ms, not 25ms.
Note that the throttle does not interrupt the report compiler threads, so the
actual running time of the threads per second will typically be somewhat
higher than the throttle limit, usually by no more than 20%.
Setting this limit to 1000 disables compiler thread throttling. Only
values between 1 and 1000 are valid. Setting an invalid value will result
in the throttle being disbled and an error message being logged. 1000 is
the default setting.
</description>
</property>
<property> <property>
<name>dfs.heartbeat.interval</name> <name>dfs.heartbeat.interval</name>
<value>3</value> <value>3</value>

View File

@ -33,6 +33,10 @@ import java.nio.channels.FileChannel;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -55,6 +59,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Test; import org.junit.Test;
/** /**
@ -494,7 +499,13 @@ public class TestDirectoryScanner {
scan(totalBlocks+3, 6, 2, 2, 3, 2); scan(totalBlocks+3, 6, 2, 2, 3, 2);
scan(totalBlocks+1, 0, 0, 0, 0, 0); scan(totalBlocks+1, 0, 0, 0, 0, 0);
// Test14: validate clean shutdown of DirectoryScanner // Test14: make sure no throttling is happening
assertTrue("Throttle appears to be engaged",
scanner.timeWaitingMs.get() < 10L);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test15: validate clean shutdown of DirectoryScanner
////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not sim ////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not sim
scanner.shutdown(); scanner.shutdown();
assertFalse(scanner.getRunStatus()); assertFalse(scanner.getRunStatus());
@ -508,6 +519,219 @@ public class TestDirectoryScanner {
} }
} }
/**
* Test that the timeslice throttle limits the report compiler thread's
* execution time correctly. We test by scanning a large block pool and
* comparing the time spent waiting to the time spent running.
*
* The block pool has to be large, or the ratio will be off. The throttle
* allows the report compiler thread to finish its current cycle when
* blocking it, so the ratio will always be a little lower than expected.
* The smaller the block pool, the further off the ratio will be.
*
* @throws Exception thrown on unexpected failure
*/
@Test (timeout=300000)
public void testThrottling() throws Exception {
Configuration conf = new Configuration(CONF);
// We need lots of blocks so the report compiler threads have enough to
// keep them busy while we watch them.
int blocks = 20000;
int maxRetries = 3;
cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
100);
DataNode dataNode = cluster.getDataNodes().get(0);
createFile(GenericTestUtils.getMethodName(),
BLOCK_LENGTH * blocks, false);
float ratio = 0.0f;
int retries = maxRetries;
while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
scanner = new DirectoryScanner(dataNode, fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
// Waiting should be about 9x running.
LOG.info("RATIO: " + ratio);
assertTrue("Throttle is too restrictive", ratio <= 10f);
assertTrue("Throttle is too permissive", ratio >= 7f);
// Test with a different limit
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
200);
ratio = 0.0f;
retries = maxRetries;
while ((retries > 0) && ((ratio < 3f) || (ratio > 4.5f))) {
scanner = new DirectoryScanner(dataNode, fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
// Waiting should be about 4x running.
LOG.info("RATIO: " + ratio);
assertTrue("Throttle is too restrictive", ratio <= 4.5f);
assertTrue("Throttle is too permissive", ratio >= 3.0f);
// Test with more than 1 thread
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 3);
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
100);
ratio = 0.0f;
retries = maxRetries;
while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
scanner = new DirectoryScanner(dataNode, fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
// Waiting should be about 9x running.
LOG.info("RATIO: " + ratio);
assertTrue("Throttle is too restrictive", ratio <= 10f);
assertTrue("Throttle is too permissive", ratio >= 7f);
// Test with no limit
scanner = new DirectoryScanner(dataNode, fds, CONF);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
assertFalse(scanner.getRunStatus());
assertTrue("Throttle appears to be engaged",
scanner.timeWaitingMs.get() < 10L);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test with a 1ms limit. This also tests whether the scanner can be
// shutdown cleanly in mid stride.
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
1);
ratio = 0.0f;
retries = maxRetries;
ScheduledExecutorService interruptor =
Executors.newScheduledThreadPool(maxRetries);
try {
while ((retries > 0) && (ratio < 10)) {
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner.setRetainDiffs(true);
final AtomicLong nowMs = new AtomicLong();
// Stop the scanner after 2 seconds because otherwise it will take an
// eternity to complete it's run
interruptor.schedule(new Runnable() {
@Override
public void run() {
scanner.shutdown();
nowMs.set(Time.monotonicNow());
}
}, 2L, TimeUnit.SECONDS);
scanner.reconcile();
assertFalse(scanner.getRunStatus());
LOG.info("Scanner took " + (Time.monotonicNow() - nowMs.get())
+ "ms to shutdown");
assertTrue("Scanner took too long to shutdown",
Time.monotonicNow() - nowMs.get() < 1000L);
ratio =
(float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
retries -= 1;
}
} finally {
interruptor.shutdown();
}
// We just want to test that it waits a lot, but it also runs some
LOG.info("RATIO: " + ratio);
assertTrue("Throttle is too permissive",
ratio > 10);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test with a 0 limit, i.e. disabled
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
0);
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
assertFalse(scanner.getRunStatus());
assertTrue("Throttle appears to be engaged",
scanner.timeWaitingMs.get() < 10L);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test with a 1000 limit, i.e. disabled
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
1000);
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
assertFalse(scanner.getRunStatus());
assertTrue("Throttle appears to be engaged",
scanner.timeWaitingMs.get() < 10L);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test that throttle works from regular start
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
10);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
1);
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner.setRetainDiffs(true);
scanner.start();
int count = 50;
while ((count > 0) && (scanner.timeWaitingMs.get() < 500L)) {
Thread.sleep(100L);
count -= 1;
}
scanner.shutdown();
assertFalse(scanner.getRunStatus());
assertTrue("Throttle does not appear to be engaged", count > 0);
} finally {
cluster.shutdown();
}
}
private float runThrottleTest(int blocks) throws IOException {
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
assertFalse(scanner.getRunStatus());
return (float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
}
private void verifyAddition(long blockId, long genStamp, long size) { private void verifyAddition(long blockId, long genStamp, long size) {
final ReplicaInfo replicainfo; final ReplicaInfo replicainfo;
replicainfo = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); replicainfo = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);