HDFS-3405. Checkpointing should use HTTP POST or PUT instead of GET-GET to send merged fsimages. Contributed by Vinayakumar B.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1575457 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2014-03-08 00:39:23 +00:00
parent 1f6c2b09c6
commit 0f595915a3
14 changed files with 846 additions and 582 deletions

View File

@ -122,6 +122,9 @@ Trunk (Unreleased)
HDFS-5138. Support HDFS upgrade in HA. (atm via todd)
HDFS-3405. Checkpointing should use HTTP POST or PUT instead of GET-GET
to send merged fsimages. (Vinayakumar B via wang)
OPTIMIZATIONS
BUG FIXES

View File

@ -460,7 +460,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Image transfer timeout
public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 10 * 60 * 1000;
public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
// Image transfer chunksize
public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
//Keys with no defaults
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -205,15 +205,16 @@ public class GetJournalEditServlet extends HttpServlet {
return;
}
editFile = elf.getFile();
GetImageServlet.setVerificationHeaders(response, editFile);
GetImageServlet.setFileNameHeaders(response, editFile);
ImageServlet.setVerificationHeadersForGet(response, editFile);
ImageServlet.setFileNameHeaders(response, editFile);
editFileIn = new FileInputStream(editFile);
}
DataTransferThrottler throttler = GetImageServlet.getThrottler(conf);
DataTransferThrottler throttler = ImageServlet.getThrottler(conf);
// send edits
TransferFsImage.getFileServer(response, editFile, editFileIn, throttler);
TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,
editFileIn, throttler);
} catch (Throwable t) {
String errMsg = "getedit failed. " + StringUtils.stringifyException(t);

View File

@ -263,8 +263,7 @@ class Checkpointer extends Daemon {
}
if(cpCmd.needToReturnImage()) {
TransferFsImage.uploadImageFromStorage(
backupNode.nnHttpAddress, getImageListenAddress(),
TransferFsImage.uploadImageFromStorage(backupNode.nnHttpAddress, conf,
bnStorage, NameNodeFile.IMAGE, txid);
}

View File

@ -1,481 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.now;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.URL;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ServletUtil;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
/**
* This class is used in Namesystem's jetty to retrieve a file.
* Typically used by the Secondary NameNode to retrieve image and
* edit file for periodic checkpointing.
*/
@InterfaceAudience.Private
public class GetImageServlet extends HttpServlet {
private static final long serialVersionUID = -7669068179452648952L;
private static final Log LOG = LogFactory.getLog(GetImageServlet.class);
public final static String CONTENT_DISPOSITION = "Content-Disposition";
public final static String HADOOP_IMAGE_EDITS_HEADER = "X-Image-Edits-Name";
private static final String TXID_PARAM = "txid";
private static final String START_TXID_PARAM = "startTxId";
private static final String END_TXID_PARAM = "endTxId";
private static final String STORAGEINFO_PARAM = "storageInfo";
private static final String LATEST_FSIMAGE_VALUE = "latest";
private static final String IMAGE_FILE_TYPE = "imageFile";
private static Set<Long> currentlyDownloadingCheckpoints =
Collections.<Long>synchronizedSet(new HashSet<Long>());
@Override
public void doGet(final HttpServletRequest request,
final HttpServletResponse response
) throws ServletException, IOException {
try {
final ServletContext context = getServletContext();
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
final GetImageParams parsedParams = new GetImageParams(request, response);
final Configuration conf = (Configuration) context
.getAttribute(JspHelper.CURRENT_CONF);
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
if (UserGroupInformation.isSecurityEnabled() &&
!isValidRequestor(context, request.getUserPrincipal().getName(), conf)) {
response.sendError(HttpServletResponse.SC_FORBIDDEN,
"Only Namenode, Secondary Namenode, and administrators may access " +
"this servlet");
LOG.warn("Received non-NN/SNN/administrator request for image or edits from "
+ request.getUserPrincipal().getName() + " at " + request.getRemoteHost());
return;
}
String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
String theirStorageInfoString = parsedParams.getStorageInfoString();
if (theirStorageInfoString != null &&
!myStorageInfoString.equals(theirStorageInfoString)) {
response.sendError(HttpServletResponse.SC_FORBIDDEN,
"This namenode has storage info " + myStorageInfoString +
" but the secondary expected " + theirStorageInfoString);
LOG.warn("Received an invalid request file transfer request " +
"from a secondary with storage info " + theirStorageInfoString);
return;
}
UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
if (parsedParams.isGetImage()) {
long txid = parsedParams.getTxId();
File imageFile = null;
String errorMessage = "Could not find image";
if (parsedParams.shouldFetchLatest()) {
imageFile = nnImage.getStorage().getHighestFsImageName();
} else {
errorMessage += " with txid " + txid;
imageFile = nnImage.getStorage().getFsImage(txid,
EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK));
}
if (imageFile == null) {
throw new IOException(errorMessage);
}
CheckpointFaultInjector.getInstance().beforeGetImageSetsHeaders();
long start = now();
serveFile(imageFile);
if (metrics != null) { // Metrics non-null only when used inside name node
long elapsed = now() - start;
metrics.addGetImage(elapsed);
}
} else if (parsedParams.isGetEdit()) {
long startTxId = parsedParams.getStartTxId();
long endTxId = parsedParams.getEndTxId();
File editFile = nnImage.getStorage()
.findFinalizedEditsFile(startTxId, endTxId);
long start = now();
serveFile(editFile);
if (metrics != null) { // Metrics non-null only when used inside name node
long elapsed = now() - start;
metrics.addGetEdit(elapsed);
}
} else if (parsedParams.isPutImage()) {
final long txid = parsedParams.getTxId();
final NameNodeFile nnf = parsedParams.getNameNodeFile();
if (! currentlyDownloadingCheckpoints.add(txid)) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer is already in the process of uploading a" +
" checkpoint made at transaction ID " + txid);
return null;
}
try {
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer already uploaded an checkpoint " +
"for txid " + txid);
return null;
}
// We may have lost our ticket since last checkpoint, log in again, just in case
if (UserGroupInformation.isSecurityEnabled()) {
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
}
long start = now();
// issue a HTTP get request to download the new fsimage
MD5Hash downloadImageDigest = TransferFsImage
.downloadImageToStorage(parsedParams.getInfoServer(conf),
txid, nnImage.getStorage(), true);
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
downloadImageDigest);
if (nnf == NameNodeFile.IMAGE_ROLLBACK) {
NameNodeHttpServer.getNameNodeFromContext(context)
.getNamesystem().setCreatedRollbackImages(true);
}
if (metrics != null) { // Metrics non-null only when used inside name node
long elapsed = now() - start;
metrics.addPutImage(elapsed);
}
// Now that we have a new checkpoint, we might be able to
// remove some old ones.
nnImage.purgeOldStorage(nnf);
} finally {
currentlyDownloadingCheckpoints.remove(txid);
}
}
return null;
}
private void serveFile(File file) throws IOException {
FileInputStream fis = new FileInputStream(file);
try {
setVerificationHeaders(response, file);
setFileNameHeaders(response, file);
if (!file.exists()) {
// Potential race where the file was deleted while we were in the
// process of setting headers!
throw new FileNotFoundException(file.toString());
// It's possible the file could be deleted after this point, but
// we've already opened the 'fis' stream.
// It's also possible length could change, but this would be
// detected by the client side as an inaccurate length header.
}
// send file
TransferFsImage.getFileServer(response, file, fis,
getThrottler(conf));
} finally {
IOUtils.closeStream(fis);
}
}
});
} catch (Throwable t) {
String errMsg = "GetImage failed. " + StringUtils.stringifyException(t);
response.sendError(HttpServletResponse.SC_GONE, errMsg);
throw new IOException(errMsg);
} finally {
response.getOutputStream().close();
}
}
public static void setFileNameHeaders(HttpServletResponse response,
File file) {
response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" +
file.getName());
response.setHeader(HADOOP_IMAGE_EDITS_HEADER, file.getName());
}
/**
* Construct a throttler from conf
* @param conf configuration
* @return a data transfer throttler
*/
public final static DataTransferThrottler getThrottler(Configuration conf) {
long transferBandwidth =
conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
DataTransferThrottler throttler = null;
if (transferBandwidth > 0) {
throttler = new DataTransferThrottler(transferBandwidth);
}
return throttler;
}
@VisibleForTesting
static boolean isValidRequestor(ServletContext context, String remoteUser,
Configuration conf) throws IOException {
if(remoteUser == null) { // This really shouldn't happen...
LOG.warn("Received null remoteUser while authorizing access to getImage servlet");
return false;
}
Set<String> validRequestors = new HashSet<String>();
validRequestors.add(
SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), NameNode
.getAddress(conf).getHostName()));
validRequestors.add(
SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
SecondaryNameNode.getHttpAddress(conf).getHostName()));
if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) {
Configuration otherNnConf = HAUtil.getConfForOtherNode(conf);
validRequestors.add(
SecurityUtil.getServerPrincipal(otherNnConf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
NameNode.getAddress(otherNnConf).getHostName()));
}
for(String v : validRequestors) {
if(v != null && v.equals(remoteUser)) {
LOG.info("GetImageServlet allowing checkpointer: " + remoteUser);
return true;
}
}
if (HttpServer2.userHasAdministratorAccess(context, remoteUser)) {
LOG.info("GetImageServlet allowing administrator: " + remoteUser);
return true;
}
LOG.info("GetImageServlet rejecting: " + remoteUser);
return false;
}
/**
* Set headers for content length, and, if available, md5.
* @throws IOException
*/
public static void setVerificationHeaders(HttpServletResponse response, File file)
throws IOException {
response.setHeader(TransferFsImage.CONTENT_LENGTH,
String.valueOf(file.length()));
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
if (hash != null) {
response.setHeader(TransferFsImage.MD5_HEADER, hash.toString());
}
}
static String getParamStringForMostRecentImage() {
return "getimage=1&" + TXID_PARAM + "=" + LATEST_FSIMAGE_VALUE;
}
static String getParamStringForImage(NameNodeFile nnf, long txid,
StorageInfo remoteStorageInfo) {
final String imageType = nnf == null ? "" : "&" + IMAGE_FILE_TYPE + "="
+ nnf.name();
return "getimage=1&" + TXID_PARAM + "=" + txid
+ imageType
+ "&" + STORAGEINFO_PARAM + "=" +
remoteStorageInfo.toColonSeparatedString();
}
static String getParamStringForLog(RemoteEditLog log,
StorageInfo remoteStorageInfo) {
return "getedit=1&" + START_TXID_PARAM + "=" + log.getStartTxId()
+ "&" + END_TXID_PARAM + "=" + log.getEndTxId()
+ "&" + STORAGEINFO_PARAM + "=" +
remoteStorageInfo.toColonSeparatedString();
}
static String getParamStringToPutImage(NameNodeFile nnf, long txid,
URL url, Storage storage) {
InetSocketAddress imageListenAddress = NetUtils.createSocketAddr(url
.getAuthority());
String machine = !imageListenAddress.isUnresolved()
&& imageListenAddress.getAddress().isAnyLocalAddress() ? null
: imageListenAddress.getHostName();
return "putimage=1" +
"&" + TXID_PARAM + "=" + txid +
"&" + IMAGE_FILE_TYPE + "=" + nnf.name() +
"&port=" + imageListenAddress.getPort() +
(machine != null ? "&machine=" + machine : "")
+ "&" + STORAGEINFO_PARAM + "=" +
storage.toColonSeparatedString();
}
static class GetImageParams {
private boolean isGetImage;
private boolean isGetEdit;
private boolean isPutImage;
private int remoteport;
private String machineName;
private NameNodeFile nnf;
private long startTxId, endTxId, txId;
private String storageInfoString;
private boolean fetchLatest;
/**
* @param request the object from which this servlet reads the url contents
* @param response the object into which this servlet writes the url contents
* @throws IOException if the request is bad
*/
public GetImageParams(HttpServletRequest request,
HttpServletResponse response
) throws IOException {
@SuppressWarnings("unchecked")
Map<String, String[]> pmap = request.getParameterMap();
isGetImage = isGetEdit = isPutImage = fetchLatest = false;
remoteport = 0;
for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
String key = entry.getKey();
String[] val = entry.getValue();
if (key.equals("getimage")) {
isGetImage = true;
try {
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
} catch (NumberFormatException nfe) {
if (request.getParameter(TXID_PARAM).equals(LATEST_FSIMAGE_VALUE)) {
fetchLatest = true;
} else {
throw nfe;
}
}
} else if (key.equals("getedit")) {
isGetEdit = true;
startTxId = ServletUtil.parseLongParam(request, START_TXID_PARAM);
endTxId = ServletUtil.parseLongParam(request, END_TXID_PARAM);
} else if (key.equals("putimage")) {
isPutImage = true;
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
} else if (key.equals("port")) {
remoteport = new Integer(val[0]).intValue();
} else if (key.equals("machine")) {
machineName = val[0];
} else if (key.equals(STORAGEINFO_PARAM)) {
storageInfoString = val[0];
}
}
if (machineName == null) {
machineName = request.getRemoteHost();
if (InetAddresses.isInetAddress(machineName)) {
machineName = NetUtils.getHostNameOfIP(machineName);
}
}
int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
if ((numGets > 1) || (numGets == 0) && !isPutImage) {
throw new IOException("Illegal parameters to TransferFsImage");
}
}
public String getStorageInfoString() {
return storageInfoString;
}
public long getTxId() {
Preconditions.checkState(isGetImage || isPutImage);
return txId;
}
public NameNodeFile getNameNodeFile() {
Preconditions.checkState(isPutImage || isGetImage);
return nnf;
}
public long getStartTxId() {
Preconditions.checkState(isGetEdit);
return startTxId;
}
public long getEndTxId() {
Preconditions.checkState(isGetEdit);
return endTxId;
}
boolean isGetEdit() {
return isGetEdit;
}
boolean isGetImage() {
return isGetImage;
}
boolean isPutImage() {
return isPutImage;
}
URL getInfoServer(Configuration conf) throws IOException {
if (machineName == null || remoteport == 0) {
throw new IOException("MachineName and port undefined");
}
return new URL(DFSUtil.getHttpClientScheme(conf), machineName, remoteport, "");
}
boolean shouldFetchLatest() {
return fetchLatest;
}
}
}

