HDFS-8808. dfs.image.transfer.bandwidthPerSec should not apply to -bootstrapStandby. Contributed by Zhe Zhang.
This commit is contained in:
parent
d3a34a4f38
commit
ab3c4cff4a
|
@ -1563,6 +1563,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-9264. Minor cleanup of operations on FsVolumeList#volumes.
|
||||
(Walter Su via lei)
|
||||
|
||||
HDFS-8808. dfs.image.transfer.bandwidthPerSec should not apply to
|
||||
-bootstrapStandby (zhz)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -520,6 +520,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
"dfs.image.transfer.bandwidthPerSec";
|
||||
public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0; //no throttling
|
||||
|
||||
public static final String DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY =
|
||||
"dfs.image.transfer-bootstrap-standby.bandwidthPerSec";
|
||||
public static final long DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_DEFAULT =
|
||||
0; //no throttling
|
||||
|
||||
// Image transfer timeout
|
||||
public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
|
||||
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
|
||||
|
|
|
@ -222,7 +222,7 @@ class Checkpointer extends Daemon {
|
|||
"image with txid " + sig.mostRecentCheckpointTxId);
|
||||
MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
|
||||
backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId, bnStorage,
|
||||
true);
|
||||
true, false);
|
||||
bnImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
|
||||
sig.mostRecentCheckpointTxId, downloadedHash);
|
||||
lastApplied = sig.mostRecentCheckpointTxId;
|
||||
|
|
|
@ -82,6 +82,7 @@ public class ImageServlet extends HttpServlet {
|
|||
private static final String STORAGEINFO_PARAM = "storageInfo";
|
||||
private static final String LATEST_FSIMAGE_VALUE = "latest";
|
||||
private static final String IMAGE_FILE_TYPE = "imageFile";
|
||||
private static final String IS_BOOTSTRAP_STANDBY = "bootstrapstandby";
|
||||
|
||||
private SortedSet<ImageUploadRequest> currentlyDownloadingCheckpoints = Collections
|
||||
.<ImageUploadRequest> synchronizedSortedSet(new TreeSet<ImageUploadRequest>());
|
||||
|
@ -157,8 +158,10 @@ public class ImageServlet extends HttpServlet {
|
|||
// detected by the client side as an inaccurate length header.
|
||||
}
|
||||
// send file
|
||||
DataTransferThrottler throttler = parsedParams.isBootstrapStandby ?
|
||||
getThrottlerForBootstrapStandby(conf) : getThrottler(conf);
|
||||
TransferFsImage.copyFileToStream(response.getOutputStream(),
|
||||
file, fis, getThrottler(conf));
|
||||
file, fis, throttler);
|
||||
} finally {
|
||||
IOUtils.closeStream(fis);
|
||||
}
|
||||
|
@ -215,8 +218,8 @@ public class ImageServlet extends HttpServlet {
|
|||
* @param conf configuration
|
||||
* @return a data transfer throttler
|
||||
*/
|
||||
public final static DataTransferThrottler getThrottler(Configuration conf) {
|
||||
long transferBandwidth =
|
||||
public static DataTransferThrottler getThrottler(Configuration conf) {
|
||||
long transferBandwidth =
|
||||
conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
|
||||
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
|
||||
DataTransferThrottler throttler = null;
|
||||
|
@ -225,7 +228,20 @@ public class ImageServlet extends HttpServlet {
|
|||
}
|
||||
return throttler;
|
||||
}
|
||||
|
||||
|
||||
private static DataTransferThrottler getThrottlerForBootstrapStandby(
|
||||
Configuration conf) {
|
||||
long transferBandwidth =
|
||||
conf.getLong(
|
||||
DFSConfigKeys.DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY,
|
||||
DFSConfigKeys.DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_DEFAULT);
|
||||
DataTransferThrottler throttler = null;
|
||||
if (transferBandwidth > 0) {
|
||||
throttler = new DataTransferThrottler(transferBandwidth);
|
||||
}
|
||||
return throttler;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static boolean isValidRequestor(ServletContext context, String remoteUser,
|
||||
Configuration conf) throws IOException {
|
||||
|
@ -301,13 +317,14 @@ public class ImageServlet extends HttpServlet {
|
|||
}
|
||||
|
||||
static String getParamStringForImage(NameNodeFile nnf, long txid,
|
||||
StorageInfo remoteStorageInfo) {
|
||||
StorageInfo remoteStorageInfo, boolean isBootstrapStandby) {
|
||||
final String imageType = nnf == null ? "" : "&" + IMAGE_FILE_TYPE + "="
|
||||
+ nnf.name();
|
||||
return "getimage=1&" + TXID_PARAM + "=" + txid
|
||||
+ imageType
|
||||
+ "&" + STORAGEINFO_PARAM + "=" +
|
||||
remoteStorageInfo.toColonSeparatedString();
|
||||
+ "&" + STORAGEINFO_PARAM + "="
|
||||
+ remoteStorageInfo.toColonSeparatedString() + "&"
|
||||
+ IS_BOOTSTRAP_STANDBY + "=" + isBootstrapStandby;
|
||||
}
|
||||
|
||||
static String getParamStringForLog(RemoteEditLog log,
|
||||
|
@ -325,6 +342,7 @@ public class ImageServlet extends HttpServlet {
|
|||
private long startTxId, endTxId, txId;
|
||||
private String storageInfoString;
|
||||
private boolean fetchLatest;
|
||||
private boolean isBootstrapStandby;
|
||||
|
||||
/**
|
||||
* @param request the object from which this servlet reads the url contents
|
||||
|
@ -336,7 +354,7 @@ public class ImageServlet extends HttpServlet {
|
|||
) throws IOException {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, String[]> pmap = request.getParameterMap();
|
||||
isGetImage = isGetEdit = fetchLatest = false;
|
||||
isGetImage = isGetEdit = fetchLatest = isBootstrapStandby = false;
|
||||
|
||||
for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
|
@ -348,6 +366,10 @@ public class ImageServlet extends HttpServlet {
|
|||
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
|
||||
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
|
||||
.valueOf(imageType);
|
||||
String bootstrapStandby = ServletUtil.getParameter(request,
|
||||
IS_BOOTSTRAP_STANDBY);
|
||||
isBootstrapStandby = bootstrapStandby != null &&
|
||||
Boolean.parseBoolean(bootstrapStandby);
|
||||
} catch (NumberFormatException nfe) {
|
||||
if (request.getParameter(TXID_PARAM).equals(LATEST_FSIMAGE_VALUE)) {
|
||||
fetchLatest = true;
|
||||
|
|
|
@ -421,7 +421,7 @@ public class SecondaryNameNode implements Runnable,
|
|||
LOG.info("Image has changed. Downloading updated image from NN.");
|
||||
MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
|
||||
nnHostPort, sig.mostRecentCheckpointTxId,
|
||||
dstImage.getStorage(), true);
|
||||
dstImage.getStorage(), true, false);
|
||||
dstImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
|
||||
sig.mostRecentCheckpointTxId, downloadedHash);
|
||||
}
|
||||
|
|
|
@ -128,9 +128,10 @@ public class TransferFsImage {
|
|||
}
|
||||
|
||||
public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
|
||||
Storage dstStorage, boolean needDigest) throws IOException {
|
||||
Storage dstStorage, boolean needDigest, boolean isBootstrapStandby)
|
||||
throws IOException {
|
||||
String fileid = ImageServlet.getParamStringForImage(null,
|
||||
imageTxId, dstStorage);
|
||||
imageTxId, dstStorage, isBootstrapStandby);
|
||||
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
|
||||
|
||||
List<File> dstFiles = dstStorage.getFiles(
|
||||
|
|
|
@ -333,7 +333,7 @@ public class BootstrapStandby implements Tool, Configurable {
|
|||
|
||||
// Download that checkpoint into our storage directories.
|
||||
MD5Hash hash = TransferFsImage.downloadImageToStorage(
|
||||
proxyInfo.getHttpAddress(), imageTxId, storage, true);
|
||||
proxyInfo.getHttpAddress(), imageTxId, storage, true, true);
|
||||
image.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE, imageTxId,
|
||||
hash);
|
||||
} catch (IOException ioe) {
|
||||
|
|
|
@ -1073,15 +1073,31 @@
|
|||
<name>dfs.image.transfer.bandwidthPerSec</name>
|
||||
<value>0</value>
|
||||
<description>
|
||||
Maximum bandwidth used for image transfer in bytes per second.
|
||||
Maximum bandwidth used for regular image transfers (instead of
|
||||
bootstrapping the standby namenode), in bytes per second.
|
||||
This can help keep normal namenode operations responsive during
|
||||
checkpointing. The maximum bandwidth and timeout in
|
||||
dfs.image.transfer.timeout should be set such that normal image
|
||||
transfers can complete successfully.
|
||||
A default value of 0 indicates that throttling is disabled.
|
||||
A default value of 0 indicates that throttling is disabled.
|
||||
The maximum bandwidth used for bootstrapping standby namenode is
|
||||
configured with dfs.image.transfer-bootstrap-standby.bandwidthPerSec.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.image.transfer-bootstrap-standby.bandwidthPerSec</name>
|
||||
<value>0</value>
|
||||
<description>
|
||||
Maximum bandwidth used for transferring image to bootstrap standby
|
||||
namenode, in bytes per second.
|
||||
A default value of 0 indicates that throttling is disabled. This default
|
||||
value should be used in most cases, to ensure timely HA operations.
|
||||
The maximum bandwidth used for regular image transfers is configured
|
||||
with dfs.image.transfer.bandwidthPerSec.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.image.transfer.chunksize</name>
|
||||
<value>65536</value>
|
||||
|
|
|
@ -2002,7 +2002,7 @@ public class TestCheckpoint {
|
|||
.when(dstImage).toColonSeparatedString();
|
||||
|
||||
try {
|
||||
TransferFsImage.downloadImageToStorage(fsName, 0, dstImage, false);
|
||||
TransferFsImage.downloadImageToStorage(fsName, 0, dstImage, false, false);
|
||||
fail("Storage info was not verified");
|
||||
} catch (IOException ioe) {
|
||||
String msg = StringUtils.stringifyException(ioe);
|
||||
|
|
|
@ -24,11 +24,14 @@ import static org.junit.Assert.fail;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||
|
@ -212,6 +215,64 @@ public class TestBootstrapStandby {
|
|||
assertSuccessfulBootstrapFromIndex(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that bootstrapping standby NN is not limited by
|
||||
* {@link DFSConfigKeys#DFS_IMAGE_TRANSFER_RATE_KEY}, but is limited by
|
||||
* {@link DFSConfigKeys#DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY}
|
||||
* created by HDFS-8808.
|
||||
*/
|
||||
@Test
|
||||
public void testRateThrottling() throws Exception {
|
||||
cluster.getConfiguration(0).setLong(
|
||||
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 1);
|
||||
cluster.restartNameNode(0);
|
||||
cluster.waitActive();
|
||||
nn0 = cluster.getNameNode(0);
|
||||
cluster.transitionToActive(0);
|
||||
// Each edit has at least 1 byte. So the lowRate definitely should cause
|
||||
// a timeout, if enforced. If lowRate is not enforced, any reasonable test
|
||||
// machine should at least download an image with 5 edits in 5 seconds.
|
||||
for (int i = 0; i < 5; i++) {
|
||||
nn0.getRpcServer().rollEditLog();
|
||||
}
|
||||
// A very low DFS_IMAGE_TRANSFER_RATE_KEY value won't affect bootstrapping
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
public Boolean get() {
|
||||
try {
|
||||
testSuccessfulBaseCase();
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}, 500, 5000);
|
||||
|
||||
shutdownCluster();
|
||||
setupCluster();
|
||||
cluster.getConfiguration(0).setLong(
|
||||
DFSConfigKeys.DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY, 1);
|
||||
cluster.restartNameNode(0);
|
||||
cluster.waitActive();
|
||||
nn0 = cluster.getNameNode(0);
|
||||
cluster.transitionToActive(0);
|
||||
// A very low DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY value should
|
||||
// cause timeout
|
||||
try {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
public Boolean get() {
|
||||
try {
|
||||
testSuccessfulBaseCase();
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}, 500, 5000);
|
||||
fail("Did not timeout");
|
||||
} catch (TimeoutException e) {
|
||||
LOG.info("Encountered expected timeout.");
|
||||
}
|
||||
}
|
||||
private void removeStandbyNameDirs() {
|
||||
for (int i = 1; i < maxNNCount; i++) {
|
||||
for (URI u : cluster.getNameDirs(i)) {
|
||||
|
@ -249,4 +310,4 @@ public class TestBootstrapStandby {
|
|||
assertEquals(0, forceBootstrap(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue