HDFS-3405. Checkpointing should use HTTP POST or PUT instead of GET-GET to send merged fsimages. Contributed by Vinayakumar B.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1576157 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2014-03-11 00:44:56 +00:00
parent 0697a50337
commit 0f7d99268c
13 changed files with 525 additions and 261 deletions

View File

@ -149,6 +149,9 @@ Release 2.4.0 - UNRELEASED
HDFS-6055. Change default configuration to limit file name length in HDFS.
(cnauroth)
HDFS-3405. Checkpointing should use HTTP POST or PUT instead of GET-GET
to send merged fsimages. (Vinayakumar B via wang)
OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

View File

@ -462,7 +462,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Image transfer timeout
public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 10 * 60 * 1000;
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
// Image transfer chunksize
public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
//Keys with no defaults
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";

View File

@ -42,7 +42,7 @@
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -198,15 +198,16 @@ public void doGet(final HttpServletRequest request,
return;
}
editFile = elf.getFile();
GetImageServlet.setVerificationHeaders(response, editFile);
GetImageServlet.setFileNameHeaders(response, editFile);
ImageServlet.setVerificationHeadersForGet(response, editFile);
ImageServlet.setFileNameHeaders(response, editFile);
editFileIn = new FileInputStream(editFile);
}
DataTransferThrottler throttler = GetImageServlet.getThrottler(conf);
DataTransferThrottler throttler = ImageServlet.getThrottler(conf);
// send edits
TransferFsImage.getFileServer(response, editFile, editFileIn, throttler);
TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,
editFileIn, throttler);
} catch (Throwable t) {
String errMsg = "getedit failed. " + StringUtils.stringifyException(t);

View File

@ -263,8 +263,7 @@ void doCheckpoint() throws IOException {
}
if(cpCmd.needToReturnImage()) {
TransferFsImage.uploadImageFromStorage(
backupNode.nnHttpAddress, getImageListenAddress(),
TransferFsImage.uploadImageFromStorage(backupNode.nnHttpAddress, conf,
bnStorage, NameNodeFile.IMAGE, txid);
}

View File

@ -19,11 +19,10 @@
import static org.apache.hadoop.util.Time.now;
import java.net.HttpURLConnection;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.URL;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
@ -32,7 +31,6 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -57,18 +55,21 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
/**
* This class is used in Namesystem's jetty to retrieve a file.
* This class is used in Namesystem's jetty to retrieve/upload a file
* Typically used by the Secondary NameNode to retrieve image and
* edit file for periodic checkpointing.
* edit file for periodic checkpointing in Non-HA deployments.
* Standby NameNode uses to upload checkpoints in HA deployments.
*/
@InterfaceAudience.Private
public class GetImageServlet extends HttpServlet {
public class ImageServlet extends HttpServlet {
public static final String PATH_SPEC = "/imagetransfer";
private static final long serialVersionUID = -7669068179452648952L;
private static final Log LOG = LogFactory.getLog(GetImageServlet.class);
private static final Log LOG = LogFactory.getLog(ImageServlet.class);
public final static String CONTENT_DISPOSITION = "Content-Disposition";
public final static String HADOOP_IMAGE_EDITS_HEADER = "X-Image-Edits-Name";
@ -85,8 +86,7 @@ public class GetImageServlet extends HttpServlet {
@Override
public void doGet(final HttpServletRequest request,
final HttpServletResponse response
) throws ServletException, IOException {
final HttpServletResponse response) throws ServletException, IOException {
try {
final ServletContext context = getServletContext();
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
@ -95,27 +95,8 @@ public void doGet(final HttpServletRequest request,
.getAttribute(JspHelper.CURRENT_CONF);
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
if (UserGroupInformation.isSecurityEnabled() &&
!isValidRequestor(context, request.getUserPrincipal().getName(), conf)) {
response.sendError(HttpServletResponse.SC_FORBIDDEN,
"Only Namenode, Secondary Namenode, and administrators may access " +
"this servlet");
LOG.warn("Received non-NN/SNN/administrator request for image or edits from "
+ request.getUserPrincipal().getName() + " at " + request.getRemoteHost());
return;
}
String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
String theirStorageInfoString = parsedParams.getStorageInfoString();
if (theirStorageInfoString != null &&
!myStorageInfoString.equals(theirStorageInfoString)) {
response.sendError(HttpServletResponse.SC_FORBIDDEN,
"This namenode has storage info " + myStorageInfoString +
" but the secondary expected " + theirStorageInfoString);
LOG.warn("Received an invalid request file transfer request " +
"from a secondary with storage info " + theirStorageInfoString);
return;
}
validateRequest(context, conf, request, response, nnImage,
parsedParams.getStorageInfoString());
UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
@Override
@ -155,53 +136,6 @@ public Void run() throws Exception {
long elapsed = now() - start;
metrics.addGetEdit(elapsed);
}
} else if (parsedParams.isPutImage()) {
final long txid = parsedParams.getTxId();
final NameNodeFile nnf = parsedParams.getNameNodeFile();
if (! currentlyDownloadingCheckpoints.add(txid)) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer is already in the process of uploading a" +
" checkpoint made at transaction ID " + txid);
return null;
}
try {
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer already uploaded an checkpoint " +
"for txid " + txid);
return null;
}
// We may have lost our ticket since last checkpoint, log in again, just in case
if (UserGroupInformation.isSecurityEnabled()) {
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
}
long start = now();
// issue a HTTP get request to download the new fsimage
MD5Hash downloadImageDigest = TransferFsImage
.downloadImageToStorage(parsedParams.getInfoServer(conf),
txid, nnImage.getStorage(), true);
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
downloadImageDigest);
if (nnf == NameNodeFile.IMAGE_ROLLBACK) {
NameNodeHttpServer.getNameNodeFromContext(context)
.getNamesystem().setCreatedRollbackImages(true);
}
if (metrics != null) { // Metrics non-null only when used inside name node
long elapsed = now() - start;
metrics.addPutImage(elapsed);
}
// Now that we have a new checkpoint, we might be able to
// remove some old ones.
nnImage.purgeOldStorage(nnf);
} finally {
currentlyDownloadingCheckpoints.remove(txid);
}
}
return null;
}
@ -209,7 +143,7 @@ public Void run() throws Exception {
private void serveFile(File file) throws IOException {
FileInputStream fis = new FileInputStream(file);
try {
setVerificationHeaders(response, file);
setVerificationHeadersForGet(response, file);
setFileNameHeaders(response, file);
if (!file.exists()) {
// Potential race where the file was deleted while we were in the
@ -221,8 +155,8 @@ private void serveFile(File file) throws IOException {
// detected by the client side as an inaccurate length header.
}
// send file
TransferFsImage.getFileServer(response, file, fis,
getThrottler(conf));
TransferFsImage.copyFileToStream(response.getOutputStream(),
file, fis, getThrottler(conf));
} finally {
IOUtils.closeStream(fis);
}
@ -238,6 +172,35 @@ private void serveFile(File file) throws IOException {
}
}
private void validateRequest(ServletContext context, Configuration conf,
HttpServletRequest request, HttpServletResponse response,
FSImage nnImage, String theirStorageInfoString) throws IOException {
if (UserGroupInformation.isSecurityEnabled()
&& !isValidRequestor(context, request.getUserPrincipal().getName(),
conf)) {
String errorMsg = "Only Namenode, Secondary Namenode, and administrators may access "
+ "this servlet";
response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg);
LOG.warn("Received non-NN/SNN/administrator request for image or edits from "
+ request.getUserPrincipal().getName()
+ " at "
+ request.getRemoteHost());
throw new IOException(errorMsg);
}
String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
if (theirStorageInfoString != null
&& !myStorageInfoString.equals(theirStorageInfoString)) {
String errorMsg = "This namenode has storage info " + myStorageInfoString
+ " but the secondary expected " + theirStorageInfoString;
response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg);
LOG.warn("Received an invalid request file transfer request "
+ "from a secondary with storage info " + theirStorageInfoString);
throw new IOException(errorMsg);
}
}
public static void setFileNameHeaders(HttpServletResponse response,
File file) {
response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" +
@ -264,43 +227,40 @@ public final static DataTransferThrottler getThrottler(Configuration conf) {
@VisibleForTesting
static boolean isValidRequestor(ServletContext context, String remoteUser,
Configuration conf) throws IOException {
if(remoteUser == null) { // This really shouldn't happen...
if (remoteUser == null) { // This really shouldn't happen...
LOG.warn("Received null remoteUser while authorizing access to getImage servlet");
return false;
}
Set<String> validRequestors = new HashSet<String>();
validRequestors.add(
SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), NameNode
.getAddress(conf).getHostName()));
validRequestors.add(
SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
validRequestors.add(SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
NameNode.getAddress(conf).getHostName()));
validRequestors.add(SecurityUtil.getServerPrincipal(
conf.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
SecondaryNameNode.getHttpAddress(conf).getHostName()));
if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) {
Configuration otherNnConf = HAUtil.getConfForOtherNode(conf);
validRequestors.add(
SecurityUtil.getServerPrincipal(otherNnConf
validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
NameNode.getAddress(otherNnConf).getHostName()));
}
for(String v : validRequestors) {
if(v != null && v.equals(remoteUser)) {
LOG.info("GetImageServlet allowing checkpointer: " + remoteUser);
for (String v : validRequestors) {
if (v != null && v.equals(remoteUser)) {
LOG.info("ImageServlet allowing checkpointer: " + remoteUser);
return true;
}
}
if (HttpServer2.userHasAdministratorAccess(context, remoteUser)) {
LOG.info("GetImageServlet allowing administrator: " + remoteUser);
LOG.info("ImageServlet allowing administrator: " + remoteUser);
return true;
}
LOG.info("GetImageServlet rejecting: " + remoteUser);
LOG.info("ImageServlet rejecting: " + remoteUser);
return false;
}
@ -308,8 +268,8 @@ static boolean isValidRequestor(ServletContext context, String remoteUser,
* Set headers for content length, and, if available, md5.
* @throws IOException
*/
public static void setVerificationHeaders(HttpServletResponse response, File file)
throws IOException {
public static void setVerificationHeadersForGet(HttpServletResponse response,
File file) throws IOException {
response.setHeader(TransferFsImage.CONTENT_LENGTH,
String.valueOf(file.length()));
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
@ -340,29 +300,9 @@ static String getParamStringForLog(RemoteEditLog log,
remoteStorageInfo.toColonSeparatedString();
}
static String getParamStringToPutImage(NameNodeFile nnf, long txid,
URL url, Storage storage) {
InetSocketAddress imageListenAddress = NetUtils.createSocketAddr(url
.getAuthority());
String machine = !imageListenAddress.isUnresolved()
&& imageListenAddress.getAddress().isAnyLocalAddress() ? null
: imageListenAddress.getHostName();
return "putimage=1" +
"&" + TXID_PARAM + "=" + txid +
"&" + IMAGE_FILE_TYPE + "=" + nnf.name() +
"&port=" + imageListenAddress.getPort() +
(machine != null ? "&machine=" + machine : "")
+ "&" + STORAGEINFO_PARAM + "=" +
storage.toColonSeparatedString();
}
static class GetImageParams {
private boolean isGetImage;
private boolean isGetEdit;
private boolean isPutImage;
private int remoteport;
private String machineName;
private NameNodeFile nnf;
private long startTxId, endTxId, txId;
private String storageInfoString;
@ -378,8 +318,7 @@ public GetImageParams(HttpServletRequest request,
) throws IOException {
@SuppressWarnings("unchecked")
Map<String, String[]> pmap = request.getParameterMap();
isGetImage = isGetEdit = isPutImage = fetchLatest = false;
remoteport = 0;
isGetImage = isGetEdit = fetchLatest = false;
for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
String key = entry.getKey();
@ -402,30 +341,13 @@ public GetImageParams(HttpServletRequest request,
isGetEdit = true;
startTxId = ServletUtil.parseLongParam(request, START_TXID_PARAM);
endTxId = ServletUtil.parseLongParam(request, END_TXID_PARAM);
} else if (key.equals("putimage")) {
isPutImage = true;
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
} else if (key.equals("port")) {
remoteport = new Integer(val[0]).intValue();
} else if (key.equals("machine")) {
machineName = val[0];
} else if (key.equals(STORAGEINFO_PARAM)) {
storageInfoString = val[0];
}
}
if (machineName == null) {
machineName = request.getRemoteHost();
if (InetAddresses.isInetAddress(machineName)) {
machineName = NetUtils.getHostNameOfIP(machineName);
}
}
int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
if ((numGets > 1) || (numGets == 0) && !isPutImage) {
if ((numGets > 1) || (numGets == 0)) {
throw new IOException("Illegal parameters to TransferFsImage");
}
}
@ -435,12 +357,12 @@ public String getStorageInfoString() {
}
public long getTxId() {
Preconditions.checkState(isGetImage || isPutImage);
Preconditions.checkState(isGetImage);
return txId;
}
public NameNodeFile getNameNodeFile() {
Preconditions.checkState(isPutImage || isGetImage);
Preconditions.checkState(isGetImage);
return nnf;
}
@ -462,20 +384,161 @@ boolean isGetImage() {
return isGetImage;
}
boolean isPutImage() {
return isPutImage;
}
URL getInfoServer(Configuration conf) throws IOException {
if (machineName == null || remoteport == 0) {
throw new IOException("MachineName and port undefined");
}
return new URL(DFSUtil.getHttpClientScheme(conf), machineName, remoteport, "");
}
boolean shouldFetchLatest() {
return fetchLatest;
}
}
/**
* Set headers for image length and if available, md5.
*
* @throws IOException
*/
static void setVerificationHeadersForPut(HttpURLConnection connection,
File file) throws IOException {
connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH,
String.valueOf(file.length()));
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
if (hash != null) {
connection
.setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString());
}
}
/**
* Set the required parameters for uploading image
*
* @param httpMethod instance of method to set the parameters
* @param storage colon separated storageInfo string
* @param txid txid of the image
* @param imageFileSize size of the imagefile to be uploaded
* @param nnf NameNodeFile Type
* @return Returns map of parameters to be used with PUT request.
*/
static Map<String, String> getParamsForPutImage(Storage storage, long txid,
long imageFileSize, NameNodeFile nnf) {
Map<String, String> params = new HashMap<String, String>();
params.put(TXID_PARAM, Long.toString(txid));
params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString());
// setting the length of the file to be uploaded in separate property as
// Content-Length only supports up to 2GB
params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize));
params.put(IMAGE_FILE_TYPE, nnf.name());
return params;
}
@Override
protected void doPut(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException {
try {
ServletContext context = getServletContext();
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
final Configuration conf = (Configuration) getServletContext()
.getAttribute(JspHelper.CURRENT_CONF);
final PutImageParams parsedParams = new PutImageParams(request, response,
conf);
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
validateRequest(context, conf, request, response, nnImage,
parsedParams.getStorageInfoString());
UserGroupInformation.getCurrentUser().doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
final long txid = parsedParams.getTxId();
final NameNodeFile nnf = parsedParams.getNameNodeFile();
if (!currentlyDownloadingCheckpoints.add(txid)) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer is already in the process of uploading a"
+ " checkpoint made at transaction ID " + txid);
return null;
}
try {
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer already uploaded an checkpoint "
+ "for txid " + txid);
return null;
}
InputStream stream = request.getInputStream();
try {
long start = now();
MD5Hash downloadImageDigest = TransferFsImage
.handleUploadImageRequest(request, txid,
nnImage.getStorage(), stream,
parsedParams.getFileSize(), getThrottler(conf));
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
downloadImageDigest);
// Metrics non-null only when used inside name node
if (metrics != null) {
long elapsed = now() - start;
metrics.addPutImage(elapsed);
}
// Now that we have a new checkpoint, we might be able to
// remove some old ones.
nnImage.purgeOldStorage(nnf);
} finally {
stream.close();
}
} finally {
currentlyDownloadingCheckpoints.remove(txid);
}
return null;
}
});
} catch (Throwable t) {
String errMsg = "PutImage failed. " + StringUtils.stringifyException(t);
response.sendError(HttpServletResponse.SC_GONE, errMsg);
throw new IOException(errMsg);
}
}
/*
* Params required to handle put image request
*/
static class PutImageParams {
private long txId = -1;
private String storageInfoString = null;
private long fileSize = 0L;
private NameNodeFile nnf;
public PutImageParams(HttpServletRequest request,
HttpServletResponse response, Configuration conf) throws IOException {
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM);
fileSize = ServletUtil.parseLongParam(request,
TransferFsImage.FILE_LENGTH);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
if (fileSize == 0 || txId == -1 || storageInfoString == null
|| storageInfoString.isEmpty()) {
throw new IOException("Illegal parameters to TransferFsImage");
}
}
public long getTxId() {
return txId;
}
public String getStorageInfoString() {
return storageInfoString;
}
public long getFileSize() {
return fileSize;
}
public NameNodeFile getNameNodeFile() {
return nnf;
}
}
}