View File

@ -0,0 +1,544 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.now;
import java.net.HttpURLConnection;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.io.*;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ServletUtil;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* This class is used in Namesystem's jetty to retrieve/upload a file
* Typically used by the Secondary NameNode to retrieve image and
* edit file for periodic checkpointing in Non-HA deployments.
* Standby NameNode uses to upload checkpoints in HA deployments.
*/
@InterfaceAudience.Private
public class ImageServlet extends HttpServlet {
public static final String PATH_SPEC = "/imagetransfer";
private static final long serialVersionUID = -7669068179452648952L;
private static final Log LOG = LogFactory.getLog(ImageServlet.class);
public final static String CONTENT_DISPOSITION = "Content-Disposition";
public final static String HADOOP_IMAGE_EDITS_HEADER = "X-Image-Edits-Name";
private static final String TXID_PARAM = "txid";
private static final String START_TXID_PARAM = "startTxId";
private static final String END_TXID_PARAM = "endTxId";
private static final String STORAGEINFO_PARAM = "storageInfo";
private static final String LATEST_FSIMAGE_VALUE = "latest";
private static final String IMAGE_FILE_TYPE = "imageFile";
private static Set<Long> currentlyDownloadingCheckpoints =
Collections.<Long>synchronizedSet(new HashSet<Long>());
@Override
public void doGet(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException {
try {
final ServletContext context = getServletContext();
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
final GetImageParams parsedParams = new GetImageParams(request, response);
final Configuration conf = (Configuration) context
.getAttribute(JspHelper.CURRENT_CONF);
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
validateRequest(context, conf, request, response, nnImage,
parsedParams.getStorageInfoString());
UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
if (parsedParams.isGetImage()) {
long txid = parsedParams.getTxId();
File imageFile = null;
String errorMessage = "Could not find image";
if (parsedParams.shouldFetchLatest()) {
imageFile = nnImage.getStorage().getHighestFsImageName();
} else {
errorMessage += " with txid " + txid;
imageFile = nnImage.getStorage().getFsImage(txid,
EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK));
}
if (imageFile == null) {
throw new IOException(errorMessage);
}
CheckpointFaultInjector.getInstance().beforeGetImageSetsHeaders();
long start = now();
serveFile(imageFile);
if (metrics != null) { // Metrics non-null only when used inside name node
long elapsed = now() - start;
metrics.addGetImage(elapsed);
}
} else if (parsedParams.isGetEdit()) {
long startTxId = parsedParams.getStartTxId();
long endTxId = parsedParams.getEndTxId();
File editFile = nnImage.getStorage()
.findFinalizedEditsFile(startTxId, endTxId);
long start = now();
serveFile(editFile);
if (metrics != null) { // Metrics non-null only when used inside name node
long elapsed = now() - start;
metrics.addGetEdit(elapsed);
}
}
return null;
}
private void serveFile(File file) throws IOException {
FileInputStream fis = new FileInputStream(file);
try {
setVerificationHeadersForGet(response, file);
setFileNameHeaders(response, file);
if (!file.exists()) {
// Potential race where the file was deleted while we were in the
// process of setting headers!
throw new FileNotFoundException(file.toString());
// It's possible the file could be deleted after this point, but
// we've already opened the 'fis' stream.
// It's also possible length could change, but this would be
// detected by the client side as an inaccurate length header.
}
// send file
TransferFsImage.copyFileToStream(response.getOutputStream(),
file, fis, getThrottler(conf));
} finally {
IOUtils.closeStream(fis);
}
}
});
} catch (Throwable t) {
String errMsg = "GetImage failed. " + StringUtils.stringifyException(t);
response.sendError(HttpServletResponse.SC_GONE, errMsg);
throw new IOException(errMsg);
} finally {
response.getOutputStream().close();
}
}
private void validateRequest(ServletContext context, Configuration conf,
HttpServletRequest request, HttpServletResponse response,
FSImage nnImage, String theirStorageInfoString) throws IOException {
if (UserGroupInformation.isSecurityEnabled()
&& !isValidRequestor(context, request.getUserPrincipal().getName(),
conf)) {
String errorMsg = "Only Namenode, Secondary Namenode, and administrators may access "
+ "this servlet";
response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg);
LOG.warn("Received non-NN/SNN/administrator request for image or edits from "
+ request.getUserPrincipal().getName()
+ " at "
+ request.getRemoteHost());
throw new IOException(errorMsg);
}
String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
if (theirStorageInfoString != null
&& !myStorageInfoString.equals(theirStorageInfoString)) {
String errorMsg = "This namenode has storage info " + myStorageInfoString
+ " but the secondary expected " + theirStorageInfoString;
response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg);
LOG.warn("Received an invalid request file transfer request "
+ "from a secondary with storage info " + theirStorageInfoString);
throw new IOException(errorMsg);
}
}
public static void setFileNameHeaders(HttpServletResponse response,
File file) {
response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" +
file.getName());
response.setHeader(HADOOP_IMAGE_EDITS_HEADER, file.getName());
}
/**
* Construct a throttler from conf
* @param conf configuration
* @return a data transfer throttler
*/
public final static DataTransferThrottler getThrottler(Configuration conf) {
long transferBandwidth =
conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
DataTransferThrottler throttler = null;
if (transferBandwidth > 0) {
throttler = new DataTransferThrottler(transferBandwidth);
}
return throttler;
}
@VisibleForTesting
static boolean isValidRequestor(ServletContext context, String remoteUser,
Configuration conf) throws IOException {
if (remoteUser == null) { // This really shouldn't happen...
LOG.warn("Received null remoteUser while authorizing access to getImage servlet");
return false;
}
Set<String> validRequestors = new HashSet<String>();
validRequestors.add(SecurityUtil.getServerPrincipal(conf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
NameNode.getAddress(conf).getHostName()));
validRequestors.add(SecurityUtil.getServerPrincipal(
conf.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
SecondaryNameNode.getHttpAddress(conf).getHostName()));
if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) {
Configuration otherNnConf = HAUtil.getConfForOtherNode(conf);
validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf
.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
NameNode.getAddress(otherNnConf).getHostName()));
}
for (String v : validRequestors) {
if (v != null && v.equals(remoteUser)) {
LOG.info("ImageServlet allowing checkpointer: " + remoteUser);
return true;
}
}
if (HttpServer2.userHasAdministratorAccess(context, remoteUser)) {
LOG.info("ImageServlet allowing administrator: " + remoteUser);
return true;
}
LOG.info("ImageServlet rejecting: " + remoteUser);
return false;
}
/**
* Set headers for content length, and, if available, md5.
* @throws IOException
*/
public static void setVerificationHeadersForGet(HttpServletResponse response,
File file) throws IOException {
response.setHeader(TransferFsImage.CONTENT_LENGTH,
String.valueOf(file.length()));
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
if (hash != null) {
response.setHeader(TransferFsImage.MD5_HEADER, hash.toString());
}
}
static String getParamStringForMostRecentImage() {
return "getimage=1&" + TXID_PARAM + "=" + LATEST_FSIMAGE_VALUE;
}
static String getParamStringForImage(NameNodeFile nnf, long txid,
StorageInfo remoteStorageInfo) {
final String imageType = nnf == null ? "" : "&" + IMAGE_FILE_TYPE + "="
+ nnf.name();
return "getimage=1&" + TXID_PARAM + "=" + txid
+ imageType
+ "&" + STORAGEINFO_PARAM + "=" +
remoteStorageInfo.toColonSeparatedString();
}
static String getParamStringForLog(RemoteEditLog log,
StorageInfo remoteStorageInfo) {
return "getedit=1&" + START_TXID_PARAM + "=" + log.getStartTxId()
+ "&" + END_TXID_PARAM + "=" + log.getEndTxId()
+ "&" + STORAGEINFO_PARAM + "=" +
remoteStorageInfo.toColonSeparatedString();
}
static class GetImageParams {
private boolean isGetImage;
private boolean isGetEdit;
private NameNodeFile nnf;
private long startTxId, endTxId, txId;
private String storageInfoString;
private boolean fetchLatest;
/**
* @param request the object from which this servlet reads the url contents
* @param response the object into which this servlet writes the url contents
* @throws IOException if the request is bad
*/
public GetImageParams(HttpServletRequest request,
HttpServletResponse response
) throws IOException {
@SuppressWarnings("unchecked")
Map<String, String[]> pmap = request.getParameterMap();
isGetImage = isGetEdit = fetchLatest = false;
for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
String key = entry.getKey();
String[] val = entry.getValue();
if (key.equals("getimage")) {
isGetImage = true;
try {
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
} catch (NumberFormatException nfe) {
if (request.getParameter(TXID_PARAM).equals(LATEST_FSIMAGE_VALUE)) {
fetchLatest = true;
} else {
throw nfe;
}
}
} else if (key.equals("getedit")) {
isGetEdit = true;
startTxId = ServletUtil.parseLongParam(request, START_TXID_PARAM);
endTxId = ServletUtil.parseLongParam(request, END_TXID_PARAM);
} else if (key.equals(STORAGEINFO_PARAM)) {
storageInfoString = val[0];
}
}
int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
if ((numGets > 1) || (numGets == 0)) {
throw new IOException("Illegal parameters to TransferFsImage");
}
}
public String getStorageInfoString() {
return storageInfoString;
}
public long getTxId() {
Preconditions.checkState(isGetImage);
return txId;
}
public NameNodeFile getNameNodeFile() {
Preconditions.checkState(isGetImage);
return nnf;
}
public long getStartTxId() {
Preconditions.checkState(isGetEdit);
return startTxId;
}
public long getEndTxId() {
Preconditions.checkState(isGetEdit);
return endTxId;
}
boolean isGetEdit() {
return isGetEdit;
}
boolean isGetImage() {
return isGetImage;
}
boolean shouldFetchLatest() {
return fetchLatest;
}
}
/**
* Set headers for image length and if available, md5.
*
* @throws IOException
*/
static void setVerificationHeadersForPut(HttpURLConnection connection,
File file) throws IOException {
connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH,
String.valueOf(file.length()));
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
if (hash != null) {
connection
.setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString());
}
}
/**
* Set the required parameters for uploading image
*
* @param httpMethod instance of method to set the parameters
* @param storage colon separated storageInfo string
* @param txid txid of the image
* @param imageFileSize size of the imagefile to be uploaded
* @param nnf NameNodeFile Type
* @return Returns map of parameters to be used with PUT request.
*/
static Map<String, String> getParamsForPutImage(Storage storage, long txid,
long imageFileSize, NameNodeFile nnf) {
Map<String, String> params = new HashMap<String, String>();
params.put(TXID_PARAM, Long.toString(txid));
params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString());
// setting the length of the file to be uploaded in separate property as
// Content-Length only supports up to 2GB
params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize));
params.put(IMAGE_FILE_TYPE, nnf.name());
return params;
}
@Override
protected void doPut(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException {
try {
ServletContext context = getServletContext();
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
final Configuration conf = (Configuration) getServletContext()
.getAttribute(JspHelper.CURRENT_CONF);
final PutImageParams parsedParams = new PutImageParams(request, response,
conf);
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
validateRequest(context, conf, request, response, nnImage,
parsedParams.getStorageInfoString());
UserGroupInformation.getCurrentUser().doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
final long txid = parsedParams.getTxId();
final NameNodeFile nnf = parsedParams.getNameNodeFile();
if (!currentlyDownloadingCheckpoints.add(txid)) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer is already in the process of uploading a"
+ " checkpoint made at transaction ID " + txid);
return null;
}
try {
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer already uploaded an checkpoint "
+ "for txid " + txid);
return null;
}
InputStream stream = request.getInputStream();
try {
long start = now();
MD5Hash downloadImageDigest = TransferFsImage
.handleUploadImageRequest(request, txid,
nnImage.getStorage(), stream,
parsedParams.getFileSize(), getThrottler(conf));
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
downloadImageDigest);
// Metrics non-null only when used inside name node
if (metrics != null) {
long elapsed = now() - start;
metrics.addPutImage(elapsed);
}
// Now that we have a new checkpoint, we might be able to
// remove some old ones.
nnImage.purgeOldStorage(nnf);
} finally {
stream.close();
}
} finally {
currentlyDownloadingCheckpoints.remove(txid);
}
return null;
}
});
} catch (Throwable t) {
String errMsg = "PutImage failed. " + StringUtils.stringifyException(t);
response.sendError(HttpServletResponse.SC_GONE, errMsg);
throw new IOException(errMsg);
}
}
/*
* Params required to handle put image request
*/
static class PutImageParams {
private long txId = -1;
private String storageInfoString = null;
private long fileSize = 0L;
private NameNodeFile nnf;
public PutImageParams(HttpServletRequest request,
HttpServletResponse response, Configuration conf) throws IOException {
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM);
fileSize = ServletUtil.parseLongParam(request,
TransferFsImage.FILE_LENGTH);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
if (fileSize == 0 || txId == -1 || storageInfoString == null
|| storageInfoString.isEmpty()) {
throw new IOException("Illegal parameters to TransferFsImage");
}
}
public long getTxId() {
return txId;
}
public String getStorageInfoString() {
return storageInfoString;
}
public long getFileSize() {
return fileSize;
}
public NameNodeFile getNameNodeFile() {
return nnf;
}
}
}

