HDFS-8808. dfs.image.transfer.bandwidthPerSec should not apply to -bootstrapStandby. Contributed by Zhe Zhang.
Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java Change-Id: I99c314bb932b96a25f9115772267176e0b815614
This commit is contained in:
parent
235e84bf3d
commit
1c468c2346
|
@ -727,6 +727,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-9264. Minor cleanup of operations on FsVolumeList#volumes.
|
HDFS-9264. Minor cleanup of operations on FsVolumeList#volumes.
|
||||||
(Walter Su via lei)
|
(Walter Su via lei)
|
||||||
|
|
||||||
|
HDFS-8808. dfs.image.transfer.bandwidthPerSec should not apply to
|
||||||
|
-bootstrapStandby (zhz)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -507,6 +507,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
"dfs.image.transfer.bandwidthPerSec";
|
"dfs.image.transfer.bandwidthPerSec";
|
||||||
public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0; //no throttling
|
public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0; //no throttling
|
||||||
|
|
||||||
|
public static final String DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY =
|
||||||
|
"dfs.image.transfer-bootstrap-standby.bandwidthPerSec";
|
||||||
|
public static final long DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_DEFAULT =
|
||||||
|
0; //no throttling
|
||||||
|
|
||||||
// Image transfer timeout
|
// Image transfer timeout
|
||||||
public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
|
public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
|
||||||
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
|
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
|
||||||
|
|
|
@ -222,7 +222,7 @@ class Checkpointer extends Daemon {
|
||||||
"image with txid " + sig.mostRecentCheckpointTxId);
|
"image with txid " + sig.mostRecentCheckpointTxId);
|
||||||
MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
|
MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
|
||||||
backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId, bnStorage,
|
backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId, bnStorage,
|
||||||
true);
|
true, false);
|
||||||
bnImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
|
bnImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
|
||||||
sig.mostRecentCheckpointTxId, downloadedHash);
|
sig.mostRecentCheckpointTxId, downloadedHash);
|
||||||
lastApplied = sig.mostRecentCheckpointTxId;
|
lastApplied = sig.mostRecentCheckpointTxId;
|
||||||
|
|
|
@ -81,6 +81,7 @@ public class ImageServlet extends HttpServlet {
|
||||||
private static final String STORAGEINFO_PARAM = "storageInfo";
|
private static final String STORAGEINFO_PARAM = "storageInfo";
|
||||||
private static final String LATEST_FSIMAGE_VALUE = "latest";
|
private static final String LATEST_FSIMAGE_VALUE = "latest";
|
||||||
private static final String IMAGE_FILE_TYPE = "imageFile";
|
private static final String IMAGE_FILE_TYPE = "imageFile";
|
||||||
|
private static final String IS_BOOTSTRAP_STANDBY = "bootstrapstandby";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void doGet(final HttpServletRequest request,
|
public void doGet(final HttpServletRequest request,
|
||||||
|
@ -153,8 +154,10 @@ public class ImageServlet extends HttpServlet {
|
||||||
// detected by the client side as an inaccurate length header.
|
// detected by the client side as an inaccurate length header.
|
||||||
}
|
}
|
||||||
// send file
|
// send file
|
||||||
|
DataTransferThrottler throttler = parsedParams.isBootstrapStandby ?
|
||||||
|
getThrottlerForBootstrapStandby(conf) : getThrottler(conf);
|
||||||
TransferFsImage.copyFileToStream(response.getOutputStream(),
|
TransferFsImage.copyFileToStream(response.getOutputStream(),
|
||||||
file, fis, getThrottler(conf));
|
file, fis, throttler);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(fis);
|
IOUtils.closeStream(fis);
|
||||||
}
|
}
|
||||||
|
@ -211,7 +214,7 @@ public class ImageServlet extends HttpServlet {
|
||||||
* @param conf configuration
|
* @param conf configuration
|
||||||
* @return a data transfer throttler
|
* @return a data transfer throttler
|
||||||
*/
|
*/
|
||||||
public final static DataTransferThrottler getThrottler(Configuration conf) {
|
public static DataTransferThrottler getThrottler(Configuration conf) {
|
||||||
long transferBandwidth =
|
long transferBandwidth =
|
||||||
conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
|
conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
|
||||||
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
|
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
|
||||||
|
@ -222,6 +225,19 @@ public class ImageServlet extends HttpServlet {
|
||||||
return throttler;
|
return throttler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static DataTransferThrottler getThrottlerForBootstrapStandby(
|
||||||
|
Configuration conf) {
|
||||||
|
long transferBandwidth =
|
||||||
|
conf.getLong(
|
||||||
|
DFSConfigKeys.DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY,
|
||||||
|
DFSConfigKeys.DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_DEFAULT);
|
||||||
|
DataTransferThrottler throttler = null;
|
||||||
|
if (transferBandwidth > 0) {
|
||||||
|
throttler = new DataTransferThrottler(transferBandwidth);
|
||||||
|
}
|
||||||
|
return throttler;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static boolean isValidRequestor(ServletContext context, String remoteUser,
|
static boolean isValidRequestor(ServletContext context, String remoteUser,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
|
@ -295,13 +311,14 @@ public class ImageServlet extends HttpServlet {
|
||||||
}
|
}
|
||||||
|
|
||||||
static String getParamStringForImage(NameNodeFile nnf, long txid,
|
static String getParamStringForImage(NameNodeFile nnf, long txid,
|
||||||
StorageInfo remoteStorageInfo) {
|
StorageInfo remoteStorageInfo, boolean isBootstrapStandby) {
|
||||||
final String imageType = nnf == null ? "" : "&" + IMAGE_FILE_TYPE + "="
|
final String imageType = nnf == null ? "" : "&" + IMAGE_FILE_TYPE + "="
|
||||||
+ nnf.name();
|
+ nnf.name();
|
||||||
return "getimage=1&" + TXID_PARAM + "=" + txid
|
return "getimage=1&" + TXID_PARAM + "=" + txid
|
||||||
+ imageType
|
+ imageType
|
||||||
+ "&" + STORAGEINFO_PARAM + "=" +
|
+ "&" + STORAGEINFO_PARAM + "="
|
||||||
remoteStorageInfo.toColonSeparatedString();
|
+ remoteStorageInfo.toColonSeparatedString() + "&"
|
||||||
|
+ IS_BOOTSTRAP_STANDBY + "=" + isBootstrapStandby;
|
||||||
}
|
}
|
||||||
|
|
||||||
static String getParamStringForLog(RemoteEditLog log,
|
static String getParamStringForLog(RemoteEditLog log,
|
||||||
|
@ -319,6 +336,7 @@ public class ImageServlet extends HttpServlet {
|
||||||
private long startTxId, endTxId, txId;
|
private long startTxId, endTxId, txId;
|
||||||
private String storageInfoString;
|
private String storageInfoString;
|
||||||
private boolean fetchLatest;
|
private boolean fetchLatest;
|
||||||
|
private boolean isBootstrapStandby;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param request the object from which this servlet reads the url contents
|
* @param request the object from which this servlet reads the url contents
|
||||||
|
@ -330,7 +348,7 @@ public class ImageServlet extends HttpServlet {
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Map<String, String[]> pmap = request.getParameterMap();
|
Map<String, String[]> pmap = request.getParameterMap();
|
||||||
isGetImage = isGetEdit = fetchLatest = false;
|
isGetImage = isGetEdit = fetchLatest = isBootstrapStandby = false;
|
||||||
|
|
||||||
for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
|
for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
|
||||||
String key = entry.getKey();
|
String key = entry.getKey();
|
||||||
|
@ -342,6 +360,10 @@ public class ImageServlet extends HttpServlet {
|
||||||
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
|
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
|
||||||
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
|
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
|
||||||
.valueOf(imageType);
|
.valueOf(imageType);
|
||||||
|
String bootstrapStandby = ServletUtil.getParameter(request,
|
||||||
|
IS_BOOTSTRAP_STANDBY);
|
||||||
|
isBootstrapStandby = bootstrapStandby != null &&
|
||||||
|
Boolean.parseBoolean(bootstrapStandby);
|
||||||
} catch (NumberFormatException nfe) {
|
} catch (NumberFormatException nfe) {
|
||||||
if (request.getParameter(TXID_PARAM).equals(LATEST_FSIMAGE_VALUE)) {
|
if (request.getParameter(TXID_PARAM).equals(LATEST_FSIMAGE_VALUE)) {
|
||||||
fetchLatest = true;
|
fetchLatest = true;
|
||||||
|
|
|
@ -421,7 +421,7 @@ public class SecondaryNameNode implements Runnable,
|
||||||
LOG.info("Image has changed. Downloading updated image from NN.");
|
LOG.info("Image has changed. Downloading updated image from NN.");
|
||||||
MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
|
MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
|
||||||
nnHostPort, sig.mostRecentCheckpointTxId,
|
nnHostPort, sig.mostRecentCheckpointTxId,
|
||||||
dstImage.getStorage(), true);
|
dstImage.getStorage(), true, false);
|
||||||
dstImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
|
dstImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
|
||||||
sig.mostRecentCheckpointTxId, downloadedHash);
|
sig.mostRecentCheckpointTxId, downloadedHash);
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,9 +102,10 @@ public class TransferFsImage {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
|
public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
|
||||||
Storage dstStorage, boolean needDigest) throws IOException {
|
Storage dstStorage, boolean needDigest, boolean isBootstrapStandby)
|
||||||
|
throws IOException {
|
||||||
String fileid = ImageServlet.getParamStringForImage(null,
|
String fileid = ImageServlet.getParamStringForImage(null,
|
||||||
imageTxId, dstStorage);
|
imageTxId, dstStorage, isBootstrapStandby);
|
||||||
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
|
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
|
||||||
|
|
||||||
List<File> dstFiles = dstStorage.getFiles(
|
List<File> dstFiles = dstStorage.getFiles(
|
||||||
|
|
|
@ -317,7 +317,7 @@ public class BootstrapStandby implements Tool, Configurable {
|
||||||
|
|
||||||
// Download that checkpoint into our storage directories.
|
// Download that checkpoint into our storage directories.
|
||||||
MD5Hash hash = TransferFsImage.downloadImageToStorage(
|
MD5Hash hash = TransferFsImage.downloadImageToStorage(
|
||||||
otherHttpAddr, imageTxId, storage, true);
|
otherHttpAddr, imageTxId, storage, true, true);
|
||||||
image.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE, imageTxId,
|
image.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE, imageTxId,
|
||||||
hash);
|
hash);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
|
|
@ -1061,15 +1061,31 @@
|
||||||
<name>dfs.image.transfer.bandwidthPerSec</name>
|
<name>dfs.image.transfer.bandwidthPerSec</name>
|
||||||
<value>0</value>
|
<value>0</value>
|
||||||
<description>
|
<description>
|
||||||
Maximum bandwidth used for image transfer in bytes per second.
|
Maximum bandwidth used for regular image transfers (instead of
|
||||||
|
bootstrapping the standby namenode), in bytes per second.
|
||||||
This can help keep normal namenode operations responsive during
|
This can help keep normal namenode operations responsive during
|
||||||
checkpointing. The maximum bandwidth and timeout in
|
checkpointing. The maximum bandwidth and timeout in
|
||||||
dfs.image.transfer.timeout should be set such that normal image
|
dfs.image.transfer.timeout should be set such that normal image
|
||||||
transfers can complete successfully.
|
transfers can complete successfully.
|
||||||
A default value of 0 indicates that throttling is disabled.
|
A default value of 0 indicates that throttling is disabled.
|
||||||
|
The maximum bandwidth used for bootstrapping standby namenode is
|
||||||
|
configured with dfs.image.transfer-bootstrap-standby.bandwidthPerSec.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.image.transfer-bootstrap-standby.bandwidthPerSec</name>
|
||||||
|
<value>0</value>
|
||||||
|
<description>
|
||||||
|
Maximum bandwidth used for transferring image to bootstrap standby
|
||||||
|
namenode, in bytes per second.
|
||||||
|
A default value of 0 indicates that throttling is disabled. This default
|
||||||
|
value should be used in most cases, to ensure timely HA operations.
|
||||||
|
The maximum bandwidth used for regular image transfers is configured
|
||||||
|
with dfs.image.transfer.bandwidthPerSec.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.image.transfer.chunksize</name>
|
<name>dfs.image.transfer.chunksize</name>
|
||||||
<value>65536</value>
|
<value>65536</value>
|
||||||
|
|
|
@ -2004,7 +2004,7 @@ public class TestCheckpoint {
|
||||||
.when(dstImage).toColonSeparatedString();
|
.when(dstImage).toColonSeparatedString();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
TransferFsImage.downloadImageToStorage(fsName, 0, dstImage, false);
|
TransferFsImage.downloadImageToStorage(fsName, 0, dstImage, false, false);
|
||||||
fail("Storage info was not verified");
|
fail("Storage info was not verified");
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
String msg = StringUtils.stringifyException(ioe);
|
String msg = StringUtils.stringifyException(ioe);
|
||||||
|
|
|
@ -24,11 +24,14 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||||
|
@ -205,6 +208,64 @@ public class TestBootstrapStandby {
|
||||||
assertEquals(0, rc);
|
assertEquals(0, rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that bootstrapping standby NN is not limited by
|
||||||
|
* {@link DFSConfigKeys#DFS_IMAGE_TRANSFER_RATE_KEY}, but is limited by
|
||||||
|
* {@link DFSConfigKeys#DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY}
|
||||||
|
* created by HDFS-8808.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRateThrottling() throws Exception {
|
||||||
|
cluster.getConfiguration(0).setLong(
|
||||||
|
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 1);
|
||||||
|
cluster.restartNameNode(0);
|
||||||
|
cluster.waitActive();
|
||||||
|
nn0 = cluster.getNameNode(0);
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
// Each edit has at least 1 byte. So the lowRate definitely should cause
|
||||||
|
// a timeout, if enforced. If lowRate is not enforced, any reasonable test
|
||||||
|
// machine should at least download an image with 5 edits in 5 seconds.
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
nn0.getRpcServer().rollEditLog();
|
||||||
|
}
|
||||||
|
// A very low DFS_IMAGE_TRANSFER_RATE_KEY value won't affect bootstrapping
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
public Boolean get() {
|
||||||
|
try {
|
||||||
|
testSuccessfulBaseCase();
|
||||||
|
return true;
|
||||||
|
} catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 500, 5000);
|
||||||
|
|
||||||
|
shutdownCluster();
|
||||||
|
setupCluster();
|
||||||
|
cluster.getConfiguration(0).setLong(
|
||||||
|
DFSConfigKeys.DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY, 1);
|
||||||
|
cluster.restartNameNode(0);
|
||||||
|
cluster.waitActive();
|
||||||
|
nn0 = cluster.getNameNode(0);
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
// A very low DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY value should
|
||||||
|
// cause timeout
|
||||||
|
try {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
public Boolean get() {
|
||||||
|
try {
|
||||||
|
testSuccessfulBaseCase();
|
||||||
|
return true;
|
||||||
|
} catch (Exception e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 500, 5000);
|
||||||
|
fail("Did not timeout");
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
LOG.info("Encountered expected timeout.");
|
||||||
|
}
|
||||||
|
}
|
||||||
private void removeStandbyNameDirs() {
|
private void removeStandbyNameDirs() {
|
||||||
for (URI u : cluster.getNameDirs(1)) {
|
for (URI u : cluster.getNameDirs(1)) {
|
||||||
assertTrue(u.getScheme().equals("file"));
|
assertTrue(u.getScheme().equals("file"));
|
||||||
|
|
Loading…
Reference in New Issue