View File

@ -233,8 +233,8 @@ private static void setupServlets(HttpServer2 httpServer, Configuration conf) {
CancelDelegationTokenServlet.class, true);
httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
true);
httpServer.addInternalServlet("getimage", "/getimage",
GetImageServlet.class, true);
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
ImageServlet.class, true);
httpServer.addInternalServlet("listPaths", "/listPaths/*",
ListPathsServlet.class, false);
httpServer.addInternalServlet("data", "/data/*",

View File

@ -114,7 +114,6 @@ public class SecondaryNameNode implements Runnable {
private InetSocketAddress nameNodeAddr;
private volatile boolean shouldRun;
private HttpServer2 infoServer;
private URL imageListenURL;
private Collection<URI> checkpointDirs;
private List<URI> checkpointEditsDirs;
@ -267,13 +266,11 @@ private void initialize(final Configuration conf,
infoServer.setAttribute("secondary.name.node", this);
infoServer.setAttribute("name.system.image", checkpointImage);
infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
infoServer.addInternalServlet("getimage", "/getimage",
GetImageServlet.class, true);
infoServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
ImageServlet.class, true);
infoServer.start();
LOG.info("Web server init done");
imageListenURL = new URL(DFSUtil.getHttpClientScheme(conf) + "://"
+ NetUtils.getHostPortString(infoServer.getConnectorAddress(0)));
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
int connIdx = 0;
@ -488,14 +485,6 @@ private URL getInfoServer() throws IOException {
return address.toURL();
}
/**
* Return the host:port of where this SecondaryNameNode is listening
* for image transfers
*/
private URL getImageListenAddress() {
return imageListenURL;
}
/**
* Create a new checkpoint
* @return if the image is fetched from primary or not
@ -555,8 +544,8 @@ public boolean doCheckpoint() throws IOException {
// to make this new uploaded image as the most current image.
//
long txid = checkpointImage.getLastAppliedTxId();
TransferFsImage.uploadImageFromStorage(fsName, getImageListenAddress(),
dstStorage, NameNodeFile.IMAGE, txid);
TransferFsImage.uploadImageFromStorage(fsName, conf, dstStorage,
NameNodeFile.IMAGE, txid);
// error simulation code for junit test
CheckpointFaultInjector.getInstance().afterSecondaryUploadsNewImage();

View File

@ -19,18 +19,22 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
@ -49,10 +53,12 @@
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.Time;
import org.apache.http.client.utils.URIBuilder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@ -65,7 +71,12 @@
public class TransferFsImage {
public final static String CONTENT_LENGTH = "Content-Length";
public final static String FILE_LENGTH = "File-Length";
public final static String MD5_HEADER = "X-MD5-Digest";
private final static String CONTENT_TYPE = "Content-Type";
private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
@VisibleForTesting
static int timeout = 0;
private static URLConnectionFactory connectionFactory;
@ -82,14 +93,14 @@ public class TransferFsImage {
public static void downloadMostRecentImageToDirectory(URL infoServer,
File dir) throws IOException {
String fileId = GetImageServlet.getParamStringForMostRecentImage();
String fileId = ImageServlet.getParamStringForMostRecentImage();
getFileClient(infoServer, fileId, Lists.newArrayList(dir),
null, false);
}
public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
Storage dstStorage, boolean needDigest) throws IOException {
String fileid = GetImageServlet.getParamStringForImage(null,
String fileid = ImageServlet.getParamStringForImage(null,
imageTxId, dstStorage);
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
@ -105,11 +116,30 @@ public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
return hash;
}
static MD5Hash handleUploadImageRequest(HttpServletRequest request,
long imageTxId, Storage dstStorage, InputStream stream,
long advertisedSize, DataTransferThrottler throttler) throws IOException {
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
if (dstFiles.isEmpty()) {
throw new IOException("No targets in destination storage!");
}
MD5Hash advertisedDigest = parseMD5Header(request);
MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
advertisedSize, advertisedDigest, fileName, stream, throttler);
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
+ dstFiles.get(0).length() + " bytes.");
return hash;
}
static void downloadEditsToStorage(URL fsName, RemoteEditLog log,
NNStorage dstStorage) throws IOException {
assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
"bad log: " + log;
String fileid = GetImageServlet.getParamStringForLog(
String fileid = ImageServlet.getParamStringForLog(
log, dstStorage);
String finalFileName = NNStorage.getFinalizedEditsFileName(
log.getStartTxId(), log.getEndTxId());
@ -159,22 +189,19 @@ static void downloadEditsToStorage(URL fsName, RemoteEditLog log,
* Requests that the NameNode download an image from this node.
*
* @param fsName the http address for the remote NN
* @param myNNAddress the host/port where the local node is running an
* HTTPServer hosting GetImageServlet
* @param conf Configuration
* @param storage the storage directory to transfer the image from
* @param nnf the NameNodeFile type of the image
* @param txid the transaction ID of the image to be uploaded
*/
public static void uploadImageFromStorage(URL fsName, URL myNNAddress,
Storage storage, NameNodeFile nnf, long txid) throws IOException {
public static void uploadImageFromStorage(URL fsName, Configuration conf,
NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
String fileid = GetImageServlet.getParamStringToPutImage(nnf, txid,
myNNAddress, storage);
// this doesn't directly upload an image, but rather asks the NN
// to connect back to the 2NN to download the specified image.
URL url = new URL(fsName, ImageServlet.PATH_SPEC);
long startTime = Time.monotonicNow();
try {
TransferFsImage.getFileClient(fsName, fileid, null, null, false);
} catch (HttpGetFailedException e) {
uploadImage(url, conf, storage, nnf, txid);
} catch (HttpPutFailedException e) {
if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) {
// this is OK - this means that a previous attempt to upload
// this checkpoint succeeded even though we thought it failed.
@ -186,25 +213,105 @@ public static void uploadImageFromStorage(URL fsName, URL myNNAddress,
throw e;
}
}
LOG.info("Uploaded image with txid " + txid + " to namenode at " +
fsName);
double xferSec = Math.max(
((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001);
LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName
+ " in " + xferSec + " seconds");
}
/*
* Uploads the imagefile using HTTP PUT method
*/
private static void uploadImage(URL url, Configuration conf,
NNStorage storage, NameNodeFile nnf, long txId) throws IOException {
File imageFile = storage.findImageFile(nnf, txId);
if (imageFile == null) {
throw new IOException("Could not find image with txid " + txId);
}
HttpURLConnection connection = null;
try {
URIBuilder uriBuilder = new URIBuilder(url.toURI());
// write all params for image upload request as query itself.
// Request body contains the image to be uploaded.
Map<String, String> params = ImageServlet.getParamsForPutImage(storage,
txId, imageFile.length(), nnf);
for (Entry<String, String> entry : params.entrySet()) {
uriBuilder.addParameter(entry.getKey(), entry.getValue());
}
URL urlWithParams = uriBuilder.build().toURL();
connection = (HttpURLConnection) connectionFactory.openConnection(
urlWithParams, UserGroupInformation.isSecurityEnabled());
// Set the request to PUT
connection.setRequestMethod("PUT");
connection.setDoOutput(true);
int chunkSize = conf.getInt(
DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT);
if (imageFile.length() > chunkSize) {
// using chunked streaming mode to support upload of 2GB+ files and to
// avoid internal buffering.
// this mode should be used only if more than chunkSize data is present
// to upload. otherwise upload may not happen sometimes.
connection.setChunkedStreamingMode(chunkSize);
}
setTimeout(connection);
// set headers for verification
ImageServlet.setVerificationHeadersForPut(connection, imageFile);
// Write the file to output stream.
writeFileToPutRequest(conf, connection, imageFile);
int responseCode = connection.getResponseCode();
if (responseCode != HttpURLConnection.HTTP_OK) {
throw new HttpPutFailedException(connection.getResponseMessage(),
responseCode);
}
} catch (AuthenticationException e) {
throw new IOException(e);
} catch (URISyntaxException e) {
throw new IOException(e);
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
private static void writeFileToPutRequest(Configuration conf,
HttpURLConnection connection, File imageFile)
throws FileNotFoundException, IOException {
connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
OutputStream output = connection.getOutputStream();
FileInputStream input = new FileInputStream(imageFile);
try {
copyFileToStream(output, imageFile, input,
ImageServlet.getThrottler(conf));
} finally {
IOUtils.closeStream(input);
IOUtils.closeStream(output);
}
}
/**
* A server-side method to respond to a getfile http request
* Copies the contents of the local file into the output stream.
*/
public static void getFileServer(ServletResponse response, File localfile,
FileInputStream infile,
DataTransferThrottler throttler)
public static void copyFileToStream(OutputStream out, File localfile,
FileInputStream infile, DataTransferThrottler throttler)
throws IOException {
byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
ServletOutputStream out = null;
try {
CheckpointFaultInjector.getInstance()
.aboutToSendFile(localfile);
out = response.getOutputStream();
if (CheckpointFaultInjector.getInstance().
shouldSendShortFile(localfile)) {
@ -250,14 +357,13 @@ public static void getFileServer(ServletResponse response, File localfile,
static MD5Hash getFileClient(URL infoServer,
String queryString, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
URL url = new URL(infoServer, "/getimage?" + queryString);
URL url = new URL(infoServer, ImageServlet.PATH_SPEC + "?" + queryString);
LOG.info("Opening connection to " + url);
return doGetUrl(url, localPaths, dstStorage, getChecksum);
}
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
long startTime = Time.monotonicNow();
HttpURLConnection connection;
try {
connection = (HttpURLConnection)
@ -266,16 +372,7 @@ public static MD5Hash doGetUrl(URL url, List<File> localPaths,
throw new IOException(e);
}
if (timeout <= 0) {
Configuration conf = new HdfsConfiguration();
timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
}
if (timeout > 0) {
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
}
setTimeout(connection);
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new HttpGetFailedException(
@ -293,10 +390,37 @@ public static MD5Hash doGetUrl(URL url, List<File> localPaths,
throw new IOException(CONTENT_LENGTH + " header is not provided " +
"by the namenode when trying to fetch " + url);
}
MD5Hash advertisedDigest = parseMD5Header(connection);
String fsImageName = connection
.getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
InputStream stream = connection.getInputStream();
return receiveFile(url.toExternalForm(), localPaths, dstStorage,
getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
null);
}
private static void setTimeout(HttpURLConnection connection) {
if (timeout <= 0) {
Configuration conf = new HdfsConfiguration();
timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
LOG.info("Image Transfer timeout configured to " + timeout
+ " milliseconds");
}
if (timeout > 0) {
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
}
}
private static MD5Hash receiveFile(String url, List<File> localPaths,
Storage dstStorage, boolean getChecksum, long advertisedSize,
MD5Hash advertisedDigest, String fsImageName, InputStream stream,
DataTransferThrottler throttler) throws IOException {
long startTime = Time.monotonicNow();
if (localPaths != null) {
String fsImageName = connection.getHeaderField(
GetImageServlet.HADOOP_IMAGE_EDITS_HEADER);
// If the local paths refer to directories, use the server-provided header
// as the filename within that directory
List<File> newLocalPaths = new ArrayList<File>();
@ -313,10 +437,8 @@ public static MD5Hash doGetUrl(URL url, List<File> localPaths,
localPaths = newLocalPaths;
}
MD5Hash advertisedDigest = parseMD5Header(connection);
long received = 0;
InputStream stream = connection.getInputStream();
MessageDigest digester = null;
if (getChecksum) {
digester = MD5Hash.getDigester();
@ -361,6 +483,9 @@ public static MD5Hash doGetUrl(URL url, List<File> localPaths,
for (FileOutputStream fos : outputStreams) {
fos.write(buf, 0, num);
}
if (throttler != null) {
throttler.throttle(num);
}
}
}
finishedReceiving = true;
@ -405,6 +530,11 @@ private static MD5Hash parseMD5Header(HttpURLConnection connection) {
return (header != null) ? new MD5Hash(header) : null;
}
private static MD5Hash parseMD5Header(HttpServletRequest request) {
String header = request.getHeader(MD5_HEADER);
return (header != null) ? new MD5Hash(header) : null;
}
public static class HttpGetFailedException extends IOException {
private static final long serialVersionUID = 1L;
private final int responseCode;
@ -419,4 +549,18 @@ public int getResponseCode() {
}
}
public static class HttpPutFailedException extends IOException {
private static final long serialVersionUID = 1L;
private final int responseCode;
HttpPutFailedException(String msg, int responseCode) throws IOException {
super(msg);
this.responseCode = responseCode;
}
public int getResponseCode() {
return responseCode;
}
}
}

View File

@ -63,6 +63,7 @@ public class StandbyCheckpointer {
private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class);
private static final long PREVENT_AFTER_CANCEL_MS = 2*60*1000L;
private final CheckpointConf checkpointConf;
private final Configuration conf;
private final FSNamesystem namesystem;
private long lastCheckpointTime;
private final CheckpointerThread thread;
@ -80,6 +81,7 @@ public class StandbyCheckpointer {
public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
throws IOException {
this.namesystem = ns;
this.conf = conf;
this.checkpointConf = new CheckpointConf(conf);
this.thread = new CheckpointerThread();
this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
@ -193,7 +195,7 @@ private void doCheckpoint() throws InterruptedException, IOException {
Future<Void> upload = executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
TransferFsImage.uploadImageFromStorage(activeNNAddress, myNNAddress,
TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
namesystem.getFSImage().getStorage(), imageType, txid);
return null;
}

View File

@ -858,15 +858,13 @@
<property>
<name>dfs.image.transfer.timeout</name>
<value>600000</value>
<value>60000</value>
<description>
Timeout for image transfer in milliseconds. This timeout and the related
Socket timeout for image transfer in milliseconds. This timeout and the related
dfs.image.transfer.bandwidthPerSec parameter should be configured such
that normal image transfer can complete within the timeout.
that normal image transfer can complete successfully.
This timeout prevents client hangs when the sender fails during
image transfer, which is particularly important during checkpointing.
Note that this timeout applies to the entirety of image transfer, and
is not a socket timeout.
image transfer. This is socket timeout during image tranfer.
</description>
</property>
@ -883,6 +881,16 @@
</description>
</property>
<property>
<name>dfs.image.transfer.chunksize</name>
<value>65536</value>
<description>
Chunksize in bytes to upload the checkpoint.
Chunked streaming is used to avoid internal buffering of contents
of image file of huge size.
</description>
</property>
<property>
<name>dfs.namenode.support.allow.format</name>
<value>true</value>

View File

@ -31,6 +31,7 @@
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
@ -564,23 +565,9 @@ public void testSecondaryNamenodeError3() throws IOException {
}
/**
* Simulate a secondary node failure to transfer image
* back to the name-node.
* Used to truncate primary fsimage file.
*/
@Test
public void testSecondaryFailsToReturnImage() throws IOException {
Mockito.doThrow(new IOException("If this exception is not caught by the " +
"name-node, fs image will be truncated."))
.when(faultInjector).aboutToSendFile(filePathContaining("secondary"));
doSecondaryFailsToReturnImage();
}
/**
* Similar to above test, but uses an unchecked Error, and causes it
* before even setting the length header. This used to cause image
* truncation. Regression test for HDFS-3330.
* Simulate a secondary node failure to transfer image. Uses an unchecked
* error and fail transfer before even setting the length header. This used to
* cause image truncation. Regression test for HDFS-3330.
*/
@Test
public void testSecondaryFailsWithErrorBeforeSettingHeaders()
@ -1979,6 +1966,13 @@ public void testNamespaceVerifiedOnFileTransfer() throws IOException {
.when(dstImage).getFiles(
Mockito.<NameNodeDirType>anyObject(), Mockito.anyString());
File mockImageFile = File.createTempFile("image", "");
FileOutputStream imageFile = new FileOutputStream(mockImageFile);
imageFile.write("data".getBytes());
imageFile.close();
Mockito.doReturn(mockImageFile).when(dstImage)
.findImageFile(Mockito.any(NameNodeFile.class), Mockito.anyLong());
Mockito.doReturn(new StorageInfo(1, 1, "X", 1, NodeType.NAME_NODE).toColonSeparatedString())
.when(dstImage).toColonSeparatedString();
@ -1999,8 +1993,8 @@ public void testNamespaceVerifiedOnFileTransfer() throws IOException {
}
try {
TransferFsImage.uploadImageFromStorage(fsName, new URL(
"http://localhost:1234"), dstImage, NameNodeFile.IMAGE, 0);
TransferFsImage.uploadImageFromStorage(fsName, conf, dstImage,
NameNodeFile.IMAGE, 0);
fail("Storage info was not verified");
} catch (IOException ioe) {
String msg = StringUtils.stringifyException(ioe);

View File

@ -69,7 +69,7 @@ public void testIsValidRequestor() throws IOException {
Mockito.when(context.getAttribute(HttpServer2.ADMINS_ACL)).thenReturn(acls);
// Make sure that NN2 is considered a valid fsimage/edits requestor.
assertTrue(GetImageServlet.isValidRequestor(context,
assertTrue(ImageServlet.isValidRequestor(context,
"hdfs/host2@TEST-REALM.COM", conf));
// Mark atm as an admin.
@ -81,15 +81,15 @@ public boolean matches(Object argument) {
}))).thenReturn(true);
// Make sure that NN2 is still considered a valid requestor.
assertTrue(GetImageServlet.isValidRequestor(context,
assertTrue(ImageServlet.isValidRequestor(context,
"hdfs/host2@TEST-REALM.COM", conf));
// Make sure an admin is considered a valid requestor.
assertTrue(GetImageServlet.isValidRequestor(context,
assertTrue(ImageServlet.isValidRequestor(context,
"atm@TEST-REALM.COM", conf));
// Make sure other users are *not* considered valid requestors.
assertFalse(GetImageServlet.isValidRequestor(context,
assertFalse(ImageServlet.isValidRequestor(context,
"todd@TEST-REALM.COM", conf));
}
}

View File

@ -22,6 +22,7 @@
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.URL;
@ -34,9 +35,11 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.HttpServerFunctionalTest;
import org.apache.hadoop.test.PathUtils;
@ -118,10 +121,11 @@ public void testClientSideExceptionOnJustOneDir() throws IOException {
* Test to verify the read timeout
*/
@Test(timeout = 5000)
public void testImageTransferTimeout() throws Exception {
public void testGetImageTimeout() throws Exception {
HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
try {
testServer.addServlet("GetImage", "/getimage", TestGetImageServlet.class);
testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
TestImageTransferServlet.class);
testServer.start();
URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
TransferFsImage.timeout = 2000;
@ -139,7 +143,48 @@ public void testImageTransferTimeout() throws Exception {
}
}
public static class TestGetImageServlet extends HttpServlet {
/**
* Test to verify the timeout of Image upload
*/
@Test(timeout = 10000)
public void testImageUploadTimeout() throws Exception {
Configuration conf = new HdfsConfiguration();
NNStorage mockStorage = Mockito.mock(NNStorage.class);
HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
try {
testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
TestImageTransferServlet.class);
testServer.start();
URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
// set the timeout here, otherwise it will take default.
TransferFsImage.timeout = 2000;
File tmpDir = new File(new FileSystemTestHelper().getTestRootDir());
tmpDir.mkdirs();
File mockImageFile = File.createTempFile("image", "", tmpDir);
FileOutputStream imageFile = new FileOutputStream(mockImageFile);
imageFile.write("data".getBytes());
imageFile.close();
Mockito.when(
mockStorage.findImageFile(Mockito.any(NameNodeFile.class),
Mockito.anyLong())).thenReturn(mockImageFile);
Mockito.when(mockStorage.toColonSeparatedString()).thenReturn(
"storage:info:string");
try {
TransferFsImage.uploadImageFromStorage(serverURL, conf, mockStorage,
NameNodeFile.IMAGE, 1L);
fail("TransferImage Should fail with timeout");
} catch (SocketTimeoutException e) {
assertEquals("Upload should timeout", "Read timed out", e.getMessage());
}
} finally {
testServer.stop();
}
}
public static class TestImageTransferServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
@Override
@ -153,5 +198,17 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
}
}
}
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
synchronized (this) {
try {
wait(5000);
} catch (InterruptedException e) {
// Ignore
}
}
}
}
}