HDFS-13947. Review of DirectoryScanner Class. Contributed by BELUGA BEHR.

This commit is contained in:
Inigo Goiri 2018-10-03 11:19:57 -07:00
parent 7051bd78b1
commit 1dc0adfac0
6 changed files with 487 additions and 436 deletions

View File

@ -709,7 +709,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
"dfs.datanode.directoryscan.throttle.limit.ms.per.sec";
public static final int
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = 1000;
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = -1;
public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";

View File

@ -1086,7 +1086,7 @@ public class DataNode extends ReconfigurableBase
reason = "verifcation is not supported by SimulatedFSDataset";
}
if (reason == null) {
directoryScanner = new DirectoryScanner(this, data, conf);
directoryScanner = new DirectoryScanner(data, conf);
directoryScanner.start();
} else {
LOG.info("Periodic Directory Tree Verification scan " +

View File

@ -17,17 +17,19 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -36,23 +38,27 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.time.FastDateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
/**
* Periodically scans the data directories for block and block metadata files.
@ -62,21 +68,17 @@ import org.apache.hadoop.util.Time;
public class DirectoryScanner implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(DirectoryScanner.class);
private static final int MILLIS_PER_SECOND = 1000;
private static final String START_MESSAGE =
"Periodic Directory Tree Verification scan"
+ " starting at %s with interval of %dms";
private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE
+ " and throttle limit of %dms/s";
private static final int DEFAULT_MAP_SIZE = 32768;
private final FsDatasetSpi<?> dataset;
private final ExecutorService reportCompileThreadPool;
private final ScheduledExecutorService masterThread;
private final long scanPeriodMsecs;
private final int throttleLimitMsPerSec;
private volatile boolean shouldRun = false;
private final long throttleLimitMsPerSec;
private final AtomicBoolean shouldRun = new AtomicBoolean();
private boolean retainDiffs = false;
private final DataNode datanode;
/**
* Total combined wall clock time (in milliseconds) spent by the report
@ -84,26 +86,30 @@ public class DirectoryScanner implements Runnable {
*/
@VisibleForTesting
final AtomicLong timeRunningMs = new AtomicLong(0L);
/**
* Total combined wall clock time (in milliseconds) spent by the report
* compiler threads blocked by the throttle. Used for testing purposes.
*/
@VisibleForTesting
final AtomicLong timeWaitingMs = new AtomicLong(0L);
/**
* The complete list of block differences indexed by block pool ID.
*/
@VisibleForTesting
final ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool();
/**
* Statistics about the block differences in each blockpool, indexed by
* block pool ID.
*/
@VisibleForTesting
final Map<String, Stats> stats = new HashMap<String, Stats>();
final BlockPoolReport diffs = new BlockPoolReport();
/**
* Allow retaining diffs for unit test and analysis. Defaults to false (off)
* Statistics about the block differences in each blockpool, indexed by block
* pool ID.
*/
@VisibleForTesting
final Map<String, Stats> stats;
/**
* Allow retaining diffs for unit test and analysis. Defaults to false (off).
*
* @param b whether to retain diffs
*/
@VisibleForTesting
@ -126,6 +132,7 @@ public class DirectoryScanner implements Runnable {
/**
* Create a new Stats object for the given blockpool ID.
*
* @param bpid blockpool ID
*/
public Stats(String bpid) {
@ -134,81 +141,145 @@ public class DirectoryScanner implements Runnable {
@Override
public String toString() {
return "BlockPool " + bpid
+ " Total blocks: " + totalBlocks + ", missing metadata files:"
+ missingMetaFile + ", missing block files:" + missingBlockFile
+ ", missing blocks in memory:" + missingMemoryBlocks
+ ", mismatched blocks:" + mismatchBlocks;
return "BlockPool " + bpid + " Total blocks: " + totalBlocks
+ ", missing metadata files: " + missingMetaFile
+ ", missing block files: " + missingBlockFile
+ ", missing blocks in memory: " + missingMemoryBlocks
+ ", mismatched blocks: " + mismatchBlocks;
}
}
/**
* Helper class for compiling block info reports from report compiler threads.
* Contains a volume, a set of block pool IDs, and a collection of ScanInfo
* objects. If a block pool exists but has no ScanInfo objects associated with
* it, there will be no mapping for that particular block pool.
*/
static class ScanInfoPerBlockPool extends
HashMap<String, LinkedList<ScanInfo>> {
@VisibleForTesting
public static class ScanInfoVolumeReport {
@SuppressWarnings("unused")
private static final long serialVersionUID = 1L;
private final FsVolumeSpi volume;
private final BlockPoolReport blockPoolReport;
/**
* Create a new info list.
*
* @param volume
*/
ScanInfoPerBlockPool() {super();}
ScanInfoVolumeReport(final FsVolumeSpi volume) {
this.volume = volume;
this.blockPoolReport = new BlockPoolReport();
}
/**
* Create a new info list initialized to the given expected size.
* See {@link java.util.HashMap#HashMap(int)}.
*
* @param sz initial expected size
*/
ScanInfoPerBlockPool(int sz) {super(sz);}
/**
* Merges {@code that} ScanInfoPerBlockPool into this one
*
* @param that ScanInfoPerBlockPool to merge
*/
public void addAll(ScanInfoPerBlockPool that) {
if (that == null) return;
for (Entry<String, LinkedList<ScanInfo>> entry : that.entrySet()) {
String bpid = entry.getKey();
LinkedList<ScanInfo> list = entry.getValue();
if (this.containsKey(bpid)) {
//merge that per-bpid linked list with this one
this.get(bpid).addAll(list);
} else {
//add that new bpid and its linked list to this
this.put(bpid, list);
ScanInfoVolumeReport(final FsVolumeSpi volume,
final Collection<String> blockPools) {
this.volume = volume;
this.blockPoolReport = new BlockPoolReport(blockPools);
}
public void addAll(final String bpid,
final Collection<ScanInfo> scanInfos) {
this.blockPoolReport.addAll(bpid, scanInfos);
}
public Set<String> getBlockPoolIds() {
return this.blockPoolReport.getBlockPoolIds();
}
public List<ScanInfo> getScanInfo(final String bpid) {
return this.blockPoolReport.getScanInfo(bpid);
}
public FsVolumeSpi getVolume() {
return volume;
}
@Override
public String toString() {
return "ScanInfoVolumeReport [volume=" + volume + ", blockPoolReport="
+ blockPoolReport + "]";
}
}
/**
* Convert all the LinkedList values in this ScanInfoPerBlockPool map
* into sorted arrays, and return a new map of these arrays per blockpool
*
* @return a map of ScanInfo arrays per blockpool
* Helper class for compiling block info reports per block pool.
*/
public Map<String, ScanInfo[]> toSortedArrays() {
Map<String, ScanInfo[]> result =
new HashMap<String, ScanInfo[]>(this.size());
@VisibleForTesting
public static class BlockPoolReport {
for (Entry<String, LinkedList<ScanInfo>> entry : this.entrySet()) {
String bpid = entry.getKey();
LinkedList<ScanInfo> list = entry.getValue();
@SuppressWarnings("unused")
private static final long serialVersionUID = 1L;
// convert list to array
ScanInfo[] record = list.toArray(new ScanInfo[list.size()]);
private final Set<String> blockPools;
private final ListMultimap<String, ScanInfo> map;
/**
* Create a block pool report.
*
* @param volume
*/
BlockPoolReport() {
this.blockPools = new HashSet<>(2);
this.map = ArrayListMultimap.create(2, DEFAULT_MAP_SIZE);
}
/**
* Create a new block pool report initialized to the given expected size.
*
* @param blockPools initial list of known block pools
*/
BlockPoolReport(final Collection<String> blockPools) {
this.blockPools = new HashSet<>(blockPools);
this.map = ArrayListMultimap.create(blockPools.size(), DEFAULT_MAP_SIZE);
}
public void addAll(final String bpid,
final Collection<ScanInfo> scanInfos) {
this.blockPools.add(bpid);
this.map.putAll(bpid, scanInfos);
}
public void sortBlocks() {
for (final String bpid : this.map.keySet()) {
final List<ScanInfo> list = this.map.get(bpid);
// Sort array based on blockId
Arrays.sort(record);
result.put(bpid, record);
}
return result;
Collections.sort(list);
}
}
public Set<String> getBlockPoolIds() {
return Collections.unmodifiableSet(this.blockPools);
}
public List<ScanInfo> getScanInfo(final String bpid) {
return this.map.get(bpid);
}
public Collection<Map.Entry<String, ScanInfo>> getEntries() {
return Collections.unmodifiableCollection(this.map.entries());
}
public void clear() {
this.map.clear();
this.blockPools.clear();
}
@Override
public String toString() {
return "BlockPoolReport [blockPools=" + blockPools + ", map=" + map + "]";
}
}
/**
* Create a new directory scanner, but don't cycle it running yet.
@ -217,48 +288,40 @@ public class DirectoryScanner implements Runnable {
* @param dataset the dataset to scan
* @param conf the Configuration object
*/
public DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset,
Configuration conf) {
this.datanode = datanode;
public DirectoryScanner(FsDatasetSpi<?> dataset, Configuration conf) {
this.dataset = dataset;
this.stats = new HashMap<>(DEFAULT_MAP_SIZE);
int interval = (int) conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT,
TimeUnit.SECONDS);
scanPeriodMsecs = interval * MILLIS_PER_SECOND; //msec
int throttle =
conf.getInt(
scanPeriodMsecs = TimeUnit.SECONDS.toMillis(interval);
int throttle = conf.getInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
if ((throttle > MILLIS_PER_SECOND) || (throttle <= 0)) {
if (throttle > MILLIS_PER_SECOND) {
LOG.error(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
+ " set to value above 1000 ms/sec. Assuming default value of " +
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
} else {
LOG.error(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
+ " set to value below 1 ms/sec. Assuming default value of " +
if (throttle >= TimeUnit.SECONDS.toMillis(1)) {
LOG.warn(
"{} set to value above 1000 ms/sec. Assuming default value of {}",
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
throttle =
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT;
}
throttleLimitMsPerSec =
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT;
} else {
throttleLimitMsPerSec = throttle;
}
int threads =
conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
reportCompileThreadPool = Executors.newFixedThreadPool(threads,
new Daemon.DaemonFactory());
masterThread = new ScheduledThreadPoolExecutor(1,
new Daemon.DaemonFactory());
reportCompileThreadPool =
Executors.newFixedThreadPool(threads, new Daemon.DaemonFactory());
masterThread =
new ScheduledThreadPoolExecutor(1, new Daemon.DaemonFactory());
}
/**
@ -266,23 +329,14 @@ public class DirectoryScanner implements Runnable {
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
*/
void start() {
shouldRun = true;
long offset = ThreadLocalRandom.current().nextInt(
(int) (scanPeriodMsecs/MILLIS_PER_SECOND)) * MILLIS_PER_SECOND; //msec
long firstScanTime = Time.now() + offset;
String logMsg;
shouldRun.set(true);
long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs);
if (throttleLimitMsPerSec < MILLIS_PER_SECOND) {
logMsg = String.format(START_MESSAGE_WITH_THROTTLE,
FastDateFormat.getInstance().format(firstScanTime), scanPeriodMsecs,
throttleLimitMsPerSec);
} else {
logMsg = String.format(START_MESSAGE,
FastDateFormat.getInstance().format(firstScanTime), scanPeriodMsecs);
}
LOG.info(
"Periodic Directory Tree Verification scan starting in {}ms with interval of {}ms and throttle limit of {}ms/s",
firstScanTime, scanPeriodMsecs, throttleLimitMsPerSec);
LOG.info(logMsg);
masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs,
masterThread.scheduleAtFixedRate(this, firstScanTime, scanPeriodMsecs,
TimeUnit.MILLISECONDS);
}
@ -293,7 +347,7 @@ public class DirectoryScanner implements Runnable {
*/
@VisibleForTesting
boolean getRunStatus() {
return shouldRun;
return shouldRun.get();
}
/**
@ -305,27 +359,29 @@ public class DirectoryScanner implements Runnable {
}
/**
* Main program loop for DirectoryScanner. Runs {@link reconcile()}
* and handles any exceptions.
* Main program loop for DirectoryScanner. Runs {@link reconcile()} and
* handles any exceptions.
*/
@Override
public void run() {
try {
if (!shouldRun) {
//shutdown has been activated
LOG.warn("this cycle terminating immediately because 'shouldRun' has been deactivated");
if (!shouldRun.get()) {
// shutdown has been activated
LOG.warn(
"This cycle terminating immediately because 'shouldRun' has been deactivated");
return;
}
//We're are okay to run - do it
try {
reconcile();
} catch (Exception e) {
//Log and continue - allows Executor to run again next cycle
LOG.error("Exception during DirectoryScanner execution - will continue next cycle", e);
// Log and continue - allows Executor to run again next cycle
LOG.error(
"Exception during DirectoryScanner execution - will continue next cycle",
e);
} catch (Error er) {
//Non-recoverable error - re-throw after logging the problem
LOG.error("System Error during DirectoryScanner execution - permanently terminating periodic scanner", er);
// Non-recoverable error - re-throw after logging the problem
LOG.error(
"System Error during DirectoryScanner execution - permanently terminating periodic scanner",
er);
throw er;
}
}
@ -333,39 +389,39 @@ public class DirectoryScanner implements Runnable {
/**
* Stops the directory scanner. This method will wait for 1 minute for the
* main thread to exit and an additional 1 minute for the report compilation
* threads to exit. If a thread does not exit in that time period, it is
* left running, and an error is logged.
* threads to exit. If a thread does not exit in that time period, it is left
* running, and an error is logged.
*/
void shutdown() {
if (!shouldRun) {
LOG.warn("DirectoryScanner: shutdown has been called, but periodic scanner not started");
} else {
LOG.warn("DirectoryScanner: shutdown has been called");
LOG.info("Shutdown has been called");
if (!shouldRun.getAndSet(false)) {
LOG.warn("Shutdown has been called, but periodic scanner not started");
}
if (masterThread != null) {
masterThread.shutdown();
}
shouldRun = false;
if (masterThread != null) masterThread.shutdown();
if (reportCompileThreadPool != null) {
reportCompileThreadPool.shutdownNow();
}
if (masterThread != null) {
try {
masterThread.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOG.error("interrupted while waiting for masterThread to " +
"terminate", e);
LOG.error(
"interrupted while waiting for masterThread to " + "terminate", e);
}
}
if (reportCompileThreadPool != null) {
try {
reportCompileThreadPool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOG.error("interrupted while waiting for reportCompileThreadPool to " +
"terminate", e);
LOG.error("interrupted while waiting for reportCompileThreadPool to "
+ "terminate", e);
}
}
if (!retainDiffs) clear();
if (!retainDiffs) {
clear();
}
}
/**
@ -374,45 +430,54 @@ public class DirectoryScanner implements Runnable {
@VisibleForTesting
public void reconcile() throws IOException {
scan();
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
String bpid = entry.getKey();
LinkedList<ScanInfo> diff = entry.getValue();
for (ScanInfo info : diff) {
dataset.checkAndUpdate(bpid, info);
for (final Map.Entry<String, ScanInfo> entry : diffs.getEntries()) {
dataset.checkAndUpdate(entry.getKey(), entry.getValue());
}
if (!retainDiffs) {
clear();
}
if (!retainDiffs) clear();
}
/**
* Scan for the differences between disk and in-memory blocks
* Scan only the "finalized blocks" lists of both disk and memory.
* Scan for the differences between disk and in-memory blocks Scan only the
* "finalized blocks" lists of both disk and memory.
*/
private void scan() {
BlockPoolReport blockPoolReport = new BlockPoolReport();
clear();
Map<String, ScanInfo[]> diskReport = getDiskReport();
Collection<ScanInfoVolumeReport> volumeReports = getVolumeReports();
for (ScanInfoVolumeReport volumeReport : volumeReports) {
for (String blockPoolId : volumeReport.getBlockPoolIds()) {
List<ScanInfo> scanInfos = volumeReport.getScanInfo(blockPoolId);
blockPoolReport.addAll(blockPoolId, scanInfos);
}
}
// Pre-sort the reports outside of the lock
blockPoolReport.sortBlocks();
// Hold FSDataset lock to prevent further changes to the block map
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
String bpid = entry.getKey();
ScanInfo[] blockpoolReport = entry.getValue();
try (AutoCloseableLock lock = dataset.acquireDatasetLock()) {
for (final String bpid : blockPoolReport.getBlockPoolIds()) {
List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid);
Stats statsRecord = new Stats(bpid);
stats.put(bpid, statsRecord);
LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>();
diffs.put(bpid, diffRecord);
Collection<ScanInfo> diffRecord = new ArrayList<>();
statsRecord.totalBlocks = blockpoolReport.length;
statsRecord.totalBlocks = blockpoolReport.size();
final List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid);
Collections.sort(bl); // Sort based on blockId
int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot
while (m < bl.size() && d < blockpoolReport.length) {
while (m < bl.size() && d < blockpoolReport.size()) {
ReplicaInfo memBlock = bl.get(m);
ScanInfo info = blockpoolReport[d];
ScanInfo info = blockpoolReport.get(d);
if (info.getBlockId() < memBlock.getBlockId()) {
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
// Block is missing in memory
@ -424,15 +489,15 @@ public class DirectoryScanner implements Runnable {
}
if (info.getBlockId() > memBlock.getBlockId()) {
// Block is missing on the disk
addDifference(diffRecord, statsRecord,
memBlock.getBlockId(), info.getVolume());
addDifference(diffRecord, statsRecord, memBlock.getBlockId(),
info.getVolume());
m++;
continue;
}
// Block file and/or metadata file exists on the disk
// Block exists in memory
if (info.getVolume().getStorageType() != StorageType.PROVIDED &&
info.getBlockFile() == null) {
if (info.getVolume().getStorageType() != StorageType.PROVIDED
&& info.getBlockFile() == null) {
// Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
@ -442,16 +507,16 @@ public class DirectoryScanner implements Runnable {
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
} else if (memBlock.compareWith(info) != 0) {
// volumeMap record and on-disk files don't match.
// volumeMap record and on-disk files do not match.
statsRecord.duplicateBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
if (d < blockpoolReport.length) {
// There may be multiple on-disk records for the same block, don't increment
// the memory record pointer if so.
ScanInfo nextInfo = blockpoolReport[d];
if (d < blockpoolReport.size()) {
// There may be multiple on-disk records for the same block, do not
// increment the memory record pointer if so.
ScanInfo nextInfo = blockpoolReport.get(d);
if (nextInfo.getBlockId() != info.getBlockId()) {
++m;
}
@ -461,132 +526,108 @@ public class DirectoryScanner implements Runnable {
}
while (m < bl.size()) {
ReplicaInfo current = bl.get(m++);
addDifference(diffRecord, statsRecord,
current.getBlockId(), current.getVolume());
addDifference(diffRecord, statsRecord, current.getBlockId(),
current.getVolume());
}
while (d < blockpoolReport.length) {
if (!dataset.isDeletingBlock(bpid, blockpoolReport[d].getBlockId())) {
while (d < blockpoolReport.size()) {
if (!dataset.isDeletingBlock(bpid,
blockpoolReport.get(d).getBlockId())) {
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, blockpoolReport[d]);
addDifference(diffRecord, statsRecord, blockpoolReport.get(d));
}
d++;
}
LOG.info(statsRecord.toString());
} //end for
} //end synchronized
diffs.addAll(bpid, diffRecord);
LOG.info("Scan Results: {}", statsRecord);
}
}
}
/**
* Add the ScanInfo object to the list of differences and adjust the stats
* accordingly. This method is called when a block is found on the disk,
* but the in-memory block is missing or does not match the block on the disk.
* accordingly. This method is called when a block is found on the disk, but
* the in-memory block is missing or does not match the block on the disk.
*
* @param diffRecord the list to which to add the info
* @param diffRecord the collection to which to add the info
* @param statsRecord the stats to update
* @param info the differing info
*/
private void addDifference(LinkedList<ScanInfo> diffRecord,
Stats statsRecord, ScanInfo info) {
private void addDifference(Collection<ScanInfo> diffRecord, Stats statsRecord,
ScanInfo info) {
statsRecord.missingMetaFile += info.getMetaFile() == null ? 1 : 0;
statsRecord.missingBlockFile += info.getBlockFile() == null ? 1 : 0;
diffRecord.add(info);
}
/**
* Add a new ScanInfo object to the list of differences and adjust the stats
* accordingly. This method is called when a block is not found on the disk.
* Add a new ScanInfo object to the collection of differences and adjust the
* stats accordingly. This method is called when a block is not found on the
* disk.
*
* @param diffRecord the list to which to add the info
* @param diffRecord the collection to which to add the info
* @param statsRecord the stats to update
* @param blockId the id of the missing block
* @param vol the volume that contains the missing block
*/
private void addDifference(LinkedList<ScanInfo> diffRecord,
Stats statsRecord, long blockId,
FsVolumeSpi vol) {
private void addDifference(Collection<ScanInfo> diffRecord, Stats statsRecord,
long blockId, FsVolumeSpi vol) {
statsRecord.missingBlockFile++;
statsRecord.missingMetaFile++;
diffRecord.add(new ScanInfo(blockId, null, null, vol));
}
/**
* Get the lists of blocks on the disks in the dataset, sorted by blockId.
* The returned map contains one entry per blockpool, keyed by the blockpool
* ID.
*
* @return a map of sorted arrays of block information
* Get the lists of blocks on the disks in the data set.
*/
@VisibleForTesting
public Map<String, ScanInfo[]> getDiskReport() {
ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
ScanInfoPerBlockPool[] dirReports = null;
public Collection<ScanInfoVolumeReport> getVolumeReports() {
List<ScanInfoVolumeReport> volReports = new ArrayList<>();
List<Future<ScanInfoVolumeReport>> compilersInProgress = new ArrayList<>();
// First get list of data directories
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
// Use an array since the threads may return out of order and
// compilersInProgress#keySet may return out of order as well.
dirReports = new ScanInfoPerBlockPool[volumes.size()];
Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
for (int i = 0; i < volumes.size(); i++) {
if (volumes.get(i).getStorageType() == StorageType.PROVIDED) {
for (final FsVolumeSpi volume : volumes) {
// Disable scanning PROVIDED volumes to keep overhead low
continue;
}
ReportCompiler reportCompiler =
new ReportCompiler(datanode, volumes.get(i));
Future<ScanInfoPerBlockPool> result =
if (volume.getStorageType() != StorageType.PROVIDED) {
ReportCompiler reportCompiler = new ReportCompiler(volume);
Future<ScanInfoVolumeReport> result =
reportCompileThreadPool.submit(reportCompiler);
compilersInProgress.put(i, result);
compilersInProgress.add(result);
}
}
for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
compilersInProgress.entrySet()) {
Integer index = report.getKey();
for (Future<ScanInfoVolumeReport> future : compilersInProgress) {
try {
dirReports[index] = report.getValue().get();
// If our compiler threads were interrupted, give up on this run
if (dirReports[index] == null) {
dirReports = null;
final ScanInfoVolumeReport result = future.get();
if (!CollectionUtils.addIgnoreNull(volReports, result)) {
// This compiler thread were interrupted, give up on this run
volReports.clear();
break;
}
} catch (Exception ex) {
FsVolumeSpi fsVolumeSpi = volumes.get(index);
LOG.error("Error compiling report for the volume, StorageId: "
+ fsVolumeSpi.getStorageID(), ex);
// Continue scanning the other volumes
LOG.warn("Error compiling report. Continuing.", ex);
}
}
} catch (IOException e) {
LOG.error("Unexpected IOException by closing FsVolumeReference", e);
}
if (dirReports != null) {
// Compile consolidated report for all the volumes
for (ScanInfoPerBlockPool report : dirReports) {
if(report != null){
list.addAll(report);
}
}
}
return list.toSortedArrays();
return volReports;
}
/**
* The ReportCompiler class encapsulates the process of searching a datanode's
* disks for block information. It operates by performing a DFS of the
* volume to discover block information.
* disks for block information. It operates by performing a DFS of the volume
* to discover block information.
*
* When the ReportCompiler discovers block information, it create a new
* ScanInfo object for it and adds that object to its report list. The report
* list is returned by the {@link #call()} method.
*/
public class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
public class ReportCompiler implements Callable<ScanInfoVolumeReport> {
private final FsVolumeSpi volume;
private final DataNode datanode;
// Variable for tracking time spent running for throttling purposes
private final StopWatch throttleTimer = new StopWatch();
// Variable for tracking time spent running and waiting for testing
@ -594,13 +635,11 @@ public class DirectoryScanner implements Runnable {
private final StopWatch perfTimer = new StopWatch();
/**
* Create a report compiler for the given volume on the given datanode.
* Create a report compiler for the given volume.
*
* @param datanode the target datanode
* @param volume the target volume
*/
public ReportCompiler(DataNode datanode, FsVolumeSpi volume) {
this.datanode = datanode;
public ReportCompiler(FsVolumeSpi volume) {
this.volume = volume;
}
@ -608,12 +647,13 @@ public class DirectoryScanner implements Runnable {
* Run this report compiler thread.
*
* @return the block info report list
* @throws IOException if the block pool isn't found
* @throws IOException if the block pool is not found
*/
@Override
public ScanInfoPerBlockPool call() throws IOException {
public ScanInfoVolumeReport call() throws IOException {
String[] bpList = volume.getBlockPoolList();
ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
ScanInfoVolumeReport result =
new ScanInfoVolumeReport(volume, Arrays.asList(bpList));
perfTimer.start();
throttleTimer.start();
for (String bpid : bpList) {
@ -623,34 +663,46 @@ public class DirectoryScanner implements Runnable {
throttleTimer.reset().start();
try {
result.put(bpid, volume.compileReport(bpid, report, this));
// ScanInfos are added directly to 'report' list
volume.compileReport(bpid, report, this);
result.addAll(bpid, report);
} catch (InterruptedException ex) {
// Exit quickly and flag the scanner to do the same
result = null;
break;
}
}
LOG.trace("Scanner volume report: {}", result);
return result;
}
/**
* Called by the thread before each potential disk scan so that a pause
* can be optionally inserted to limit the number of scans per second.
* The limit is controlled by
* Called by the thread before each potential disk scan so that a pause can
* be optionally inserted to limit the number of scans per second. The limit
* is controlled by
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
*/
public void throttle() throws InterruptedException {
accumulateTimeRunning();
if ((throttleLimitMsPerSec < 1000) &&
(throttleTimer.now(TimeUnit.MILLISECONDS) > throttleLimitMsPerSec)) {
Thread.sleep(MILLIS_PER_SECOND - throttleLimitMsPerSec);
if (throttleLimitMsPerSec > 0L) {
final long runningTime = throttleTimer.now(TimeUnit.MILLISECONDS);
if (runningTime >= throttleLimitMsPerSec) {
final long sleepTime;
if (runningTime >= 1000L) {
LOG.warn("Unable to throttle within the second. Blocking for 1s.");
sleepTime = 1000L;
} else {
// Sleep for the expected time plus any time processing ran over
final long overTime = runningTime - throttleLimitMsPerSec;
sleepTime = (1000L - throttleLimitMsPerSec) + overTime;
}
Thread.sleep(sleepTime);
throttleTimer.reset().start();
}
accumulateTimeWaiting();
}
}
/**
* Helper method to measure time running.
@ -679,4 +731,5 @@ public class DirectoryScanner implements Runnable {
|| name.startsWith(Block.BLOCK_FILE_PREFIX);
}
}
}

View File

@ -33,6 +33,7 @@ import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -44,26 +45,23 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
@ -73,14 +71,17 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests {@link DirectoryScanner} handling of differences
* between blocks on the disk and block in memory.
* Tests {@link DirectoryScanner} handling of differences between blocks on the
* disk and block in memory.
*/
public class TestDirectoryScanner {
private static final Logger LOG =
@ -110,21 +111,20 @@ public class TestDirectoryScanner {
LazyPersistTestCase.initCacheManipulator();
}
/** create a file with a length of <code>fileLen</code> */
private List<LocatedBlock> createFile(String fileNamePrefix,
long fileLen,
/** create a file with a length of <code>fileLen</code>. */
private List<LocatedBlock> createFile(String fileNamePrefix, long fileLen,
boolean isLazyPersist) throws IOException {
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/" + fileNamePrefix + ".dat");
DFSTestUtil.createFile(
fs, filePath, isLazyPersist, 1024, fileLen,
DFSTestUtil.createFile(fs, filePath, isLazyPersist, 1024, fileLen,
BLOCK_LENGTH, (short) 1, r.nextLong(), false);
return client.getLocatedBlocks(filePath.toString(), 0, fileLen).getLocatedBlocks();
return client.getLocatedBlocks(filePath.toString(), 0, fileLen)
.getLocatedBlocks();
}
/** Truncate a block file */
/** Truncate a block file. */
private long truncateBlockFile() throws IOException {
try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = new File(b.getBlockURI());
File mf = new File(b.getMetadataURI());
@ -149,7 +149,7 @@ public class TestDirectoryScanner {
/** Delete a block file */
private long deleteBlockFile() {
try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = new File(b.getBlockURI());
File mf = new File(b.getMetadataURI());
@ -165,7 +165,7 @@ public class TestDirectoryScanner {
/** Delete block meta file */
private long deleteMetaFile() {
try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
// Delete a metadata file
if (b.metadataExists() && b.deleteMetadata()) {
@ -179,11 +179,12 @@ public class TestDirectoryScanner {
/**
* Duplicate the given block on all volumes.
*
* @param blockId
* @throws IOException
*/
private void duplicateBlock(long blockId) throws IOException {
try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
try (AutoCloseableLock lock = fds.acquireDatasetLock()) {
ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
try (FsDatasetSpi.FsVolumeReferences volumes =
fds.getFsVolumeReferences()) {
@ -199,16 +200,14 @@ public class TestDirectoryScanner {
URI destRoot = v.getStorageLocation().getUri();
String relativeBlockPath =
sourceRoot.relativize(sourceBlock.toURI())
.getPath();
sourceRoot.relativize(sourceBlock.toURI()).getPath();
String relativeMetaPath =
sourceRoot.relativize(sourceMeta.toURI())
.getPath();
sourceRoot.relativize(sourceMeta.toURI()).getPath();
File destBlock = new File(new File(destRoot).toString(),
relativeBlockPath);
File destMeta = new File(new File(destRoot).toString(),
relativeMetaPath);
File destBlock =
new File(new File(destRoot).toString(), relativeBlockPath);
File destMeta =
new File(new File(destRoot).toString(), relativeMetaPath);
destBlock.getParentFile().mkdirs();
FileUtils.copyFile(sourceBlock, destBlock);
@ -223,7 +222,7 @@ public class TestDirectoryScanner {
}
}
/** Get a random blockId that is not used already */
/** Get a random blockId that is not used already. */
private long getFreeBlockId() {
long id = rand.nextLong();
while (true) {
@ -244,14 +243,15 @@ public class TestDirectoryScanner {
+ Block.METADATA_EXTENSION;
}
/** Create a block file in a random volume*/
/** Create a block file in a random volume. */
private long createBlockFile() throws IOException {
long id = getFreeBlockId();
try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) {
try (
FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) {
int numVolumes = volumes.size();
int index = rand.nextInt(numVolumes - 1);
File finalizedDir = ((FsVolumeImpl) volumes.get(index))
.getFinalizedDir(bpid);
File finalizedDir =
((FsVolumeImpl) volumes.get(index)).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
@ -260,14 +260,14 @@ public class TestDirectoryScanner {
return id;
}
/** Create a metafile in a random volume*/
/** Create a metafile in a random volume */
private long createMetaFile() throws IOException {
long id = getFreeBlockId();
try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) {
int numVolumes = refs.size();
int index = rand.nextInt(numVolumes - 1);
File finalizedDir = ((FsVolumeImpl) refs.get(index))
.getFinalizedDir(bpid);
File finalizedDir =
((FsVolumeImpl) refs.get(index)).getFinalizedDir(bpid);
File file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
@ -276,7 +276,7 @@ public class TestDirectoryScanner {
return id;
}
/** Create block file and corresponding metafile in a rondom volume */
/** Create block file and corresponding metafile in a rondom volume. */
private long createBlockMetaFile() throws IOException {
long id = getFreeBlockId();
@ -332,22 +332,22 @@ public class TestDirectoryScanner {
verifyStats(totalBlocks, diffsize, missingMetaFile, missingBlockFile,
missingMemoryBlocks, mismatchBlocks, duplicateBlocks);
} catch (AssertionError ex) {
LOG.warn("Assertion Error", ex);
return false;
}
return true;
}, 50, 2000);
}, 100, 2000);
}
private void verifyStats(long totalBlocks, int diffsize, long missingMetaFile,
long missingBlockFile, long missingMemoryBlocks, long mismatchBlocks,
long duplicateBlocks) {
assertTrue(scanner.diffs.containsKey(bpid));
LinkedList<FsVolumeSpi.ScanInfo> diff = scanner.diffs.get(bpid);
assertTrue(scanner.stats.containsKey(bpid));
DirectoryScanner.Stats stats = scanner.stats.get(bpid);
Collection<FsVolumeSpi.ScanInfo> diff = scanner.diffs.getScanInfo(bpid);
assertEquals(diffsize, diff.size());
DirectoryScanner.Stats stats = scanner.stats.get(bpid);
assertNotNull(stats);
assertEquals(totalBlocks, stats.totalBlocks);
assertEquals(missingMetaFile, stats.missingMetaFile);
assertEquals(missingBlockFile, stats.missingBlockFile);
@ -356,20 +356,18 @@ public class TestDirectoryScanner {
assertEquals(duplicateBlocks, stats.duplicateBlocks);
}
@Test (timeout=300000)
@Test(timeout = 300000)
public void testRetainBlockOnPersistentStorage() throws Exception {
cluster = new MiniDFSCluster
.Builder(CONF)
.storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
.numDataNodes(1)
.build();
cluster = new MiniDFSCluster.Builder(CONF)
.storageTypes(
new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
.numDataNodes(1).build();
try {
cluster.waitActive();
DataNode dataNode = cluster.getDataNodes().get(0);
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
scanner = new DirectoryScanner(dataNode, fds, CONF);
scanner = new DirectoryScanner(fds, CONF);
scanner.setRetainDiffs(true);
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
@ -397,24 +395,22 @@ public class TestDirectoryScanner {
}
}
@Test (timeout=300000)
@Test(timeout = 300000)
public void testDeleteBlockOnTransientStorage() throws Exception {
cluster = new MiniDFSCluster
.Builder(CONF)
.storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
.numDataNodes(1)
.build();
cluster = new MiniDFSCluster.Builder(CONF)
.storageTypes(
new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
.numDataNodes(1).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
DataNode dataNode = cluster.getDataNodes().get(0);
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
scanner = new DirectoryScanner(dataNode, fds, CONF);
scanner = new DirectoryScanner(fds, CONF);
scanner.setRetainDiffs(true);
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
// Create a file file on RAM_DISK
// Create a file on RAM_DISK
List<LocatedBlock> blocks =
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, true);
@ -440,7 +436,7 @@ public class TestDirectoryScanner {
}
}
@Test (timeout=600000)
@Test(timeout = 600000)
public void testDirectoryScanner() throws Exception {
// Run the test with and without parallel scanning
for (int parallelism = 1; parallelism < 3; parallelism++) {
@ -457,8 +453,8 @@ public class TestDirectoryScanner {
client = cluster.getFileSystem().getClient();
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
parallelism);
DataNode dataNode = cluster.getDataNodes().get(0);
scanner = new DirectoryScanner(dataNode, fds, CONF);
scanner = new DirectoryScanner(fds, CONF);
scanner.setRetainDiffs(true);
// Add files with 100 blocks
@ -492,7 +488,7 @@ public class TestDirectoryScanner {
// Test5: A metafile exists for which there is no block file and
// a block in memory
blockId = createMetaFile();
scan(totalBlocks+1, 1, 0, 1, 1, 0);
scan(totalBlocks + 1, 1, 0, 1, 1, 0);
File metafile = new File(getMetaFile(blockId));
assertTrue(!metafile.exists());
scan(totalBlocks, 0, 0, 0, 0, 0);
@ -521,7 +517,7 @@ public class TestDirectoryScanner {
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test9: create a bunch of blocks files
for (int i = 0; i < 10 ; i++) {
for (int i = 0; i < 10; i++) {
blockId = createBlockFile();
}
totalBlocks += 10;
@ -529,14 +525,14 @@ public class TestDirectoryScanner {
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test10: create a bunch of metafiles
for (int i = 0; i < 10 ; i++) {
for (int i = 0; i < 10; i++) {
blockId = createMetaFile();
}
scan(totalBlocks+10, 10, 0, 10, 10, 0);
scan(totalBlocks + 10, 10, 0, 10, 10, 0);
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test11: create a bunch block files and meta files
for (int i = 0; i < 10 ; i++) {
for (int i = 0; i < 10; i++) {
blockId = createBlockMetaFile();
}
totalBlocks += 10;
@ -544,7 +540,7 @@ public class TestDirectoryScanner {
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test12: truncate block files to test block length mismatch
for (int i = 0; i < 10 ; i++) {
for (int i = 0; i < 10; i++) {
truncateBlockFile();
}
scan(totalBlocks, 10, 0, 0, 0, 10);
@ -557,8 +553,8 @@ public class TestDirectoryScanner {
deleteMetaFile();
deleteBlockFile();
truncateBlockFile();
scan(totalBlocks+3, 6, 2, 2, 3, 2);
scan(totalBlocks+1, 0, 0, 0, 0, 0);
scan(totalBlocks + 3, 6, 2, 2, 3, 2);
scan(totalBlocks + 1, 0, 0, 0, 0, 0);
// Test14: make sure no throttling is happening
assertTrue("Throttle appears to be engaged",
@ -567,7 +563,8 @@ public class TestDirectoryScanner {
scanner.timeRunningMs.get() > 0L);
// Test15: validate clean shutdown of DirectoryScanner
////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not sim
//// assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not
// sim
scanner.shutdown();
assertFalse(scanner.getRunStatus());
@ -586,13 +583,13 @@ public class TestDirectoryScanner {
* comparing the time spent waiting to the time spent running.
*
* The block pool has to be large, or the ratio will be off. The throttle
* allows the report compiler thread to finish its current cycle when
* blocking it, so the ratio will always be a little lower than expected.
* The smaller the block pool, the further off the ratio will be.
* allows the report compiler thread to finish its current cycle when blocking
* it, so the ratio will always be a little lower than expected. The smaller
* the block pool, the further off the ratio will be.
*
* @throws Exception thrown on unexpected failure
*/
@Test (timeout=600000)
@Test(timeout = 600000)
public void testThrottling() throws Exception {
Configuration conf = new Configuration(CONF);
@ -611,10 +608,9 @@ public class TestDirectoryScanner {
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
100);
DataNode dataNode = cluster.getDataNodes().get(0);
final int maxBlocksPerFile = (int) DFSConfigKeys
.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT;
final int maxBlocksPerFile =
(int) DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT;
int numBlocksToCreate = blocks;
while (numBlocksToCreate > 0) {
final int toCreate = Math.min(maxBlocksPerFile, numBlocksToCreate);
@ -627,7 +623,7 @@ public class TestDirectoryScanner {
int retries = maxRetries;
while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner = new DirectoryScanner(fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
@ -645,7 +641,7 @@ public class TestDirectoryScanner {
retries = maxRetries;
while ((retries > 0) && ((ratio < 2.75f) || (ratio > 4.5f))) {
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner = new DirectoryScanner(fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
@ -664,7 +660,7 @@ public class TestDirectoryScanner {
retries = maxRetries;
while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner = new DirectoryScanner(fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
@ -675,7 +671,7 @@ public class TestDirectoryScanner {
assertTrue("Throttle is too permissive", ratio >= 7f);
// Test with no limit
scanner = new DirectoryScanner(dataNode, fds, CONF);
scanner = new DirectoryScanner(fds, CONF);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
@ -698,7 +694,7 @@ public class TestDirectoryScanner {
try {
while ((retries > 0) && (ratio < 10)) {
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
final AtomicLong nowMs = new AtomicLong();
@ -728,7 +724,7 @@ public class TestDirectoryScanner {
}
ratio =
(float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
(float) scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
retries -= 1;
}
} finally {
@ -737,8 +733,7 @@ public class TestDirectoryScanner {
// We just want to test that it waits a lot, but it also runs some
LOG.info("RATIO: " + ratio);
assertTrue("Throttle is too permissive",
ratio > 10);
assertTrue("Throttle is too permissive", ratio > 8);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
@ -746,7 +741,7 @@ public class TestDirectoryScanner {
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
0);
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
@ -761,7 +756,7 @@ public class TestDirectoryScanner {
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
1000);
scanner = new DirectoryScanner(dataNode, fds, conf);
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
@ -777,9 +772,8 @@ public class TestDirectoryScanner {
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
10);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
1);
scanner = new DirectoryScanner(dataNode, fds, conf);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scanner.start();
@ -805,7 +799,7 @@ public class TestDirectoryScanner {
scanner.shutdown();
assertFalse(scanner.getRunStatus());
return (float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
return (float) scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
}
private void verifyAddition(long blockId, long genStamp, long size) {
@ -902,6 +896,7 @@ public class TestDirectoryScanner {
throw new UnsupportedOperationException();
}
@SuppressWarnings("rawtypes")
@Override
public FsDatasetSpi getDataset() {
throw new UnsupportedOperationException();
@ -923,8 +918,8 @@ public class TestDirectoryScanner {
}
@Override
public byte[] loadLastPartialChunkChecksum(
File blockFile, File metaFile) throws IOException {
public byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile)
throws IOException {
return null;
}
@ -945,7 +940,6 @@ public class TestDirectoryScanner {
return null;
}
@Override
public VolumeCheckResult check(VolumeCheckContext context)
throws Exception {
@ -987,7 +981,7 @@ public class TestDirectoryScanner {
assertNull(scanInfo.getMetaFile());
}
@Test(timeout=120000)
@Test(timeout = 120000)
public void TestScanInfo() throws Exception {
testScanInfoObject(123,
new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath(),
@ -998,13 +992,10 @@ public class TestDirectoryScanner {
new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath(),
"blk_123"),
null);
testScanInfoObject(523,
null,
testScanInfoObject(523, null,
new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath(),
"blk_123__1009.meta"));
testScanInfoObject(789,
null,
null);
testScanInfoObject(789, null, null);
testScanInfoObject(456);
testScanInfoObject(123,
new File(TEST_VOLUME.getFinalizedDir(BPID_2).getAbsolutePath(),
@ -1027,7 +1018,6 @@ public class TestDirectoryScanner {
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
DataNode dataNode = cluster.getDataNodes().get(0);
// Add files with 2 blocks
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 2, false);
@ -1047,7 +1037,7 @@ public class TestDirectoryScanner {
FsDatasetSpi<? extends FsVolumeSpi> spyFds = Mockito.spy(fds);
Mockito.doReturn(volReferences).when(spyFds).getFsVolumeReferences();
scanner = new DirectoryScanner(dataNode, spyFds, CONF);
scanner = new DirectoryScanner(spyFds, CONF);
scanner.setRetainDiffs(true);
scanner.reconcile();
} finally {
@ -1061,28 +1051,27 @@ public class TestDirectoryScanner {
@Test
public void testDirectoryScannerInFederatedCluster() throws Exception {
//Create Federated cluster with two nameservices and one DN
// Create Federated cluster with two nameservices and one DN
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF)
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
.numDataNodes(1).build()) {
cluster.waitActive();
cluster.transitionToActive(1);
cluster.transitionToActive(3);
DataNode dataNode = cluster.getDataNodes().get(0);
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
//Create one block in first nameservice
// Create one block in first nameservice
FileSystem fs = cluster.getFileSystem(1);
int bp1Files = 1;
writeFile(fs, bp1Files);
//Create two blocks in second nameservice
// Create two blocks in second nameservice
FileSystem fs2 = cluster.getFileSystem(3);
int bp2Files = 2;
writeFile(fs2, bp2Files);
//Call the Directory scanner
scanner = new DirectoryScanner(dataNode, fds, CONF);
// Call the Directory scanner
scanner = new DirectoryScanner(fds, CONF);
scanner.setRetainDiffs(true);
scanner.reconcile();
//Check blocks in corresponding BP
// Check blocks in corresponding BP
GenericTestUtils.waitFor(() -> {
try {

View File

@ -34,6 +34,7 @@ import java.io.Writer;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfoVolumeReport;
import org.apache.hadoop.hdfs.server.datanode.FinalizedProvidedReplica;
import org.apache.hadoop.hdfs.server.datanode.ProvidedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
@ -73,9 +75,9 @@ import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.TestProvidedReplicaImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils;
@ -183,7 +185,6 @@ public class TestProvidedImpl {
public static class TestFileRegionBlockAliasMap
extends BlockAliasMap<FileRegion> {
private Configuration conf;
private int minId;
private int numBlocks;
private Iterator<FileRegion> suppliedIterator;
@ -592,11 +593,13 @@ public class TestProvidedImpl {
@Test
public void testScannerWithProvidedVolumes() throws Exception {
DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf);
Map<String, FsVolumeSpi.ScanInfo[]> report = scanner.getDiskReport();
DirectoryScanner scanner = new DirectoryScanner(dataset, conf);
Collection<ScanInfoVolumeReport> reports = scanner.getVolumeReports();
// no blocks should be reported for the Provided volume as long as
// the directoryScanner is disabled.
assertEquals(0, report.get(BLOCK_POOL_IDS[CHOSEN_BP_ID]).length);
for (ScanInfoVolumeReport report : reports) {
assertEquals(0, report.getScanInfo(BLOCK_POOL_IDS[CHOSEN_BP_ID]).size());
}
}
/**

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
@ -63,14 +64,19 @@ public class TestListCorruptFileBlocks {
@Test (timeout=300000)
public void testListCorruptFilesCorruptedBlock() throws Exception {
MiniDFSCluster cluster = null;
Random random = new Random();
try {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); // datanode scans directories
conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3 * 1000); // datanode sends block reports
// datanode scans directories
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
// datanode sends block reports
conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3 * 1000);
// Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem();
@ -84,8 +90,8 @@ public class TestListCorruptFileBlocks {
final NameNode namenode = cluster.getNameNode();
Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode.
getNamesystem().listCorruptFileBlocks("/", null);
assertTrue("Namenode has " + badFiles.size()
+ " corrupt files. Expecting None.", badFiles.size() == 0);
assertEquals("Namenode has " + badFiles.size()
+ " corrupt files. Expecting None.", 0, badFiles.size());
// Now deliberately corrupt one block
String bpid = cluster.getNamesystem().getBlockPoolId();
@ -101,7 +107,7 @@ public class TestListCorruptFileBlocks {
long position = channel.size() - 2;
int length = 2;
byte[] buffer = new byte[length];
random.nextBytes(buffer);
new Random(13L).nextBytes(buffer);
channel.write(ByteBuffer.wrap(buffer), position);
file.close();
LOG.info("Deliberately corrupting file " + metaFile.getName() +
@ -134,7 +140,6 @@ public class TestListCorruptFileBlocks {
@Test (timeout=300000)
public void testListCorruptFileBlocksInSafeMode() throws Exception {
MiniDFSCluster cluster = null;
Random random = new Random();
try {
Configuration conf = new HdfsConfiguration();
@ -164,8 +169,8 @@ public class TestListCorruptFileBlocks {
// fetch bad file list from namenode. There should be none.
Collection<FSNamesystem.CorruptFileBlockInfo> badFiles =
cluster.getNameNode().getNamesystem().listCorruptFileBlocks("/", null);
assertTrue("Namenode has " + badFiles.size()
+ " corrupt files. Expecting None.", badFiles.size() == 0);
assertEquals("Namenode has " + badFiles.size()
+ " corrupt files. Expecting None.", 0, badFiles.size());
// Now deliberately corrupt one block
File storageDir = cluster.getInstanceStorageDir(0, 0);
@ -181,7 +186,7 @@ public class TestListCorruptFileBlocks {
long position = channel.size() - 2;
int length = 2;
byte[] buffer = new byte[length];
random.nextBytes(buffer);
new Random(13L).nextBytes(buffer);
channel.write(ByteBuffer.wrap(buffer), position);
file.close();
LOG.info("Deliberately corrupting file " + metaFile.getName() +
@ -318,9 +323,9 @@ public class TestListCorruptFileBlocks {
}
// Validate we get all the corrupt files
LOG.info("Namenode has bad files. " + numCorrupt);
assertTrue(numCorrupt == 3);
// test the paging here
assertEquals(3, numCorrupt);
// test the paging here
FSNamesystem.CorruptFileBlockInfo[] cfb = corruptFileBlocks
.toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
// now get the 2nd and 3rd file that is corrupt
@ -331,7 +336,7 @@ public class TestListCorruptFileBlocks {
FSNamesystem.CorruptFileBlockInfo[] ncfb = nextCorruptFileBlocks
.toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
numCorrupt = nextCorruptFileBlocks.size();
assertTrue(numCorrupt == 2);
assertEquals(2, numCorrupt);
assertTrue(ncfb[0].block.getBlockName()
.equalsIgnoreCase(cfb[1].block.getBlockName()));
@ -339,14 +344,14 @@ public class TestListCorruptFileBlocks {
namenode.getNamesystem()
.listCorruptFileBlocks("/corruptData", cookie);
numCorrupt = corruptFileBlocks.size();
assertTrue(numCorrupt == 0);
assertEquals(0, numCorrupt);
// Do a listing on a dir which doesn't have any corrupt blocks and
// validate
util.createFiles(fs, "/goodData");
corruptFileBlocks =
namenode.getNamesystem().listCorruptFileBlocks("/goodData", null);
numCorrupt = corruptFileBlocks.size();
assertTrue(numCorrupt == 0);
assertEquals(0, numCorrupt);
util.cleanup(fs, "/corruptData");
util.cleanup(fs, "/goodData");
} finally {
@ -390,7 +395,7 @@ public class TestListCorruptFileBlocks {
RemoteIterator<Path> corruptFileBlocks =
dfs.listCorruptFileBlocks(new Path("/corruptData"));
int numCorrupt = countPaths(corruptFileBlocks);
assertTrue(numCorrupt == 0);
assertEquals(0, numCorrupt);
// delete the blocks
String bpid = cluster.getNamesystem().getBlockPoolId();
// For loop through number of datadirectories per datanode (2)
@ -426,7 +431,7 @@ public class TestListCorruptFileBlocks {
}
// Validate we get all the corrupt files
LOG.info("Namenode has bad files. " + numCorrupt);
assertTrue(numCorrupt == 3);
assertEquals(3, numCorrupt);
util.cleanup(fs, "/corruptData");
util.cleanup(fs, "/goodData");
@ -465,8 +470,9 @@ public class TestListCorruptFileBlocks {
final NameNode namenode = cluster.getNameNode();
Collection<FSNamesystem.CorruptFileBlockInfo> badFiles = namenode.
getNamesystem().listCorruptFileBlocks("/srcdat2", null);
assertTrue("Namenode has " + badFiles.size() + " corrupt files. Expecting none.",
badFiles.size() == 0);
assertEquals(
"Namenode has " + badFiles.size() + " corrupt files. Expecting none.",
0, badFiles.size());
// Now deliberately blocks from all files
final String bpid = cluster.getNamesystem().getBlockPoolId();
@ -555,7 +561,7 @@ public class TestListCorruptFileBlocks {
RemoteIterator<Path> corruptFileBlocks = dfs
.listCorruptFileBlocks(new Path("corruptData"));
int numCorrupt = countPaths(corruptFileBlocks);
assertTrue(numCorrupt == 0);
assertEquals(0, numCorrupt);
// delete the blocks
String bpid = cluster.getNamesystem().getBlockPoolId();
@ -589,7 +595,7 @@ public class TestListCorruptFileBlocks {
}
// Validate we get all the corrupt files
LOG.info("Namenode has bad files. " + numCorrupt);
assertTrue("Failed to get corrupt files!", numCorrupt == 3);
assertEquals("Failed to get corrupt files!", 3, numCorrupt);
util.cleanup(fs, "corruptData");
} finally {