HDFS-7548. Corrupt block reporting delayed until datablock scanner thread detects it. Contributed by Rushabh Shah.

This commit is contained in:
Kihwal Lee 2015-01-21 14:41:31 -06:00
parent 2a69775610
commit 95858db0c1
7 changed files with 125 additions and 11 deletions

View File

@ -472,6 +472,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7610. Fix removal of dynamically added DN volumes (Lei (Eddy) Xu via HDFS-7610. Fix removal of dynamically added DN volumes (Lei (Eddy) Xu via
Colin P. McCabe) Colin P. McCabe)
HDFS-7548. Corrupt block reporting delayed until datablock scanner thread
detects it (Rushabh Shah via kihwal)
Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -105,6 +105,7 @@ class BlockPoolSliceScanner {
private long bytesLeft = 0; // Bytes to scan in this period private long bytesLeft = 0; // Bytes to scan in this period
private long totalBytesToScan = 0; private long totalBytesToScan = 0;
private boolean isNewPeriod = true; private boolean isNewPeriod = true;
private int lastScanTimeDifference = 5*60*1000;
private final LogFileHandler verificationLog; private final LogFileHandler verificationLog;
@ -112,6 +113,7 @@ class BlockPoolSliceScanner {
200, MAX_SCAN_RATE); 200, MAX_SCAN_RATE);
private static enum ScanType { private static enum ScanType {
IMMEDIATE_SCAN,
VERIFICATION_SCAN, // scanned as part of periodic verfication VERIFICATION_SCAN, // scanned as part of periodic verfication
NONE, NONE,
} }
@ -129,12 +131,17 @@ class BlockPoolSliceScanner {
@Override @Override
public int compare(BlockScanInfo left, BlockScanInfo right) { public int compare(BlockScanInfo left, BlockScanInfo right) {
final ScanType leftNextScanType = left.nextScanType;
final ScanType rightNextScanType = right.nextScanType;
final long l = left.lastScanTime; final long l = left.lastScanTime;
final long r = right.lastScanTime; final long r = right.lastScanTime;
// Compare by nextScanType if they are same then compare by
// lastScanTimes
// compare blocks itself if scantimes are same to avoid. // compare blocks itself if scantimes are same to avoid.
// because TreeMap uses comparator if available to check existence of // because TreeMap uses comparator if available to check existence of
// the object. // the object.
return l < r? -1: l > r? 1: left.compareTo(right); int compareByNextScanType = leftNextScanType.compareTo(rightNextScanType);
return compareByNextScanType < 0? -1: compareByNextScanType > 0? 1: l < r? -1: l > r? 1: left.compareTo(right);
} }
}; };
@ -142,6 +149,7 @@ class BlockPoolSliceScanner {
ScanType lastScanType = ScanType.NONE; ScanType lastScanType = ScanType.NONE;
boolean lastScanOk = true; boolean lastScanOk = true;
private LinkedElement next; private LinkedElement next;
ScanType nextScanType = ScanType.VERIFICATION_SCAN;
BlockScanInfo(Block block) { BlockScanInfo(Block block) {
super(block); super(block);
@ -268,7 +276,9 @@ class BlockPoolSliceScanner {
if (info != null && e.verificationTime > 0 && if (info != null && e.verificationTime > 0 &&
info.lastScanTime < e.verificationTime) { info.lastScanTime < e.verificationTime) {
delBlockInfo(info); delBlockInfo(info);
if (info.nextScanType != ScanType.IMMEDIATE_SCAN) {
info.lastScanTime = e.verificationTime; info.lastScanTime = e.verificationTime;
}
info.lastScanType = ScanType.VERIFICATION_SCAN; info.lastScanType = ScanType.VERIFICATION_SCAN;
addBlockInfo(info, false); addBlockInfo(info, false);
} }
@ -285,9 +295,23 @@ class BlockPoolSliceScanner {
DFSUtil.getRandom().nextInt(periodInt); DFSUtil.getRandom().nextInt(periodInt);
} }
/** Adds block to list of blocks */ /** Adds block to list of blocks
synchronized void addBlock(ExtendedBlock block) { * @param scanNow - true if we want to make that particular block a high
* priority one to scan immediately
**/
synchronized void addBlock(ExtendedBlock block, boolean scanNow) {
BlockScanInfo info = blockMap.get(block.getLocalBlock()); BlockScanInfo info = blockMap.get(block.getLocalBlock());
long lastScanTime = 0;
if (info != null) {
lastScanTime = info.lastScanTime;
}
// If the particular block is scanned in last 5 minutes, the no need to
// verify that block again
if (scanNow && Time.monotonicNow() - lastScanTime <
lastScanTimeDifference) {
return;
}
if ( info != null ) { if ( info != null ) {
LOG.warn("Adding an already existing block " + block); LOG.warn("Adding an already existing block " + block);
delBlockInfo(info); delBlockInfo(info);
@ -295,6 +319,12 @@ class BlockPoolSliceScanner {
info = new BlockScanInfo(block.getLocalBlock()); info = new BlockScanInfo(block.getLocalBlock());
info.lastScanTime = getNewBlockScanTime(); info.lastScanTime = getNewBlockScanTime();
if (scanNow) {
// Create a new BlockScanInfo object and set the lastScanTime to 0
// which will make it the high priority block
LOG.info("Adding block for immediate verification " + block);
info.nextScanType = ScanType.IMMEDIATE_SCAN;
}
addBlockInfo(info, true); addBlockInfo(info, true);
adjustThrottler(); adjustThrottler();
@ -340,6 +370,7 @@ class BlockPoolSliceScanner {
info.lastScanType = type; info.lastScanType = type;
info.lastScanTime = now; info.lastScanTime = now;
info.lastScanOk = scanOk; info.lastScanOk = scanOk;
info.nextScanType = ScanType.VERIFICATION_SCAN;
addBlockInfo(info, false); addBlockInfo(info, false);
// Don't update meta data if the verification failed. // Don't update meta data if the verification failed.
@ -363,6 +394,11 @@ class BlockPoolSliceScanner {
} }
} }
@VisibleForTesting
synchronized void setLastScanTimeDifference(int lastScanTimeDifference) {
this.lastScanTimeDifference = lastScanTimeDifference;
}
static private class LogEntry { static private class LogEntry {
long blockId = -1; long blockId = -1;
@ -502,6 +538,9 @@ class BlockPoolSliceScanner {
private synchronized boolean isFirstBlockProcessed() { private synchronized boolean isFirstBlockProcessed() {
if (!blockInfoSet.isEmpty()) { if (!blockInfoSet.isEmpty()) {
if (blockInfoSet.first().nextScanType == ScanType.IMMEDIATE_SCAN) {
return false;
}
long blockId = blockInfoSet.first().getBlockId(); long blockId = blockInfoSet.first().getBlockId();
if ((processedBlocks.get(blockId) != null) if ((processedBlocks.get(blockId) != null)
&& (processedBlocks.get(blockId) == 1)) { && (processedBlocks.get(blockId) == 1)) {

View File

@ -600,6 +600,9 @@ class BlockSender implements java.io.Closeable {
String ioem = e.getMessage(); String ioem = e.getMessage();
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) { if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e); LOG.error("BlockSender.sendChunks() exception: ", e);
//Something might be wrong with the block. Make this block the high
//priority block for verification.
datanode.blockScanner.addBlock(block, true);
} }
} }
throw ioeToSocketException(e); throw ioeToSocketException(e);

View File

@ -186,10 +186,10 @@ public class DataBlockScanner implements Runnable {
new String[blockPoolScannerMap.keySet().size()]); new String[blockPoolScannerMap.keySet().size()]);
} }
public void addBlock(ExtendedBlock block) { public void addBlock(ExtendedBlock block, boolean scanNow) {
BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId()); BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
if (bpScanner != null) { if (bpScanner != null) {
bpScanner.addBlock(block); bpScanner.addBlock(block, scanNow);
} else { } else {
LOG.warn("No block pool scanner found for block pool id: " LOG.warn("No block pool scanner found for block pool id: "
+ block.getBlockPoolId()); + block.getBlockPoolId());
@ -293,6 +293,17 @@ public class DataBlockScanner implements Runnable {
} }
} }
@VisibleForTesting
public void setLastScanTimeDifference(ExtendedBlock block, int lastScanTimeDifference) {
BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
if (bpScanner != null) {
bpScanner.setLastScanTimeDifference(lastScanTimeDifference);
} else {
LOG.warn("No block pool scanner found for block pool id: "
+ block.getBlockPoolId());
}
}
public void start() { public void start() {
blockScannerThread = new Thread(this); blockScannerThread = new Thread(this);
blockScannerThread.setDaemon(true); blockScannerThread.setDaemon(true);

View File

@ -2177,7 +2177,7 @@ public class DataNode extends ReconfigurableBase
} }
FsVolumeSpi volume = getFSDataset().getVolume(block); FsVolumeSpi volume = getFSDataset().getVolume(block);
if (blockScanner != null && !volume.isTransientStorage()) { if (blockScanner != null && !volume.isTransientStorage()) {
blockScanner.addBlock(block); blockScanner.addBlock(block, false);
} }
} }

View File

@ -766,7 +766,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
// Replace the old block if any to reschedule the scanning. // Replace the old block if any to reschedule the scanning.
datanode.getBlockScanner().addBlock(block); datanode.getBlockScanner().addBlock(block, false);
return replicaInfo; return replicaInfo;
} }
@ -2030,7 +2030,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final DataBlockScanner blockScanner = datanode.getBlockScanner(); final DataBlockScanner blockScanner = datanode.getBlockScanner();
if (!vol.isTransientStorage()) { if (!vol.isTransientStorage()) {
if (blockScanner != null) { if (blockScanner != null) {
blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo)); blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo), false);
} }
} else { } else {
ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol); ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -28,7 +27,10 @@ import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URL; import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -42,6 +44,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
@ -490,4 +493,59 @@ public class TestDatanodeBlockScanner {
cluster.shutdown(); cluster.shutdown();
} }
} }
/**
* This test verifies whether block is added to the first location of
* BlockPoolSliceScanner#blockInfoSet
*/
@Test
public void testAddBlockInfoToFirstLocation() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
.numDataNodes(1).build();
FileSystem fs = null;
try {
fs = cluster.getFileSystem();
DataNode dataNode = cluster.getDataNodes().get(0);
// Creating a bunch of blocks
for (int i = 1; i < 10; i++) {
Path fileName = new Path("/test" + i);
DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
}
// Get block of the first file created (file1)
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/test1"));
dataNode.getBlockScanner().setLastScanTimeDifference(block, 0);
// Let it sleep for more than 5 seconds so that BlockPoolSliceScanner can
// scan the first set of blocks
Thread.sleep(10000);
Long scanTime1Fortest1Block = DataNodeTestUtils.getLatestScanTime(
dataNode, block);
// Create another set of blocks
for (int i = 10; i < 20; i++) {
Path fileName = new Path("/test" + i);
DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
}
dataNode.getBlockScanner().addBlock(block, true);
// Sleep so that BlockPoolSliceScanner can scan the second set of blocks
// and one block which we scheduled to rescan
Thread.sleep(10000);
// Get the lastScanTime of all of the second set of blocks
Set<Long> lastScanTimeSet = new HashSet<Long>();
for (int i = 10; i < 20; i++) {
long lastScanTime = DataNodeTestUtils.getLatestScanTime(dataNode,
DFSTestUtil.getFirstBlock(fs, new Path("/test" + i)));
lastScanTimeSet.add(lastScanTime);
}
Long scanTime2Fortest1Block = DataNodeTestUtils.getLatestScanTime(
dataNode, DFSTestUtil.getFirstBlock(fs, new Path("/test1")));
Long minimumLastScanTime = Collections.min(lastScanTimeSet);
assertTrue("The second scanTime for test1 block should be greater than "
+ "first scanTime", scanTime2Fortest1Block > scanTime1Fortest1Block);
assertTrue("The second scanTime for test1 block should be less than or"
+ " equal to minimum of the lastScanTime of second set of blocks",
scanTime2Fortest1Block <= minimumLastScanTime);
} finally {
IOUtils.closeStream(fs);
cluster.shutdown();
}
}
} }