HDFS-3405. Checkpointing should use HTTP POST or PUT instead of GET-GET to send merged fsimages. Contributed by Vinayakumar B.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1576154 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2014-03-11 00:34:16 +00:00
parent 5d2252b6dc
commit 1356761f0f
13 changed files with 1045 additions and 686 deletions

File diff suppressed because it is too large Load Diff

View File

@ -462,7 +462,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Image transfer timeout
public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 10 * 60 * 1000;
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
// Image transfer chunksize
public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
//Keys with no defaults
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";

View File

@ -42,7 +42,7 @@ import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -198,15 +198,16 @@ public class GetJournalEditServlet extends HttpServlet {
return;
}
editFile = elf.getFile();
GetImageServlet.setVerificationHeaders(response, editFile);
GetImageServlet.setFileNameHeaders(response, editFile);
ImageServlet.setVerificationHeadersForGet(response, editFile);
ImageServlet.setFileNameHeaders(response, editFile);
editFileIn = new FileInputStream(editFile);
}
DataTransferThrottler throttler = GetImageServlet.getThrottler(conf);
DataTransferThrottler throttler = ImageServlet.getThrottler(conf);
// send edits
TransferFsImage.getFileServer(response, editFile, editFileIn, throttler);
TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,
editFileIn, throttler);
} catch (Throwable t) {
String errMsg = "getedit failed. " + StringUtils.stringifyException(t);

View File

@ -263,8 +263,7 @@ class Checkpointer extends Daemon {
}
if(cpCmd.needToReturnImage()) {
TransferFsImage.uploadImageFromStorage(
backupNode.nnHttpAddress, getImageListenAddress(),
TransferFsImage.uploadImageFromStorage(backupNode.nnHttpAddress, conf,
bnStorage, NameNodeFile.IMAGE, txid);
}

View File

@ -19,11 +19,10 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.now;
import java.net.HttpURLConnection;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.URL;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
@ -32,7 +31,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -57,18 +55,21 @@ import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
/**
* This class is used in Namesystem's jetty to retrieve a file.
* This class is used in Namesystem's jetty to retrieve/upload a file
* Typically used by the Secondary NameNode to retrieve image and
* edit file for periodic checkpointing.
* edit file for periodic checkpointing in Non-HA deployments.
* Standby NameNode uses to upload checkpoints in HA deployments.
*/
@InterfaceAudience.Private
public class GetImageServlet extends HttpServlet {
public class ImageServlet extends HttpServlet {
public static final String PATH_SPEC = "/imagetransfer";
private static final long serialVersionUID = -7669068179452648952L;
private static final Log LOG = LogFactory.getLog(GetImageServlet.class);
private static final Log LOG = LogFactory.getLog(ImageServlet.class);
public final static String CONTENT_DISPOSITION = "Content-Disposition";
public final static String HADOOP_IMAGE_EDITS_HEADER = "X-Image-Edits-Name";
@ -85,8 +86,7 @@ public class GetImageServlet extends HttpServlet {
@Override
public void doGet(final HttpServletRequest request,
final HttpServletResponse response
) throws ServletException, IOException {
final HttpServletResponse response) throws ServletException, IOException {
try {
final ServletContext context = getServletContext();
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
@ -95,27 +95,8 @@ public class GetImageServlet extends HttpServlet {
.getAttribute(JspHelper.CURRENT_CONF);
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
if (UserGroupInformation.isSecurityEnabled() &&
!isValidRequestor(context, request.getUserPrincipal().getName(), conf)) {
response.sendError(HttpServletResponse.SC_FORBIDDEN,
"Only Namenode, Secondary Namenode, and administrators may access " +
"this servlet");
LOG.warn("Received non-NN/SNN/administrator request for image or edits from "
+ request.getUserPrincipal().getName() + " at " + request.getRemoteHost());
return;
}
String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
String theirStorageInfoString = parsedParams.getStorageInfoString();
if (theirStorageInfoString != null &&
!myStorageInfoString.equals(theirStorageInfoString)) {
response.sendError(HttpServletResponse.SC_FORBIDDEN,
"This namenode has storage info " + myStorageInfoString +
" but the secondary expected " + theirStorageInfoString);
LOG.warn("Received an invalid request file transfer request " +
"from a secondary with storage info " + theirStorageInfoString);
return;
}
validateRequest(context, conf, request, response, nnImage,
parsedParams.getStorageInfoString());
UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
@Override
@ -155,53 +136,6 @@ public class GetImageServlet extends HttpServlet {
long elapsed = now() - start;
metrics.addGetEdit(elapsed);
}
} else if (parsedParams.isPutImage()) {
final long txid = parsedParams.getTxId();
final NameNodeFile nnf = parsedParams.getNameNodeFile();
if (! currentlyDownloadingCheckpoints.add(txid)) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer is already in the process of uploading a" +
" checkpoint made at transaction ID " + txid);
return null;
}
try {
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer already uploaded an checkpoint " +
"for txid " + txid);
return null;
}
// We may have lost our ticket since last checkpoint, log in again, just in case
if (UserGroupInformation.isSecurityEnabled()) {
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
}
long start = now();
// issue a HTTP get request to download the new fsimage
MD5Hash downloadImageDigest = TransferFsImage
.downloadImageToStorage(parsedParams.getInfoServer(conf),
txid, nnImage.getStorage(), true);
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
downloadImageDigest);
if (nnf == NameNodeFile.IMAGE_ROLLBACK) {
NameNodeHttpServer.getNameNodeFromContext(context)
.getNamesystem().setCreatedRollbackImages(true);
}
if (metrics != null) { // Metrics non-null only when used inside name node
long elapsed = now() - start;
metrics.addPutImage(elapsed);
}
// Now that we have a new checkpoint, we might be able to
// remove some old ones.
nnImage.purgeOldStorage(nnf);
} finally {
currentlyDownloadingCheckpoints.remove(txid);
}
}
return null;
}
@ -209,7 +143,7 @@ public class GetImageServlet extends HttpServlet {
private void serveFile(File file) throws IOException {
FileInputStream fis = new FileInputStream(file);
try {
setVerificationHeaders(response, file);
setVerificationHeadersForGet(response, file);
setFileNameHeaders(response, file);
if (!file.exists()) {
// Potential race where the file was deleted while we were in the
@ -221,8 +155,8 @@ public class GetImageServlet extends HttpServlet {
// detected by the client side as an inaccurate length header.
}
// send file
TransferFsImage.getFileServer(response, file, fis,
getThrottler(conf));
TransferFsImage.copyFileToStream(response.getOutputStream(),
file, fis, getThrottler(conf));
} finally {
IOUtils.closeStream(fis);
}
@ -238,6 +172,35 @@ public class GetImageServlet extends HttpServlet {
}
}
private void validateRequest(ServletContext context, Configuration conf,
HttpServletRequest request, HttpServletResponse response,
FSImage nnImage, String theirStorageInfoString) throws IOException {
if (UserGroupInformation.isSecurityEnabled()
&& !isValidRequestor(context, request.getUserPrincipal().getName(),
conf)) {
String errorMsg = "Only Namenode, Secondary Namenode, and administrators may access "
+ "this servlet";
response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg);
LOG.warn("Received non-NN/SNN/administrator request for image or edits from "
+ request.getUserPrincipal().getName()
+ " at "
+ request.getRemoteHost());
throw new IOException(errorMsg);
}
String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
if (theirStorageInfoString != null
&& !myStorageInfoString.equals(theirStorageInfoString)) {
String errorMsg = "This namenode has storage info " + myStorageInfoString
+ " but the secondary expected " + theirStorageInfoString;
response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg);
LOG.warn("Received an invalid request file transfer request "
+ "from a secondary with storage info " + theirStorageInfoString);
throw new IOException(errorMsg);
}
}
public static void setFileNameHeaders(HttpServletResponse response,
File file) {
response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" +
@ -271,36 +234,33 @@ public class GetImageServlet extends HttpServlet {
Set<String> validRequestors = new HashSet<String>();
validRequestors.add(
SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), NameNode
.getAddress(conf).getHostName()));
validRequestors.add(
SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
validRequestors.add(SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
NameNode.getAddress(conf).getHostName()));
validRequestors.add(SecurityUtil.getServerPrincipal(
conf.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
SecondaryNameNode.getHttpAddress(conf).getHostName()));
if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) {
Configuration otherNnConf = HAUtil.getConfForOtherNode(conf);
validRequestors.add(
SecurityUtil.getServerPrincipal(otherNnConf
validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
NameNode.getAddress(otherNnConf).getHostName()));
}
for (String v : validRequestors) {
if (v != null && v.equals(remoteUser)) {
LOG.info("GetImageServlet allowing checkpointer: " + remoteUser);
LOG.info("ImageServlet allowing checkpointer: " + remoteUser);
return true;
}
}
if (HttpServer2.userHasAdministratorAccess(context, remoteUser)) {
LOG.info("GetImageServlet allowing administrator: " + remoteUser);
LOG.info("ImageServlet allowing administrator: " + remoteUser);
return true;
}
LOG.info("GetImageServlet rejecting: " + remoteUser);
LOG.info("ImageServlet rejecting: " + remoteUser);
return false;
}
@ -308,8 +268,8 @@ public class GetImageServlet extends HttpServlet {
* Set headers for content length, and, if available, md5.
* @throws IOException
*/
public static void setVerificationHeaders(HttpServletResponse response, File file)
throws IOException {
public static void setVerificationHeadersForGet(HttpServletResponse response,
File file) throws IOException {
response.setHeader(TransferFsImage.CONTENT_LENGTH,
String.valueOf(file.length()));
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
@ -340,29 +300,9 @@ public class GetImageServlet extends HttpServlet {
remoteStorageInfo.toColonSeparatedString();
}
static String getParamStringToPutImage(NameNodeFile nnf, long txid,
URL url, Storage storage) {
InetSocketAddress imageListenAddress = NetUtils.createSocketAddr(url
.getAuthority());
String machine = !imageListenAddress.isUnresolved()
&& imageListenAddress.getAddress().isAnyLocalAddress() ? null
: imageListenAddress.getHostName();
return "putimage=1" +
"&" + TXID_PARAM + "=" + txid +
"&" + IMAGE_FILE_TYPE + "=" + nnf.name() +
"&port=" + imageListenAddress.getPort() +
(machine != null ? "&machine=" + machine : "")
+ "&" + STORAGEINFO_PARAM + "=" +
storage.toColonSeparatedString();
}
static class GetImageParams {
private boolean isGetImage;
private boolean isGetEdit;
private boolean isPutImage;
private int remoteport;
private String machineName;
private NameNodeFile nnf;
private long startTxId, endTxId, txId;
private String storageInfoString;
@ -378,8 +318,7 @@ public class GetImageServlet extends HttpServlet {
) throws IOException {
@SuppressWarnings("unchecked")
Map<String, String[]> pmap = request.getParameterMap();
isGetImage = isGetEdit = isPutImage = fetchLatest = false;
remoteport = 0;
isGetImage = isGetEdit = fetchLatest = false;
for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
String key = entry.getKey();
@ -402,30 +341,13 @@ public class GetImageServlet extends HttpServlet {
isGetEdit = true;
startTxId = ServletUtil.parseLongParam(request, START_TXID_PARAM);
endTxId = ServletUtil.parseLongParam(request, END_TXID_PARAM);
} else if (key.equals("putimage")) {
isPutImage = true;
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
} else if (key.equals("port")) {
remoteport = new Integer(val[0]).intValue();
} else if (key.equals("machine")) {
machineName = val[0];
} else if (key.equals(STORAGEINFO_PARAM)) {
storageInfoString = val[0];
}
}
if (machineName == null) {
machineName = request.getRemoteHost();
if (InetAddresses.isInetAddress(machineName)) {
machineName = NetUtils.getHostNameOfIP(machineName);
}
}
int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
if ((numGets > 1) || (numGets == 0) && !isPutImage) {
if ((numGets > 1) || (numGets == 0)) {
throw new IOException("Illegal parameters to TransferFsImage");
}
}
@ -435,12 +357,12 @@ public class GetImageServlet extends HttpServlet {
}
public long getTxId() {
Preconditions.checkState(isGetImage || isPutImage);
Preconditions.checkState(isGetImage);
return txId;
}
public NameNodeFile getNameNodeFile() {
Preconditions.checkState(isPutImage || isGetImage);
Preconditions.checkState(isGetImage);
return nnf;
}
@ -462,20 +384,161 @@ public class GetImageServlet extends HttpServlet {
return isGetImage;
}
boolean isPutImage() {
return isPutImage;
}
URL getInfoServer(Configuration conf) throws IOException {
if (machineName == null || remoteport == 0) {
throw new IOException("MachineName and port undefined");
}
return new URL(DFSUtil.getHttpClientScheme(conf), machineName, remoteport, "");
}
boolean shouldFetchLatest() {
return fetchLatest;
}
}
/**
* Set headers for image length and if available, md5.
*
* @throws IOException
*/
static void setVerificationHeadersForPut(HttpURLConnection connection,
File file) throws IOException {
connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH,
String.valueOf(file.length()));
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
if (hash != null) {
connection
.setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString());
}
}
/**
* Set the required parameters for uploading image
*
* @param httpMethod instance of method to set the parameters
* @param storage colon separated storageInfo string
* @param txid txid of the image
* @param imageFileSize size of the imagefile to be uploaded
* @param nnf NameNodeFile Type
* @return Returns map of parameters to be used with PUT request.
*/
static Map<String, String> getParamsForPutImage(Storage storage, long txid,
long imageFileSize, NameNodeFile nnf) {
Map<String, String> params = new HashMap<String, String>();
params.put(TXID_PARAM, Long.toString(txid));
params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString());
// setting the length of the file to be uploaded in separate property as
// Content-Length only supports up to 2GB
params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize));
params.put(IMAGE_FILE_TYPE, nnf.name());
return params;
}
@Override
protected void doPut(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException {
try {
ServletContext context = getServletContext();
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
final Configuration conf = (Configuration) getServletContext()
.getAttribute(JspHelper.CURRENT_CONF);
final PutImageParams parsedParams = new PutImageParams(request, response,
conf);
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
validateRequest(context, conf, request, response, nnImage,
parsedParams.getStorageInfoString());
UserGroupInformation.getCurrentUser().doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
final long txid = parsedParams.getTxId();
final NameNodeFile nnf = parsedParams.getNameNodeFile();
if (!currentlyDownloadingCheckpoints.add(txid)) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer is already in the process of uploading a"
+ " checkpoint made at transaction ID " + txid);
return null;
}
try {
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer already uploaded an checkpoint "
+ "for txid " + txid);
return null;
}
InputStream stream = request.getInputStream();
try {
long start = now();
MD5Hash downloadImageDigest = TransferFsImage
.handleUploadImageRequest(request, txid,
nnImage.getStorage(), stream,
parsedParams.getFileSize(), getThrottler(conf));
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
downloadImageDigest);
// Metrics non-null only when used inside name node
if (metrics != null) {
long elapsed = now() - start;
metrics.addPutImage(elapsed);
}
// Now that we have a new checkpoint, we might be able to
// remove some old ones.
nnImage.purgeOldStorage(nnf);
} finally {
stream.close();
}
} finally {
currentlyDownloadingCheckpoints.remove(txid);
}
return null;
}
});
} catch (Throwable t) {
String errMsg = "PutImage failed. " + StringUtils.stringifyException(t);
response.sendError(HttpServletResponse.SC_GONE, errMsg);
throw new IOException(errMsg);
}
}
/*
* Params required to handle put image request
*/
static class PutImageParams {
private long txId = -1;
private String storageInfoString = null;
private long fileSize = 0L;
private NameNodeFile nnf;
public PutImageParams(HttpServletRequest request,
HttpServletResponse response, Configuration conf) throws IOException {
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM);
fileSize = ServletUtil.parseLongParam(request,
TransferFsImage.FILE_LENGTH);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
if (fileSize == 0 || txId == -1 || storageInfoString == null
|| storageInfoString.isEmpty()) {
throw new IOException("Illegal parameters to TransferFsImage");
}
}
public long getTxId() {
return txId;
}
public String getStorageInfoString() {
return storageInfoString;
}
public long getFileSize() {
return fileSize;
}
public NameNodeFile getNameNodeFile() {
return nnf;
}
}
}

