HDFS-8873. Allow the directoryScanner to be rate-limited (Daniel Templeton via Colin P. McCabe)

(cherry picked from commit 7a3c381b39)
This commit is contained in:
Colin Patrick Mccabe 2015-09-26 04:09:06 -07:00
parent 85c0cb0075
commit e5dc723d48
5 changed files with 570 additions and 29 deletions

View File

@ -626,6 +626,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9133. ExternalBlockReader and ReplicaAccessor need to return -1 on read
when at EOF. (Colin Patrick McCabe via Lei (Eddy) Xu)
HDFS-8873. Allow the directoryScanner to be rate-limited (Daniel Templeton
via Colin P. McCabe)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -399,6 +399,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600;
public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads";
public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1;
public static final String
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
"dfs.datanode.directoryscan.throttle.limit.ms.per.sec";
public static final int
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = 1000;
public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
@ -33,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -47,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time;
/**
@ -56,27 +59,59 @@ import org.apache.hadoop.util.Time;
@InterfaceAudience.Private
public class DirectoryScanner implements Runnable {
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
private static final int MILLIS_PER_SECOND = 1000;
private static final String START_MESSAGE =
"Periodic Directory Tree Verification scan"
+ " starting at %dms with interval of %dms";
private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE
+ " and throttle limit of %dms/s";
private final FsDatasetSpi<?> dataset;
private final ExecutorService reportCompileThreadPool;
private final ScheduledExecutorService masterThread;
private final long scanPeriodMsecs;
private final int throttleLimitMsPerSec;
private volatile boolean shouldRun = false;
private boolean retainDiffs = false;
private final DataNode datanode;
/**
* Total combined wall clock time (in milliseconds) spent by the report
* compiler threads executing. Used for testing purposes.
*/
@VisibleForTesting
final AtomicLong timeRunningMs = new AtomicLong(0L);
/**
* Total combined wall clock time (in milliseconds) spent by the report
* compiler threads blocked by the throttle. Used for testing purposes.
*/
@VisibleForTesting
final AtomicLong timeWaitingMs = new AtomicLong(0L);
/**
* The complete list of block differences indexed by block pool ID.
*/
@VisibleForTesting
final ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool();
/**
* Statistics about the block differences in each blockpool, indexed by
* block pool ID.
*/
@VisibleForTesting
final Map<String, Stats> stats = new HashMap<String, Stats>();
/**
* Allow retaining diffs for unit test and analysis
* @param b - defaults to false (off)
* Allow retaining diffs for unit test and analysis. Defaults to false (off)
* @param b whether to retain diffs
*/
@VisibleForTesting
void setRetainDiffs(boolean b) {
retainDiffs = b;
}
/** Stats tracked for reporting and testing, per blockpool */
/**
* Stats tracked for reporting and testing, per blockpool
*/
@VisibleForTesting
static class Stats {
final String bpid;
long totalBlocks = 0;
@ -86,6 +121,10 @@ public class DirectoryScanner implements Runnable {
long mismatchBlocks = 0;
long duplicateBlocks = 0;
/**
* Create a new Stats object for the given blockpool ID.
* @param bpid blockpool ID
*/
public Stats(String bpid) {
this.bpid = bpid;
}
@ -100,17 +139,31 @@ public class DirectoryScanner implements Runnable {
}
}
/**
* Helper class for compiling block info reports from report compiler threads.
*/
static class ScanInfoPerBlockPool extends
HashMap<String, LinkedList<ScanInfo>> {
private static final long serialVersionUID = 1L;
/**
* Create a new info list.
*/
ScanInfoPerBlockPool() {super();}
/**
* Create a new info list initialized to the given expected size.
* See {@link java.util.HashMap#HashMap(int)}.
*
* @param sz initial expected size
*/
ScanInfoPerBlockPool(int sz) {super(sz);}
/**
* Merges {@code that} ScanInfoPerBlockPool into this one
*
* @param the ScanInfoPerBlockPool to merge
*/
public void addAll(ScanInfoPerBlockPool that) {
if (that == null) return;
@ -132,6 +185,7 @@ public class DirectoryScanner implements Runnable {
/**
* Convert all the LinkedList values in this ScanInfoPerBlockPool map
* into sorted arrays, and return a new map of these arrays per blockpool
*
* @return a map of ScanInfo arrays per blockpool
*/
public Map<String, ScanInfo[]> toSortedArrays() {
@ -208,6 +262,9 @@ public class DirectoryScanner implements Runnable {
* For example, the condensed version of /foo//bar is /foo/bar
* Unlike {@link File#getCanonicalPath()}, this will never perform I/O
* on the filesystem.
*
* @param path the path to condense
* @return the condensed path
*/
private static String getCondensedPath(String path) {
return CONDENSED_PATH_REGEX.matcher(path).
@ -230,6 +287,15 @@ public class DirectoryScanner implements Runnable {
throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
}
/**
* Create a ScanInfo object for a block. This constructor will examine
* the block data and meta-data files.
*
* @param blockId the block ID
* @param blockFile the path to the block data file
* @param metaFile the path to the block meta-data file
* @param vol the volume that contains the block
*/
ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
this.blockId = blockId;
String condensedVolPath = vol == null ? null :
@ -248,15 +314,31 @@ public class DirectoryScanner implements Runnable {
this.volume = vol;
}
/**
* Returns the block data file.
*
* @return the block data file
*/
File getBlockFile() {
return (blockSuffix == null) ? null :
new File(volume.getBasePath(), blockSuffix);
}
/**
* Return the length of the data block. The length returned is the length
* cached when this object was created.
*
* @return the length of the data block
*/
long getBlockFileLength() {
return blockFileLength;
}
/**
* Returns the block meta data file or null if there isn't one.
*
* @return the block meta data file
*/
File getMetaFile() {
if (metaSuffix == null) {
return null;
@ -267,10 +349,20 @@ public class DirectoryScanner implements Runnable {
}
}
/**
* Returns the block ID.
*
* @return the block ID
*/
long getBlockId() {
return blockId;
}
/**
* Returns the volume that contains the block that this object describes.
*
* @return the volume
*/
FsVolumeSpi getVolume() {
return volume;
}
@ -309,12 +401,44 @@ public class DirectoryScanner implements Runnable {
}
}
/**
* Create a new directory scanner, but don't cycle it running yet.
*
* @param datanode the parent datanode
* @param dataset the dataset to scan
* @param conf the Configuration object
*/
DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
scanPeriodMsecs = interval * 1000L; //msec
scanPeriodMsecs = interval * MILLIS_PER_SECOND; //msec
int throttle =
conf.getInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
if ((throttle > MILLIS_PER_SECOND) || (throttle <= 0)) {
if (throttle > MILLIS_PER_SECOND) {
LOG.error(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
+ " set to value above 1000 ms/sec. Assuming default value of " +
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
} else {
LOG.error(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
+ " set to value below 1 ms/sec. Assuming default value of " +
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
}
throttleLimitMsPerSec =
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT;
} else {
throttleLimitMsPerSec = throttle;
}
int threads =
conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
@ -325,30 +449,50 @@ public class DirectoryScanner implements Runnable {
new Daemon.DaemonFactory());
}
/**
* Start the scanner. The scanner will run every
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
*/
void start() {
shouldRun = true;
long offset = ThreadLocalRandom.current().nextInt(
(int) (scanPeriodMsecs/1000L)) * 1000L; //msec
(int) (scanPeriodMsecs/MILLIS_PER_SECOND)) * MILLIS_PER_SECOND; //msec
long firstScanTime = Time.now() + offset;
LOG.info("Periodic Directory Tree Verification scan starting at "
+ firstScanTime + " with interval " + scanPeriodMsecs);
String logMsg;
if (throttleLimitMsPerSec < MILLIS_PER_SECOND) {
logMsg = String.format(START_MESSAGE_WITH_THROTTLE, firstScanTime,
scanPeriodMsecs, throttleLimitMsPerSec);
} else {
logMsg = String.format(START_MESSAGE, firstScanTime, scanPeriodMsecs);
}
LOG.info(logMsg);
masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs,
TimeUnit.MILLISECONDS);
}
// for unit test
/**
* Return whether the scanner has been started.
*
* @return whether the scanner has been started
*/
@VisibleForTesting
boolean getRunStatus() {
return shouldRun;
}
/**
* Clear the current cache of diffs and statistics.
*/
private void clear() {
diffs.clear();
stats.clear();
}
/**
* Main program loop for DirectoryScanner
* Runs "reconcile()" periodically under the masterThread.
* Main program loop for DirectoryScanner. Runs {@link reconcile()}
* and handles any exceptions.
*/
@Override
public void run() {
@ -372,6 +516,12 @@ public class DirectoryScanner implements Runnable {
}
}
/**
* Stops the directory scanner. This method will wait for 1 minute for the
* main thread to exit and an additional 1 minute for the report compilation
* threads to exit. If a thread does not exit in that time period, it is
* left running, and an error is logged.
*/
void shutdown() {
if (!shouldRun) {
LOG.warn("DirectoryScanner: shutdown has been called, but periodic scanner not started");
@ -380,7 +530,11 @@ public class DirectoryScanner implements Runnable {
}
shouldRun = false;
if (masterThread != null) masterThread.shutdown();
if (reportCompileThreadPool != null) reportCompileThreadPool.shutdown();
if (reportCompileThreadPool != null) {
reportCompileThreadPool.shutdownNow();
}
if (masterThread != null) {
try {
masterThread.awaitTermination(1, TimeUnit.MINUTES);
@ -403,6 +557,7 @@ public class DirectoryScanner implements Runnable {
/**
* Reconcile differences between disk and in-memory blocks
*/
@VisibleForTesting
void reconcile() throws IOException {
scan();
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
@ -421,7 +576,7 @@ public class DirectoryScanner implements Runnable {
* Scan for the differences between disk and in-memory blocks
* Scan only the "finalized blocks" lists of both disk and memory.
*/
void scan() {
private void scan() {
clear();
Map<String, ScanInfo[]> diskReport = getDiskReport();
@ -509,8 +664,13 @@ public class DirectoryScanner implements Runnable {
}
/**
* Block is found on the disk. In-memory block is missing or does not match
* the block on the disk
* Add the ScanInfo object to the list of differences and adjust the stats
* accordingly. This method is called when a block is found on the disk,
* but the in-memory block is missing or does not match the block on the disk.
*
* @param diffRecord the list to which to add the info
* @param statsRecord the stats to update
* @param info the differing info
*/
private void addDifference(LinkedList<ScanInfo> diffRecord,
Stats statsRecord, ScanInfo info) {
@ -519,7 +679,15 @@ public class DirectoryScanner implements Runnable {
diffRecord.add(info);
}
/** Block is not found on the disk */
/**
* Add a new ScanInfo object to the list of differences and adjust the stats
* accordingly. This method is called when a block is not found on the disk.
*
* @param diffRecord the list to which to add the info
* @param statsRecord the stats to update
* @param blockId the id of the missing block
* @param vol the volume that contains the missing block
*/
private void addDifference(LinkedList<ScanInfo> diffRecord,
Stats statsRecord, long blockId,
FsVolumeSpi vol) {
@ -528,7 +696,13 @@ public class DirectoryScanner implements Runnable {
diffRecord.add(new ScanInfo(blockId, null, null, vol));
}
/** Get lists of blocks on the disk sorted by blockId, per blockpool */
/**
* Get the lists of blocks on the disks in the dataset, sorted by blockId.
* The returned map contains one entry per blockpool, keyed by the blockpool
* ID.
*
* @return a map of sorted arrays of block information
*/
private Map<String, ScanInfo[]> getDiskReport() {
ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
ScanInfoPerBlockPool[] dirReports = null;
@ -555,6 +729,12 @@ public class DirectoryScanner implements Runnable {
compilersInProgress.entrySet()) {
try {
dirReports[report.getKey()] = report.getValue().get();
// If our compiler threads were interrupted, give up on this run
if (dirReports[report.getKey()] == null) {
dirReports = null;
break;
}
} catch (Exception ex) {
LOG.error("Error compiling report", ex);
// Propagate ex to DataBlockScanner to deal with
@ -573,38 +753,102 @@ public class DirectoryScanner implements Runnable {
return list.toSortedArrays();
}
/**
* Helper method to determine if a file name is consistent with a block.
* meta-data file
*
* @param blockId the block ID
* @param metaFile the file to check
* @return whether the file name is a block meta-data file name
*/
private static boolean isBlockMetaFile(String blockId, String metaFile) {
return metaFile.startsWith(blockId)
&& metaFile.endsWith(Block.METADATA_EXTENSION);
}
private static class ReportCompiler
implements Callable<ScanInfoPerBlockPool> {
/**
* The ReportCompiler class encapsulates the process of searching a datanode's
* disks for block information. It operates by performing a DFS of the
* volume to discover block information.
*
* When the ReportCompiler discovers block information, it create a new
* ScanInfo object for it and adds that object to its report list. The report
* list is returned by the {@link #call()} method.
*/
private class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
private final FsVolumeSpi volume;
private final DataNode datanode;
// Variable for tracking time spent running for throttling purposes
private final StopWatch throttleTimer = new StopWatch();
// Variable for tracking time spent running and waiting for testing
// purposes
private final StopWatch perfTimer = new StopWatch();
/**
* The associated thread. Used for testing purposes only.
*/
@VisibleForTesting
Thread currentThread;
/**
* Create a report compiler for the given volume on the given datanode.
*
* @param datanode the target datanode
* @param volume the target volume
*/
public ReportCompiler(DataNode datanode, FsVolumeSpi volume) {
this.datanode = datanode;
this.volume = volume;
}
/**
* Run this report compiler thread.
*
* @return the block info report list
* @throws IOException if the block pool isn't found
*/
@Override
public ScanInfoPerBlockPool call() throws Exception {
public ScanInfoPerBlockPool call() throws IOException {
currentThread = Thread.currentThread();
String[] bpList = volume.getBlockPoolList();
ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
for (String bpid : bpList) {
LinkedList<ScanInfo> report = new LinkedList<ScanInfo>();
LinkedList<ScanInfo> report = new LinkedList<>();
File bpFinalizedDir = volume.getFinalizedDir(bpid);
perfTimer.start();
throttleTimer.start();
try {
result.put(bpid,
compileReport(volume, bpFinalizedDir, bpFinalizedDir, report));
} catch (InterruptedException ex) {
// Exit quickly and flag the scanner to do the same
result = null;
break;
}
}
return result;
}
/** Compile list {@link ScanInfo} for the blocks in the directory <dir> */
/**
* Compile a list of {@link ScanInfo} for the blocks in the directory
* given by {@code dir}.
*
* @param vol the volume that contains the directory to scan
* @param bpFinalizedDir the root directory of the directory to scan
* @param dir the directory to scan
* @param report the list onto which blocks reports are placed
*/
private LinkedList<ScanInfo> compileReport(FsVolumeSpi vol,
File bpFinalizedDir, File dir, LinkedList<ScanInfo> report) {
File bpFinalizedDir, File dir, LinkedList<ScanInfo> report)
throws InterruptedException {
File[] files;
throttle();
try {
files = FileUtil.listFiles(dir);
} catch (IOException ioe) {
@ -622,6 +866,12 @@ public class DirectoryScanner implements Runnable {
* blk_<blockid>_<genstamp>.meta
*/
for (int i = 0; i < files.length; i++) {
// Make sure this thread can make a timely exit. With a low throttle
// rate, completing a run can take a looooong time.
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (files[i].isDirectory()) {
compileReport(vol, bpFinalizedDir, files[i], report);
continue;
@ -668,5 +918,40 @@ public class DirectoryScanner implements Runnable {
+ " has to be upgraded to block ID-based layout");
}
}
/**
* Called by the thread before each potential disk scan so that a pause
* can be optionally inserted to limit the number of scans per second.
* The limit is controlled by
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
*/
private void throttle() throws InterruptedException {
accumulateTimeRunning();
if ((throttleLimitMsPerSec < 1000) &&
(throttleTimer.now(TimeUnit.MILLISECONDS) > throttleLimitMsPerSec)) {
Thread.sleep(MILLIS_PER_SECOND - throttleLimitMsPerSec);
throttleTimer.reset().start();
}
accumulateTimeWaiting();
}
/**
* Helper method to measure time running.
*/
private void accumulateTimeRunning() {
timeRunningMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
perfTimer.reset().start();
}
/**
* Helper method to measure time waiting.
*/
private void accumulateTimeWaiting() {
timeWaitingMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
perfTimer.reset().start();
}
}
}

View File

@ -635,6 +635,26 @@
</description>
</property>
<property>
<name>dfs.datanode.directoryscan.throttle.limit.ms.per.sec</name>
<value>0</value>
<description>The report compilation threads are limited to only running for
a given number of milliseconds per second, as configured by the
property. The limit is taken per thread, not in aggregate, e.g. setting
a limit of 100ms for 4 compiler threads will result in each thread being
limited to 100ms, not 25ms.
Note that the throttle does not interrupt the report compiler threads, so the
actual running time of the threads per second will typically be somewhat
higher than the throttle limit, usually by no more than 20%.
Setting this limit to 1000 disables compiler thread throttling. Only
values between 1 and 1000 are valid. Setting an invalid value will result
in the throttle being disbled and an error message being logged. 1000 is
the default setting.
</description>
</property>
<property>
<name>dfs.heartbeat.interval</name>
<value>3</value>

View File

@ -33,6 +33,10 @@ import java.nio.channels.FileChannel;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
@ -56,6 +60,8 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.Test;
/**
@ -84,6 +90,11 @@ public class TestDirectoryScanner {
Long.MAX_VALUE);
}
@Before
public void setup() {
LazyPersistTestCase.initCacheManipulator();
}
/** create a file with a length of <code>fileLen</code> */
private List<LocatedBlock> createFile(String fileNamePrefix,
long fileLen,
@ -311,7 +322,6 @@ public class TestDirectoryScanner {
@Test (timeout=300000)
public void testRetainBlockOnPersistentStorage() throws Exception {
LazyPersistTestCase.initCacheManipulator();
cluster = new MiniDFSCluster
.Builder(CONF)
.storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
@ -353,7 +363,6 @@ public class TestDirectoryScanner {
@Test (timeout=300000)
public void testDeleteBlockOnTransientStorage() throws Exception {
LazyPersistTestCase.initCacheManipulator();
cluster = new MiniDFSCluster
.Builder(CONF)
.storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
@ -515,7 +524,13 @@ public class TestDirectoryScanner {
scan(totalBlocks+3, 6, 2, 2, 3, 2);
scan(totalBlocks+1, 0, 0, 0, 0, 0);
// Test14: validate clean shutdown of DirectoryScanner
// Test14: make sure no throttling is happening
assertTrue("Throttle appears to be engaged",
scanner.timeWaitingMs.get() < 10L);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test15: validate clean shutdown of DirectoryScanner
////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not sim
scanner.shutdown();
assertFalse(scanner.getRunStatus());
@ -529,6 +544,219 @@ public class TestDirectoryScanner {
}
}
/**
* Test that the timeslice throttle limits the report compiler thread's
* execution time correctly. We test by scanning a large block pool and
* comparing the time spent waiting to the time spent running.
*
* The block pool has to be large, or the ratio will be off. The throttle
* allows the report compiler thread to finish its current cycle when
* blocking it, so the ratio will always be a little lower than expected.
* The smaller the block pool, the further off the ratio will be.
*
* @throws Exception thrown on unexpected failure
*/
@Test (timeout=300000)
public void testThrottling() throws Exception {
Configuration conf = new Configuration(CONF);
// We need lots of blocks so the report compiler threads have enough to
// keep them busy while we watch them.
int blocks = 20000;
int maxRetries = 3;
cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
100);
DataNode dataNode = cluster.getDataNodes().get(0);
createFile(GenericTestUtils.getMethodName(),
BLOCK_LENGTH * blocks, false);
float ratio = 0.0f;
int retries = maxRetries;
while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
scanner = new DirectoryScanner(dataNode, fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
// Waiting should be about 9x running.
LOG.info("RATIO: " + ratio);
assertTrue("Throttle is too restrictive", ratio <= 10f);
assertTrue("Throttle is too permissive", ratio >= 7f);
// Test with a different limit
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
200);
ratio = 0.0f;
retries = maxRetries;
while ((retries > 0) && ((ratio < 3f) || (ratio > 4.5f))) {
scanner = new DirectoryScanner(dataNode, fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
// Waiting should be about 4x running.
LOG.info("RATIO: " + ratio);
assertTrue("Throttle is too restrictive", ratio <= 4.5f);
assertTrue("Throttle is too permissive", ratio >= 3.0f);
// Test with more than 1 thread
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 3);
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
100);
ratio = 0.0f;
retries = maxRetries;
while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
scanner = new DirectoryScanner(dataNode, fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
// Waiting should be about 9x running.
LOG.info("RATIO: " + ratio);
assertTrue("Throttle is too restrictive", ratio <= 10f);
assertTrue("Throttle is too permissive", ratio >= 7f);
// Test with no limit
scanner = new DirectoryScanner(dataNode, fds, CONF);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
assertFalse(scanner.getRunStatus());
assertTrue("Throttle appears to be engaged",
scanner.timeWaitingMs.get() < 10L);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test with a 1ms limit. This also tests whether the scanner can be
// shutdown cleanly in mid stride.
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
1);
ratio = 0.0f;
retries = maxRetries;
ScheduledExecutorService interruptor =
Executors.newScheduledThreadPool(maxRetries);
try {
while ((retries > 0) && (ratio < 10)) {
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner.setRetainDiffs(true);
final AtomicLong nowMs = new AtomicLong();
// Stop the scanner after 2 seconds because otherwise it will take an
// eternity to complete it's run
interruptor.schedule(new Runnable() {
@Override
public void run() {
scanner.shutdown();
nowMs.set(Time.monotonicNow());
}
}, 2L, TimeUnit.SECONDS);
scanner.reconcile();
assertFalse(scanner.getRunStatus());
LOG.info("Scanner took " + (Time.monotonicNow() - nowMs.get())
+ "ms to shutdown");
assertTrue("Scanner took too long to shutdown",
Time.monotonicNow() - nowMs.get() < 1000L);
ratio =
(float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
retries -= 1;
}
} finally {
interruptor.shutdown();
}
// We just want to test that it waits a lot, but it also runs some
LOG.info("RATIO: " + ratio);
assertTrue("Throttle is too permissive",
ratio > 10);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test with a 0 limit, i.e. disabled
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
0);
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
assertFalse(scanner.getRunStatus());
assertTrue("Throttle appears to be engaged",
scanner.timeWaitingMs.get() < 10L);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test with a 1000 limit, i.e. disabled
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
1000);
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
assertFalse(scanner.getRunStatus());
assertTrue("Throttle appears to be engaged",
scanner.timeWaitingMs.get() < 10L);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test that throttle works from regular start
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
10);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
1);
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner.setRetainDiffs(true);
scanner.start();
int count = 50;
while ((count > 0) && (scanner.timeWaitingMs.get() < 500L)) {
Thread.sleep(100L);
count -= 1;
}
scanner.shutdown();
assertFalse(scanner.getRunStatus());
assertTrue("Throttle does not appear to be engaged", count > 0);
} finally {
cluster.shutdown();
}
}
private float runThrottleTest(int blocks) throws IOException {
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
assertFalse(scanner.getRunStatus());
return (float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
}
private void verifyAddition(long blockId, long genStamp, long size) {
final ReplicaInfo replicainfo;
replicainfo = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);