diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f0c6dd14e5d..860939e805a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -648,6 +648,9 @@ Release 2.7.0 - UNRELEASED HDFS-7721. The HDFS BlockScanner may run fast during the first hour (cmccabe) + HDFS-7686. Re-add rapid rescan of possibly corrupt block feature to the + block scanner (cmccabe) + BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS HDFS-7720. Quota by Storage Type API, tools and ClientNameNode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java index 7429fff9adb..b0248c50d9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java @@ -29,6 +29,7 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; @@ -278,6 +279,37 @@ public class BlockScanner { } } + /** + * Mark a block as "suspect." + * + * This means that we should try to rescan it soon. Note that the + * VolumeScanner keeps a list of recently suspicious blocks, which + * it uses to avoid rescanning the same block over and over in a short + * time frame. + * + * @param storageId The ID of the storage where the block replica + * is being stored. + * @param block The block's ID and block pool id. + */ + synchronized void markSuspectBlock(String storageId, ExtendedBlock block) { + if (!isEnabled()) { + LOG.info("Not scanning suspicious block {} on {}, because the block " + + "scanner is disabled.", block, storageId); + return; + } + VolumeScanner scanner = scanners.get(storageId); + if (scanner == null) { + // This could happen if the volume is in the process of being removed. + // The removal process shuts down the VolumeScanner, but the volume + // object stays around as long as there are references to it (which + // should not be that long.) + LOG.info("Not scanning suspicious block {} on {}, because there is no " + + "volume scanner for that storageId.", block, storageId); + return; + } + scanner.markSuspectBlock(block); + } + @InterfaceAudience.Private public static class Servlet extends HttpServlet { private static final long serialVersionUID = 1L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index c016e62e7f2..f4cde11678a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -601,6 +601,9 @@ class BlockSender implements java.io.Closeable { if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) { LOG.error("BlockSender.sendChunks() exception: ", e); } + datanode.getBlockScanner().markSuspectBlock( + volumeRef.getVolume().getStorageID(), + block); } throw ioeToSocketException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java index ce0a8755f2f..615abe91f0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java @@ -22,12 +22,15 @@ import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf; @@ -116,6 +119,21 @@ public class VolumeScanner extends Thread { private final List blockIters = new LinkedList(); + /** + * Blocks which are suspect. + * The scanner prioritizes scanning these blocks. + */ + private final LinkedHashSet suspectBlocks = + new LinkedHashSet(); + + /** + * Blocks which were suspect which we have scanned. + * This is used to avoid scanning the same suspect block over and over. + */ + private final Cache recentSuspectBlocks = + CacheBuilder.newBuilder().maximumSize(1000) + .expireAfterAccess(10, TimeUnit.MINUTES).build(); + /** * The current block iterator, or null if there is none. */ @@ -458,10 +476,13 @@ public class VolumeScanner extends Thread { /** * Run an iteration of the VolumeScanner loop. * + * @param suspectBlock A suspect block which we should scan, or null to + * scan the next regularly scheduled block. + * * @return The number of milliseconds to delay before running the loop * again, or 0 to re-run the loop immediately. */ - private long runLoop() { + private long runLoop(ExtendedBlock suspectBlock) { long bytesScanned = -1; boolean scanError = false; ExtendedBlock block = null; @@ -477,40 +498,43 @@ public class VolumeScanner extends Thread { } // Find a usable block pool to scan. - if ((curBlockIter == null) || curBlockIter.atEnd()) { - long timeout = findNextUsableBlockIter(); - if (timeout > 0) { - LOG.trace("{}: no block pools are ready to scan yet. Waiting " + - "{} ms.", this, timeout); - synchronized (stats) { - stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout; + if (suspectBlock != null) { + block = suspectBlock; + } else { + if ((curBlockIter == null) || curBlockIter.atEnd()) { + long timeout = findNextUsableBlockIter(); + if (timeout > 0) { + LOG.trace("{}: no block pools are ready to scan yet. Waiting " + + "{} ms.", this, timeout); + synchronized (stats) { + stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout; + } + return timeout; } - return timeout; + synchronized (stats) { + stats.scansSinceRestart++; + stats.blocksScannedInCurrentPeriod = 0; + stats.nextBlockPoolScanStartMs = -1; + } + return 0L; } - synchronized (stats) { - stats.scansSinceRestart++; - stats.blocksScannedInCurrentPeriod = 0; - stats.nextBlockPoolScanStartMs = -1; + try { + block = curBlockIter.nextBlock(); + } catch (IOException e) { + // There was an error listing the next block in the volume. This is a + // serious issue. + LOG.warn("{}: nextBlock error on {}", this, curBlockIter); + // On the next loop iteration, curBlockIter#eof will be set to true, and + // we will pick a different block iterator. + return 0L; + } + if (block == null) { + // The BlockIterator is at EOF. + LOG.info("{}: finished scanning block pool {}", + this, curBlockIter.getBlockPoolId()); + saveBlockIterator(curBlockIter); + return 0; } - return 0L; - } - - try { - block = curBlockIter.nextBlock(); - } catch (IOException e) { - // There was an error listing the next block in the volume. This is a - // serious issue. - LOG.warn("{}: nextBlock error on {}", this, curBlockIter); - // On the next loop iteration, curBlockIter#eof will be set to true, and - // we will pick a different block iterator. - return 0L; - } - if (block == null) { - // The BlockIterator is at EOF. - LOG.info("{}: finished scanning block pool {}", - this, curBlockIter.getBlockPoolId()); - saveBlockIterator(curBlockIter); - return 0; } long saveDelta = monotonicMs - curBlockIter.getLastSavedMs(); if (saveDelta >= conf.cursorSaveMs) { @@ -529,7 +553,7 @@ public class VolumeScanner extends Thread { } finally { synchronized (stats) { stats.bytesScannedInPastHour = scannedBytesSum; - if (bytesScanned >= 0) { + if (bytesScanned > 0) { stats.blocksScannedInCurrentPeriod++; stats.blocksScannedSinceRestart++; } @@ -551,6 +575,20 @@ public class VolumeScanner extends Thread { } } + /** + * If there are elements in the suspectBlocks list, removes + * and returns the first one. Otherwise, returns null. + */ + private synchronized ExtendedBlock popNextSuspectBlock() { + Iterator iter = suspectBlocks.iterator(); + if (!iter.hasNext()) { + return null; + } + ExtendedBlock block = iter.next(); + iter.remove(); + return block; + } + @Override public void run() { // Record the minute on which the scanner started. @@ -563,7 +601,9 @@ public class VolumeScanner extends Thread { try { long timeout = 0; while (true) { - // Take the lock to check if we should stop. + ExtendedBlock suspectBlock = null; + // Take the lock to check if we should stop, and access the + // suspect block list. synchronized (this) { if (stopping) { break; @@ -574,8 +614,9 @@ public class VolumeScanner extends Thread { break; } } + suspectBlock = popNextSuspectBlock(); } - timeout = runLoop(); + timeout = runLoop(suspectBlock); } } catch (InterruptedException e) { // We are exiting because of an InterruptedException, @@ -612,6 +653,30 @@ public class VolumeScanner extends Thread { this.interrupt(); } + + public synchronized void markSuspectBlock(ExtendedBlock block) { + if (stopping) { + LOG.info("{}: Not scheduling suspect block {} for " + + "rescanning, because this volume scanner is stopping.", this, block); + return; + } + Boolean recent = recentSuspectBlocks.getIfPresent(block); + if (recent != null) { + LOG.info("{}: Not scheduling suspect block {} for " + + "rescanning, because we rescanned it recently.", this, block); + return; + } + if (suspectBlocks.contains(block)) { + LOG.info("{}: suspect block {} is already queued for " + + "rescanning.", this, block); + return; + } + suspectBlocks.add(block); + recentSuspectBlocks.put(block, true); + LOG.info("{}: Scheduling suspect block {} for rescanning.", this, block); + notify(); // wake scanner thread. + } + /** * Allow the scanner to scan the given block pool. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java index b727263e2a9..735e9a12b0b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java @@ -268,6 +268,20 @@ public class TestBlockScanner { final Set goodBlocks = new HashSet(); long blocksScanned = 0; Semaphore sem = null; + + @Override + public String toString() { + final StringBuilder bld = new StringBuilder(); + bld.append("ScanResultHandler.Info{"); + bld.append("shouldRun=").append(shouldRun).append(", "); + bld.append("blocksScanned=").append(blocksScanned).append(", "); + bld.append("sem#availablePermits=").append(sem.availablePermits()). + append(", "); + bld.append("badBlocks=").append(badBlocks).append(", "); + bld.append("goodBlocks=").append(goodBlocks); + bld.append("}"); + return bld.toString(); + } } private VolumeScanner scanner; @@ -681,4 +695,121 @@ public class TestBlockScanner { Assert.assertFalse(VolumeScanner. calculateShouldScan("test", 100000L, 365000000L, 0, 60)); } + + /** + * Test that we can mark certain blocks as suspect, and get them quickly + * rescanned that way. See HDFS-7686 and HDFS-7548. + */ + @Test(timeout=120000) + public void testMarkSuspectBlock() throws Exception { + Configuration conf = new Configuration(); + // Set a really long scan period. + conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L); + conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, + TestScanResultHandler.class.getName()); + conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L); + final TestContext ctx = new TestContext(conf, 1); + final int NUM_EXPECTED_BLOCKS = 10; + ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1); + final TestScanResultHandler.Info info = + TestScanResultHandler.getInfo(ctx.volumes.get(0)); + String storageID = ctx.datanode.getFSDataset(). + getVolumes().get(0).getStorageID(); + synchronized (info) { + info.sem = new Semaphore(4); + info.shouldRun = true; + info.notify(); + } + // Scan the first 4 blocks + LOG.info("Waiting for the first 4 blocks to be scanned."); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + synchronized (info) { + if (info.blocksScanned >= 4) { + LOG.info("info = {}. blockScanned has now reached 4.", info); + return true; + } else { + LOG.info("info = {}. Waiting for blockScanned to reach 4.", info); + return false; + } + } + } + }, 50, 30000); + // We should have scanned 4 blocks + synchronized (info) { + assertEquals("Expected 4 good blocks.", 4, info.goodBlocks.size()); + info.goodBlocks.clear(); + assertEquals("Expected 4 blocksScanned", 4, info.blocksScanned); + assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size()); + info.blocksScanned = 0; + } + ExtendedBlock first = ctx.getFileBlock(0, 0); + ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first); + + // When we increment the semaphore, the TestScanResultHandler will finish + // adding the block that it was scanning previously (the 5th block). + // We increment the semaphore twice so that the handler will also + // get a chance to see the suspect block which we just requested the + // VolumeScanner to process. + info.sem.release(2); + + LOG.info("Waiting for 2 more blocks to be scanned."); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + synchronized (info) { + if (info.blocksScanned >= 2) { + LOG.info("info = {}. blockScanned has now reached 2.", info); + return true; + } else { + LOG.info("info = {}. Waiting for blockScanned to reach 2.", info); + return false; + } + } + } + }, 50, 30000); + + synchronized (info) { + assertTrue("Expected block " + first + " to have been scanned.", + info.goodBlocks.contains(first)); + assertEquals(2, info.goodBlocks.size()); + info.goodBlocks.clear(); + assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size()); + assertEquals(2, info.blocksScanned); + info.blocksScanned = 0; + } + + // Re-mark the same block as suspect. + ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first); + info.sem.release(10); + + LOG.info("Waiting for 5 more blocks to be scanned."); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + synchronized (info) { + if (info.blocksScanned >= 5) { + LOG.info("info = {}. blockScanned has now reached 5.", info); + return true; + } else { + LOG.info("info = {}. Waiting for blockScanned to reach 5.", info); + return false; + } + } + } + }, 50, 30000); + synchronized (info) { + assertEquals(5, info.goodBlocks.size()); + assertEquals(0, info.badBlocks.size()); + assertEquals(5, info.blocksScanned); + // We should not have rescanned the "suspect block", + // because it was recently rescanned by the suspect block system. + // This is a test of the "suspect block" rate limiting. + Assert.assertFalse("We should not " + + "have rescanned block " + first + ", because it should have been " + + "in recentSuspectBlocks.", info.goodBlocks.contains(first)); + info.blocksScanned = 0; + } + } }