View File

@ -233,8 +233,8 @@ public class NameNodeHttpServer {
CancelDelegationTokenServlet.class, true);
httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
true);
httpServer.addInternalServlet("getimage", "/getimage",
GetImageServlet.class, true);
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
ImageServlet.class, true);
httpServer.addInternalServlet("listPaths", "/listPaths/*",
ListPathsServlet.class, false);
httpServer.addInternalServlet("data", "/data/*",

View File

@ -114,7 +114,6 @@ public class SecondaryNameNode implements Runnable {
private InetSocketAddress nameNodeAddr;
private volatile boolean shouldRun;
private HttpServer2 infoServer;
private URL imageListenURL;
private Collection<URI> checkpointDirs;
private List<URI> checkpointEditsDirs;
@ -267,13 +266,11 @@ public class SecondaryNameNode implements Runnable {
infoServer.setAttribute("secondary.name.node", this);
infoServer.setAttribute("name.system.image", checkpointImage);
infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
infoServer.addInternalServlet("getimage", "/getimage",
GetImageServlet.class, true);
infoServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
ImageServlet.class, true);
infoServer.start();
LOG.info("Web server init done");
imageListenURL = new URL(DFSUtil.getHttpClientScheme(conf) + "://"
+ NetUtils.getHostPortString(infoServer.getConnectorAddress(0)));
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
int connIdx = 0;
@ -488,14 +485,6 @@ public class SecondaryNameNode implements Runnable {
return address.toURL();
}
/**
* Return the host:port of where this SecondaryNameNode is listening
* for image transfers
*/
private URL getImageListenAddress() {
return imageListenURL;
}
/**
* Create a new checkpoint
* @return if the image is fetched from primary or not
@ -555,8 +544,8 @@ public class SecondaryNameNode implements Runnable {
// to make this new uploaded image as the most current image.
//
long txid = checkpointImage.getLastAppliedTxId();
TransferFsImage.uploadImageFromStorage(fsName, getImageListenAddress(),
dstStorage, NameNodeFile.IMAGE, txid);
TransferFsImage.uploadImageFromStorage(fsName, conf, dstStorage,
NameNodeFile.IMAGE, txid);
// error simulation code for junit test
CheckpointFaultInjector.getInstance().afterSecondaryUploadsNewImage();

View File

@ -19,18 +19,22 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
@ -49,10 +53,12 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.Time;
import org.apache.http.client.utils.URIBuilder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@ -65,7 +71,12 @@ import com.google.common.collect.Lists;
public class TransferFsImage {
public final static String CONTENT_LENGTH = "Content-Length";
public final static String FILE_LENGTH = "File-Length";
public final static String MD5_HEADER = "X-MD5-Digest";
private final static String CONTENT_TYPE = "Content-Type";
private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
@VisibleForTesting
static int timeout = 0;
private static URLConnectionFactory connectionFactory;
@ -82,14 +93,14 @@ public class TransferFsImage {
public static void downloadMostRecentImageToDirectory(URL infoServer,
File dir) throws IOException {
String fileId = GetImageServlet.getParamStringForMostRecentImage();
String fileId = ImageServlet.getParamStringForMostRecentImage();
getFileClient(infoServer, fileId, Lists.newArrayList(dir),
null, false);
}
public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
Storage dstStorage, boolean needDigest) throws IOException {
String fileid = GetImageServlet.getParamStringForImage(null,
String fileid = ImageServlet.getParamStringForImage(null,
imageTxId, dstStorage);
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
@ -105,11 +116,30 @@ public class TransferFsImage {
return hash;
}
static MD5Hash handleUploadImageRequest(HttpServletRequest request,
long imageTxId, Storage dstStorage, InputStream stream,
long advertisedSize, DataTransferThrottler throttler) throws IOException {
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
if (dstFiles.isEmpty()) {
throw new IOException("No targets in destination storage!");
}
MD5Hash advertisedDigest = parseMD5Header(request);
MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
advertisedSize, advertisedDigest, fileName, stream, throttler);
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
+ dstFiles.get(0).length() + " bytes.");
return hash;
}
static void downloadEditsToStorage(URL fsName, RemoteEditLog log,
NNStorage dstStorage) throws IOException {
assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
"bad log: " + log;
String fileid = GetImageServlet.getParamStringForLog(
String fileid = ImageServlet.getParamStringForLog(
log, dstStorage);
String finalFileName = NNStorage.getFinalizedEditsFileName(
log.getStartTxId(), log.getEndTxId());
@ -159,22 +189,19 @@ public class TransferFsImage {
* Requests that the NameNode download an image from this node.
*
* @param fsName the http address for the remote NN
* @param myNNAddress the host/port where the local node is running an
* HTTPServer hosting GetImageServlet
* @param conf Configuration
* @param storage the storage directory to transfer the image from
* @param nnf the NameNodeFile type of the image
* @param txid the transaction ID of the image to be uploaded
*/
public static void uploadImageFromStorage(URL fsName, URL myNNAddress,
Storage storage, NameNodeFile nnf, long txid) throws IOException {
public static void uploadImageFromStorage(URL fsName, Configuration conf,
NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
String fileid = GetImageServlet.getParamStringToPutImage(nnf, txid,
myNNAddress, storage);
// this doesn't directly upload an image, but rather asks the NN
// to connect back to the 2NN to download the specified image.
URL url = new URL(fsName, ImageServlet.PATH_SPEC);
long startTime = Time.monotonicNow();
try {
TransferFsImage.getFileClient(fsName, fileid, null, null, false);
} catch (HttpGetFailedException e) {
uploadImage(url, conf, storage, nnf, txid);
} catch (HttpPutFailedException e) {
if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) {
// this is OK - this means that a previous attempt to upload
// this checkpoint succeeded even though we thought it failed.
@ -186,25 +213,105 @@ public class TransferFsImage {
throw e;
}
}
LOG.info("Uploaded image with txid " + txid + " to namenode at " +
fsName);
double xferSec = Math.max(
((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001);
LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName
+ " in " + xferSec + " seconds");
}
/*
* Uploads the imagefile using HTTP PUT method
*/
private static void uploadImage(URL url, Configuration conf,
NNStorage storage, NameNodeFile nnf, long txId) throws IOException {
File imageFile = storage.findImageFile(nnf, txId);
if (imageFile == null) {
throw new IOException("Could not find image with txid " + txId);
}
HttpURLConnection connection = null;
try {
URIBuilder uriBuilder = new URIBuilder(url.toURI());
// write all params for image upload request as query itself.
// Request body contains the image to be uploaded.
Map<String, String> params = ImageServlet.getParamsForPutImage(storage,
txId, imageFile.length(), nnf);
for (Entry<String, String> entry : params.entrySet()) {
uriBuilder.addParameter(entry.getKey(), entry.getValue());
}
URL urlWithParams = uriBuilder.build().toURL();
connection = (HttpURLConnection) connectionFactory.openConnection(
urlWithParams, UserGroupInformation.isSecurityEnabled());
// Set the request to PUT
connection.setRequestMethod("PUT");
connection.setDoOutput(true);
int chunkSize = conf.getInt(
DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT);
if (imageFile.length() > chunkSize) {
// using chunked streaming mode to support upload of 2GB+ files and to
// avoid internal buffering.
// this mode should be used only if more than chunkSize data is present
// to upload. otherwise upload may not happen sometimes.
connection.setChunkedStreamingMode(chunkSize);
}
setTimeout(connection);
// set headers for verification
ImageServlet.setVerificationHeadersForPut(connection, imageFile);
// Write the file to output stream.
writeFileToPutRequest(conf, connection, imageFile);
int responseCode = connection.getResponseCode();
if (responseCode != HttpURLConnection.HTTP_OK) {
throw new HttpPutFailedException(connection.getResponseMessage(),
responseCode);
}
} catch (AuthenticationException e) {
throw new IOException(e);
} catch (URISyntaxException e) {
throw new IOException(e);
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
private static void writeFileToPutRequest(Configuration conf,
HttpURLConnection connection, File imageFile)
throws FileNotFoundException, IOException {
connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
OutputStream output = connection.getOutputStream();
FileInputStream input = new FileInputStream(imageFile);
try {
copyFileToStream(output, imageFile, input,
ImageServlet.getThrottler(conf));
} finally {
IOUtils.closeStream(input);
IOUtils.closeStream(output);
}
}
/**
* A server-side method to respond to a getfile http request
* Copies the contents of the local file into the output stream.
*/
public static void getFileServer(ServletResponse response, File localfile,
FileInputStream infile,
DataTransferThrottler throttler)
public static void copyFileToStream(OutputStream out, File localfile,
FileInputStream infile, DataTransferThrottler throttler)
throws IOException {
byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
ServletOutputStream out = null;
try {
CheckpointFaultInjector.getInstance()
.aboutToSendFile(localfile);
out = response.getOutputStream();
if (CheckpointFaultInjector.getInstance().
shouldSendShortFile(localfile)) {
@ -250,14 +357,13 @@ public class TransferFsImage {
static MD5Hash getFileClient(URL infoServer,
String queryString, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
URL url = new URL(infoServer, "/getimage?" + queryString);
URL url = new URL(infoServer, ImageServlet.PATH_SPEC + "?" + queryString);
LOG.info("Opening connection to " + url);
return doGetUrl(url, localPaths, dstStorage, getChecksum);
}
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
long startTime = Time.monotonicNow();
HttpURLConnection connection;
try {
connection = (HttpURLConnection)
@ -266,16 +372,7 @@ public class TransferFsImage {
throw new IOException(e);
}
if (timeout <= 0) {
Configuration conf = new HdfsConfiguration();
timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
}
if (timeout > 0) {
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
}
setTimeout(connection);
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new HttpGetFailedException(
@ -293,10 +390,37 @@ public class TransferFsImage {
throw new IOException(CONTENT_LENGTH + " header is not provided " +
"by the namenode when trying to fetch " + url);
}
MD5Hash advertisedDigest = parseMD5Header(connection);
String fsImageName = connection
.getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
InputStream stream = connection.getInputStream();
return receiveFile(url.toExternalForm(), localPaths, dstStorage,
getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
null);
}
private static void setTimeout(HttpURLConnection connection) {
if (timeout <= 0) {
Configuration conf = new HdfsConfiguration();
timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
LOG.info("Image Transfer timeout configured to " + timeout
+ " milliseconds");
}
if (timeout > 0) {
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
}
}
private static MD5Hash receiveFile(String url, List<File> localPaths,
Storage dstStorage, boolean getChecksum, long advertisedSize,
MD5Hash advertisedDigest, String fsImageName, InputStream stream,
DataTransferThrottler throttler) throws IOException {
long startTime = Time.monotonicNow();
if (localPaths != null) {
String fsImageName = connection.getHeaderField(
GetImageServlet.HADOOP_IMAGE_EDITS_HEADER);
// If the local paths refer to directories, use the server-provided header
// as the filename within that directory
List<File> newLocalPaths = new ArrayList<File>();
@ -313,10 +437,8 @@ public class TransferFsImage {
localPaths = newLocalPaths;
}
MD5Hash advertisedDigest = parseMD5Header(connection);
long received = 0;
InputStream stream = connection.getInputStream();
MessageDigest digester = null;
if (getChecksum) {
digester = MD5Hash.getDigester();
@ -361,6 +483,9 @@ public class TransferFsImage {
for (FileOutputStream fos : outputStreams) {
fos.write(buf, 0, num);
}
if (throttler != null) {
throttler.throttle(num);
}
}
}
finishedReceiving = true;
@ -405,6 +530,11 @@ public class TransferFsImage {
return (header != null) ? new MD5Hash(header) : null;
}
private static MD5Hash parseMD5Header(HttpServletRequest request) {
String header = request.getHeader(MD5_HEADER);
return (header != null) ? new MD5Hash(header) : null;
}
public static class HttpGetFailedException extends IOException {
private static final long serialVersionUID = 1L;
private final int responseCode;
@ -419,4 +549,18 @@ public class TransferFsImage {
}
}
public static class HttpPutFailedException extends IOException {
private static final long serialVersionUID = 1L;
private final int responseCode;
HttpPutFailedException(String msg, int responseCode) throws IOException {
super(msg);
this.responseCode = responseCode;
}
public int getResponseCode() {
return responseCode;
}
}
}

View File

@ -63,6 +63,7 @@ public class StandbyCheckpointer {
private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class);
private static final long PREVENT_AFTER_CANCEL_MS = 2*60*1000L;
private final CheckpointConf checkpointConf;
private final Configuration conf;
private final FSNamesystem namesystem;
private long lastCheckpointTime;
private final CheckpointerThread thread;
@ -80,6 +81,7 @@ public class StandbyCheckpointer {
public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
throws IOException {
this.namesystem = ns;
this.conf = conf;
this.checkpointConf = new CheckpointConf(conf);
this.thread = new CheckpointerThread();
this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
@ -193,7 +195,7 @@ public class StandbyCheckpointer {
Future<Void> upload = executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
TransferFsImage.uploadImageFromStorage(activeNNAddress, myNNAddress,
TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
namesystem.getFSImage().getStorage(), imageType, txid);
return null;
}

View File

@ -858,15 +858,13 @@
<property>
<name>dfs.image.transfer.timeout</name>
<value>600000</value>
<value>60000</value>
<description>
Timeout for image transfer in milliseconds. This timeout and the related
Socket timeout for image transfer in milliseconds. This timeout and the related
dfs.image.transfer.bandwidthPerSec parameter should be configured such
that normal image transfer can complete within the timeout.
that normal image transfer can complete successfully.
This timeout prevents client hangs when the sender fails during
image transfer, which is particularly important during checkpointing.
Note that this timeout applies to the entirety of image transfer, and
is not a socket timeout.
image transfer. This is socket timeout during image tranfer.
</description>
</property>
@ -883,6 +881,16 @@
</description>
</property>
<property>
<name>dfs.image.transfer.chunksize</name>
<value>65536</value>
<description>
Chunksize in bytes to upload the checkpoint.
Chunked streaming is used to avoid internal buffering of contents
of image file of huge size.
</description>
</property>
<property>
<name>dfs.namenode.support.allow.format</name>
<value>true</value>

View File

@ -31,6 +31,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
@ -564,23 +565,9 @@ public class TestCheckpoint {
}
/**
* Simulate a secondary node failure to transfer image
* back to the name-node.
* Used to truncate primary fsimage file.
*/
@Test
public void testSecondaryFailsToReturnImage() throws IOException {
Mockito.doThrow(new IOException("If this exception is not caught by the " +
"name-node, fs image will be truncated."))
.when(faultInjector).aboutToSendFile(filePathContaining("secondary"));
doSecondaryFailsToReturnImage();
}
/**
* Similar to above test, but uses an unchecked Error, and causes it
* before even setting the length header. This used to cause image
* truncation. Regression test for HDFS-3330.
* Simulate a secondary node failure to transfer image. Uses an unchecked
* error and fail transfer before even setting the length header. This used to
* cause image truncation. Regression test for HDFS-3330.
*/
@Test
public void testSecondaryFailsWithErrorBeforeSettingHeaders()
@ -1979,6 +1966,13 @@ public class TestCheckpoint {
.when(dstImage).getFiles(
Mockito.<NameNodeDirType>anyObject(), Mockito.anyString());
File mockImageFile = File.createTempFile("image", "");
FileOutputStream imageFile = new FileOutputStream(mockImageFile);
imageFile.write("data".getBytes());
imageFile.close();
Mockito.doReturn(mockImageFile).when(dstImage)
.findImageFile(Mockito.any(NameNodeFile.class), Mockito.anyLong());
Mockito.doReturn(new StorageInfo(1, 1, "X", 1, NodeType.NAME_NODE).toColonSeparatedString())
.when(dstImage).toColonSeparatedString();
@ -1999,8 +1993,8 @@ public class TestCheckpoint {
}
try {
TransferFsImage.uploadImageFromStorage(fsName, new URL(
"http://localhost:1234"), dstImage, NameNodeFile.IMAGE, 0);
TransferFsImage.uploadImageFromStorage(fsName, conf, dstImage,
NameNodeFile.IMAGE, 0);
fail("Storage info was not verified");
} catch (IOException ioe) {
String msg = StringUtils.stringifyException(ioe);

View File

@ -69,7 +69,7 @@ public class TestGetImageServlet {
Mockito.when(context.getAttribute(HttpServer2.ADMINS_ACL)).thenReturn(acls);
// Make sure that NN2 is considered a valid fsimage/edits requestor.
assertTrue(GetImageServlet.isValidRequestor(context,
assertTrue(ImageServlet.isValidRequestor(context,
"hdfs/host2@TEST-REALM.COM", conf));
// Mark atm as an admin.
@ -81,15 +81,15 @@ public class TestGetImageServlet {
}))).thenReturn(true);
// Make sure that NN2 is still considered a valid requestor.
assertTrue(GetImageServlet.isValidRequestor(context,
assertTrue(ImageServlet.isValidRequestor(context,
"hdfs/host2@TEST-REALM.COM", conf));
// Make sure an admin is considered a valid requestor.
assertTrue(GetImageServlet.isValidRequestor(context,
assertTrue(ImageServlet.isValidRequestor(context,
"atm@TEST-REALM.COM", conf));
// Make sure other users are *not* considered valid requestors.
assertFalse(GetImageServlet.isValidRequestor(context,
assertFalse(ImageServlet.isValidRequestor(context,
"todd@TEST-REALM.COM", conf));
}
}

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.URL;
@ -34,9 +35,11 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.HttpServerFunctionalTest;
import org.apache.hadoop.test.PathUtils;
@ -118,10 +121,11 @@ public class TestTransferFsImage {
* Test to verify the read timeout
*/
@Test(timeout = 5000)
public void testImageTransferTimeout() throws Exception {
public void testGetImageTimeout() throws Exception {
HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
try {
testServer.addServlet("GetImage", "/getimage", TestGetImageServlet.class);
testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
TestImageTransferServlet.class);
testServer.start();
URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
TransferFsImage.timeout = 2000;
@ -139,7 +143,48 @@ public class TestTransferFsImage {
}
}
public static class TestGetImageServlet extends HttpServlet {
/**
* Test to verify the timeout of Image upload
*/
@Test(timeout = 10000)
public void testImageUploadTimeout() throws Exception {
Configuration conf = new HdfsConfiguration();
NNStorage mockStorage = Mockito.mock(NNStorage.class);
HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
try {
testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
TestImageTransferServlet.class);
testServer.start();
URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
// set the timeout here, otherwise it will take default.
TransferFsImage.timeout = 2000;
File tmpDir = new File(new FileSystemTestHelper().getTestRootDir());
tmpDir.mkdirs();
File mockImageFile = File.createTempFile("image", "", tmpDir);
FileOutputStream imageFile = new FileOutputStream(mockImageFile);
imageFile.write("data".getBytes());
imageFile.close();
Mockito.when(
mockStorage.findImageFile(Mockito.any(NameNodeFile.class),
Mockito.anyLong())).thenReturn(mockImageFile);
Mockito.when(mockStorage.toColonSeparatedString()).thenReturn(
"storage:info:string");
try {
TransferFsImage.uploadImageFromStorage(serverURL, conf, mockStorage,
NameNodeFile.IMAGE, 1L);
fail("TransferImage Should fail with timeout");
} catch (SocketTimeoutException e) {
assertEquals("Upload should timeout", "Read timed out", e.getMessage());
}
} finally {
testServer.stop();
}
}
public static class TestImageTransferServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
@Override
@ -153,5 +198,17 @@ public class TestTransferFsImage {
}
}
}
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
synchronized (this) {
try {
wait(5000);
} catch (InterruptedException e) {
// Ignore
}
}
}
}
}