HDFS-15618. Improve datanode shutdown latency. Contributed by Ahmed Hussein.

This commit is contained in:
Kihwal Lee 2021-06-16 11:38:30 -05:00
parent e7c7fb2896
commit ab6b5681e8
8 changed files with 280 additions and 7 deletions

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@ -600,6 +602,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 21 * 24; // 3 weeks.
public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second";
public static final long DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;
/**
* The amount of time in milliseconds that the BlockScanner times out waiting
* for the VolumeScanner thread to join during a shutdown call.
*/
public static final String DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY =
"dfs.block.scanner.volume.join.timeout.ms";
public static final long DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT =
TimeUnit.SECONDS.toMillis(5);
public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
@ -66,6 +68,12 @@ public class BlockScanner {
*/
private Conf conf;
/**
* Timeout duration in milliseconds waiting for {@link VolumeScanner} to stop
* inside {@link #removeAllVolumeScanners}.
*/
private long joinVolumeScannersTimeOutMs;
@VisibleForTesting
void setConf(Conf conf) {
this.conf = conf;
@ -179,6 +187,9 @@ public class BlockScanner {
public BlockScanner(DataNode datanode, Configuration conf) {
this.datanode = datanode;
setJoinVolumeScannersTimeOutMs(
conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT));
this.conf = new Conf(conf);
if (isEnabled()) {
LOG.info("Initialized block scanner with targetBytesPerSec {}",
@ -198,6 +209,13 @@ public class BlockScanner {
return (conf.scanPeriodMs > 0) && (conf.targetBytesPerSec > 0);
}
/**
* Returns true if there is any scanner thread registered.
*/
public synchronized boolean hasAnyRegisteredScanner() {
return !scanners.isEmpty();
}
/**
* Set up a scanner for the given block pool and volume.
*
@ -262,7 +280,10 @@ public class BlockScanner {
/**
* Stops and removes all volume scanners.<p/>
*
* This function will block until all the volume scanners have stopped.
* This function is called on shutdown. It will return even if some of
* the scanners don't terminate in time. Since the scanners are daemon
* threads and do not alter the block content, it is safe to ignore
* such conditions on shutdown.
*/
public synchronized void removeAllVolumeScanners() {
for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
@ -270,7 +291,7 @@ public class BlockScanner {
}
for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
Uninterruptibles.joinUninterruptibly(entry.getValue(),
5, TimeUnit.MINUTES);
getJoinVolumeScannersTimeOutMs(), TimeUnit.MILLISECONDS);
}
scanners.clear();
}
@ -346,6 +367,14 @@ public class BlockScanner {
scanner.markSuspectBlock(block);
}
public long getJoinVolumeScannersTimeOutMs() {
return joinVolumeScannersTimeOutMs;
}
public void setJoinVolumeScannersTimeOutMs(long joinScannersTimeOutMs) {
this.joinVolumeScannersTimeOutMs = joinScannersTimeOutMs;
}
@InterfaceAudience.Private
public static class Servlet extends HttpServlet {
private static final long serialVersionUID = 1L;

View File

@ -1579,7 +1579,9 @@ public class DataNode extends ReconfigurableBase
// a block pool id
String bpId = bpos.getBlockPoolId();
blockScanner.disableBlockPoolId(bpId);
if (blockScanner.hasAnyRegisteredScanner()) {
blockScanner.disableBlockPoolId(bpId);
}
if (data != null) {
data.shutdownBlockPool(bpId);

View File

@ -641,12 +641,14 @@ public class VolumeScanner extends Thread {
LOG.error("{} exiting because of exception ", this, e);
}
LOG.info("{} exiting.", this);
VolumeScannerCBInjector.get().preSavingBlockIteratorTask(this);
// Save the current position of all block iterators and close them.
for (BlockIterator iter : blockIters) {
saveBlockIterator(iter);
IOUtils.cleanup(null, iter);
}
} finally {
VolumeScannerCBInjector.get().terminationCallBack(this);
// When the VolumeScanner exits, release the reference we were holding
// on the volume. This will allow the volume to be removed later.
IOUtils.cleanup(null, ref);
@ -666,6 +668,7 @@ public class VolumeScanner extends Thread {
stopping = true;
notify();
this.interrupt();
VolumeScannerCBInjector.get().shutdownCallBack(this);
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Used for injecting call backs in {@link VolumeScanner}
* and {@link BlockScanner} tests.
* Calls into this are a no-op in production code.
*/
@VisibleForTesting
@InterfaceAudience.Private
public class VolumeScannerCBInjector {
private static VolumeScannerCBInjector instance =
new VolumeScannerCBInjector();
public static VolumeScannerCBInjector get() {
return instance;
}
public static void set(VolumeScannerCBInjector injector) {
instance = injector;
}
public void preSavingBlockIteratorTask(final VolumeScanner volumeScanner) {
}
public void shutdownCallBack(final VolumeScanner volumeScanner) {
}
public void terminationCallBack(final VolumeScanner volumeScanner) {
}
}

View File

@ -1408,6 +1408,15 @@
</description>
</property>
<property>
<name>dfs.block.scanner.volume.join.timeout.ms</name>
<value>5000</value>
<description>
The amount of time in milliseconds that the BlockScanner times out waiting
for the VolumeScanner thread to join during a shutdown call.
</description>
</property>
<property>
<name>dfs.datanode.readahead.bytes</name>
<value>4194304</value>

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
@ -62,8 +63,10 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.base.Supplier;
@ -154,6 +157,13 @@ public class MiniDFSCluster implements AutoCloseable {
= DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY
= DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY + ".testing";
/**
* For the Junit tests, this is the default value of the The amount of time
* in milliseconds that the BlockScanner times out waiting for the
* thread to join during a shutdown call.
*/
public static final long DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC =
TimeUnit.SECONDS.toMillis(30);
// Changing this default may break some tests that assume it is 2.
private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
@ -200,8 +210,7 @@ public class MiniDFSCluster implements AutoCloseable {
public Builder(Configuration conf) {
this.conf = conf;
this.storagesPerDatanode =
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
initDefaultConfigurations();
if (null == conf.get(HDFS_MINIDFS_BASEDIR)) {
conf.set(HDFS_MINIDFS_BASEDIR,
new File(getBaseDirectory()).getAbsolutePath());
@ -210,8 +219,7 @@ public class MiniDFSCluster implements AutoCloseable {
public Builder(Configuration conf, File basedir) {
this.conf = conf;
this.storagesPerDatanode =
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
initDefaultConfigurations();
if (null == basedir) {
throw new IllegalArgumentException(
"MiniDFSCluster base directory cannot be null");
@ -475,6 +483,19 @@ public class MiniDFSCluster implements AutoCloseable {
public MiniDFSCluster build() throws IOException {
return new MiniDFSCluster(this);
}
/**
* Initializes default values for the cluster.
*/
private void initDefaultConfigurations() {
long defaultScannerVolumeTimeOut =
conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC);
conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
defaultScannerVolumeTimeOut);
this.storagesPerDatanode =
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
}
}
/**

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS;
@ -32,9 +33,11 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Supplier;
@ -93,9 +96,19 @@ public class TestBlockScanner {
TestContext(Configuration conf, int numNameServices) throws Exception {
this.numNameServices = numNameServices;
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
long volumeScannerTimeOutFromConf =
conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1);
long expectedVScannerTimeOut =
volumeScannerTimeOutFromConf == -1
? MiniDFSCluster.DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC
: volumeScannerTimeOutFromConf;
MiniDFSCluster.Builder bld = new MiniDFSCluster.Builder(conf, basedir).
numDataNodes(1).
storagesPerDatanode(1);
// verify that the builder was initialized to get the default
// configuration designated for Junit tests.
assertEquals(expectedVScannerTimeOut,
conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1));
if (numNameServices > 1) {
bld.nnTopology(MiniDFSNNTopology.
simpleFederatedTopology(numNameServices));
@ -973,4 +986,139 @@ public class TestBlockScanner {
info.blocksScanned = 0;
}
}
/**
* Test a DN does not wait for the VolumeScanners to finish before shutting
* down.
*
* @throws Exception
*/
@Test(timeout=120000)
public void testFastDatanodeShutdown() throws Exception {
// set the joinTimeOut to a value smaller than the completion time of the
// VolumeScanner.
testDatanodeShutDown(50L, 1000L, true);
}
/**
* Test a DN waits for the VolumeScanners to finish before shutting down.
*
* @throws Exception
*/
@Test(timeout=120000)
public void testSlowDatanodeShutdown() throws Exception {
// Set the joinTimeOut to a value larger than the completion time of the
// volume scanner
testDatanodeShutDown(TimeUnit.MINUTES.toMillis(5), 1000L,
false);
}
private void testDatanodeShutDown(final long joinTimeOutMS,
final long delayMS, boolean isFastShutdown) throws Exception {
VolumeScannerCBInjector prevVolumeScannerCBInject =
VolumeScannerCBInjector.get();
try {
DelayVolumeScannerResponseToInterrupt injectDelay =
new DelayVolumeScannerResponseToInterrupt(delayMS);
VolumeScannerCBInjector.set(injectDelay);
Configuration conf = new Configuration();
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
joinTimeOutMS);
final TestContext ctx = new TestContext(conf, 1);
final int numExpectedBlocks = 10;
ctx.createFiles(0, numExpectedBlocks, 1);
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
synchronized (info) {
info.sem = new Semaphore(5);
info.shouldRun = true;
info.notify();
}
// make sure that the scanners are doing progress
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
return info.blocksScanned >= 1;
}
}
}, 3, 30000);
// mark the time where the
long startShutdownTime = Time.monotonicNow();
ctx.datanode.shutdown();
long endShutdownTime = Time.monotonicNow();
long totalTimeShutdown = endShutdownTime - startShutdownTime;
if (isFastShutdown) {
assertTrue("total shutdown time of DN must be smaller than "
+ "VolumeScanner Response time: " + totalTimeShutdown,
totalTimeShutdown < delayMS
&& totalTimeShutdown >= joinTimeOutMS);
// wait for scanners to terminate before we move to the next test.
injectDelay.waitForScanners();
return;
}
assertTrue("total shutdown time of DN must be larger than " +
"VolumeScanner Response time: " + totalTimeShutdown,
totalTimeShutdown >= delayMS
&& totalTimeShutdown < joinTimeOutMS);
} finally {
// restore the VolumeScanner callback injector.
VolumeScannerCBInjector.set(prevVolumeScannerCBInject);
}
}
private static class DelayVolumeScannerResponseToInterrupt extends
VolumeScannerCBInjector {
final private long delayAmountNS;
final private Map<VolumeScanner, Boolean> scannersToShutDown;
DelayVolumeScannerResponseToInterrupt(long delayMS) {
delayAmountNS =
TimeUnit.NANOSECONDS.convert(delayMS, TimeUnit.MILLISECONDS);
scannersToShutDown = new ConcurrentHashMap<>();
}
@Override
public void preSavingBlockIteratorTask(VolumeScanner volumeScanner) {
long remainingTimeNS = delayAmountNS;
// busy delay without sleep().
long startTime = Time.monotonicNowNanos();
long endTime = startTime + remainingTimeNS;
long currTime, waitTime = 0;
while ((currTime = Time.monotonicNowNanos()) < endTime) {
// empty loop. No need to sleep because the thread could be in an
// interrupt mode.
waitTime = currTime - startTime;
}
LOG.info("VolumeScanner {} finished delayed Task after {}",
volumeScanner.toString(),
TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.MILLISECONDS));
}
@Override
public void shutdownCallBack(VolumeScanner volumeScanner) {
scannersToShutDown.put(volumeScanner, true);
}
@Override
public void terminationCallBack(VolumeScanner volumeScanner) {
scannersToShutDown.remove(volumeScanner);
}
public void waitForScanners() throws TimeoutException,
InterruptedException {
GenericTestUtils.waitFor(
new Supplier<Boolean>() {
@Override
public Boolean get() {
return scannersToShutDown.isEmpty();
}
}, 10, 120000);
}
}
}