Merge r1299139 and r1299144 from trunk for HDFS-3056.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1299146 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-03-10 02:06:00 +00:00
parent cac1864734
commit cd4ce55758
9 changed files with 462 additions and 394 deletions

View File

@ -45,8 +45,6 @@ Release 0.23.3 - UNRELEASED
HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple
storages. (suresh)
HDFS-3021. Use generic type to declare FSDatasetInterface. (szetszwo)
HDFS-2430. The number of failed or low-resource volumes the NN can tolerate
should be configurable. (atm)
@ -122,6 +120,11 @@ Release 0.23.3 - UNRELEASED
HDFS-3014. FSEditLogOp and its subclasses should have toString() method.
(Sho Shimauchi via atm)
HDFS-3021. Use generic type to declare FSDatasetInterface. (szetszwo)
HDFS-3056. Add a new interface RollingLogs for DataBlockScanner logging.
(szetszwo)
OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the
namenode state. (Tomasz Nykiel via hairong)

View File

@ -18,15 +18,9 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collections;
@ -34,7 +28,11 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -61,41 +59,43 @@ class BlockPoolSliceScanner {
public static final Log LOG = LogFactory.getLog(BlockPoolSliceScanner.class);
private static final String DATA_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS";
private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec
private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec
private static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
private static final String VERIFICATION_PREFIX = "dncp_block_verification.log";
static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
private final String blockPoolId;
private final long scanPeriod;
private final AtomicLong lastScanTime = new AtomicLong();
private static final String dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS";
static final String verificationLogFile = "dncp_block_verification.log";
static final int verficationLogLimit = 5; // * numBlocks.
private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000;
private DataNode datanode;
private final DataNode datanode;
private final FSDatasetInterface<? extends FSVolumeInterface> dataset;
// sorted set
private TreeSet<BlockScanInfo> blockInfoSet;
private HashMap<Block, BlockScanInfo> blockMap;
private final SortedSet<BlockScanInfo> blockInfoSet
= new TreeSet<BlockScanInfo>();
private final Map<Block, BlockScanInfo> blockMap
= new HashMap<Block, BlockScanInfo>();
// processedBlocks keeps track of which blocks are scanned
// since the last run.
private HashMap<Long, Integer> processedBlocks;
private volatile HashMap<Long, Integer> processedBlocks;
private long totalScans = 0;
private long totalScanErrors = 0;
private long totalTransientErrors = 0;
private long totalBlocksScannedInLastRun = 0; // Used for test only
private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only
private long currentPeriodStart = System.currentTimeMillis();
private long bytesLeft = 0; // Bytes to scan in this period
private long totalBytesToScan = 0;
private LogFileHandler verificationLog;
private final LogFileHandler verificationLog;
private DataTransferThrottler throttler = null;
private final DataTransferThrottler throttler = new DataTransferThrottler(
200, MAX_SCAN_RATE);
private static enum ScanType {
VERIFICATION_SCAN, // scanned as part of periodic verfication
@ -133,29 +133,48 @@ class BlockPoolSliceScanner {
}
}
BlockPoolSliceScanner(DataNode datanode,
BlockPoolSliceScanner(String bpid, DataNode datanode,
FSDatasetInterface<? extends FSVolumeInterface> dataset,
Configuration conf, String bpid) {
Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;
this.blockPoolId = bpid;
scanPeriod = conf.getInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
long hours = conf.getInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT);
if ( scanPeriod <= 0 ) {
scanPeriod = DEFAULT_SCAN_PERIOD_HOURS;
if (hours <= 0) {
hours = DEFAULT_SCAN_PERIOD_HOURS;
}
scanPeriod *= 3600 * 1000;
LOG.info("Periodic Block Verification scan initialized with interval " + scanPeriod + ".");
this.scanPeriod = hours * 3600 * 1000;
LOG.info("Periodic Block Verification Scanner initialized with interval "
+ hours + " hours for block pool " + bpid + ".");
// get the list of blocks and arrange them in random order
List<Block> arr = dataset.getFinalizedBlocks(blockPoolId);
Collections.shuffle(arr);
long scanTime = -1;
for (Block block : arr) {
BlockScanInfo info = new BlockScanInfo( block );
info.lastScanTime = scanTime--;
//still keep 'info.lastScanType' to NONE.
addBlockInfo(info);
}
RollingLogs rollingLogs = null;
try {
rollingLogs = dataset.createRollingLogs(blockPoolId, VERIFICATION_PREFIX);
} catch (IOException e) {
LOG.warn("Could not open verfication log. " +
"Verification times are not stored.");
}
verificationLog = rollingLogs == null? null: new LogFileHandler(rollingLogs);
}
String getBlockPoolId() {
return blockPoolId;
}
synchronized boolean isInitialized() {
return throttler != null;
}
private void updateBytesToScan(long len, long lastScanTime) {
// len could be negative when a block is deleted.
totalBytesToScan += len;
@ -197,51 +216,6 @@ class BlockPoolSliceScanner {
}
}
void init() throws IOException {
// get the list of blocks and arrange them in random order
List<Block> arr = dataset.getFinalizedBlocks(blockPoolId);
Collections.shuffle(arr);
blockInfoSet = new TreeSet<BlockScanInfo>();
blockMap = new HashMap<Block, BlockScanInfo>();
long scanTime = -1;
for (Block block : arr) {
BlockScanInfo info = new BlockScanInfo( block );
info.lastScanTime = scanTime--;
//still keep 'info.lastScanType' to NONE.
addBlockInfo(info);
}
/* Pick the first directory that has any existing scanner log.
* otherwise, pick the first directory.
*/
File dir = null;
final List<? extends FSVolumeInterface> volumes = dataset.getVolumes();
for (FSVolumeInterface vol : volumes) {
File bpDir = vol.getDirectory(blockPoolId);
if (LogFileHandler.isFilePresent(bpDir, verificationLogFile)) {
dir = bpDir;
break;
}
}
if (dir == null) {
dir = volumes.get(0).getDirectory(blockPoolId);
}
try {
// max lines will be updated later during initialization.
verificationLog = new LogFileHandler(dir, verificationLogFile, 100);
} catch (IOException e) {
LOG.warn("Could not open verfication log. " +
"Verification times are not stored.");
}
synchronized (this) {
throttler = new DataTransferThrottler(200, MAX_SCAN_RATE);
}
}
private synchronized long getNewBlockScanTime() {
/* If there are a lot of blocks, this returns a random time with in
* the scan period. Otherwise something sooner.
@ -255,10 +229,6 @@ class BlockPoolSliceScanner {
/** Adds block to list of blocks */
synchronized void addBlock(ExtendedBlock block) {
if (!isInitialized()) {
return;
}
BlockScanInfo info = blockMap.get(block.getLocalBlock());
if ( info != null ) {
LOG.warn("Adding an already existing block " + block);
@ -274,20 +244,19 @@ class BlockPoolSliceScanner {
/** Deletes the block from internal structures */
synchronized void deleteBlock(Block block) {
if (!isInitialized()) {
return;
}
BlockScanInfo info = blockMap.get(block);
if ( info != null ) {
delBlockInfo(info);
}
}
/** @return the last scan time */
synchronized long getLastScanTime(Block block) {
if (!isInitialized()) {
return 0;
/** @return the last scan time for the block pool. */
long getLastScanTime() {
return lastScanTime.get();
}
/** @return the last scan time the given block. */
synchronized long getLastScanTime(Block block) {
BlockScanInfo info = blockMap.get(block);
return info == null? 0: info.lastScanTime;
}
@ -302,9 +271,6 @@ class BlockPoolSliceScanner {
private synchronized void updateScanStatus(Block block,
ScanType type,
boolean scanOk) {
if (!isInitialized()) {
return;
}
BlockScanInfo info = blockMap.get(block);
if ( info != null ) {
@ -325,9 +291,9 @@ class BlockPoolSliceScanner {
return;
}
LogFileHandler log = verificationLog;
if (log != null) {
log.appendLine(now, block.getGenerationStamp(), block.getBlockId());
if (verificationLog != null) {
verificationLog.append(now, block.getGenerationStamp(),
block.getBlockId());
}
}
@ -342,6 +308,7 @@ class BlockPoolSliceScanner {
}
static private class LogEntry {
long blockId = -1;
long verificationTime = -1;
long genStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
@ -355,6 +322,14 @@ class BlockPoolSliceScanner {
private static Pattern entryPattern =
Pattern.compile("\\G\\s*([^=\\p{Space}]+)=\"(.*?)\"\\s*");
static String toString(long verificationTime, long genStamp, long blockId,
DateFormat dateFormat) {
return "\ndate=\"" + dateFormat.format(new Date(verificationTime))
+ "\"\t time=\"" + verificationTime
+ "\"\t genstamp=\"" + genStamp
+ "\"\t id=\"" + blockId + "\"";
}
static LogEntry parseEntry(String line) {
LogEntry entry = new LogEntry();
@ -491,8 +466,8 @@ class BlockPoolSliceScanner {
}
// Used for tests only
long getBlocksScannedInLastRun() {
return totalBlocksScannedInLastRun;
int getBlocksScannedInLastRun() {
return totalBlocksScannedInLastRun.get();
}
/**
@ -503,33 +478,19 @@ class BlockPoolSliceScanner {
* to exit.
*/
private boolean assignInitialVerificationTimes() {
int numBlocks = 1;
LogFileHandler log = null;
synchronized (this) {
log = verificationLog;
numBlocks = Math.max(blockMap.size(), 1);
}
//First updates the last verification times from the log file.
if (verificationLog != null) {
long now = System.currentTimeMillis();
LogFileHandler.Reader logReader[] = new LogFileHandler.Reader[2];
RollingLogs.LineIterator logIterator = null;
try {
if (log != null) {
logReader[0] = log.getCurrentFileReader();
logReader[1] = log.getPreviousFileReader();
}
} catch (IOException e) {
LOG.warn("Could not read previous verification times", e);
}
try {
for (LogFileHandler.Reader reader : logReader) {
logIterator = verificationLog.logs.iterator(false);
// update verification times from the verificationLog.
while (logReader != null && reader.hasNext()) {
while (logIterator.hasNext()) {
if (!datanode.shouldRun
|| datanode.blockScanner.blockScannerThread.isInterrupted()) {
return false;
}
LogEntry entry = LogEntry.parseEntry(reader.next());
LogEntry entry = LogEntry.parseEntry(logIterator.next());
if (entry != null) {
updateBlockInfo(entry);
if (now - entry.verificationTime < scanPeriod) {
@ -540,35 +501,35 @@ class BlockPoolSliceScanner {
updateBytesLeft(-info.block.getNumBytes());
processedBlocks.put(entry.blockId, 1);
}
if (reader.file == log.prevFile) {
if (logIterator.isPrevious()) {
// write the log entry to current file
// so that the entry is preserved for later runs.
log.appendLine(entry.verificationTime, entry.genStamp,
verificationLog.append(entry.verificationTime, entry.genStamp,
entry.blockId);
}
}
}
}
}
}
} catch (IOException e) {
LOG.warn("Failed to read previous verification times.", e);
} finally {
IOUtils.closeStream(logReader[0]);
IOUtils.closeStream(logReader[1]);
IOUtils.closeStream(logIterator);
}
}
/* Initially spread the block reads over half of
* MIN_SCAN_PERIOD so that we don't keep scanning the
* blocks too quickly when restarted.
*/
long verifyInterval = (long) (Math.min( scanPeriod/2.0/numBlocks,
10*60*1000 ));
long lastScanTime = System.currentTimeMillis() - scanPeriod;
/* Before this loop, entries in blockInfoSet that are not
* updated above have lastScanTime of <= 0 . Loop until first entry has
* lastModificationTime > 0.
*/
synchronized (this) {
final int numBlocks = Math.max(blockMap.size(), 1);
// Initially spread the block reads over half of scan period
// so that we don't keep scanning the blocks too quickly when restarted.
long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L);
long lastScanTime = System.currentTimeMillis() - scanPeriod;
if (!blockInfoSet.isEmpty()) {
BlockScanInfo info;
while ((info = blockInfoSet.first()).lastScanTime < 0) {
@ -587,11 +548,6 @@ class BlockPoolSliceScanner {
bytesLeft += len;
}
static File getCurrentFile(FSVolumeInterface vol, String bpid) throws IOException {
return LogFileHandler.getCurrentFile(vol.getDirectory(bpid),
BlockPoolSliceScanner.verificationLogFile);
}
private synchronized void startNewPeriod() {
LOG.info("Starting a new period : work left in prev period : "
+ String.format("%.2f%%", totalBytesToScan == 0 ? 0
@ -604,26 +560,21 @@ class BlockPoolSliceScanner {
void scanBlockPoolSlice() {
startNewPeriod();
if (processedBlocks != null) {
totalBlocksScannedInLastRun = processedBlocks.size();
}
// Create a new processedBlocks structure
processedBlocks = new HashMap<Long, Integer>();
if (verificationLog != null) {
try {
verificationLog.openCurFile();
} catch (FileNotFoundException ex) {
LOG.warn("Could not open current file");
}
}
if (!assignInitialVerificationTimes()) {
return;
}
// Start scanning
try {
scan();
} finally {
totalBlocksScannedInLastRun.set(processedBlocks.size());
lastScanTime.set(System.currentTimeMillis());
}
}
public void scan() {
private void scan() {
if (LOG.isDebugEnabled()) {
LOG.debug("Starting to scan blockpool: " + blockPoolId);
}
@ -663,7 +614,7 @@ class BlockPoolSliceScanner {
private synchronized void cleanUp() {
if (verificationLog != null) {
try {
verificationLog.roll();
verificationLog.logs.roll();
} catch (IOException ex) {
LOG.warn("Received exception: ", ex);
verificationLog.close();
@ -686,7 +637,7 @@ class BlockPoolSliceScanner {
int inScanPeriod = 0;
int neverScanned = 0;
DateFormat dateFormat = new SimpleDateFormat(dateFormatString);
DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
int total = blockInfoSet.size();
@ -751,190 +702,32 @@ class BlockPoolSliceScanner {
/**
* This class takes care of log file used to store the last verification
* times of the blocks. It rolls the current file when it is too big etc.
* If there is an error while writing, it stops updating with an error
* message.
* times of the blocks.
*/
private static class LogFileHandler {
private final DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
private static final String curFileSuffix = ".curr";
private static final String prevFileSuffix = ".prev";
private final DateFormat dateFormat = new SimpleDateFormat(dateFormatString);
private final RollingLogs logs;
static File getCurrentFile(File dir, String filePrefix) {
return new File(dir, filePrefix + curFileSuffix);
private LogFileHandler(RollingLogs logs) {
this.logs = logs;
}
public Reader getPreviousFileReader() throws IOException {
return new Reader(prevFile);
}
public Reader getCurrentFileReader() throws IOException {
return new Reader(curFile);
}
static boolean isFilePresent(File dir, String filePrefix) {
return new File(dir, filePrefix + curFileSuffix).exists() ||
new File(dir, filePrefix + prevFileSuffix).exists();
}
private File curFile;
private File prevFile;
private PrintStream out;
/**
* Opens the log file for appending.
* Note that rolling will happen only after "updateLineCount()" is
* called. This is so that line count could be updated in a separate
* thread without delaying start up.
*
* @param dir where the logs files are located.
* @param filePrefix prefix of the file.
* @param maxNumLines max lines in a file (its a soft limit).
* @throws IOException
*/
LogFileHandler(File dir, String filePrefix, int maxNumLines)
throws IOException {
curFile = new File(dir, filePrefix + curFileSuffix);
prevFile = new File(dir, filePrefix + prevFileSuffix);
}
/**
* Append "\n" + line.
* If the log file need to be rolled, it will done after
* appending the text.
* This does not throw IOException when there is an error while
* appending. Currently does not throw an error even if rolling
* fails (may be it should?).
* return true if append was successful.
*/
synchronized boolean appendLine(String line) {
if (out == null) {
return false;
}
out.println();
out.print(line);
return true;
}
boolean appendLine(long verificationTime, long genStamp, long blockId) {
return appendLine("date=\""
+ dateFormat.format(new Date(verificationTime)) + "\"\t " + "time=\""
+ verificationTime + "\"\t " + "genstamp=\"" + genStamp + "\"\t "
+ "id=\"" + blockId + "\"");
}
private synchronized void openCurFile() throws FileNotFoundException {
close();
out = new PrintStream(new FileOutputStream(curFile, true));
}
private void roll() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Rolling current file: " + curFile.getAbsolutePath()
+ " to previous file: " + prevFile.getAbsolutePath());
}
if (!prevFile.delete() && prevFile.exists()) {
throw new IOException("Could not delete " + prevFile);
}
close();
if (!curFile.renameTo(prevFile)) {
throw new IOException("Could not rename " + curFile +
" to " + prevFile);
}
}
synchronized void close() {
if (out != null) {
out.close();
out = null;
}
}
/**
* This is used to read the lines in order.
* If the data is not read completely (i.e, untill hasNext() returns
* false), it needs to be explicitly
*/
private static class Reader implements Iterator<String>, Closeable {
BufferedReader reader;
File file;
String line;
boolean closed = false;
private Reader(File file) throws IOException {
reader = null;
this.file = file;
readNext();
}
private boolean openFile() throws IOException {
if (file == null) {
return false;
}
if (reader != null ) {
reader.close();
reader = null;
}
if (file.exists()) {
reader = new BufferedReader(new FileReader(file));
return true;
} else {
return false;
}
}
// read next line if possible.
private void readNext() throws IOException {
line = null;
if (reader == null) {
openFile();
}
void append(long verificationTime, long genStamp, long blockId) {
final String m = LogEntry.toString(verificationTime, genStamp, blockId,
dateFormat);
try {
if (reader != null && (line = reader.readLine()) != null) {
return;
}
} finally {
if (!hasNext()) {
close();
}
}
}
public boolean hasNext() {
return line != null;
}
public String next() {
String curLine = line;
try {
readNext();
logs.appender().append(m);
} catch (IOException e) {
LOG.info("Could not read next line in LogHandler", e);
LOG.warn("Failed to append to " + logs + ", m=" + m, e);
}
return curLine;
}
public void remove() {
throw new RuntimeException("remove() is not supported.");
}
public void close() throws IOException {
if (!closed) {
void close() {
try {
if (reader != null) {
reader.close();
}
} finally {
file = null;
reader = null;
closed = true;
}
}
logs.appender().close();
} catch (IOException e) {
LOG.warn("Failed to close the appender of " + logs, e);
}
}
}

View File

@ -18,9 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;
import javax.servlet.http.HttpServlet;
@ -132,40 +130,26 @@ public class DataBlockScanner implements Runnable {
waitForInit(currentBpId);
synchronized (this) {
if (getBlockPoolSetSize() > 0) {
// Find nextBpId by finding the last modified current log file, if any
long lastScanTime = -1;
Iterator<String> bpidIterator = blockPoolScannerMap.keySet()
.iterator();
while (bpidIterator.hasNext()) {
String bpid = bpidIterator.next();
for (FSDatasetInterface.FSVolumeInterface vol : dataset.getVolumes()) {
try {
File currFile = BlockPoolSliceScanner.getCurrentFile(vol, bpid);
if (currFile.exists()) {
long lastModified = currFile.lastModified();
if (lastScanTime < lastModified) {
lastScanTime = lastModified;
// Find nextBpId by the minimum of the last scan time
long lastScanTime = 0;
for (String bpid : blockPoolScannerMap.keySet()) {
final long t = getBPScanner(bpid).getLastScanTime();
if (t != 0L) {
if (bpid == null || t < lastScanTime) {
lastScanTime = t;
nextBpId = bpid;
}
}
} catch (IOException e) {
LOG.warn("Received exception: ", e);
}
}
}
// nextBpId can still be null if no current log is found,
// find nextBpId sequentially.
if (nextBpId == null) {
if ("".equals(currentBpId)) {
nextBpId = blockPoolScannerMap.firstKey();
} else {
nextBpId = blockPoolScannerMap.higherKey(currentBpId);
if (nextBpId == null) {
nextBpId = blockPoolScannerMap.firstKey();
}
}
}
if (nextBpId != null) {
return getBPScanner(nextBpId);
}
@ -206,12 +190,8 @@ public class DataBlockScanner implements Runnable {
}
}
public synchronized boolean isInitialized(String bpid) {
BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
if (bpScanner != null) {
return bpScanner.isInitialized();
}
return false;
boolean isInitialized(String bpid) {
return getBPScanner(bpid) != null;
}
public synchronized void printBlockReport(StringBuilder buffer,
@ -260,14 +240,8 @@ public class DataBlockScanner implements Runnable {
if (blockPoolScannerMap.get(blockPoolId) != null) {
return;
}
BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(datanode, dataset,
conf, blockPoolId);
try {
bpScanner.init();
} catch (IOException ex) {
LOG.warn("Failed to initialized block scanner for pool id="+blockPoolId);
return;
}
BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(blockPoolId,
datanode, dataset, conf);
blockPoolScannerMap.put(blockPoolId, bpScanner);
LOG.info("Added bpid=" + blockPoolId + " to blockPoolScannerMap, new size="
+ blockPoolScannerMap.size());

View File

@ -38,7 +38,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
@ -48,8 +47,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
@ -137,7 +134,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
@ -393,7 +389,7 @@ public class DataNode extends Configured
boolean isBlockTokenEnabled;
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
public DataBlockScanner blockScanner = null;
volatile DataBlockScanner blockScanner = null;
private DirectoryScanner directoryScanner = null;
/** Activated plug-ins. */

View File

@ -18,13 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
@ -37,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@ -612,8 +616,8 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
}
@Override
public File getDirectory(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getDirectory();
public String getPath(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
}
@Override
@ -2301,7 +2305,7 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
DataNode.LOG.warn("Metadata file in memory "
+ memMetaFile.getAbsolutePath()
+ " does not match file found by scan "
+ diskMetaFile.getAbsolutePath());
+ (diskMetaFile == null? null: diskMetaFile.getAbsolutePath()));
}
} else {
// Metadata file corresponding to block in memory is missing
@ -2612,4 +2616,220 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
datafile.getAbsolutePath(), metafile.getAbsolutePath());
return info;
}
@Override
public RollingLogs createRollingLogs(String bpid, String prefix
) throws IOException {
String dir = null;
final List<FSVolume> volumes = getVolumes();
for (FSVolume vol : volumes) {
String bpDir = vol.getPath(bpid);
if (RollingLogsImpl.isFilePresent(bpDir, prefix)) {
dir = bpDir;
break;
}
}
if (dir == null) {
dir = volumes.get(0).getPath(bpid);
}
return new RollingLogsImpl(dir, prefix);
}
static class RollingLogsImpl implements RollingLogs {
private static final String CURR_SUFFIX = ".curr";
private static final String PREV_SUFFIX = ".prev";
static boolean isFilePresent(String dir, String filePrefix) {
return new File(dir, filePrefix + CURR_SUFFIX).exists() ||
new File(dir, filePrefix + PREV_SUFFIX).exists();
}
private final File curr;
private final File prev;
private PrintStream out; //require synchronized access
private Appender appender = new Appender() {
@Override
public Appendable append(CharSequence csq) {
synchronized(RollingLogsImpl.this) {
if (out == null) {
throw new IllegalStateException(RollingLogsImpl.this
+ " is not yet opened.");
}
out.print(csq);
}
return this;
}
@Override
public Appendable append(char c) {
throw new UnsupportedOperationException();
}
@Override
public Appendable append(CharSequence csq, int start, int end) {
throw new UnsupportedOperationException();
}
@Override
public void close() {
synchronized(RollingLogsImpl.this) {
if (out != null) {
out.close();
out = null;
}
}
}
};
private final AtomicInteger numReaders = new AtomicInteger();
private RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{
curr = new File(dir, filePrefix + CURR_SUFFIX);
prev = new File(dir, filePrefix + PREV_SUFFIX);
out = new PrintStream(new FileOutputStream(curr, true));
}
@Override
public Reader iterator(boolean skipPrevFile) throws IOException {
numReaders.incrementAndGet();
return new Reader(skipPrevFile);
}
@Override
public Appender appender() {
return appender;
}
@Override
public boolean roll() throws IOException {
if (numReaders.get() > 0) {
return false;
}
if (!prev.delete() && prev.exists()) {
throw new IOException("Failed to delete " + prev);
}
synchronized(this) {
appender.close();
final boolean renamed = curr.renameTo(prev);
out = new PrintStream(new FileOutputStream(curr, true));
if (!renamed) {
throw new IOException("Failed to rename " + curr + " to " + prev);
}
}
return true;
}
@Override
public String toString() {
return curr.toString();
}
/**
* This is used to read the lines in order.
* If the data is not read completely (i.e, untill hasNext() returns
* false), it needs to be explicitly
*/
private class Reader implements RollingLogs.LineIterator {
private File file;
private BufferedReader reader;
private String line;
private boolean closed = false;
private Reader(boolean skipPrevFile) throws IOException {
reader = null;
file = skipPrevFile? curr : prev;
readNext();
}
@Override
public boolean isPrevious() {
return file == prev;
}
private boolean openFile() throws IOException {
for(int i=0; i<2; i++) {
if (reader != null || i > 0) {
// move to next file
file = isPrevious()? curr : null;
}
if (file == null) {
return false;
}
if (file.exists()) {
break;
}
}
if (reader != null ) {
reader.close();
reader = null;
}
reader = new BufferedReader(new FileReader(file));
return true;
}
// read next line if possible.
private void readNext() throws IOException {
line = null;
try {
if (reader != null && (line = reader.readLine()) != null) {
return;
}
if (line == null) {
// move to the next file.
if (openFile()) {
readNext();
}
}
} finally {
if (!hasNext()) {
close();
}
}
}
@Override
public boolean hasNext() {
return line != null;
}
@Override
public String next() {
String curLine = line;
try {
readNext();
} catch (IOException e) {
DataBlockScanner.LOG.warn("Failed to read next line.", e);
}
return curLine;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException {
if (!closed) {
try {
if (reader != null) {
reader.close();
}
} finally {
file = null;
reader = null;
closed = true;
final int n = numReaders.decrementAndGet();
assert(n >= 0);
}
}
}
}
}
}

View File

@ -88,13 +88,21 @@ public interface FSDatasetInterface<V extends FSDatasetInterface.FSVolumeInterfa
/** @return the available storage space in bytes. */
public long getAvailable() throws IOException;
/** @return the directory for the block pool. */
public File getDirectory(String bpid) throws IOException;
/** @return the path to the volume */
public String getPath(String bpid) throws IOException;
/** @return the directory for the finalized blocks in the block pool. */
public File getFinalizedDir(String bpid) throws IOException;
}
/**
* Create rolling logs.
*
* @param prefix the prefix of the log names.
* @return rolling logs
*/
public RollingLogs createRollingLogs(String bpid, String prefix) throws IOException;
/** @return a list of volumes. */
public List<V> getVolumes();

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
/**
* Rolling logs consist of a current log and a previous log.
* When the roll operation is invoked, current is rolled to previous
* and previous is deleted.
* The implementation should support a single appender and multiple readers.
*/
public interface RollingLogs {
/**
* To iterate the lines of the logs.
*/
public interface LineIterator extends Iterator<String>, Closeable {
/** Is the iterator iterating the previous? */
public boolean isPrevious();
}
/**
* To append text to the logs.
*/
public interface Appender extends Appendable, Closeable {
}
/**
* Create an iterator to iterate the lines in the logs.
*
* @param skipPrevious Should it skip reading the previous log?
* @return a new iterator.
*/
public LineIterator iterator(boolean skipPrevious) throws IOException;
/**
* @return the only appender to append text to the logs.
* The same object is returned if it is invoked multiple times.
*/
public Appender appender();
/**
* Roll current to previous and delete the previous.
*
* @return true if the rolling succeeded.
* When it returns false, it is not equivalent to an error.
* It means that the rolling cannot be performed at the moment,
* e.g. the logs are being read.
*/
public boolean roll() throws IOException;
}

View File

@ -18,27 +18,28 @@
package org.apache.hadoop.hdfs;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.io.*;
import java.util.Random;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import junit.framework.TestCase;
/**
* This test verifies that block verification occurs on the datanode
*/
@ -392,7 +393,7 @@ public class TestDatanodeBlockScanner extends TestCase {
}
private static void waitForBlockDeleted(ExtendedBlock blk, int dnIndex,
long timeout) throws IOException, TimeoutException, InterruptedException {
long timeout) throws TimeoutException, InterruptedException {
File blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
long failtime = System.currentTimeMillis()
+ ((timeout > 0) ? timeout : Long.MAX_VALUE);

View File

@ -435,7 +435,7 @@ public class SimulatedFSDataset
}
@Override // FSDatasetInterface
public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
public synchronized void unfinalizeBlock(ExtendedBlock b) {
if (isValidRbw(b)) {
blockMap.remove(b.getLocalBlock());
}
@ -456,12 +456,12 @@ public class SimulatedFSDataset
}
@Override // FSDatasetMBean
public long getCapacity() throws IOException {
public long getCapacity() {
return storage.getCapacity();
}
@Override // FSDatasetMBean
public long getDfsUsed() throws IOException {
public long getDfsUsed() {
return storage.getUsed();
}
@ -471,7 +471,7 @@ public class SimulatedFSDataset
}
@Override // FSDatasetMBean
public long getRemaining() throws IOException {
public long getRemaining() {
return storage.getFree();
}
@ -938,13 +938,13 @@ public class SimulatedFSDataset
@Override // FSDatasetInterface
public FinalizedReplica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId,
long newlength) throws IOException {
long newlength) {
return new FinalizedReplica(
oldBlock.getBlockId(), newlength, recoveryId, null, null);
}
@Override // FSDatasetInterface
public long getReplicaVisibleLength(ExtendedBlock block) throws IOException {
public long getReplicaVisibleLength(ExtendedBlock block) {
return block.getNumBytes();
}
@ -1013,4 +1013,9 @@ public class SimulatedFSDataset
public Map<String, Object> getVolumeInfoMap() {
throw new UnsupportedOperationException();
}
@Override
public RollingLogs createRollingLogs(String bpid, String prefix) {
throw new UnsupportedOperationException();
}
}