Revert bad HDFS-3405 merge from trunk to retry

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1576155 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2014-03-11 00:40:46 +00:00
parent 1356761f0f
commit 0697a50337
13 changed files with 686 additions and 1045 deletions

File diff suppressed because it is too large Load Diff

View File

@ -462,11 +462,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Image transfer timeout
public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
// Image transfer chunksize
public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 10 * 60 * 1000;
//Keys with no defaults
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";

View File

@ -42,7 +42,7 @@ import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -198,16 +198,15 @@ public class GetJournalEditServlet extends HttpServlet {
return;
}
editFile = elf.getFile();
ImageServlet.setVerificationHeadersForGet(response, editFile);
ImageServlet.setFileNameHeaders(response, editFile);
GetImageServlet.setVerificationHeaders(response, editFile);
GetImageServlet.setFileNameHeaders(response, editFile);
editFileIn = new FileInputStream(editFile);
}
DataTransferThrottler throttler = ImageServlet.getThrottler(conf);
DataTransferThrottler throttler = GetImageServlet.getThrottler(conf);
// send edits
TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,
editFileIn, throttler);
TransferFsImage.getFileServer(response, editFile, editFileIn, throttler);
} catch (Throwable t) {
String errMsg = "getedit failed. " + StringUtils.stringifyException(t);

View File

@ -263,7 +263,8 @@ class Checkpointer extends Daemon {
}
if(cpCmd.needToReturnImage()) {
TransferFsImage.uploadImageFromStorage(backupNode.nnHttpAddress, conf,
TransferFsImage.uploadImageFromStorage(
backupNode.nnHttpAddress, getImageListenAddress(),
bnStorage, NameNodeFile.IMAGE, txid);
}

View File

@ -19,10 +19,11 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.now;
import java.net.HttpURLConnection;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.URL;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
@ -31,6 +32,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -55,21 +57,18 @@ import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
/**
* This class is used in Namesystem's jetty to retrieve/upload a file
* This class is used in Namesystem's jetty to retrieve a file.
* Typically used by the Secondary NameNode to retrieve image and
* edit file for periodic checkpointing in Non-HA deployments.
* Standby NameNode uses to upload checkpoints in HA deployments.
* edit file for periodic checkpointing.
*/
@InterfaceAudience.Private
public class ImageServlet extends HttpServlet {
public static final String PATH_SPEC = "/imagetransfer";
public class GetImageServlet extends HttpServlet {
private static final long serialVersionUID = -7669068179452648952L;
private static final Log LOG = LogFactory.getLog(ImageServlet.class);
private static final Log LOG = LogFactory.getLog(GetImageServlet.class);
public final static String CONTENT_DISPOSITION = "Content-Disposition";
public final static String HADOOP_IMAGE_EDITS_HEADER = "X-Image-Edits-Name";
@ -86,7 +85,8 @@ public class ImageServlet extends HttpServlet {
@Override
public void doGet(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException {
final HttpServletResponse response
) throws ServletException, IOException {
try {
final ServletContext context = getServletContext();
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
@ -95,8 +95,27 @@ public class ImageServlet extends HttpServlet {
.getAttribute(JspHelper.CURRENT_CONF);
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
validateRequest(context, conf, request, response, nnImage,
parsedParams.getStorageInfoString());
if (UserGroupInformation.isSecurityEnabled() &&
!isValidRequestor(context, request.getUserPrincipal().getName(), conf)) {
response.sendError(HttpServletResponse.SC_FORBIDDEN,
"Only Namenode, Secondary Namenode, and administrators may access " +
"this servlet");
LOG.warn("Received non-NN/SNN/administrator request for image or edits from "
+ request.getUserPrincipal().getName() + " at " + request.getRemoteHost());
return;
}
String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
String theirStorageInfoString = parsedParams.getStorageInfoString();
if (theirStorageInfoString != null &&
!myStorageInfoString.equals(theirStorageInfoString)) {
response.sendError(HttpServletResponse.SC_FORBIDDEN,
"This namenode has storage info " + myStorageInfoString +
" but the secondary expected " + theirStorageInfoString);
LOG.warn("Received an invalid request file transfer request " +
"from a secondary with storage info " + theirStorageInfoString);
return;
}
UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
@Override
@ -136,6 +155,53 @@ public class ImageServlet extends HttpServlet {
long elapsed = now() - start;
metrics.addGetEdit(elapsed);
}
} else if (parsedParams.isPutImage()) {
final long txid = parsedParams.getTxId();
final NameNodeFile nnf = parsedParams.getNameNodeFile();
if (! currentlyDownloadingCheckpoints.add(txid)) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer is already in the process of uploading a" +
" checkpoint made at transaction ID " + txid);
return null;
}
try {
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer already uploaded an checkpoint " +
"for txid " + txid);
return null;
}
// We may have lost our ticket since last checkpoint, log in again, just in case
if (UserGroupInformation.isSecurityEnabled()) {
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
}
long start = now();
// issue a HTTP get request to download the new fsimage
MD5Hash downloadImageDigest = TransferFsImage
.downloadImageToStorage(parsedParams.getInfoServer(conf),
txid, nnImage.getStorage(), true);
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
downloadImageDigest);
if (nnf == NameNodeFile.IMAGE_ROLLBACK) {
NameNodeHttpServer.getNameNodeFromContext(context)
.getNamesystem().setCreatedRollbackImages(true);
}
if (metrics != null) { // Metrics non-null only when used inside name node
long elapsed = now() - start;
metrics.addPutImage(elapsed);
}
// Now that we have a new checkpoint, we might be able to
// remove some old ones.
nnImage.purgeOldStorage(nnf);
} finally {
currentlyDownloadingCheckpoints.remove(txid);
}
}
return null;
}
@ -143,7 +209,7 @@ public class ImageServlet extends HttpServlet {
private void serveFile(File file) throws IOException {
FileInputStream fis = new FileInputStream(file);
try {
setVerificationHeadersForGet(response, file);
setVerificationHeaders(response, file);
setFileNameHeaders(response, file);
if (!file.exists()) {
// Potential race where the file was deleted while we were in the
@ -155,8 +221,8 @@ public class ImageServlet extends HttpServlet {
// detected by the client side as an inaccurate length header.
}
// send file
TransferFsImage.copyFileToStream(response.getOutputStream(),
file, fis, getThrottler(conf));
TransferFsImage.getFileServer(response, file, fis,
getThrottler(conf));
} finally {
IOUtils.closeStream(fis);
}
@ -172,35 +238,6 @@ public class ImageServlet extends HttpServlet {
}
}
private void validateRequest(ServletContext context, Configuration conf,
HttpServletRequest request, HttpServletResponse response,
FSImage nnImage, String theirStorageInfoString) throws IOException {
if (UserGroupInformation.isSecurityEnabled()
&& !isValidRequestor(context, request.getUserPrincipal().getName(),
conf)) {
String errorMsg = "Only Namenode, Secondary Namenode, and administrators may access "
+ "this servlet";
response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg);
LOG.warn("Received non-NN/SNN/administrator request for image or edits from "
+ request.getUserPrincipal().getName()
+ " at "
+ request.getRemoteHost());
throw new IOException(errorMsg);
}
String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
if (theirStorageInfoString != null
&& !myStorageInfoString.equals(theirStorageInfoString)) {
String errorMsg = "This namenode has storage info " + myStorageInfoString
+ " but the secondary expected " + theirStorageInfoString;
response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg);
LOG.warn("Received an invalid request file transfer request "
+ "from a secondary with storage info " + theirStorageInfoString);
throw new IOException(errorMsg);
}
}
public static void setFileNameHeaders(HttpServletResponse response,
File file) {
response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" +
@ -227,40 +264,43 @@ public class ImageServlet extends HttpServlet {
@VisibleForTesting
static boolean isValidRequestor(ServletContext context, String remoteUser,
Configuration conf) throws IOException {
if (remoteUser == null) { // This really shouldn't happen...
if(remoteUser == null) { // This really shouldn't happen...
LOG.warn("Received null remoteUser while authorizing access to getImage servlet");
return false;
}
Set<String> validRequestors = new HashSet<String>();
validRequestors.add(SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
NameNode.getAddress(conf).getHostName()));
validRequestors.add(SecurityUtil.getServerPrincipal(
conf.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
validRequestors.add(
SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), NameNode
.getAddress(conf).getHostName()));
validRequestors.add(
SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
SecondaryNameNode.getHttpAddress(conf).getHostName()));
if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) {
Configuration otherNnConf = HAUtil.getConfForOtherNode(conf);
validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf
validRequestors.add(
SecurityUtil.getServerPrincipal(otherNnConf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
NameNode.getAddress(otherNnConf).getHostName()));
}
for (String v : validRequestors) {
if (v != null && v.equals(remoteUser)) {
LOG.info("ImageServlet allowing checkpointer: " + remoteUser);
for(String v : validRequestors) {
if(v != null && v.equals(remoteUser)) {
LOG.info("GetImageServlet allowing checkpointer: " + remoteUser);
return true;
}
}
if (HttpServer2.userHasAdministratorAccess(context, remoteUser)) {
LOG.info("ImageServlet allowing administrator: " + remoteUser);
LOG.info("GetImageServlet allowing administrator: " + remoteUser);
return true;
}
LOG.info("ImageServlet rejecting: " + remoteUser);
LOG.info("GetImageServlet rejecting: " + remoteUser);
return false;
}
@ -268,8 +308,8 @@ public class ImageServlet extends HttpServlet {
* Set headers for content length, and, if available, md5.
* @throws IOException
*/
public static void setVerificationHeadersForGet(HttpServletResponse response,
File file) throws IOException {
public static void setVerificationHeaders(HttpServletResponse response, File file)
throws IOException {
response.setHeader(TransferFsImage.CONTENT_LENGTH,
String.valueOf(file.length()));
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
@ -300,9 +340,29 @@ public class ImageServlet extends HttpServlet {
remoteStorageInfo.toColonSeparatedString();
}
static String getParamStringToPutImage(NameNodeFile nnf, long txid,
URL url, Storage storage) {
InetSocketAddress imageListenAddress = NetUtils.createSocketAddr(url
.getAuthority());
String machine = !imageListenAddress.isUnresolved()
&& imageListenAddress.getAddress().isAnyLocalAddress() ? null
: imageListenAddress.getHostName();
return "putimage=1" +
"&" + TXID_PARAM + "=" + txid +
"&" + IMAGE_FILE_TYPE + "=" + nnf.name() +
"&port=" + imageListenAddress.getPort() +
(machine != null ? "&machine=" + machine : "")
+ "&" + STORAGEINFO_PARAM + "=" +
storage.toColonSeparatedString();
}
static class GetImageParams {
private boolean isGetImage;
private boolean isGetEdit;
private boolean isPutImage;
private int remoteport;
private String machineName;
private NameNodeFile nnf;
private long startTxId, endTxId, txId;
private String storageInfoString;
@ -318,7 +378,8 @@ public class ImageServlet extends HttpServlet {
) throws IOException {
@SuppressWarnings("unchecked")
Map<String, String[]> pmap = request.getParameterMap();
isGetImage = isGetEdit = fetchLatest = false;
isGetImage = isGetEdit = isPutImage = fetchLatest = false;
remoteport = 0;
for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
String key = entry.getKey();
@ -341,13 +402,30 @@ public class ImageServlet extends HttpServlet {
isGetEdit = true;
startTxId = ServletUtil.parseLongParam(request, START_TXID_PARAM);
endTxId = ServletUtil.parseLongParam(request, END_TXID_PARAM);
} else if (key.equals("putimage")) {
isPutImage = true;
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
} else if (key.equals("port")) {
remoteport = new Integer(val[0]).intValue();
} else if (key.equals("machine")) {
machineName = val[0];
} else if (key.equals(STORAGEINFO_PARAM)) {
storageInfoString = val[0];
}
}
if (machineName == null) {
machineName = request.getRemoteHost();
if (InetAddresses.isInetAddress(machineName)) {
machineName = NetUtils.getHostNameOfIP(machineName);
}
}
int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
if ((numGets > 1) || (numGets == 0)) {
if ((numGets > 1) || (numGets == 0) && !isPutImage) {
throw new IOException("Illegal parameters to TransferFsImage");
}
}
@ -357,12 +435,12 @@ public class ImageServlet extends HttpServlet {
}
public long getTxId() {
Preconditions.checkState(isGetImage);
Preconditions.checkState(isGetImage || isPutImage);
return txId;
}
public NameNodeFile getNameNodeFile() {
Preconditions.checkState(isGetImage);
Preconditions.checkState(isPutImage || isGetImage);
return nnf;
}
@ -384,161 +462,20 @@ public class ImageServlet extends HttpServlet {
return isGetImage;
}
boolean isPutImage() {
return isPutImage;
}
URL getInfoServer(Configuration conf) throws IOException {
if (machineName == null || remoteport == 0) {
throw new IOException("MachineName and port undefined");
}
return new URL(DFSUtil.getHttpClientScheme(conf), machineName, remoteport, "");
}
boolean shouldFetchLatest() {
return fetchLatest;
}
}
/**
* Set headers for image length and if available, md5.
*
* @throws IOException
*/
static void setVerificationHeadersForPut(HttpURLConnection connection,
File file) throws IOException {
connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH,
String.valueOf(file.length()));
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
if (hash != null) {
connection
.setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString());
}
}
/**
* Set the required parameters for uploading image
*
* @param httpMethod instance of method to set the parameters
* @param storage colon separated storageInfo string
* @param txid txid of the image
* @param imageFileSize size of the imagefile to be uploaded
* @param nnf NameNodeFile Type
* @return Returns map of parameters to be used with PUT request.
*/
static Map<String, String> getParamsForPutImage(Storage storage, long txid,
long imageFileSize, NameNodeFile nnf) {
Map<String, String> params = new HashMap<String, String>();
params.put(TXID_PARAM, Long.toString(txid));
params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString());
// setting the length of the file to be uploaded in separate property as
// Content-Length only supports up to 2GB
params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize));
params.put(IMAGE_FILE_TYPE, nnf.name());
return params;
}
@Override
protected void doPut(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException {
try {
ServletContext context = getServletContext();
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
final Configuration conf = (Configuration) getServletContext()
.getAttribute(JspHelper.CURRENT_CONF);
final PutImageParams parsedParams = new PutImageParams(request, response,
conf);
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
validateRequest(context, conf, request, response, nnImage,
parsedParams.getStorageInfoString());
UserGroupInformation.getCurrentUser().doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
final long txid = parsedParams.getTxId();
final NameNodeFile nnf = parsedParams.getNameNodeFile();
if (!currentlyDownloadingCheckpoints.add(txid)) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer is already in the process of uploading a"
+ " checkpoint made at transaction ID " + txid);
return null;
}
try {
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer already uploaded an checkpoint "
+ "for txid " + txid);
return null;
}
InputStream stream = request.getInputStream();
try {
long start = now();
MD5Hash downloadImageDigest = TransferFsImage
.handleUploadImageRequest(request, txid,
nnImage.getStorage(), stream,
parsedParams.getFileSize(), getThrottler(conf));
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
downloadImageDigest);
// Metrics non-null only when used inside name node
if (metrics != null) {
long elapsed = now() - start;
metrics.addPutImage(elapsed);
}
// Now that we have a new checkpoint, we might be able to
// remove some old ones.
nnImage.purgeOldStorage(nnf);
} finally {
stream.close();
}
} finally {
currentlyDownloadingCheckpoints.remove(txid);
}
return null;
}
});
} catch (Throwable t) {
String errMsg = "PutImage failed. " + StringUtils.stringifyException(t);
response.sendError(HttpServletResponse.SC_GONE, errMsg);
throw new IOException(errMsg);
}
}
/*
* Params required to handle put image request
*/
static class PutImageParams {
private long txId = -1;
private String storageInfoString = null;
private long fileSize = 0L;
private NameNodeFile nnf;
public PutImageParams(HttpServletRequest request,
HttpServletResponse response, Configuration conf) throws IOException {
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM);
fileSize = ServletUtil.parseLongParam(request,
TransferFsImage.FILE_LENGTH);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
if (fileSize == 0 || txId == -1 || storageInfoString == null
|| storageInfoString.isEmpty()) {
throw new IOException("Illegal parameters to TransferFsImage");
}
}
public long getTxId() {
return txId;
}
public String getStorageInfoString() {
return storageInfoString;
}
public long getFileSize() {
return fileSize;
}
public NameNodeFile getNameNodeFile() {
return nnf;
}
}
}

View File

@ -233,8 +233,8 @@ public class NameNodeHttpServer {
CancelDelegationTokenServlet.class, true);
httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
true);
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
ImageServlet.class, true);
httpServer.addInternalServlet("getimage", "/getimage",
GetImageServlet.class, true);
httpServer.addInternalServlet("listPaths", "/listPaths/*",
ListPathsServlet.class, false);
httpServer.addInternalServlet("data", "/data/*",

View File

@ -114,6 +114,7 @@ public class SecondaryNameNode implements Runnable {
private InetSocketAddress nameNodeAddr;
private volatile boolean shouldRun;
private HttpServer2 infoServer;
private URL imageListenURL;
private Collection<URI> checkpointDirs;
private List<URI> checkpointEditsDirs;
@ -266,11 +267,13 @@ public class SecondaryNameNode implements Runnable {
infoServer.setAttribute("secondary.name.node", this);
infoServer.setAttribute("name.system.image", checkpointImage);
infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
infoServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
ImageServlet.class, true);
infoServer.addInternalServlet("getimage", "/getimage",
GetImageServlet.class, true);
infoServer.start();
LOG.info("Web server init done");
imageListenURL = new URL(DFSUtil.getHttpClientScheme(conf) + "://"
+ NetUtils.getHostPortString(infoServer.getConnectorAddress(0)));
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
int connIdx = 0;
@ -485,6 +488,14 @@ public class SecondaryNameNode implements Runnable {
return address.toURL();
}
/**
* Return the host:port of where this SecondaryNameNode is listening
* for image transfers
*/
private URL getImageListenAddress() {
return imageListenURL;
}
/**
* Create a new checkpoint
* @return if the image is fetched from primary or not
@ -544,8 +555,8 @@ public class SecondaryNameNode implements Runnable {
// to make this new uploaded image as the most current image.
//
long txid = checkpointImage.getLastAppliedTxId();
TransferFsImage.uploadImageFromStorage(fsName, conf, dstStorage,
NameNodeFile.IMAGE, txid);
TransferFsImage.uploadImageFromStorage(fsName, getImageListenAddress(),
dstStorage, NameNodeFile.IMAGE, txid);
// error simulation code for junit test
CheckpointFaultInjector.getInstance().afterSecondaryUploadsNewImage();

View File

@ -19,22 +19,18 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
@ -53,12 +49,10 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.Time;
import org.apache.http.client.utils.URIBuilder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@ -71,12 +65,7 @@ import com.google.common.collect.Lists;
public class TransferFsImage {
public final static String CONTENT_LENGTH = "Content-Length";
public final static String FILE_LENGTH = "File-Length";
public final static String MD5_HEADER = "X-MD5-Digest";
private final static String CONTENT_TYPE = "Content-Type";
private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
@VisibleForTesting
static int timeout = 0;
private static URLConnectionFactory connectionFactory;
@ -93,14 +82,14 @@ public class TransferFsImage {
public static void downloadMostRecentImageToDirectory(URL infoServer,
File dir) throws IOException {
String fileId = ImageServlet.getParamStringForMostRecentImage();
String fileId = GetImageServlet.getParamStringForMostRecentImage();
getFileClient(infoServer, fileId, Lists.newArrayList(dir),
null, false);
}
public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
Storage dstStorage, boolean needDigest) throws IOException {
String fileid = ImageServlet.getParamStringForImage(null,
String fileid = GetImageServlet.getParamStringForImage(null,
imageTxId, dstStorage);
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
@ -116,30 +105,11 @@ public class TransferFsImage {
return hash;
}
static MD5Hash handleUploadImageRequest(HttpServletRequest request,
long imageTxId, Storage dstStorage, InputStream stream,
long advertisedSize, DataTransferThrottler throttler) throws IOException {
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
if (dstFiles.isEmpty()) {
throw new IOException("No targets in destination storage!");
}
MD5Hash advertisedDigest = parseMD5Header(request);
MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
advertisedSize, advertisedDigest, fileName, stream, throttler);
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
+ dstFiles.get(0).length() + " bytes.");
return hash;
}
static void downloadEditsToStorage(URL fsName, RemoteEditLog log,
NNStorage dstStorage) throws IOException {
assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
"bad log: " + log;
String fileid = ImageServlet.getParamStringForLog(
String fileid = GetImageServlet.getParamStringForLog(
log, dstStorage);
String finalFileName = NNStorage.getFinalizedEditsFileName(
log.getStartTxId(), log.getEndTxId());
@ -189,19 +159,22 @@ public class TransferFsImage {
* Requests that the NameNode download an image from this node.
*
* @param fsName the http address for the remote NN
* @param conf Configuration
* @param myNNAddress the host/port where the local node is running an
* HTTPServer hosting GetImageServlet
* @param storage the storage directory to transfer the image from
* @param nnf the NameNodeFile type of the image
* @param txid the transaction ID of the image to be uploaded
*/
public static void uploadImageFromStorage(URL fsName, Configuration conf,
NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
public static void uploadImageFromStorage(URL fsName, URL myNNAddress,
Storage storage, NameNodeFile nnf, long txid) throws IOException {
URL url = new URL(fsName, ImageServlet.PATH_SPEC);
long startTime = Time.monotonicNow();
String fileid = GetImageServlet.getParamStringToPutImage(nnf, txid,
myNNAddress, storage);
// this doesn't directly upload an image, but rather asks the NN
// to connect back to the 2NN to download the specified image.
try {
uploadImage(url, conf, storage, nnf, txid);
} catch (HttpPutFailedException e) {
TransferFsImage.getFileClient(fsName, fileid, null, null, false);
} catch (HttpGetFailedException e) {
if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) {
// this is OK - this means that a previous attempt to upload
// this checkpoint succeeded even though we thought it failed.
@ -213,105 +186,25 @@ public class TransferFsImage {
throw e;
}
}
double xferSec = Math.max(
((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001);
LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName
+ " in " + xferSec + " seconds");
LOG.info("Uploaded image with txid " + txid + " to namenode at " +
fsName);
}
/*
* Uploads the imagefile using HTTP PUT method
*/
private static void uploadImage(URL url, Configuration conf,
NNStorage storage, NameNodeFile nnf, long txId) throws IOException {
File imageFile = storage.findImageFile(nnf, txId);
if (imageFile == null) {
throw new IOException("Could not find image with txid " + txId);
}
HttpURLConnection connection = null;
try {
URIBuilder uriBuilder = new URIBuilder(url.toURI());
// write all params for image upload request as query itself.
// Request body contains the image to be uploaded.
Map<String, String> params = ImageServlet.getParamsForPutImage(storage,
txId, imageFile.length(), nnf);
for (Entry<String, String> entry : params.entrySet()) {
uriBuilder.addParameter(entry.getKey(), entry.getValue());
}
URL urlWithParams = uriBuilder.build().toURL();
connection = (HttpURLConnection) connectionFactory.openConnection(
urlWithParams, UserGroupInformation.isSecurityEnabled());
// Set the request to PUT
connection.setRequestMethod("PUT");
connection.setDoOutput(true);
int chunkSize = conf.getInt(
DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT);
if (imageFile.length() > chunkSize) {
// using chunked streaming mode to support upload of 2GB+ files and to
// avoid internal buffering.
// this mode should be used only if more than chunkSize data is present
// to upload. otherwise upload may not happen sometimes.
connection.setChunkedStreamingMode(chunkSize);
}
setTimeout(connection);
// set headers for verification
ImageServlet.setVerificationHeadersForPut(connection, imageFile);
// Write the file to output stream.
writeFileToPutRequest(conf, connection, imageFile);
int responseCode = connection.getResponseCode();
if (responseCode != HttpURLConnection.HTTP_OK) {
throw new HttpPutFailedException(connection.getResponseMessage(),
responseCode);
}
} catch (AuthenticationException e) {
throw new IOException(e);
} catch (URISyntaxException e) {
throw new IOException(e);
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
private static void writeFileToPutRequest(Configuration conf,
HttpURLConnection connection, File imageFile)
throws FileNotFoundException, IOException {
connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
OutputStream output = connection.getOutputStream();
FileInputStream input = new FileInputStream(imageFile);
try {
copyFileToStream(output, imageFile, input,
ImageServlet.getThrottler(conf));
} finally {
IOUtils.closeStream(input);
IOUtils.closeStream(output);
}
}
/**
* A server-side method to respond to a getfile http request
* Copies the contents of the local file into the output stream.
*/
public static void copyFileToStream(OutputStream out, File localfile,
FileInputStream infile, DataTransferThrottler throttler)
public static void getFileServer(ServletResponse response, File localfile,
FileInputStream infile,
DataTransferThrottler throttler)
throws IOException {
byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
ServletOutputStream out = null;
try {
CheckpointFaultInjector.getInstance()
.aboutToSendFile(localfile);
out = response.getOutputStream();
if (CheckpointFaultInjector.getInstance().
shouldSendShortFile(localfile)) {
@ -357,13 +250,14 @@ public class TransferFsImage {
static MD5Hash getFileClient(URL infoServer,
String queryString, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
URL url = new URL(infoServer, ImageServlet.PATH_SPEC + "?" + queryString);
URL url = new URL(infoServer, "/getimage?" + queryString);
LOG.info("Opening connection to " + url);
return doGetUrl(url, localPaths, dstStorage, getChecksum);
}
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
long startTime = Time.monotonicNow();
HttpURLConnection connection;
try {
connection = (HttpURLConnection)
@ -372,7 +266,16 @@ public class TransferFsImage {
throw new IOException(e);
}
setTimeout(connection);
if (timeout <= 0) {
Configuration conf = new HdfsConfiguration();
timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
}
if (timeout > 0) {
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
}
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new HttpGetFailedException(
@ -390,37 +293,10 @@ public class TransferFsImage {
throw new IOException(CONTENT_LENGTH + " header is not provided " +
"by the namenode when trying to fetch " + url);
}
MD5Hash advertisedDigest = parseMD5Header(connection);
String fsImageName = connection
.getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
InputStream stream = connection.getInputStream();
return receiveFile(url.toExternalForm(), localPaths, dstStorage,
getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
null);
}
private static void setTimeout(HttpURLConnection connection) {
if (timeout <= 0) {
Configuration conf = new HdfsConfiguration();
timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
LOG.info("Image Transfer timeout configured to " + timeout
+ " milliseconds");
}
if (timeout > 0) {
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
}
}
private static MD5Hash receiveFile(String url, List<File> localPaths,
Storage dstStorage, boolean getChecksum, long advertisedSize,
MD5Hash advertisedDigest, String fsImageName, InputStream stream,
DataTransferThrottler throttler) throws IOException {
long startTime = Time.monotonicNow();
if (localPaths != null) {
String fsImageName = connection.getHeaderField(
GetImageServlet.HADOOP_IMAGE_EDITS_HEADER);
// If the local paths refer to directories, use the server-provided header
// as the filename within that directory
List<File> newLocalPaths = new ArrayList<File>();
@ -437,8 +313,10 @@ public class TransferFsImage {
localPaths = newLocalPaths;
}
MD5Hash advertisedDigest = parseMD5Header(connection);
long received = 0;
InputStream stream = connection.getInputStream();
MessageDigest digester = null;
if (getChecksum) {
digester = MD5Hash.getDigester();
@ -483,9 +361,6 @@ public class TransferFsImage {
for (FileOutputStream fos : outputStreams) {
fos.write(buf, 0, num);
}
if (throttler != null) {
throttler.throttle(num);
}
}
}
finishedReceiving = true;
@ -530,11 +405,6 @@ public class TransferFsImage {
return (header != null) ? new MD5Hash(header) : null;
}
private static MD5Hash parseMD5Header(HttpServletRequest request) {
String header = request.getHeader(MD5_HEADER);
return (header != null) ? new MD5Hash(header) : null;
}
public static class HttpGetFailedException extends IOException {
private static final long serialVersionUID = 1L;
private final int responseCode;
@ -549,18 +419,4 @@ public class TransferFsImage {
}
}
public static class HttpPutFailedException extends IOException {
private static final long serialVersionUID = 1L;
private final int responseCode;
HttpPutFailedException(String msg, int responseCode) throws IOException {
super(msg);
this.responseCode = responseCode;
}
public int getResponseCode() {
return responseCode;
}
}
}

View File

@ -63,7 +63,6 @@ public class StandbyCheckpointer {
private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class);
private static final long PREVENT_AFTER_CANCEL_MS = 2*60*1000L;
private final CheckpointConf checkpointConf;
private final Configuration conf;
private final FSNamesystem namesystem;
private long lastCheckpointTime;
private final CheckpointerThread thread;
@ -81,7 +80,6 @@ public class StandbyCheckpointer {
public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
throws IOException {
this.namesystem = ns;
this.conf = conf;
this.checkpointConf = new CheckpointConf(conf);
this.thread = new CheckpointerThread();
this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
@ -195,7 +193,7 @@ public class StandbyCheckpointer {
Future<Void> upload = executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
TransferFsImage.uploadImageFromStorage(activeNNAddress, myNNAddress,
namesystem.getFSImage().getStorage(), imageType, txid);
return null;
}

View File

@ -858,13 +858,15 @@
<property>
<name>dfs.image.transfer.timeout</name>
<value>60000</value>
<value>600000</value>
<description>
Socket timeout for image transfer in milliseconds. This timeout and the related
Timeout for image transfer in milliseconds. This timeout and the related
dfs.image.transfer.bandwidthPerSec parameter should be configured such
that normal image transfer can complete successfully.
that normal image transfer can complete within the timeout.
This timeout prevents client hangs when the sender fails during
image transfer. This is socket timeout during image tranfer.
image transfer, which is particularly important during checkpointing.
Note that this timeout applies to the entirety of image transfer, and
is not a socket timeout.
</description>
</property>
@ -881,16 +883,6 @@
</description>
</property>
<property>
<name>dfs.image.transfer.chunksize</name>
<value>65536</value>
<description>
Chunksize in bytes to upload the checkpoint.
Chunked streaming is used to avoid internal buffering of contents
of image file of huge size.
</description>
</property>
<property>
<name>dfs.namenode.support.allow.format</name>
<value>true</value>

View File

@ -31,7 +31,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
@ -565,9 +564,23 @@ public class TestCheckpoint {
}
/**
* Simulate a secondary node failure to transfer image. Uses an unchecked
* error and fail transfer before even setting the length header. This used to
* cause image truncation. Regression test for HDFS-3330.
* Simulate a secondary node failure to transfer image
* back to the name-node.
* Used to truncate primary fsimage file.
*/
@Test
public void testSecondaryFailsToReturnImage() throws IOException {
Mockito.doThrow(new IOException("If this exception is not caught by the " +
"name-node, fs image will be truncated."))
.when(faultInjector).aboutToSendFile(filePathContaining("secondary"));
doSecondaryFailsToReturnImage();
}
/**
* Similar to above test, but uses an unchecked Error, and causes it
* before even setting the length header. This used to cause image
* truncation. Regression test for HDFS-3330.
*/
@Test
public void testSecondaryFailsWithErrorBeforeSettingHeaders()
@ -1966,13 +1979,6 @@ public class TestCheckpoint {
.when(dstImage).getFiles(
Mockito.<NameNodeDirType>anyObject(), Mockito.anyString());
File mockImageFile = File.createTempFile("image", "");
FileOutputStream imageFile = new FileOutputStream(mockImageFile);
imageFile.write("data".getBytes());
imageFile.close();
Mockito.doReturn(mockImageFile).when(dstImage)
.findImageFile(Mockito.any(NameNodeFile.class), Mockito.anyLong());
Mockito.doReturn(new StorageInfo(1, 1, "X", 1, NodeType.NAME_NODE).toColonSeparatedString())
.when(dstImage).toColonSeparatedString();
@ -1993,8 +1999,8 @@ public class TestCheckpoint {
}
try {
TransferFsImage.uploadImageFromStorage(fsName, conf, dstImage,
NameNodeFile.IMAGE, 0);
TransferFsImage.uploadImageFromStorage(fsName, new URL(
"http://localhost:1234"), dstImage, NameNodeFile.IMAGE, 0);
fail("Storage info was not verified");
} catch (IOException ioe) {
String msg = StringUtils.stringifyException(ioe);

View File

@ -69,7 +69,7 @@ public class TestGetImageServlet {
Mockito.when(context.getAttribute(HttpServer2.ADMINS_ACL)).thenReturn(acls);
// Make sure that NN2 is considered a valid fsimage/edits requestor.
assertTrue(ImageServlet.isValidRequestor(context,
assertTrue(GetImageServlet.isValidRequestor(context,
"hdfs/host2@TEST-REALM.COM", conf));
// Mark atm as an admin.
@ -81,15 +81,15 @@ public class TestGetImageServlet {
}))).thenReturn(true);
// Make sure that NN2 is still considered a valid requestor.
assertTrue(ImageServlet.isValidRequestor(context,
assertTrue(GetImageServlet.isValidRequestor(context,
"hdfs/host2@TEST-REALM.COM", conf));
// Make sure an admin is considered a valid requestor.
assertTrue(ImageServlet.isValidRequestor(context,
assertTrue(GetImageServlet.isValidRequestor(context,
"atm@TEST-REALM.COM", conf));
// Make sure other users are *not* considered valid requestors.
assertFalse(ImageServlet.isValidRequestor(context,
assertFalse(GetImageServlet.isValidRequestor(context,
"todd@TEST-REALM.COM", conf));
}
}

View File

@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.URL;
@ -35,11 +34,9 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.HttpServerFunctionalTest;
import org.apache.hadoop.test.PathUtils;
@ -121,11 +118,10 @@ public class TestTransferFsImage {
* Test to verify the read timeout
*/
@Test(timeout = 5000)
public void testGetImageTimeout() throws Exception {
public void testImageTransferTimeout() throws Exception {
HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
try {
testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
TestImageTransferServlet.class);
testServer.addServlet("GetImage", "/getimage", TestGetImageServlet.class);
testServer.start();
URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
TransferFsImage.timeout = 2000;
@ -143,48 +139,7 @@ public class TestTransferFsImage {
}
}
/**
* Test to verify the timeout of Image upload
*/
@Test(timeout = 10000)
public void testImageUploadTimeout() throws Exception {
Configuration conf = new HdfsConfiguration();
NNStorage mockStorage = Mockito.mock(NNStorage.class);
HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
try {
testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
TestImageTransferServlet.class);
testServer.start();
URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
// set the timeout here, otherwise it will take default.
TransferFsImage.timeout = 2000;
File tmpDir = new File(new FileSystemTestHelper().getTestRootDir());
tmpDir.mkdirs();
File mockImageFile = File.createTempFile("image", "", tmpDir);
FileOutputStream imageFile = new FileOutputStream(mockImageFile);
imageFile.write("data".getBytes());
imageFile.close();
Mockito.when(
mockStorage.findImageFile(Mockito.any(NameNodeFile.class),
Mockito.anyLong())).thenReturn(mockImageFile);
Mockito.when(mockStorage.toColonSeparatedString()).thenReturn(
"storage:info:string");
try {
TransferFsImage.uploadImageFromStorage(serverURL, conf, mockStorage,
NameNodeFile.IMAGE, 1L);
fail("TransferImage Should fail with timeout");
} catch (SocketTimeoutException e) {
assertEquals("Upload should timeout", "Read timed out", e.getMessage());
}
} finally {
testServer.stop();
}
}
public static class TestImageTransferServlet extends HttpServlet {
public static class TestGetImageServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
@Override
@ -198,17 +153,5 @@ public class TestTransferFsImage {
}
}
}
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
synchronized (this) {
try {
wait(5000);
} catch (InterruptedException e) {
// Ignore
}
}
}
}
}