HDFS-7548. Corrupt block reporting delayed until datablock scanner thread detects it. Contributed by Rushabh Shah.
This commit is contained in:
parent
925c9fed33
commit
c0af72c7f7
|
@ -756,6 +756,9 @@ Release 2.7.0 - UNRELEASED
|
|||
HDFS-7610. Fix removal of dynamically added DN volumes (Lei (Eddy) Xu via
|
||||
Colin P. McCabe)
|
||||
|
||||
HDFS-7548. Corrupt block reporting delayed until datablock scanner thread
|
||||
detects it (Rushabh Shah via kihwal)
|
||||
|
||||
Release 2.6.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -105,6 +105,7 @@ class BlockPoolSliceScanner {
|
|||
private long bytesLeft = 0; // Bytes to scan in this period
|
||||
private long totalBytesToScan = 0;
|
||||
private boolean isNewPeriod = true;
|
||||
private int lastScanTimeDifference = 5*60*1000;
|
||||
|
||||
private final LogFileHandler verificationLog;
|
||||
|
||||
|
@ -112,6 +113,7 @@ class BlockPoolSliceScanner {
|
|||
200, MAX_SCAN_RATE);
|
||||
|
||||
private static enum ScanType {
|
||||
IMMEDIATE_SCAN,
|
||||
VERIFICATION_SCAN, // scanned as part of periodic verfication
|
||||
NONE,
|
||||
}
|
||||
|
@ -129,12 +131,17 @@ class BlockPoolSliceScanner {
|
|||
|
||||
@Override
|
||||
public int compare(BlockScanInfo left, BlockScanInfo right) {
|
||||
final ScanType leftNextScanType = left.nextScanType;
|
||||
final ScanType rightNextScanType = right.nextScanType;
|
||||
final long l = left.lastScanTime;
|
||||
final long r = right.lastScanTime;
|
||||
// Compare by nextScanType if they are same then compare by
|
||||
// lastScanTimes
|
||||
// compare blocks itself if scantimes are same to avoid.
|
||||
// because TreeMap uses comparator if available to check existence of
|
||||
// the object.
|
||||
return l < r? -1: l > r? 1: left.compareTo(right);
|
||||
int compareByNextScanType = leftNextScanType.compareTo(rightNextScanType);
|
||||
return compareByNextScanType < 0? -1: compareByNextScanType > 0? 1: l < r? -1: l > r? 1: left.compareTo(right);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -142,6 +149,7 @@ class BlockPoolSliceScanner {
|
|||
ScanType lastScanType = ScanType.NONE;
|
||||
boolean lastScanOk = true;
|
||||
private LinkedElement next;
|
||||
ScanType nextScanType = ScanType.VERIFICATION_SCAN;
|
||||
|
||||
BlockScanInfo(Block block) {
|
||||
super(block);
|
||||
|
@ -265,10 +273,12 @@ class BlockPoolSliceScanner {
|
|||
private synchronized void updateBlockInfo(LogEntry e) {
|
||||
BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
|
||||
|
||||
if(info != null && e.verificationTime > 0 &&
|
||||
if (info != null && e.verificationTime > 0 &&
|
||||
info.lastScanTime < e.verificationTime) {
|
||||
delBlockInfo(info);
|
||||
info.lastScanTime = e.verificationTime;
|
||||
if (info.nextScanType != ScanType.IMMEDIATE_SCAN) {
|
||||
info.lastScanTime = e.verificationTime;
|
||||
}
|
||||
info.lastScanType = ScanType.VERIFICATION_SCAN;
|
||||
addBlockInfo(info, false);
|
||||
}
|
||||
|
@ -285,9 +295,23 @@ class BlockPoolSliceScanner {
|
|||
DFSUtil.getRandom().nextInt(periodInt);
|
||||
}
|
||||
|
||||
/** Adds block to list of blocks */
|
||||
synchronized void addBlock(ExtendedBlock block) {
|
||||
/** Adds block to list of blocks
|
||||
* @param scanNow - true if we want to make that particular block a high
|
||||
* priority one to scan immediately
|
||||
**/
|
||||
synchronized void addBlock(ExtendedBlock block, boolean scanNow) {
|
||||
BlockScanInfo info = blockMap.get(block.getLocalBlock());
|
||||
long lastScanTime = 0;
|
||||
if (info != null) {
|
||||
lastScanTime = info.lastScanTime;
|
||||
}
|
||||
// If the particular block is scanned in last 5 minutes, the no need to
|
||||
// verify that block again
|
||||
if (scanNow && Time.monotonicNow() - lastScanTime <
|
||||
lastScanTimeDifference) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ( info != null ) {
|
||||
LOG.warn("Adding an already existing block " + block);
|
||||
delBlockInfo(info);
|
||||
|
@ -295,6 +319,12 @@ class BlockPoolSliceScanner {
|
|||
|
||||
info = new BlockScanInfo(block.getLocalBlock());
|
||||
info.lastScanTime = getNewBlockScanTime();
|
||||
if (scanNow) {
|
||||
// Create a new BlockScanInfo object and set the lastScanTime to 0
|
||||
// which will make it the high priority block
|
||||
LOG.info("Adding block for immediate verification " + block);
|
||||
info.nextScanType = ScanType.IMMEDIATE_SCAN;
|
||||
}
|
||||
|
||||
addBlockInfo(info, true);
|
||||
adjustThrottler();
|
||||
|
@ -340,6 +370,7 @@ class BlockPoolSliceScanner {
|
|||
info.lastScanType = type;
|
||||
info.lastScanTime = now;
|
||||
info.lastScanOk = scanOk;
|
||||
info.nextScanType = ScanType.VERIFICATION_SCAN;
|
||||
addBlockInfo(info, false);
|
||||
|
||||
// Don't update meta data if the verification failed.
|
||||
|
@ -363,6 +394,11 @@ class BlockPoolSliceScanner {
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
synchronized void setLastScanTimeDifference(int lastScanTimeDifference) {
|
||||
this.lastScanTimeDifference = lastScanTimeDifference;
|
||||
}
|
||||
|
||||
static private class LogEntry {
|
||||
|
||||
long blockId = -1;
|
||||
|
@ -502,6 +538,9 @@ class BlockPoolSliceScanner {
|
|||
|
||||
private synchronized boolean isFirstBlockProcessed() {
|
||||
if (!blockInfoSet.isEmpty()) {
|
||||
if (blockInfoSet.first().nextScanType == ScanType.IMMEDIATE_SCAN) {
|
||||
return false;
|
||||
}
|
||||
long blockId = blockInfoSet.first().getBlockId();
|
||||
if ((processedBlocks.get(blockId) != null)
|
||||
&& (processedBlocks.get(blockId) == 1)) {
|
||||
|
|
|
@ -600,6 +600,9 @@ class BlockSender implements java.io.Closeable {
|
|||
String ioem = e.getMessage();
|
||||
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
|
||||
LOG.error("BlockSender.sendChunks() exception: ", e);
|
||||
//Something might be wrong with the block. Make this block the high
|
||||
//priority block for verification.
|
||||
datanode.blockScanner.addBlock(block, true);
|
||||
}
|
||||
}
|
||||
throw ioeToSocketException(e);
|
||||
|
|
|
@ -186,10 +186,10 @@ public class DataBlockScanner implements Runnable {
|
|||
new String[blockPoolScannerMap.keySet().size()]);
|
||||
}
|
||||
|
||||
public void addBlock(ExtendedBlock block) {
|
||||
public void addBlock(ExtendedBlock block, boolean scanNow) {
|
||||
BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
|
||||
if (bpScanner != null) {
|
||||
bpScanner.addBlock(block);
|
||||
bpScanner.addBlock(block, scanNow);
|
||||
} else {
|
||||
LOG.warn("No block pool scanner found for block pool id: "
|
||||
+ block.getBlockPoolId());
|
||||
|
@ -293,6 +293,17 @@ public class DataBlockScanner implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setLastScanTimeDifference(ExtendedBlock block, int lastScanTimeDifference) {
|
||||
BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
|
||||
if (bpScanner != null) {
|
||||
bpScanner.setLastScanTimeDifference(lastScanTimeDifference);
|
||||
} else {
|
||||
LOG.warn("No block pool scanner found for block pool id: "
|
||||
+ block.getBlockPoolId());
|
||||
}
|
||||
}
|
||||
|
||||
public void start() {
|
||||
blockScannerThread = new Thread(this);
|
||||
blockScannerThread.setDaemon(true);
|
||||
|
|
|
@ -2170,7 +2170,7 @@ public class DataNode extends ReconfigurableBase
|
|||
}
|
||||
FsVolumeSpi volume = getFSDataset().getVolume(block);
|
||||
if (blockScanner != null && !volume.isTransientStorage()) {
|
||||
blockScanner.addBlock(block);
|
||||
blockScanner.addBlock(block, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -771,7 +771,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
// Replace the old block if any to reschedule the scanning.
|
||||
datanode.getBlockScanner().addBlock(block);
|
||||
datanode.getBlockScanner().addBlock(block, false);
|
||||
return replicaInfo;
|
||||
}
|
||||
|
||||
|
@ -2035,7 +2035,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
final DataBlockScanner blockScanner = datanode.getBlockScanner();
|
||||
if (!vol.isTransientStorage()) {
|
||||
if (blockScanner != null) {
|
||||
blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
|
||||
blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo), false);
|
||||
}
|
||||
} else {
|
||||
ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -28,7 +27,10 @@ import java.io.IOException;
|
|||
import java.io.RandomAccessFile;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URL;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -42,6 +44,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
|
@ -490,4 +493,59 @@ public class TestDatanodeBlockScanner {
|
|||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test verifies whether block is added to the first location of
|
||||
* BlockPoolSliceScanner#blockInfoSet
|
||||
*/
|
||||
@Test
|
||||
public void testAddBlockInfoToFirstLocation() throws Exception {
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
|
||||
.numDataNodes(1).build();
|
||||
FileSystem fs = null;
|
||||
try {
|
||||
fs = cluster.getFileSystem();
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
// Creating a bunch of blocks
|
||||
for (int i = 1; i < 10; i++) {
|
||||
Path fileName = new Path("/test" + i);
|
||||
DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
|
||||
}
|
||||
// Get block of the first file created (file1)
|
||||
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/test1"));
|
||||
dataNode.getBlockScanner().setLastScanTimeDifference(block, 0);
|
||||
// Let it sleep for more than 5 seconds so that BlockPoolSliceScanner can
|
||||
// scan the first set of blocks
|
||||
Thread.sleep(10000);
|
||||
Long scanTime1Fortest1Block = DataNodeTestUtils.getLatestScanTime(
|
||||
dataNode, block);
|
||||
// Create another set of blocks
|
||||
for (int i = 10; i < 20; i++) {
|
||||
Path fileName = new Path("/test" + i);
|
||||
DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
|
||||
}
|
||||
dataNode.getBlockScanner().addBlock(block, true);
|
||||
// Sleep so that BlockPoolSliceScanner can scan the second set of blocks
|
||||
// and one block which we scheduled to rescan
|
||||
Thread.sleep(10000);
|
||||
// Get the lastScanTime of all of the second set of blocks
|
||||
Set<Long> lastScanTimeSet = new HashSet<Long>();
|
||||
for (int i = 10; i < 20; i++) {
|
||||
long lastScanTime = DataNodeTestUtils.getLatestScanTime(dataNode,
|
||||
DFSTestUtil.getFirstBlock(fs, new Path("/test" + i)));
|
||||
lastScanTimeSet.add(lastScanTime);
|
||||
}
|
||||
Long scanTime2Fortest1Block = DataNodeTestUtils.getLatestScanTime(
|
||||
dataNode, DFSTestUtil.getFirstBlock(fs, new Path("/test1")));
|
||||
Long minimumLastScanTime = Collections.min(lastScanTimeSet);
|
||||
assertTrue("The second scanTime for test1 block should be greater than "
|
||||
+ "first scanTime", scanTime2Fortest1Block > scanTime1Fortest1Block);
|
||||
assertTrue("The second scanTime for test1 block should be less than or"
|
||||
+ " equal to minimum of the lastScanTime of second set of blocks",
|
||||
scanTime2Fortest1Block <= minimumLastScanTime);
|
||||
} finally {
|
||||
IOUtils.closeStream(fs);
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue