HDFS-5031. BlockScanner scans the block multiple times. (Vinay via Arpit Agarwal)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1524553 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-09-18 20:08:50 +00:00
parent f278a491dc
commit 22b401284b
6 changed files with 80 additions and 8 deletions

View File

@ -303,6 +303,9 @@ Release 2.3.0 - UNRELEASED
HDFS-5170. BlockPlacementPolicyDefault uses the wrong classname when HDFS-5170. BlockPlacementPolicyDefault uses the wrong classname when
alerting to enable debug logging. (Andrew Wang) alerting to enable debug logging. (Andrew Wang)
HDFS-5031. BlockScanner scans the block multiple times. (Vinay via Arpit
Agarwal)
Release 2.2.0 - UNRELEASED Release 2.2.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -100,6 +100,7 @@ class BlockPoolSliceScanner {
private long currentPeriodStart = Time.now(); private long currentPeriodStart = Time.now();
private long bytesLeft = 0; // Bytes to scan in this period private long bytesLeft = 0; // Bytes to scan in this period
private long totalBytesToScan = 0; private long totalBytesToScan = 0;
private boolean isNewPeriod = true;
private final LogFileHandler verificationLog; private final LogFileHandler verificationLog;
@ -126,7 +127,10 @@ static class BlockScanInfo extends Block
public int compare(BlockScanInfo left, BlockScanInfo right) { public int compare(BlockScanInfo left, BlockScanInfo right) {
final long l = left.lastScanTime; final long l = left.lastScanTime;
final long r = right.lastScanTime; final long r = right.lastScanTime;
return l < r? -1: l > r? 1: 0; // compare blocks itself if scantimes are same to avoid.
// because TreeMap uses comparator if available to check existence of
// the object.
return l < r? -1: l > r? 1: left.compareTo(right);
} }
}; };
@ -148,8 +152,6 @@ public int hashCode() {
public boolean equals(Object that) { public boolean equals(Object that) {
if (this == that) { if (this == that) {
return true; return true;
} else if (that == null || !(that instanceof BlockScanInfo)) {
return false;
} }
return super.equals(that); return super.equals(that);
} }
@ -539,10 +541,12 @@ private boolean assignInitialVerificationTimes() {
entry.genStamp)); entry.genStamp));
if (info != null) { if (info != null) {
if (processedBlocks.get(entry.blockId) == null) { if (processedBlocks.get(entry.blockId) == null) {
if (isNewPeriod) {
updateBytesLeft(-info.getNumBytes()); updateBytesLeft(-info.getNumBytes());
}
processedBlocks.put(entry.blockId, 1); processedBlocks.put(entry.blockId, 1);
} }
if (logIterator.isPrevious()) { if (logIterator.isLastReadFromPrevious()) {
// write the log entry to current file // write the log entry to current file
// so that the entry is preserved for later runs. // so that the entry is preserved for later runs.
verificationLog.append(entry.verificationTime, entry.genStamp, verificationLog.append(entry.verificationTime, entry.genStamp,
@ -557,6 +561,7 @@ private boolean assignInitialVerificationTimes() {
} finally { } finally {
IOUtils.closeStream(logIterator); IOUtils.closeStream(logIterator);
} }
isNewPeriod = false;
} }
@ -597,6 +602,7 @@ private synchronized void startNewPeriod() {
// reset the byte counts : // reset the byte counts :
bytesLeft = totalBytesToScan; bytesLeft = totalBytesToScan;
currentPeriodStart = Time.now(); currentPeriodStart = Time.now();
isNewPeriod = true;
} }
private synchronized boolean workRemainingInCurrentPeriod() { private synchronized boolean workRemainingInCurrentPeriod() {

View File

@ -33,6 +33,12 @@ public interface RollingLogs {
public interface LineIterator extends Iterator<String>, Closeable { public interface LineIterator extends Iterator<String>, Closeable {
/** Is the iterator iterating the previous? */ /** Is the iterator iterating the previous? */
public boolean isPrevious(); public boolean isPrevious();
/**
* Is the last read entry from previous? This should be called after
* reading.
*/
public boolean isLastReadFromPrevious();
} }
/** /**

View File

@ -134,6 +134,7 @@ public String toString() {
*/ */
private class Reader implements RollingLogs.LineIterator { private class Reader implements RollingLogs.LineIterator {
private File file; private File file;
private File lastReadFile;
private BufferedReader reader; private BufferedReader reader;
private String line; private String line;
private boolean closed = false; private boolean closed = false;
@ -149,6 +150,11 @@ public boolean isPrevious() {
return file == prev; return file == prev;
} }
@Override
public boolean isLastReadFromPrevious() {
return lastReadFile == prev;
}
private boolean openFile() throws IOException { private boolean openFile() throws IOException {
for(int i=0; i<2; i++) { for(int i=0; i<2; i++) {
@ -203,6 +209,7 @@ public boolean hasNext() {
public String next() { public String next() {
String curLine = line; String curLine = line;
try { try {
lastReadFile = file;
readNext(); readNext();
} catch (IOException e) { } catch (IOException e) {
DataBlockScanner.LOG.warn("Failed to read next line.", e); DataBlockScanner.LOG.warn("Failed to read next line.", e);

View File

@ -459,4 +459,43 @@ private static void testReplicaInfoParsingSingle(String subDirPath, int[] expect
assertArrayEquals(expectedSubDirs, ReplicaInfo.parseSubDirs(testFile).subDirs); assertArrayEquals(expectedSubDirs, ReplicaInfo.parseSubDirs(testFile).subDirs);
assertEquals(BASE_PATH, ReplicaInfo.parseSubDirs(testFile).baseDirPath); assertEquals(BASE_PATH, ReplicaInfo.parseSubDirs(testFile).baseDirPath);
} }
@Test
public void testDuplicateScans() throws Exception {
long startTime = Time.now();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
.numDataNodes(1).build();
FileSystem fs = null;
try {
fs = cluster.getFileSystem();
DataNode dataNode = cluster.getDataNodes().get(0);
int infoPort = dataNode.getInfoPort();
long scanTimeBefore = 0, scanTimeAfter = 0;
for (int i = 1; i < 10; i++) {
Path fileName = new Path("/test" + i);
DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
waitForVerification(infoPort, fs, fileName, i, startTime, TIMEOUT);
if (i > 1) {
scanTimeAfter = DataNodeTestUtils.getLatestScanTime(dataNode,
DFSTestUtil.getFirstBlock(fs, new Path("/test" + (i - 1))));
assertFalse("scan time shoud not be 0", scanTimeAfter == 0);
assertEquals("There should not be duplicate scan", scanTimeBefore,
scanTimeAfter);
}
scanTimeBefore = DataNodeTestUtils.getLatestScanTime(dataNode,
DFSTestUtil.getFirstBlock(fs, new Path("/test" + i)));
}
cluster.restartDataNode(0);
Thread.sleep(10000);
dataNode = cluster.getDataNodes().get(0);
scanTimeAfter = DataNodeTestUtils.getLatestScanTime(dataNode,
DFSTestUtil.getFirstBlock(fs, new Path("/test" + (9))));
assertEquals("There should not be duplicate scan", scanTimeBefore,
scanTimeAfter);
} finally {
IOUtils.closeStream(fs);
cluster.shutdown();
}
}
} }

View File

@ -115,9 +115,20 @@ public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
} }
public static void runBlockScannerForBlock(DataNode dn, ExtendedBlock b) { public static void runBlockScannerForBlock(DataNode dn, ExtendedBlock b) {
BlockPoolSliceScanner bpScanner = getBlockPoolScanner(dn, b);
bpScanner.verifyBlock(b);
}
private static BlockPoolSliceScanner getBlockPoolScanner(DataNode dn,
ExtendedBlock b) {
DataBlockScanner scanner = dn.getBlockScanner(); DataBlockScanner scanner = dn.getBlockScanner();
BlockPoolSliceScanner bpScanner = scanner.getBPScanner(b.getBlockPoolId()); BlockPoolSliceScanner bpScanner = scanner.getBPScanner(b.getBlockPoolId());
bpScanner.verifyBlock(b); return bpScanner;
}
public static long getLatestScanTime(DataNode dn, ExtendedBlock b) {
BlockPoolSliceScanner scanner = getBlockPoolScanner(dn, b);
return scanner.getLastScanTime(b.getLocalBlock());
} }
public static void shutdownBlockScanner(DataNode dn) { public static void shutdownBlockScanner(DataNode dn) {