diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 4b8c27b563d..52647999270 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -846,6 +846,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 21 * 24; // 3 weeks. public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second"; public static final long DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L; + /** + * The amount of time in milliseconds that the BlockScanner times out waiting + * for the VolumeScanner thread to join during a shutdown call. + */ + public static final String DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY = + "dfs.block.scanner.volume.join.timeout.ms"; + public static final long DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT = + TimeUnit.SECONDS.toMillis(5); public static final String DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED = "dfs.block.scanner.skip.recent.accessed"; public static final boolean DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java index 82efcf86432..dc619f24e71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; @@ -68,6 +70,12 @@ public class BlockScanner { */ private Conf conf; + /** + * Timeout duration in milliseconds waiting for {@link VolumeScanner} to stop + * inside {@link #removeAllVolumeScanners}. + */ + private long joinVolumeScannersTimeOutMs; + @VisibleForTesting void setConf(Conf conf) { this.conf = conf; @@ -185,6 +193,9 @@ public class BlockScanner { public BlockScanner(DataNode datanode, Configuration conf) { this.datanode = datanode; + setJoinVolumeScannersTimeOutMs( + conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, + DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT)); this.conf = new Conf(conf); if (isEnabled()) { LOG.info("Initialized block scanner with targetBytesPerSec {}", @@ -204,6 +215,13 @@ public class BlockScanner { return (conf.scanPeriodMs > 0) && (conf.targetBytesPerSec > 0); } + /** + * Returns true if there is any scanner thread registered. + */ + public synchronized boolean hasAnyRegisteredScanner() { + return !scanners.isEmpty(); + } + /** * Set up a scanner for the given block pool and volume. * @@ -268,7 +286,10 @@ public class BlockScanner { /** * Stops and removes all volume scanners. * - * This function will block until all the volume scanners have stopped. + * This function is called on shutdown. It will return even if some of + * the scanners don't terminate in time. Since the scanners are daemon + * threads and do not alter the block content, it is safe to ignore + * such conditions on shutdown. */ public synchronized void removeAllVolumeScanners() { for (Entry entry : scanners.entrySet()) { @@ -276,7 +297,7 @@ public class BlockScanner { } for (Entry entry : scanners.entrySet()) { Uninterruptibles.joinUninterruptibly(entry.getValue(), - 5, TimeUnit.MINUTES); + getJoinVolumeScannersTimeOutMs(), TimeUnit.MILLISECONDS); } scanners.clear(); } @@ -352,6 +373,14 @@ public class BlockScanner { scanner.markSuspectBlock(block); } + public long getJoinVolumeScannersTimeOutMs() { + return joinVolumeScannersTimeOutMs; + } + + public void setJoinVolumeScannersTimeOutMs(long joinScannersTimeOutMs) { + this.joinVolumeScannersTimeOutMs = joinScannersTimeOutMs; + } + @InterfaceAudience.Private public static class Servlet extends HttpServlet { private static final long serialVersionUID = 1L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 380343ded5e..1ddd05d3106 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1661,7 +1661,9 @@ public class DataNode extends ReconfigurableBase // a block pool id String bpId = bpos.getBlockPoolId(); - blockScanner.disableBlockPoolId(bpId); + if (blockScanner.hasAnyRegisteredScanner()) { + blockScanner.disableBlockPoolId(bpId); + } if (data != null) { data.shutdownBlockPool(bpId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java index 5e3d523cde3..4728583afc5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java @@ -670,12 +670,14 @@ public class VolumeScanner extends Thread { LOG.error("{} exiting because of exception ", this, e); } LOG.info("{} exiting.", this); + VolumeScannerCBInjector.get().preSavingBlockIteratorTask(this); // Save the current position of all block iterators and close them. for (BlockIterator iter : blockIters) { saveBlockIterator(iter); IOUtils.cleanup(null, iter); } } finally { + VolumeScannerCBInjector.get().terminationCallBack(this); // When the VolumeScanner exits, release the reference we were holding // on the volume. This will allow the volume to be removed later. IOUtils.cleanup(null, ref); @@ -695,6 +697,7 @@ public class VolumeScanner extends Thread { stopping = true; notify(); this.interrupt(); + VolumeScannerCBInjector.get().shutdownCallBack(this); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java new file mode 100644 index 00000000000..5798bd18f80 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Used for injecting call backs in {@link VolumeScanner} + * and {@link BlockScanner} tests. + * Calls into this are a no-op in production code. + */ +@VisibleForTesting +@InterfaceAudience.Private +public class VolumeScannerCBInjector { + private static VolumeScannerCBInjector instance = + new VolumeScannerCBInjector(); + + public static VolumeScannerCBInjector get() { + return instance; + } + + public static void set(VolumeScannerCBInjector injector) { + instance = injector; + } + + public void preSavingBlockIteratorTask(final VolumeScanner volumeScanner) { + } + + public void shutdownCallBack(final VolumeScanner volumeScanner) { + } + + public void terminationCallBack(final VolumeScanner volumeScanner) { + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 00d5dfd15ae..62b9c5ccfef 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1596,6 +1596,15 @@ + + dfs.block.scanner.volume.join.timeout.ms + 5000 + + The amount of time in milliseconds that the BlockScanner times out waiting + for the VolumeScanner thread to join during a shutdown call. + + + dfs.datanode.readahead.bytes 4194304 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index a2a90669bbd..ff378d53263 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NO import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; @@ -80,6 +81,7 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAliasMapClient; +import org.apache.hadoop.hdfs.server.datanode.VolumeScanner; import org.apache.hadoop.hdfs.server.namenode.ImageServlet; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.security.ssl.KeyStoreTestUtil; @@ -171,6 +173,13 @@ public class MiniDFSCluster implements AutoCloseable { = DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing"; public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY = DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY + ".testing"; + /** + * For the Junit tests, this is the default value of the The amount of time + * in milliseconds that the BlockScanner times out waiting for the + * {@link VolumeScanner} thread to join during a shutdown call. + */ + public static final long DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC = + TimeUnit.SECONDS.toMillis(30); // Changing this default may break some tests that assume it is 2. private static final int DEFAULT_STORAGES_PER_DATANODE = 2; @@ -217,8 +226,7 @@ public class MiniDFSCluster implements AutoCloseable { public Builder(Configuration conf) { this.conf = conf; - this.storagesPerDatanode = - FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs(); + initDefaultConfigurations(); if (null == conf.get(HDFS_MINIDFS_BASEDIR)) { conf.set(HDFS_MINIDFS_BASEDIR, new File(getBaseDirectory()).getAbsolutePath()); @@ -227,8 +235,7 @@ public class MiniDFSCluster implements AutoCloseable { public Builder(Configuration conf, File basedir) { this.conf = conf; - this.storagesPerDatanode = - FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs(); + initDefaultConfigurations(); if (null == basedir) { throw new IllegalArgumentException( "MiniDFSCluster base directory cannot be null"); @@ -492,6 +499,19 @@ public class MiniDFSCluster implements AutoCloseable { public MiniDFSCluster build() throws IOException { return new MiniDFSCluster(this); } + + /** + * Initializes default values for the cluster. + */ + private void initDefaultConfigurations() { + long defaultScannerVolumeTimeOut = + conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, + DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC); + conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, + defaultScannerVolumeTimeOut); + this.storagesPerDatanode = + FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs(); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java index 75d1c4475ed..b34b7df0a92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND; import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS; @@ -38,6 +39,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; @@ -96,9 +98,19 @@ public class TestBlockScanner { TestContext(Configuration conf, int numNameServices) throws Exception { this.numNameServices = numNameServices; File basedir = new File(GenericTestUtils.getRandomizedTempPath()); + long volumeScannerTimeOutFromConf = + conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1); + long expectedVScannerTimeOut = + volumeScannerTimeOutFromConf == -1 + ? MiniDFSCluster.DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC + : volumeScannerTimeOutFromConf; MiniDFSCluster.Builder bld = new MiniDFSCluster.Builder(conf, basedir). numDataNodes(1). storagesPerDatanode(1); + // verify that the builder was initialized to get the default + // configuration designated for Junit tests. + assertEquals(expectedVScannerTimeOut, + conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1)); if (numNameServices > 1) { bld.nnTopology(MiniDFSNNTopology. simpleFederatedTopology(numNameServices)); @@ -1012,4 +1024,134 @@ public class TestBlockScanner { 0, info.blocksScanned); ctx.close(); } + + /** + * Test a DN does not wait for the VolumeScanners to finish before shutting + * down. + * + * @throws Exception + */ + @Test(timeout=120000) + public void testFastDatanodeShutdown() throws Exception { + // set the joinTimeOut to a value smaller than the completion time of the + // VolumeScanner. + testDatanodeShutDown(50L, 1000L, true); + } + + /** + * Test a DN waits for the VolumeScanners to finish before shutting down. + * + * @throws Exception + */ + @Test(timeout=120000) + public void testSlowDatanodeShutdown() throws Exception { + // Set the joinTimeOut to a value larger than the completion time of the + // volume scanner + testDatanodeShutDown(TimeUnit.MINUTES.toMillis(5), 1000L, + false); + } + + private void testDatanodeShutDown(final long joinTimeOutMS, + final long delayMS, boolean isFastShutdown) throws Exception { + VolumeScannerCBInjector prevVolumeScannerCBInject = + VolumeScannerCBInjector.get(); + try { + DelayVolumeScannerResponseToInterrupt injectDelay = + new DelayVolumeScannerResponseToInterrupt(delayMS); + VolumeScannerCBInjector.set(injectDelay); + Configuration conf = new Configuration(); + conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L); + conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, + TestScanResultHandler.class.getName()); + conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L); + conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, + joinTimeOutMS); + final TestContext ctx = new TestContext(conf, 1); + final int numExpectedBlocks = 10; + ctx.createFiles(0, numExpectedBlocks, 1); + final TestScanResultHandler.Info info = + TestScanResultHandler.getInfo(ctx.volumes.get(0)); + synchronized (info) { + info.sem = new Semaphore(5); + info.shouldRun = true; + info.notify(); + } + // make sure that the scanners are doing progress + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + synchronized (info) { + return info.blocksScanned >= 1; + } + } + }, 3, 30000); + // mark the time where the + long startShutdownTime = Time.monotonicNow(); + ctx.datanode.shutdown(); + long endShutdownTime = Time.monotonicNow(); + long totalTimeShutdown = endShutdownTime - startShutdownTime; + + if (isFastShutdown) { + assertTrue("total shutdown time of DN must be smaller than " + + "VolumeScanner Response time: " + totalTimeShutdown, + totalTimeShutdown < delayMS + && totalTimeShutdown >= joinTimeOutMS); + // wait for scanners to terminate before we move to the next test. + injectDelay.waitForScanners(); + return; + } + assertTrue("total shutdown time of DN must be larger than " + + "VolumeScanner Response time: " + totalTimeShutdown, + totalTimeShutdown >= delayMS + && totalTimeShutdown < joinTimeOutMS); + } finally { + // restore the VolumeScanner callback injector. + VolumeScannerCBInjector.set(prevVolumeScannerCBInject); + } + } + + private static class DelayVolumeScannerResponseToInterrupt extends + VolumeScannerCBInjector { + final private long delayAmountNS; + final private Set scannersToShutDown; + + DelayVolumeScannerResponseToInterrupt(long delayMS) { + delayAmountNS = + TimeUnit.NANOSECONDS.convert(delayMS, TimeUnit.MILLISECONDS); + scannersToShutDown = ConcurrentHashMap.newKeySet(); + } + + @Override + public void preSavingBlockIteratorTask(VolumeScanner volumeScanner) { + long remainingTimeNS = delayAmountNS; + // busy delay without sleep(). + long startTime = Time.monotonicNowNanos(); + long endTime = startTime + remainingTimeNS; + long currTime, waitTime = 0; + while ((currTime = Time.monotonicNowNanos()) < endTime) { + // empty loop. No need to sleep because the thread could be in an + // interrupt mode. + waitTime = currTime - startTime; + } + LOG.info("VolumeScanner {} finished delayed Task after {}", + volumeScanner.toString(), + TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.MILLISECONDS)); + } + + @Override + public void shutdownCallBack(VolumeScanner volumeScanner) { + scannersToShutDown.add(volumeScanner); + } + + @Override + public void terminationCallBack(VolumeScanner volumeScanner) { + scannersToShutDown.remove(volumeScanner); + } + + public void waitForScanners() throws TimeoutException, + InterruptedException { + GenericTestUtils.waitFor( + () -> scannersToShutDown.isEmpty(), 10, 120000); + } + } }