View File

@ -233,8 +233,8 @@ public class NameNodeHttpServer {
CancelDelegationTokenServlet.class, true);
httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
true);
httpServer.addInternalServlet("getimage", "/getimage",
GetImageServlet.class, true);
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
ImageServlet.class, true);
httpServer.addInternalServlet("listPaths", "/listPaths/*",
ListPathsServlet.class, false);
httpServer.addInternalServlet("data", "/data/*",

View File

@ -114,7 +114,6 @@ public class SecondaryNameNode implements Runnable {
private InetSocketAddress nameNodeAddr;
private volatile boolean shouldRun;
private HttpServer2 infoServer;
private URL imageListenURL;
private Collection<URI> checkpointDirs;
private List<URI> checkpointEditsDirs;
@ -267,13 +266,11 @@ public class SecondaryNameNode implements Runnable {
infoServer.setAttribute("secondary.name.node", this);
infoServer.setAttribute("name.system.image", checkpointImage);
infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
infoServer.addInternalServlet("getimage", "/getimage",
GetImageServlet.class, true);
infoServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
ImageServlet.class, true);
infoServer.start();
LOG.info("Web server init done");
imageListenURL = new URL(DFSUtil.getHttpClientScheme(conf) + "://"
+ NetUtils.getHostPortString(infoServer.getConnectorAddress(0)));
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
int connIdx = 0;
@ -488,14 +485,6 @@ public class SecondaryNameNode implements Runnable {
return address.toURL();
}
/**
* Return the host:port of where this SecondaryNameNode is listening
* for image transfers
*/
private URL getImageListenAddress() {
return imageListenURL;
}
/**
* Create a new checkpoint
* @return if the image is fetched from primary or not
@ -555,8 +544,8 @@ public class SecondaryNameNode implements Runnable {
// to make this new uploaded image as the most current image.
//
long txid = checkpointImage.getLastAppliedTxId();
TransferFsImage.uploadImageFromStorage(fsName, getImageListenAddress(),
dstStorage, NameNodeFile.IMAGE, txid);
TransferFsImage.uploadImageFromStorage(fsName, conf, dstStorage,
NameNodeFile.IMAGE, txid);
// error simulation code for junit test
CheckpointFaultInjector.getInstance().afterSecondaryUploadsNewImage();

View File

@ -19,18 +19,22 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
@ -49,10 +53,12 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.Time;
import org.apache.http.client.utils.URIBuilder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@ -65,7 +71,12 @@ import com.google.common.collect.Lists;
public class TransferFsImage {
public final static String CONTENT_LENGTH = "Content-Length";
public final static String FILE_LENGTH = "File-Length";
public final static String MD5_HEADER = "X-MD5-Digest";
private final static String CONTENT_TYPE = "Content-Type";
private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
@VisibleForTesting
static int timeout = 0;
private static URLConnectionFactory connectionFactory;
@ -82,14 +93,14 @@ public class TransferFsImage {
public static void downloadMostRecentImageToDirectory(URL infoServer,
File dir) throws IOException {
String fileId = GetImageServlet.getParamStringForMostRecentImage();
String fileId = ImageServlet.getParamStringForMostRecentImage();
getFileClient(infoServer, fileId, Lists.newArrayList(dir),
null, false);
}
public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
Storage dstStorage, boolean needDigest) throws IOException {
String fileid = GetImageServlet.getParamStringForImage(null,
String fileid = ImageServlet.getParamStringForImage(null,
imageTxId, dstStorage);
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
@ -105,11 +116,30 @@ public class TransferFsImage {
return hash;
}
static MD5Hash handleUploadImageRequest(HttpServletRequest request,
long imageTxId, Storage dstStorage, InputStream stream,
long advertisedSize, DataTransferThrottler throttler) throws IOException {
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
if (dstFiles.isEmpty()) {
throw new IOException("No targets in destination storage!");
}
MD5Hash advertisedDigest = parseMD5Header(request);
MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
advertisedSize, advertisedDigest, fileName, stream, throttler);
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
+ dstFiles.get(0).length() + " bytes.");
return hash;
}
static void downloadEditsToStorage(URL fsName, RemoteEditLog log,
NNStorage dstStorage) throws IOException {
assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
"bad log: " + log;
String fileid = GetImageServlet.getParamStringForLog(
String fileid = ImageServlet.getParamStringForLog(
log, dstStorage);
String finalFileName = NNStorage.getFinalizedEditsFileName(
log.getStartTxId(), log.getEndTxId());
@ -159,22 +189,19 @@ public class TransferFsImage {
* Requests that the NameNode download an image from this node.
*
* @param fsName the http address for the remote NN
* @param myNNAddress the host/port where the local node is running an
* HTTPServer hosting GetImageServlet
* @param conf Configuration
* @param storage the storage directory to transfer the image from
* @param nnf the NameNodeFile type of the image
* @param txid the transaction ID of the image to be uploaded
*/
public static void uploadImageFromStorage(URL fsName, URL myNNAddress,
Storage storage, NameNodeFile nnf, long txid) throws IOException {
public static void uploadImageFromStorage(URL fsName, Configuration conf,
NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
String fileid = GetImageServlet.getParamStringToPutImage(nnf, txid,
myNNAddress, storage);
// this doesn't directly upload an image, but rather asks the NN
// to connect back to the 2NN to download the specified image.
URL url = new URL(fsName, ImageServlet.PATH_SPEC);
long startTime = Time.monotonicNow();
try {
TransferFsImage.getFileClient(fsName, fileid, null, null, false);
} catch (HttpGetFailedException e) {
uploadImage(url, conf, storage, nnf, txid);
} catch (HttpPutFailedException e) {
if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) {
// this is OK - this means that a previous attempt to upload
// this checkpoint succeeded even though we thought it failed.
@ -186,25 +213,105 @@ public class TransferFsImage {
throw e;
}
}
LOG.info("Uploaded image with txid " + txid + " to namenode at " +
fsName);
double xferSec = Math.max(
((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001);
LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName
+ " in " + xferSec + " seconds");
}
/*
* Uploads the imagefile using HTTP PUT method
*/
private static void uploadImage(URL url, Configuration conf,
NNStorage storage, NameNodeFile nnf, long txId) throws IOException {
File imageFile = storage.findImageFile(nnf, txId);
if (imageFile == null) {
throw new IOException("Could not find image with txid " + txId);
}
HttpURLConnection connection = null;
try {
URIBuilder uriBuilder = new URIBuilder(url.toURI());
// write all params for image upload request as query itself.
// Request body contains the image to be uploaded.
Map<String, String> params = ImageServlet.getParamsForPutImage(storage,
txId, imageFile.length(), nnf);
for (Entry<String, String> entry : params.entrySet()) {
uriBuilder.addParameter(entry.getKey(), entry.getValue());
}
URL urlWithParams = uriBuilder.build().toURL();
connection = (HttpURLConnection) connectionFactory.openConnection(
urlWithParams, UserGroupInformation.isSecurityEnabled());
// Set the request to PUT
connection.setRequestMethod("PUT");
connection.setDoOutput(true);
int chunkSize = conf.getInt(
DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT);
if (imageFile.length() > chunkSize) {
// using chunked streaming mode to support upload of 2GB+ files and to
// avoid internal buffering.
// this mode should be used only if more than chunkSize data is present
// to upload. otherwise upload may not happen sometimes.
connection.setChunkedStreamingMode(chunkSize);
}
setTimeout(connection);
// set headers for verification
ImageServlet.setVerificationHeadersForPut(connection, imageFile);
// Write the file to output stream.
writeFileToPutRequest(conf, connection, imageFile);
int responseCode = connection.getResponseCode();
if (responseCode != HttpURLConnection.HTTP_OK) {
throw new HttpPutFailedException(connection.getResponseMessage(),
responseCode);
}
} catch (AuthenticationException e) {
throw new IOException(e);
} catch (URISyntaxException e) {
throw new IOException(e);
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
private static void writeFileToPutRequest(Configuration conf,
HttpURLConnection connection, File imageFile)
throws FileNotFoundException, IOException {
connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
OutputStream output = connection.getOutputStream();
FileInputStream input = new FileInputStream(imageFile);
try {
copyFileToStream(output, imageFile, input,
ImageServlet.getThrottler(conf));
} finally {
IOUtils.closeStream(input);
IOUtils.closeStream(output);
}
}
/**
* A server-side method to respond to a getfile http request
* Copies the contents of the local file into the output stream.
*/
public static void getFileServer(ServletResponse response, File localfile,
FileInputStream infile,
DataTransferThrottler throttler)
public static void copyFileToStream(OutputStream out, File localfile,
FileInputStream infile, DataTransferThrottler throttler)
throws IOException {
byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
ServletOutputStream out = null;
try {
CheckpointFaultInjector.getInstance()
.aboutToSendFile(localfile);
out = response.getOutputStream();
if (CheckpointFaultInjector.getInstance().
shouldSendShortFile(localfile)) {
@ -250,14 +357,13 @@ public class TransferFsImage {
static MD5Hash getFileClient(URL infoServer,
String queryString, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
URL url = new URL(infoServer, "/getimage?" + queryString);
URL url = new URL(infoServer, ImageServlet.PATH_SPEC + "?" + queryString);
LOG.info("Opening connection to " + url);
return doGetUrl(url, localPaths, dstStorage, getChecksum);
}
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
long startTime = Time.monotonicNow();
HttpURLConnection connection;
try {
connection = (HttpURLConnection)
@ -266,16 +372,7 @@ public class TransferFsImage {
throw new IOException(e);
}
if (timeout <= 0) {
Configuration conf = new HdfsConfiguration();
timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
}
if (timeout > 0) {
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
}
setTimeout(connection);
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new HttpGetFailedException(
@ -293,10 +390,37 @@ public class TransferFsImage {
throw new IOException(CONTENT_LENGTH + " header is not provided " +
"by the namenode when trying to fetch " + url);
}
MD5Hash advertisedDigest = parseMD5Header(connection);
String fsImageName = connection
.getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
InputStream stream = connection.getInputStream();
return receiveFile(url.toExternalForm(), localPaths, dstStorage,
getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
null);
}
private static void setTimeout(HttpURLConnection connection) {
if (timeout <= 0) {
Configuration conf = new HdfsConfiguration();
timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
LOG.info("Image Transfer timeout configured to " + timeout
+ " milliseconds");
}
if (timeout > 0) {
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
}
}
private static MD5Hash receiveFile(String url, List<File> localPaths,
Storage dstStorage, boolean getChecksum, long advertisedSize,
MD5Hash advertisedDigest, String fsImageName, InputStream stream,
DataTransferThrottler throttler) throws IOException {
long startTime = Time.monotonicNow();
if (localPaths != null) {
String fsImageName = connection.getHeaderField(
GetImageServlet.HADOOP_IMAGE_EDITS_HEADER);
// If the local paths refer to directories, use the server-provided header
// as the filename within that directory
List<File> newLocalPaths = new ArrayList<File>();
@ -313,10 +437,8 @@ public class TransferFsImage {
localPaths = newLocalPaths;
}
MD5Hash advertisedDigest = parseMD5Header(connection);
long received = 0;
InputStream stream = connection.getInputStream();
MessageDigest digester = null;
if (getChecksum) {
digester = MD5Hash.getDigester();
@ -361,6 +483,9 @@ public class TransferFsImage {
for (FileOutputStream fos : outputStreams) {
fos.write(buf, 0, num);
}
if (throttler != null) {
throttler.throttle(num);
}
}
}
finishedReceiving = true;
@ -405,6 +530,11 @@ public class TransferFsImage {
return (header != null) ? new MD5Hash(header) : null;
}
private static MD5Hash parseMD5Header(HttpServletRequest request) {
String header = request.getHeader(MD5_HEADER);
return (header != null) ? new MD5Hash(header) : null;
}
public static class HttpGetFailedException extends IOException {
private static final long serialVersionUID = 1L;
private final int responseCode;
@ -419,4 +549,18 @@ public class TransferFsImage {
}
}
public static class HttpPutFailedException extends IOException {
private static final long serialVersionUID = 1L;
private final int responseCode;
HttpPutFailedException(String msg, int responseCode) throws IOException {
super(msg);
this.responseCode = responseCode;
}
public int getResponseCode() {
return responseCode;
}
}
}

View File

@ -63,6 +63,7 @@ public class StandbyCheckpointer {
private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class);
private static final long PREVENT_AFTER_CANCEL_MS = 2*60*1000L;
private final CheckpointConf checkpointConf;
private final Configuration conf;
private final FSNamesystem namesystem;
private long lastCheckpointTime;
private final CheckpointerThread thread;
@ -80,6 +81,7 @@ public class StandbyCheckpointer {
public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
throws IOException {
this.namesystem = ns;
this.conf = conf;
this.checkpointConf = new CheckpointConf(conf);
this.thread = new CheckpointerThread();
this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
@ -193,7 +195,7 @@ public class StandbyCheckpointer {
Future<Void> upload = executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
TransferFsImage.uploadImageFromStorage(activeNNAddress, myNNAddress,
TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
namesystem.getFSImage().getStorage(), imageType, txid);
return null;
}

View File

@ -858,15 +858,13 @@
<property>
<name>dfs.image.transfer.timeout</name>
<value>600000</value>
<value>60000</value>
<description>
Timeout for image transfer in milliseconds. This timeout and the related
Socket timeout for image transfer in milliseconds. This timeout and the related
dfs.image.transfer.bandwidthPerSec parameter should be configured such
that normal image transfer can complete within the timeout.
that normal image transfer can complete successfully.
This timeout prevents client hangs when the sender fails during
image transfer, which is particularly important during checkpointing.
Note that this timeout applies to the entirety of image transfer, and
is not a socket timeout.
image transfer. This is socket timeout during image tranfer.
</description>
</property>
@ -883,6 +881,16 @@
</description>
</property>
<property>
<name>dfs.image.transfer.chunksize</name>
<value>65536</value>
<description>
Chunksize in bytes to upload the checkpoint.
Chunked streaming is used to avoid internal buffering of contents
of image file of huge size.
</description>
</property>
<property>
<name>dfs.namenode.support.allow.format</name>
<value>true</value>

View File

@ -31,6 +31,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
@ -554,23 +555,9 @@ public class TestCheckpoint {
}
/**
* Simulate a secondary node failure to transfer image
* back to the name-node.
* Used to truncate primary fsimage file.
*/
@Test
public void testSecondaryFailsToReturnImage() throws IOException {
Mockito.doThrow(new IOException("If this exception is not caught by the " +
"name-node, fs image will be truncated."))
.when(faultInjector).aboutToSendFile(filePathContaining("secondary"));
doSecondaryFailsToReturnImage();
}
/**
* Similar to above test, but uses an unchecked Error, and causes it
* before even setting the length header. This used to cause image
* truncation. Regression test for HDFS-3330.
* Simulate a secondary node failure to transfer image. Uses an unchecked
* error and fail transfer before even setting the length header. This used to
* cause image truncation. Regression test for HDFS-3330.
*/
@Test
public void testSecondaryFailsWithErrorBeforeSettingHeaders()
@ -1976,6 +1963,13 @@ public class TestCheckpoint {
.when(dstImage).getFiles(
Mockito.<NameNodeDirType>anyObject(), Mockito.anyString());
File mockImageFile = File.createTempFile("image", "");
FileOutputStream imageFile = new FileOutputStream(mockImageFile);
imageFile.write("data".getBytes());
imageFile.close();
Mockito.doReturn(mockImageFile).when(dstImage)
.findImageFile(Mockito.any(NameNodeFile.class), Mockito.anyLong());
Mockito.doReturn(new StorageInfo(1, 1, "X", 1, NodeType.NAME_NODE).toColonSeparatedString())
.when(dstImage).toColonSeparatedString();
@ -1996,8 +1990,8 @@ public class TestCheckpoint {
}
try {
TransferFsImage.uploadImageFromStorage(fsName, new URL(
"http://localhost:1234"), dstImage, NameNodeFile.IMAGE, 0);
TransferFsImage.uploadImageFromStorage(fsName, conf, dstImage,
NameNodeFile.IMAGE, 0);
fail("Storage info was not verified");
} catch (IOException ioe) {
String msg = StringUtils.stringifyException(ioe);

View File

@ -69,7 +69,7 @@ public class TestGetImageServlet {
Mockito.when(context.getAttribute(HttpServer2.ADMINS_ACL)).thenReturn(acls);
// Make sure that NN2 is considered a valid fsimage/edits requestor.
assertTrue(GetImageServlet.isValidRequestor(context,
assertTrue(ImageServlet.isValidRequestor(context,
"hdfs/host2@TEST-REALM.COM", conf));
// Mark atm as an admin.
@ -81,15 +81,15 @@ public class TestGetImageServlet {
}))).thenReturn(true);
// Make sure that NN2 is still considered a valid requestor.
assertTrue(GetImageServlet.isValidRequestor(context,
assertTrue(ImageServlet.isValidRequestor(context,
"hdfs/host2@TEST-REALM.COM", conf));
// Make sure an admin is considered a valid requestor.
assertTrue(GetImageServlet.isValidRequestor(context,
assertTrue(ImageServlet.isValidRequestor(context,
"atm@TEST-REALM.COM", conf));
// Make sure other users are *not* considered valid requestors.
assertFalse(GetImageServlet.isValidRequestor(context,
assertFalse(ImageServlet.isValidRequestor(context,
"todd@TEST-REALM.COM", conf));
}
}

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.URL;
@ -34,9 +35,11 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.HttpServerFunctionalTest;
import org.apache.hadoop.test.PathUtils;
@ -118,10 +121,11 @@ public class TestTransferFsImage {
* Test to verify the read timeout
*/
@Test(timeout = 5000)
public void testImageTransferTimeout() throws Exception {
public void testGetImageTimeout() throws Exception {
HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
try {
testServer.addServlet("GetImage", "/getimage", TestGetImageServlet.class);
testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
TestImageTransferServlet.class);
testServer.start();
URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
TransferFsImage.timeout = 2000;
@ -139,7 +143,48 @@ public class TestTransferFsImage {
}
}
public static class TestGetImageServlet extends HttpServlet {
/**
* Test to verify the timeout of Image upload
*/
@Test(timeout = 10000)
public void testImageUploadTimeout() throws Exception {
Configuration conf = new HdfsConfiguration();
NNStorage mockStorage = Mockito.mock(NNStorage.class);
HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
try {
testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
TestImageTransferServlet.class);
testServer.start();
URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
// set the timeout here, otherwise it will take default.
TransferFsImage.timeout = 2000;
File tmpDir = new File(new FileSystemTestHelper().getTestRootDir());
tmpDir.mkdirs();
File mockImageFile = File.createTempFile("image", "", tmpDir);
FileOutputStream imageFile = new FileOutputStream(mockImageFile);
imageFile.write("data".getBytes());
imageFile.close();
Mockito.when(
mockStorage.findImageFile(Mockito.any(NameNodeFile.class),
Mockito.anyLong())).thenReturn(mockImageFile);
Mockito.when(mockStorage.toColonSeparatedString()).thenReturn(
"storage:info:string");
try {
TransferFsImage.uploadImageFromStorage(serverURL, conf, mockStorage,
NameNodeFile.IMAGE, 1L);
fail("TransferImage Should fail with timeout");
} catch (SocketTimeoutException e) {
assertEquals("Upload should timeout", "Read timed out", e.getMessage());
}
} finally {
testServer.stop();
}
}
public static class TestImageTransferServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
@Override
@ -153,5 +198,17 @@ public class TestTransferFsImage {
}
}
}
@Override
protected void doPut(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
synchronized (this) {
try {
wait(5000);
} catch (InterruptedException e) {
// Ignore
}
}
}
}
}