HDFS-15618. Improve datanode shutdown latency. Contributed by Ahmed Hussein.

(cherry picked from commit cf932a7e2d)
This commit is contained in:
Kihwal Lee 2020-10-22 14:59:09 -05:00
parent 0d6958962c
commit f3b2d85690
8 changed files with 273 additions and 7 deletions

View File

@ -31,6 +31,8 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalcul
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.http.HttpConfig;
import java.util.concurrent.TimeUnit;
/**
* This class contains constants for configuration keys and default values
* used in hdfs.
@ -787,6 +789,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 21 * 24; // 3 weeks.
public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second";
public static final long DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;
/**
* The amount of time in milliseconds that the BlockScanner times out waiting
* for the VolumeScanner thread to join during a shutdown call.
*/
public static final String DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY =
"dfs.block.scanner.volume.join.timeout.ms";
public static final long DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT =
TimeUnit.SECONDS.toMillis(5);
public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
@ -66,6 +68,12 @@ public class BlockScanner {
*/
private Conf conf;
/**
* Timeout duration in milliseconds waiting for {@link VolumeScanner} to stop
* inside {@link #removeAllVolumeScanners}.
*/
private long joinVolumeScannersTimeOutMs;
@VisibleForTesting
void setConf(Conf conf) {
this.conf = conf;
@ -179,6 +187,9 @@ public class BlockScanner {
public BlockScanner(DataNode datanode, Configuration conf) {
this.datanode = datanode;
setJoinVolumeScannersTimeOutMs(
conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT));
this.conf = new Conf(conf);
if (isEnabled()) {
LOG.info("Initialized block scanner with targetBytesPerSec {}",
@ -198,6 +209,13 @@ public class BlockScanner {
return (conf.scanPeriodMs > 0) && (conf.targetBytesPerSec > 0);
}
/**
* Returns true if there is any scanner thread registered.
*/
public synchronized boolean hasAnyRegisteredScanner() {
return !scanners.isEmpty();
}
/**
* Set up a scanner for the given block pool and volume.
*
@ -262,7 +280,10 @@ public class BlockScanner {
/**
* Stops and removes all volume scanners.<p/>
*
* This function will block until all the volume scanners have stopped.
* This function is called on shutdown. It will return even if some of
* the scanners don't terminate in time. Since the scanners are daemon
* threads and do not alter the block content, it is safe to ignore
* such conditions on shutdown.
*/
public synchronized void removeAllVolumeScanners() {
for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
@ -270,7 +291,7 @@ public class BlockScanner {
}
for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
Uninterruptibles.joinUninterruptibly(entry.getValue(),
5, TimeUnit.MINUTES);
getJoinVolumeScannersTimeOutMs(), TimeUnit.MILLISECONDS);
}
scanners.clear();
}
@ -346,6 +367,14 @@ public class BlockScanner {
scanner.markSuspectBlock(block);
}
public long getJoinVolumeScannersTimeOutMs() {
return joinVolumeScannersTimeOutMs;
}
public void setJoinVolumeScannersTimeOutMs(long joinScannersTimeOutMs) {
this.joinVolumeScannersTimeOutMs = joinScannersTimeOutMs;
}
@InterfaceAudience.Private
public static class Servlet extends HttpServlet {
private static final long serialVersionUID = 1L;

View File

@ -1641,7 +1641,9 @@ public class DataNode extends ReconfigurableBase
// a block pool id
String bpId = bpos.getBlockPoolId();
blockScanner.disableBlockPoolId(bpId);
if (blockScanner.hasAnyRegisteredScanner()) {
blockScanner.disableBlockPoolId(bpId);
}
if (data != null) {
data.shutdownBlockPool(bpId);

View File

@ -635,12 +635,14 @@ public class VolumeScanner extends Thread {
LOG.error("{} exiting because of exception ", this, e);
}
LOG.info("{} exiting.", this);
VolumeScannerCBInjector.get().preSavingBlockIteratorTask(this);
// Save the current position of all block iterators and close them.
for (BlockIterator iter : blockIters) {
saveBlockIterator(iter);
IOUtils.cleanup(null, iter);
}
} finally {
VolumeScannerCBInjector.get().terminationCallBack(this);
// When the VolumeScanner exits, release the reference we were holding
// on the volume. This will allow the volume to be removed later.
IOUtils.cleanup(null, ref);
@ -660,6 +662,7 @@ public class VolumeScanner extends Thread {
stopping = true;
notify();
this.interrupt();
VolumeScannerCBInjector.get().shutdownCallBack(this);
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Used for injecting call backs in {@link VolumeScanner}
* and {@link BlockScanner} tests.
* Calls into this are a no-op in production code.
*/
@VisibleForTesting
@InterfaceAudience.Private
public class VolumeScannerCBInjector {
private static VolumeScannerCBInjector instance =
new VolumeScannerCBInjector();
public static VolumeScannerCBInjector get() {
return instance;
}
public static void set(VolumeScannerCBInjector injector) {
instance = injector;
}
public void preSavingBlockIteratorTask(final VolumeScanner volumeScanner) {
}
public void shutdownCallBack(final VolumeScanner volumeScanner) {
}
public void terminationCallBack(final VolumeScanner volumeScanner) {
}
}

View File

@ -1446,6 +1446,15 @@
</description>
</property>
<property>
<name>dfs.block.scanner.volume.join.timeout.ms</name>
<value>5000</value>
<description>
The amount of time in milliseconds that the BlockScanner times out waiting
for the VolumeScanner thread to join during a shutdown call.
</description>
</property>
<property>
<name>dfs.datanode.readahead.bytes</name>
<value>4194304</value>

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NO
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
@ -67,6 +68,7 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import org.apache.hadoop.hdfs.server.datanode.VolumeScanner;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -156,6 +158,13 @@ public class MiniDFSCluster implements AutoCloseable {
= DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY
= DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY + ".testing";
/**
* For the Junit tests, this is the default value of the The amount of time
* in milliseconds that the BlockScanner times out waiting for the
* {@link VolumeScanner} thread to join during a shutdown call.
*/
public static final long DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC =
TimeUnit.SECONDS.toMillis(30);
// Changing this default may break some tests that assume it is 2.
private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
@ -202,8 +211,7 @@ public class MiniDFSCluster implements AutoCloseable {
public Builder(Configuration conf) {
this.conf = conf;
this.storagesPerDatanode =
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
initDefaultConfigurations();
if (null == conf.get(HDFS_MINIDFS_BASEDIR)) {
conf.set(HDFS_MINIDFS_BASEDIR,
new File(getBaseDirectory()).getAbsolutePath());
@ -212,8 +220,7 @@ public class MiniDFSCluster implements AutoCloseable {
public Builder(Configuration conf, File basedir) {
this.conf = conf;
this.storagesPerDatanode =
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
initDefaultConfigurations();
if (null == basedir) {
throw new IllegalArgumentException(
"MiniDFSCluster base directory cannot be null");
@ -477,6 +484,19 @@ public class MiniDFSCluster implements AutoCloseable {
public MiniDFSCluster build() throws IOException {
return new MiniDFSCluster(this);
}
/**
* Initializes default values for the cluster.
*/
private void initDefaultConfigurations() {
long defaultScannerVolumeTimeOut =
conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC);
conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
defaultScannerVolumeTimeOut);
this.storagesPerDatanode =
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
}
}
/**

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS;
@ -36,6 +37,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
@ -94,9 +96,19 @@ public class TestBlockScanner {
TestContext(Configuration conf, int numNameServices) throws Exception {
this.numNameServices = numNameServices;
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
long volumeScannerTimeOutFromConf =
conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1);
long expectedVScannerTimeOut =
volumeScannerTimeOutFromConf == -1
? MiniDFSCluster.DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC
: volumeScannerTimeOutFromConf;
MiniDFSCluster.Builder bld = new MiniDFSCluster.Builder(conf, basedir).
numDataNodes(1).
storagesPerDatanode(1);
// verify that the builder was initialized to get the default
// configuration designated for Junit tests.
assertEquals(expectedVScannerTimeOut,
conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1));
if (numNameServices > 1) {
bld.nnTopology(MiniDFSNNTopology.
simpleFederatedTopology(numNameServices));
@ -974,4 +986,134 @@ public class TestBlockScanner {
info.blocksScanned = 0;
}
}
/**
* Test a DN does not wait for the VolumeScanners to finish before shutting
* down.
*
* @throws Exception
*/
@Test(timeout=120000)
public void testFastDatanodeShutdown() throws Exception {
// set the joinTimeOut to a value smaller than the completion time of the
// VolumeScanner.
testDatanodeShutDown(50L, 1000L, true);
}
/**
* Test a DN waits for the VolumeScanners to finish before shutting down.
*
* @throws Exception
*/
@Test(timeout=120000)
public void testSlowDatanodeShutdown() throws Exception {
// Set the joinTimeOut to a value larger than the completion time of the
// volume scanner
testDatanodeShutDown(TimeUnit.MINUTES.toMillis(5), 1000L,
false);
}
private void testDatanodeShutDown(final long joinTimeOutMS,
final long delayMS, boolean isFastShutdown) throws Exception {
VolumeScannerCBInjector prevVolumeScannerCBInject =
VolumeScannerCBInjector.get();
try {
DelayVolumeScannerResponseToInterrupt injectDelay =
new DelayVolumeScannerResponseToInterrupt(delayMS);
VolumeScannerCBInjector.set(injectDelay);
Configuration conf = new Configuration();
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
joinTimeOutMS);
final TestContext ctx = new TestContext(conf, 1);
final int numExpectedBlocks = 10;
ctx.createFiles(0, numExpectedBlocks, 1);
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
synchronized (info) {
info.sem = new Semaphore(5);
info.shouldRun = true;
info.notify();
}
// make sure that the scanners are doing progress
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
return info.blocksScanned >= 1;
}
}
}, 3, 30000);
// mark the time where the
long startShutdownTime = Time.monotonicNow();
ctx.datanode.shutdown();
long endShutdownTime = Time.monotonicNow();
long totalTimeShutdown = endShutdownTime - startShutdownTime;
if (isFastShutdown) {
assertTrue("total shutdown time of DN must be smaller than "
+ "VolumeScanner Response time: " + totalTimeShutdown,
totalTimeShutdown < delayMS
&& totalTimeShutdown >= joinTimeOutMS);
// wait for scanners to terminate before we move to the next test.
injectDelay.waitForScanners();
return;
}
assertTrue("total shutdown time of DN must be larger than " +
"VolumeScanner Response time: " + totalTimeShutdown,
totalTimeShutdown >= delayMS
&& totalTimeShutdown < joinTimeOutMS);
} finally {
// restore the VolumeScanner callback injector.
VolumeScannerCBInjector.set(prevVolumeScannerCBInject);
}
}
private static class DelayVolumeScannerResponseToInterrupt extends
VolumeScannerCBInjector {
final private long delayAmountNS;
final private Set<VolumeScanner> scannersToShutDown;
DelayVolumeScannerResponseToInterrupt(long delayMS) {
delayAmountNS =
TimeUnit.NANOSECONDS.convert(delayMS, TimeUnit.MILLISECONDS);
scannersToShutDown = ConcurrentHashMap.newKeySet();
}
@Override
public void preSavingBlockIteratorTask(VolumeScanner volumeScanner) {
long remainingTimeNS = delayAmountNS;
// busy delay without sleep().
long startTime = Time.monotonicNowNanos();
long endTime = startTime + remainingTimeNS;
long currTime, waitTime = 0;
while ((currTime = Time.monotonicNowNanos()) < endTime) {
// empty loop. No need to sleep because the thread could be in an
// interrupt mode.
waitTime = currTime - startTime;
}
LOG.info("VolumeScanner {} finished delayed Task after {}",
volumeScanner.toString(),
TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.MILLISECONDS));
}
@Override
public void shutdownCallBack(VolumeScanner volumeScanner) {
scannersToShutDown.add(volumeScanner);
}
@Override
public void terminationCallBack(VolumeScanner volumeScanner) {
scannersToShutDown.remove(volumeScanner);
}
public void waitForScanners() throws TimeoutException,
InterruptedException {
GenericTestUtils.waitFor(
() -> scannersToShutDown.isEmpty(), 10, 120000);
}
}
}