diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2e02d7b7527..0b1649c6ff3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -122,6 +122,9 @@ Trunk (Unreleased) HDFS-5138. Support HDFS upgrade in HA. (atm via todd) + HDFS-3405. Checkpointing should use HTTP POST or PUT instead of GET-GET + to send merged fsimages. (Vinayakumar B via wang) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 8dc17fc3711..378c2cd0ff0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -460,7 +460,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // Image transfer timeout public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout"; - public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 10 * 60 * 1000; + public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000; + + // Image transfer chunksize + public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize"; + public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024; //Keys with no defaults public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java index 25716706932..e9387d7a0e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; -import org.apache.hadoop.hdfs.server.namenode.GetImageServlet; +import org.apache.hadoop.hdfs.server.namenode.ImageServlet; import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; @@ -205,15 +205,16 @@ public void doGet(final HttpServletRequest request, return; } editFile = elf.getFile(); - GetImageServlet.setVerificationHeaders(response, editFile); - GetImageServlet.setFileNameHeaders(response, editFile); + ImageServlet.setVerificationHeadersForGet(response, editFile); + ImageServlet.setFileNameHeaders(response, editFile); editFileIn = new FileInputStream(editFile); } - DataTransferThrottler throttler = GetImageServlet.getThrottler(conf); + DataTransferThrottler throttler = ImageServlet.getThrottler(conf); // send edits - TransferFsImage.getFileServer(response, editFile, editFileIn, throttler); + TransferFsImage.copyFileToStream(response.getOutputStream(), editFile, + editFileIn, throttler); } catch (Throwable t) { String errMsg = "getedit failed. " + StringUtils.stringifyException(t); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java index ca2d07015f9..c66574cdcaa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java @@ -263,8 +263,7 @@ void doCheckpoint() throws IOException { } if(cpCmd.needToReturnImage()) { - TransferFsImage.uploadImageFromStorage( - backupNode.nnHttpAddress, getImageListenAddress(), + TransferFsImage.uploadImageFromStorage(backupNode.nnHttpAddress, conf, bnStorage, NameNodeFile.IMAGE, txid); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java index 3c234434653..e69de29bb2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java @@ -1,481 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import static org.apache.hadoop.util.Time.now; - -import java.security.PrivilegedExceptionAction; -import java.util.*; -import java.io.*; -import java.net.InetSocketAddress; -import java.net.URL; - -import javax.servlet.ServletContext; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HAUtil; -import org.apache.hadoop.hdfs.server.common.JspHelper; -import org.apache.hadoop.hdfs.server.common.Storage; -import org.apache.hadoop.hdfs.server.common.StorageInfo; -import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; -import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; -import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; -import org.apache.hadoop.hdfs.util.DataTransferThrottler; -import org.apache.hadoop.hdfs.util.MD5FileUtils; -import org.apache.hadoop.http.HttpServer2; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.MD5Hash; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.ServletUtil; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.net.InetAddresses; - -/** - * This class is used in Namesystem's jetty to retrieve a file. - * Typically used by the Secondary NameNode to retrieve image and - * edit file for periodic checkpointing. - */ -@InterfaceAudience.Private -public class GetImageServlet extends HttpServlet { - private static final long serialVersionUID = -7669068179452648952L; - - private static final Log LOG = LogFactory.getLog(GetImageServlet.class); - - public final static String CONTENT_DISPOSITION = "Content-Disposition"; - public final static String HADOOP_IMAGE_EDITS_HEADER = "X-Image-Edits-Name"; - - private static final String TXID_PARAM = "txid"; - private static final String START_TXID_PARAM = "startTxId"; - private static final String END_TXID_PARAM = "endTxId"; - private static final String STORAGEINFO_PARAM = "storageInfo"; - private static final String LATEST_FSIMAGE_VALUE = "latest"; - private static final String IMAGE_FILE_TYPE = "imageFile"; - - private static Set currentlyDownloadingCheckpoints = - Collections.synchronizedSet(new HashSet()); - - @Override - public void doGet(final HttpServletRequest request, - final HttpServletResponse response - ) throws ServletException, IOException { - try { - final ServletContext context = getServletContext(); - final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context); - final GetImageParams parsedParams = new GetImageParams(request, response); - final Configuration conf = (Configuration) context - .getAttribute(JspHelper.CURRENT_CONF); - final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); - - if (UserGroupInformation.isSecurityEnabled() && - !isValidRequestor(context, request.getUserPrincipal().getName(), conf)) { - response.sendError(HttpServletResponse.SC_FORBIDDEN, - "Only Namenode, Secondary Namenode, and administrators may access " + - "this servlet"); - LOG.warn("Received non-NN/SNN/administrator request for image or edits from " - + request.getUserPrincipal().getName() + " at " + request.getRemoteHost()); - return; - } - - String myStorageInfoString = nnImage.getStorage().toColonSeparatedString(); - String theirStorageInfoString = parsedParams.getStorageInfoString(); - if (theirStorageInfoString != null && - !myStorageInfoString.equals(theirStorageInfoString)) { - response.sendError(HttpServletResponse.SC_FORBIDDEN, - "This namenode has storage info " + myStorageInfoString + - " but the secondary expected " + theirStorageInfoString); - LOG.warn("Received an invalid request file transfer request " + - "from a secondary with storage info " + theirStorageInfoString); - return; - } - - UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - if (parsedParams.isGetImage()) { - long txid = parsedParams.getTxId(); - File imageFile = null; - String errorMessage = "Could not find image"; - if (parsedParams.shouldFetchLatest()) { - imageFile = nnImage.getStorage().getHighestFsImageName(); - } else { - errorMessage += " with txid " + txid; - imageFile = nnImage.getStorage().getFsImage(txid, - EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK)); - } - if (imageFile == null) { - throw new IOException(errorMessage); - } - CheckpointFaultInjector.getInstance().beforeGetImageSetsHeaders(); - long start = now(); - serveFile(imageFile); - - if (metrics != null) { // Metrics non-null only when used inside name node - long elapsed = now() - start; - metrics.addGetImage(elapsed); - } - } else if (parsedParams.isGetEdit()) { - long startTxId = parsedParams.getStartTxId(); - long endTxId = parsedParams.getEndTxId(); - - File editFile = nnImage.getStorage() - .findFinalizedEditsFile(startTxId, endTxId); - long start = now(); - serveFile(editFile); - - if (metrics != null) { // Metrics non-null only when used inside name node - long elapsed = now() - start; - metrics.addGetEdit(elapsed); - } - } else if (parsedParams.isPutImage()) { - final long txid = parsedParams.getTxId(); - final NameNodeFile nnf = parsedParams.getNameNodeFile(); - - if (! currentlyDownloadingCheckpoints.add(txid)) { - response.sendError(HttpServletResponse.SC_CONFLICT, - "Another checkpointer is already in the process of uploading a" + - " checkpoint made at transaction ID " + txid); - return null; - } - - try { - if (nnImage.getStorage().findImageFile(nnf, txid) != null) { - response.sendError(HttpServletResponse.SC_CONFLICT, - "Another checkpointer already uploaded an checkpoint " + - "for txid " + txid); - return null; - } - - // We may have lost our ticket since last checkpoint, log in again, just in case - if (UserGroupInformation.isSecurityEnabled()) { - UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab(); - } - - long start = now(); - // issue a HTTP get request to download the new fsimage - MD5Hash downloadImageDigest = TransferFsImage - .downloadImageToStorage(parsedParams.getInfoServer(conf), - txid, nnImage.getStorage(), true); - nnImage.saveDigestAndRenameCheckpointImage(nnf, txid, - downloadImageDigest); - if (nnf == NameNodeFile.IMAGE_ROLLBACK) { - NameNodeHttpServer.getNameNodeFromContext(context) - .getNamesystem().setCreatedRollbackImages(true); - } - - if (metrics != null) { // Metrics non-null only when used inside name node - long elapsed = now() - start; - metrics.addPutImage(elapsed); - } - - // Now that we have a new checkpoint, we might be able to - // remove some old ones. - nnImage.purgeOldStorage(nnf); - } finally { - currentlyDownloadingCheckpoints.remove(txid); - } - } - return null; - } - - private void serveFile(File file) throws IOException { - FileInputStream fis = new FileInputStream(file); - try { - setVerificationHeaders(response, file); - setFileNameHeaders(response, file); - if (!file.exists()) { - // Potential race where the file was deleted while we were in the - // process of setting headers! - throw new FileNotFoundException(file.toString()); - // It's possible the file could be deleted after this point, but - // we've already opened the 'fis' stream. - // It's also possible length could change, but this would be - // detected by the client side as an inaccurate length header. - } - // send file - TransferFsImage.getFileServer(response, file, fis, - getThrottler(conf)); - } finally { - IOUtils.closeStream(fis); - } - } - }); - - } catch (Throwable t) { - String errMsg = "GetImage failed. " + StringUtils.stringifyException(t); - response.sendError(HttpServletResponse.SC_GONE, errMsg); - throw new IOException(errMsg); - } finally { - response.getOutputStream().close(); - } - } - - public static void setFileNameHeaders(HttpServletResponse response, - File file) { - response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" + - file.getName()); - response.setHeader(HADOOP_IMAGE_EDITS_HEADER, file.getName()); - } - - /** - * Construct a throttler from conf - * @param conf configuration - * @return a data transfer throttler - */ - public final static DataTransferThrottler getThrottler(Configuration conf) { - long transferBandwidth = - conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, - DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT); - DataTransferThrottler throttler = null; - if (transferBandwidth > 0) { - throttler = new DataTransferThrottler(transferBandwidth); - } - return throttler; - } - - @VisibleForTesting - static boolean isValidRequestor(ServletContext context, String remoteUser, - Configuration conf) throws IOException { - if(remoteUser == null) { // This really shouldn't happen... - LOG.warn("Received null remoteUser while authorizing access to getImage servlet"); - return false; - } - - Set validRequestors = new HashSet(); - - validRequestors.add( - SecurityUtil.getServerPrincipal(conf - .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), NameNode - .getAddress(conf).getHostName())); - validRequestors.add( - SecurityUtil.getServerPrincipal(conf - .get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY), - SecondaryNameNode.getHttpAddress(conf).getHostName())); - - if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) { - Configuration otherNnConf = HAUtil.getConfForOtherNode(conf); - validRequestors.add( - SecurityUtil.getServerPrincipal(otherNnConf - .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), - NameNode.getAddress(otherNnConf).getHostName())); - } - - for(String v : validRequestors) { - if(v != null && v.equals(remoteUser)) { - LOG.info("GetImageServlet allowing checkpointer: " + remoteUser); - return true; - } - } - - if (HttpServer2.userHasAdministratorAccess(context, remoteUser)) { - LOG.info("GetImageServlet allowing administrator: " + remoteUser); - return true; - } - - LOG.info("GetImageServlet rejecting: " + remoteUser); - return false; - } - - /** - * Set headers for content length, and, if available, md5. - * @throws IOException - */ - public static void setVerificationHeaders(HttpServletResponse response, File file) - throws IOException { - response.setHeader(TransferFsImage.CONTENT_LENGTH, - String.valueOf(file.length())); - MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file); - if (hash != null) { - response.setHeader(TransferFsImage.MD5_HEADER, hash.toString()); - } - } - - static String getParamStringForMostRecentImage() { - return "getimage=1&" + TXID_PARAM + "=" + LATEST_FSIMAGE_VALUE; - } - - static String getParamStringForImage(NameNodeFile nnf, long txid, - StorageInfo remoteStorageInfo) { - final String imageType = nnf == null ? "" : "&" + IMAGE_FILE_TYPE + "=" - + nnf.name(); - return "getimage=1&" + TXID_PARAM + "=" + txid - + imageType - + "&" + STORAGEINFO_PARAM + "=" + - remoteStorageInfo.toColonSeparatedString(); - } - - static String getParamStringForLog(RemoteEditLog log, - StorageInfo remoteStorageInfo) { - return "getedit=1&" + START_TXID_PARAM + "=" + log.getStartTxId() - + "&" + END_TXID_PARAM + "=" + log.getEndTxId() - + "&" + STORAGEINFO_PARAM + "=" + - remoteStorageInfo.toColonSeparatedString(); - } - - static String getParamStringToPutImage(NameNodeFile nnf, long txid, - URL url, Storage storage) { - InetSocketAddress imageListenAddress = NetUtils.createSocketAddr(url - .getAuthority()); - String machine = !imageListenAddress.isUnresolved() - && imageListenAddress.getAddress().isAnyLocalAddress() ? null - : imageListenAddress.getHostName(); - return "putimage=1" + - "&" + TXID_PARAM + "=" + txid + - "&" + IMAGE_FILE_TYPE + "=" + nnf.name() + - "&port=" + imageListenAddress.getPort() + - (machine != null ? "&machine=" + machine : "") - + "&" + STORAGEINFO_PARAM + "=" + - storage.toColonSeparatedString(); - } - - - static class GetImageParams { - private boolean isGetImage; - private boolean isGetEdit; - private boolean isPutImage; - private int remoteport; - private String machineName; - private NameNodeFile nnf; - private long startTxId, endTxId, txId; - private String storageInfoString; - private boolean fetchLatest; - - /** - * @param request the object from which this servlet reads the url contents - * @param response the object into which this servlet writes the url contents - * @throws IOException if the request is bad - */ - public GetImageParams(HttpServletRequest request, - HttpServletResponse response - ) throws IOException { - @SuppressWarnings("unchecked") - Map pmap = request.getParameterMap(); - isGetImage = isGetEdit = isPutImage = fetchLatest = false; - remoteport = 0; - - for (Map.Entry entry : pmap.entrySet()) { - String key = entry.getKey(); - String[] val = entry.getValue(); - if (key.equals("getimage")) { - isGetImage = true; - try { - txId = ServletUtil.parseLongParam(request, TXID_PARAM); - String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE); - nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile - .valueOf(imageType); - } catch (NumberFormatException nfe) { - if (request.getParameter(TXID_PARAM).equals(LATEST_FSIMAGE_VALUE)) { - fetchLatest = true; - } else { - throw nfe; - } - } - } else if (key.equals("getedit")) { - isGetEdit = true; - startTxId = ServletUtil.parseLongParam(request, START_TXID_PARAM); - endTxId = ServletUtil.parseLongParam(request, END_TXID_PARAM); - } else if (key.equals("putimage")) { - isPutImage = true; - txId = ServletUtil.parseLongParam(request, TXID_PARAM); - String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE); - nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile - .valueOf(imageType); - } else if (key.equals("port")) { - remoteport = new Integer(val[0]).intValue(); - } else if (key.equals("machine")) { - machineName = val[0]; - } else if (key.equals(STORAGEINFO_PARAM)) { - storageInfoString = val[0]; - } - } - - if (machineName == null) { - machineName = request.getRemoteHost(); - if (InetAddresses.isInetAddress(machineName)) { - machineName = NetUtils.getHostNameOfIP(machineName); - } - } - - int numGets = (isGetImage?1:0) + (isGetEdit?1:0); - if ((numGets > 1) || (numGets == 0) && !isPutImage) { - throw new IOException("Illegal parameters to TransferFsImage"); - } - } - - public String getStorageInfoString() { - return storageInfoString; - } - - public long getTxId() { - Preconditions.checkState(isGetImage || isPutImage); - return txId; - } - - public NameNodeFile getNameNodeFile() { - Preconditions.checkState(isPutImage || isGetImage); - return nnf; - } - - public long getStartTxId() { - Preconditions.checkState(isGetEdit); - return startTxId; - } - - public long getEndTxId() { - Preconditions.checkState(isGetEdit); - return endTxId; - } - - boolean isGetEdit() { - return isGetEdit; - } - - boolean isGetImage() { - return isGetImage; - } - - boolean isPutImage() { - return isPutImage; - } - - URL getInfoServer(Configuration conf) throws IOException { - if (machineName == null || remoteport == 0) { - throw new IOException("MachineName and port undefined"); - } - return new URL(DFSUtil.getHttpClientScheme(conf), machineName, remoteport, ""); - } - - boolean shouldFetchLatest() { - return fetchLatest; - } - - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java new file mode 100644 index 00000000000..b1f127de0e1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java @@ -0,0 +1,544 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.apache.hadoop.util.Time.now; + +import java.net.HttpURLConnection; +import java.security.PrivilegedExceptionAction; +import java.util.*; +import java.io.*; + +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; +import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.hdfs.util.MD5FileUtils; +import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ServletUtil; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * This class is used in Namesystem's jetty to retrieve/upload a file + * Typically used by the Secondary NameNode to retrieve image and + * edit file for periodic checkpointing in Non-HA deployments. + * Standby NameNode uses to upload checkpoints in HA deployments. + */ +@InterfaceAudience.Private +public class ImageServlet extends HttpServlet { + + public static final String PATH_SPEC = "/imagetransfer"; + + private static final long serialVersionUID = -7669068179452648952L; + + private static final Log LOG = LogFactory.getLog(ImageServlet.class); + + public final static String CONTENT_DISPOSITION = "Content-Disposition"; + public final static String HADOOP_IMAGE_EDITS_HEADER = "X-Image-Edits-Name"; + + private static final String TXID_PARAM = "txid"; + private static final String START_TXID_PARAM = "startTxId"; + private static final String END_TXID_PARAM = "endTxId"; + private static final String STORAGEINFO_PARAM = "storageInfo"; + private static final String LATEST_FSIMAGE_VALUE = "latest"; + private static final String IMAGE_FILE_TYPE = "imageFile"; + + private static Set currentlyDownloadingCheckpoints = + Collections.synchronizedSet(new HashSet()); + + @Override + public void doGet(final HttpServletRequest request, + final HttpServletResponse response) throws ServletException, IOException { + try { + final ServletContext context = getServletContext(); + final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context); + final GetImageParams parsedParams = new GetImageParams(request, response); + final Configuration conf = (Configuration) context + .getAttribute(JspHelper.CURRENT_CONF); + final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); + + validateRequest(context, conf, request, response, nnImage, + parsedParams.getStorageInfoString()); + + UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + if (parsedParams.isGetImage()) { + long txid = parsedParams.getTxId(); + File imageFile = null; + String errorMessage = "Could not find image"; + if (parsedParams.shouldFetchLatest()) { + imageFile = nnImage.getStorage().getHighestFsImageName(); + } else { + errorMessage += " with txid " + txid; + imageFile = nnImage.getStorage().getFsImage(txid, + EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK)); + } + if (imageFile == null) { + throw new IOException(errorMessage); + } + CheckpointFaultInjector.getInstance().beforeGetImageSetsHeaders(); + long start = now(); + serveFile(imageFile); + + if (metrics != null) { // Metrics non-null only when used inside name node + long elapsed = now() - start; + metrics.addGetImage(elapsed); + } + } else if (parsedParams.isGetEdit()) { + long startTxId = parsedParams.getStartTxId(); + long endTxId = parsedParams.getEndTxId(); + + File editFile = nnImage.getStorage() + .findFinalizedEditsFile(startTxId, endTxId); + long start = now(); + serveFile(editFile); + + if (metrics != null) { // Metrics non-null only when used inside name node + long elapsed = now() - start; + metrics.addGetEdit(elapsed); + } + } + return null; + } + + private void serveFile(File file) throws IOException { + FileInputStream fis = new FileInputStream(file); + try { + setVerificationHeadersForGet(response, file); + setFileNameHeaders(response, file); + if (!file.exists()) { + // Potential race where the file was deleted while we were in the + // process of setting headers! + throw new FileNotFoundException(file.toString()); + // It's possible the file could be deleted after this point, but + // we've already opened the 'fis' stream. + // It's also possible length could change, but this would be + // detected by the client side as an inaccurate length header. + } + // send file + TransferFsImage.copyFileToStream(response.getOutputStream(), + file, fis, getThrottler(conf)); + } finally { + IOUtils.closeStream(fis); + } + } + }); + + } catch (Throwable t) { + String errMsg = "GetImage failed. " + StringUtils.stringifyException(t); + response.sendError(HttpServletResponse.SC_GONE, errMsg); + throw new IOException(errMsg); + } finally { + response.getOutputStream().close(); + } + } + + private void validateRequest(ServletContext context, Configuration conf, + HttpServletRequest request, HttpServletResponse response, + FSImage nnImage, String theirStorageInfoString) throws IOException { + + if (UserGroupInformation.isSecurityEnabled() + && !isValidRequestor(context, request.getUserPrincipal().getName(), + conf)) { + String errorMsg = "Only Namenode, Secondary Namenode, and administrators may access " + + "this servlet"; + response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg); + LOG.warn("Received non-NN/SNN/administrator request for image or edits from " + + request.getUserPrincipal().getName() + + " at " + + request.getRemoteHost()); + throw new IOException(errorMsg); + } + + String myStorageInfoString = nnImage.getStorage().toColonSeparatedString(); + if (theirStorageInfoString != null + && !myStorageInfoString.equals(theirStorageInfoString)) { + String errorMsg = "This namenode has storage info " + myStorageInfoString + + " but the secondary expected " + theirStorageInfoString; + response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg); + LOG.warn("Received an invalid request file transfer request " + + "from a secondary with storage info " + theirStorageInfoString); + throw new IOException(errorMsg); + } + } + + public static void setFileNameHeaders(HttpServletResponse response, + File file) { + response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" + + file.getName()); + response.setHeader(HADOOP_IMAGE_EDITS_HEADER, file.getName()); + } + + /** + * Construct a throttler from conf + * @param conf configuration + * @return a data transfer throttler + */ + public final static DataTransferThrottler getThrottler(Configuration conf) { + long transferBandwidth = + conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, + DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT); + DataTransferThrottler throttler = null; + if (transferBandwidth > 0) { + throttler = new DataTransferThrottler(transferBandwidth); + } + return throttler; + } + + @VisibleForTesting + static boolean isValidRequestor(ServletContext context, String remoteUser, + Configuration conf) throws IOException { + if (remoteUser == null) { // This really shouldn't happen... + LOG.warn("Received null remoteUser while authorizing access to getImage servlet"); + return false; + } + + Set validRequestors = new HashSet(); + + validRequestors.add(SecurityUtil.getServerPrincipal(conf + .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), + NameNode.getAddress(conf).getHostName())); + validRequestors.add(SecurityUtil.getServerPrincipal( + conf.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY), + SecondaryNameNode.getHttpAddress(conf).getHostName())); + + if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) { + Configuration otherNnConf = HAUtil.getConfForOtherNode(conf); + validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf + .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), + NameNode.getAddress(otherNnConf).getHostName())); + } + + for (String v : validRequestors) { + if (v != null && v.equals(remoteUser)) { + LOG.info("ImageServlet allowing checkpointer: " + remoteUser); + return true; + } + } + + if (HttpServer2.userHasAdministratorAccess(context, remoteUser)) { + LOG.info("ImageServlet allowing administrator: " + remoteUser); + return true; + } + + LOG.info("ImageServlet rejecting: " + remoteUser); + return false; + } + + /** + * Set headers for content length, and, if available, md5. + * @throws IOException + */ + public static void setVerificationHeadersForGet(HttpServletResponse response, + File file) throws IOException { + response.setHeader(TransferFsImage.CONTENT_LENGTH, + String.valueOf(file.length())); + MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file); + if (hash != null) { + response.setHeader(TransferFsImage.MD5_HEADER, hash.toString()); + } + } + + static String getParamStringForMostRecentImage() { + return "getimage=1&" + TXID_PARAM + "=" + LATEST_FSIMAGE_VALUE; + } + + static String getParamStringForImage(NameNodeFile nnf, long txid, + StorageInfo remoteStorageInfo) { + final String imageType = nnf == null ? "" : "&" + IMAGE_FILE_TYPE + "=" + + nnf.name(); + return "getimage=1&" + TXID_PARAM + "=" + txid + + imageType + + "&" + STORAGEINFO_PARAM + "=" + + remoteStorageInfo.toColonSeparatedString(); + } + + static String getParamStringForLog(RemoteEditLog log, + StorageInfo remoteStorageInfo) { + return "getedit=1&" + START_TXID_PARAM + "=" + log.getStartTxId() + + "&" + END_TXID_PARAM + "=" + log.getEndTxId() + + "&" + STORAGEINFO_PARAM + "=" + + remoteStorageInfo.toColonSeparatedString(); + } + + static class GetImageParams { + private boolean isGetImage; + private boolean isGetEdit; + private NameNodeFile nnf; + private long startTxId, endTxId, txId; + private String storageInfoString; + private boolean fetchLatest; + + /** + * @param request the object from which this servlet reads the url contents + * @param response the object into which this servlet writes the url contents + * @throws IOException if the request is bad + */ + public GetImageParams(HttpServletRequest request, + HttpServletResponse response + ) throws IOException { + @SuppressWarnings("unchecked") + Map pmap = request.getParameterMap(); + isGetImage = isGetEdit = fetchLatest = false; + + for (Map.Entry entry : pmap.entrySet()) { + String key = entry.getKey(); + String[] val = entry.getValue(); + if (key.equals("getimage")) { + isGetImage = true; + try { + txId = ServletUtil.parseLongParam(request, TXID_PARAM); + String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE); + nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile + .valueOf(imageType); + } catch (NumberFormatException nfe) { + if (request.getParameter(TXID_PARAM).equals(LATEST_FSIMAGE_VALUE)) { + fetchLatest = true; + } else { + throw nfe; + } + } + } else if (key.equals("getedit")) { + isGetEdit = true; + startTxId = ServletUtil.parseLongParam(request, START_TXID_PARAM); + endTxId = ServletUtil.parseLongParam(request, END_TXID_PARAM); + } else if (key.equals(STORAGEINFO_PARAM)) { + storageInfoString = val[0]; + } + } + + int numGets = (isGetImage?1:0) + (isGetEdit?1:0); + if ((numGets > 1) || (numGets == 0)) { + throw new IOException("Illegal parameters to TransferFsImage"); + } + } + + public String getStorageInfoString() { + return storageInfoString; + } + + public long getTxId() { + Preconditions.checkState(isGetImage); + return txId; + } + + public NameNodeFile getNameNodeFile() { + Preconditions.checkState(isGetImage); + return nnf; + } + + public long getStartTxId() { + Preconditions.checkState(isGetEdit); + return startTxId; + } + + public long getEndTxId() { + Preconditions.checkState(isGetEdit); + return endTxId; + } + + boolean isGetEdit() { + return isGetEdit; + } + + boolean isGetImage() { + return isGetImage; + } + + boolean shouldFetchLatest() { + return fetchLatest; + } + + } + + /** + * Set headers for image length and if available, md5. + * + * @throws IOException + */ + static void setVerificationHeadersForPut(HttpURLConnection connection, + File file) throws IOException { + connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH, + String.valueOf(file.length())); + MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file); + if (hash != null) { + connection + .setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString()); + } + } + + /** + * Set the required parameters for uploading image + * + * @param httpMethod instance of method to set the parameters + * @param storage colon separated storageInfo string + * @param txid txid of the image + * @param imageFileSize size of the imagefile to be uploaded + * @param nnf NameNodeFile Type + * @return Returns map of parameters to be used with PUT request. + */ + static Map getParamsForPutImage(Storage storage, long txid, + long imageFileSize, NameNodeFile nnf) { + Map params = new HashMap(); + params.put(TXID_PARAM, Long.toString(txid)); + params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString()); + // setting the length of the file to be uploaded in separate property as + // Content-Length only supports up to 2GB + params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize)); + params.put(IMAGE_FILE_TYPE, nnf.name()); + return params; + } + + @Override + protected void doPut(final HttpServletRequest request, + final HttpServletResponse response) throws ServletException, IOException { + try { + ServletContext context = getServletContext(); + final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context); + final Configuration conf = (Configuration) getServletContext() + .getAttribute(JspHelper.CURRENT_CONF); + final PutImageParams parsedParams = new PutImageParams(request, response, + conf); + final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); + + validateRequest(context, conf, request, response, nnImage, + parsedParams.getStorageInfoString()); + + UserGroupInformation.getCurrentUser().doAs( + new PrivilegedExceptionAction() { + + @Override + public Void run() throws Exception { + + final long txid = parsedParams.getTxId(); + + final NameNodeFile nnf = parsedParams.getNameNodeFile(); + + if (!currentlyDownloadingCheckpoints.add(txid)) { + response.sendError(HttpServletResponse.SC_CONFLICT, + "Another checkpointer is already in the process of uploading a" + + " checkpoint made at transaction ID " + txid); + return null; + } + try { + if (nnImage.getStorage().findImageFile(nnf, txid) != null) { + response.sendError(HttpServletResponse.SC_CONFLICT, + "Another checkpointer already uploaded an checkpoint " + + "for txid " + txid); + return null; + } + + InputStream stream = request.getInputStream(); + try { + long start = now(); + MD5Hash downloadImageDigest = TransferFsImage + .handleUploadImageRequest(request, txid, + nnImage.getStorage(), stream, + parsedParams.getFileSize(), getThrottler(conf)); + nnImage.saveDigestAndRenameCheckpointImage(nnf, txid, + downloadImageDigest); + // Metrics non-null only when used inside name node + if (metrics != null) { + long elapsed = now() - start; + metrics.addPutImage(elapsed); + } + // Now that we have a new checkpoint, we might be able to + // remove some old ones. + nnImage.purgeOldStorage(nnf); + } finally { + stream.close(); + } + } finally { + currentlyDownloadingCheckpoints.remove(txid); + } + return null; + } + + }); + } catch (Throwable t) { + String errMsg = "PutImage failed. " + StringUtils.stringifyException(t); + response.sendError(HttpServletResponse.SC_GONE, errMsg); + throw new IOException(errMsg); + } + } + + /* + * Params required to handle put image request + */ + static class PutImageParams { + private long txId = -1; + private String storageInfoString = null; + private long fileSize = 0L; + private NameNodeFile nnf; + + public PutImageParams(HttpServletRequest request, + HttpServletResponse response, Configuration conf) throws IOException { + txId = ServletUtil.parseLongParam(request, TXID_PARAM); + storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM); + fileSize = ServletUtil.parseLongParam(request, + TransferFsImage.FILE_LENGTH); + String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE); + nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile + .valueOf(imageType); + if (fileSize == 0 || txId == -1 || storageInfoString == null + || storageInfoString.isEmpty()) { + throw new IOException("Illegal parameters to TransferFsImage"); + } + } + + public long getTxId() { + return txId; + } + + public String getStorageInfoString() { + return storageInfoString; + } + + public long getFileSize() { + return fileSize; + } + + public NameNodeFile getNameNodeFile() { + return nnf; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java index 43952be5b61..89772005b59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java @@ -233,8 +233,8 @@ private static void setupServlets(HttpServer2 httpServer, Configuration conf) { CancelDelegationTokenServlet.class, true); httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class, true); - httpServer.addInternalServlet("getimage", "/getimage", - GetImageServlet.class, true); + httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC, + ImageServlet.class, true); httpServer.addInternalServlet("listPaths", "/listPaths/*", ListPathsServlet.class, false); httpServer.addInternalServlet("data", "/data/*", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java index 1b5bf07ed68..a35d362a0d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java @@ -114,7 +114,6 @@ public class SecondaryNameNode implements Runnable { private InetSocketAddress nameNodeAddr; private volatile boolean shouldRun; private HttpServer2 infoServer; - private URL imageListenURL; private Collection checkpointDirs; private List checkpointEditsDirs; @@ -267,13 +266,11 @@ private void initialize(final Configuration conf, infoServer.setAttribute("secondary.name.node", this); infoServer.setAttribute("name.system.image", checkpointImage); infoServer.setAttribute(JspHelper.CURRENT_CONF, conf); - infoServer.addInternalServlet("getimage", "/getimage", - GetImageServlet.class, true); + infoServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC, + ImageServlet.class, true); infoServer.start(); LOG.info("Web server init done"); - imageListenURL = new URL(DFSUtil.getHttpClientScheme(conf) + "://" - + NetUtils.getHostPortString(infoServer.getConnectorAddress(0))); HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf); int connIdx = 0; @@ -487,14 +484,6 @@ private URL getInfoServer() throws IOException { LOG.debug("Will connect to NameNode at " + address); return address.toURL(); } - - /** - * Return the host:port of where this SecondaryNameNode is listening - * for image transfers - */ - private URL getImageListenAddress() { - return imageListenURL; - } /** * Create a new checkpoint @@ -555,8 +544,8 @@ public boolean doCheckpoint() throws IOException { // to make this new uploaded image as the most current image. // long txid = checkpointImage.getLastAppliedTxId(); - TransferFsImage.uploadImageFromStorage(fsName, getImageListenAddress(), - dstStorage, NameNodeFile.IMAGE, txid); + TransferFsImage.uploadImageFromStorage(fsName, conf, dstStorage, + NameNodeFile.IMAGE, txid); // error simulation code for junit test CheckpointFaultInjector.getInstance().afterSecondaryUploadsNewImage(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java index ed0922f069f..07870199d99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java @@ -19,18 +19,22 @@ import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.HttpURLConnection; +import java.net.URISyntaxException; import java.net.URL; import java.security.DigestInputStream; import java.security.MessageDigest; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; -import javax.servlet.ServletOutputStream; -import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.commons.logging.Log; @@ -49,10 +53,12 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.web.URLConnectionFactory; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.util.Time; +import org.apache.http.client.utils.URIBuilder; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -65,7 +71,12 @@ public class TransferFsImage { public final static String CONTENT_LENGTH = "Content-Length"; + public final static String FILE_LENGTH = "File-Length"; public final static String MD5_HEADER = "X-MD5-Digest"; + + private final static String CONTENT_TYPE = "Content-Type"; + private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding"; + @VisibleForTesting static int timeout = 0; private static URLConnectionFactory connectionFactory; @@ -82,14 +93,14 @@ public class TransferFsImage { public static void downloadMostRecentImageToDirectory(URL infoServer, File dir) throws IOException { - String fileId = GetImageServlet.getParamStringForMostRecentImage(); + String fileId = ImageServlet.getParamStringForMostRecentImage(); getFileClient(infoServer, fileId, Lists.newArrayList(dir), null, false); } public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId, Storage dstStorage, boolean needDigest) throws IOException { - String fileid = GetImageServlet.getParamStringForImage(null, + String fileid = ImageServlet.getParamStringForImage(null, imageTxId, dstStorage); String fileName = NNStorage.getCheckpointImageFileName(imageTxId); @@ -104,12 +115,31 @@ public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId, dstFiles.get(0).length() + " bytes."); return hash; } - + + static MD5Hash handleUploadImageRequest(HttpServletRequest request, + long imageTxId, Storage dstStorage, InputStream stream, + long advertisedSize, DataTransferThrottler throttler) throws IOException { + + String fileName = NNStorage.getCheckpointImageFileName(imageTxId); + + List dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName); + if (dstFiles.isEmpty()) { + throw new IOException("No targets in destination storage!"); + } + + MD5Hash advertisedDigest = parseMD5Header(request); + MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true, + advertisedSize, advertisedDigest, fileName, stream, throttler); + LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " + + dstFiles.get(0).length() + " bytes."); + return hash; + } + static void downloadEditsToStorage(URL fsName, RemoteEditLog log, NNStorage dstStorage) throws IOException { assert log.getStartTxId() > 0 && log.getEndTxId() > 0 : "bad log: " + log; - String fileid = GetImageServlet.getParamStringForLog( + String fileid = ImageServlet.getParamStringForLog( log, dstStorage); String finalFileName = NNStorage.getFinalizedEditsFileName( log.getStartTxId(), log.getEndTxId()); @@ -159,22 +189,19 @@ static void downloadEditsToStorage(URL fsName, RemoteEditLog log, * Requests that the NameNode download an image from this node. * * @param fsName the http address for the remote NN - * @param myNNAddress the host/port where the local node is running an - * HTTPServer hosting GetImageServlet + * @param conf Configuration * @param storage the storage directory to transfer the image from * @param nnf the NameNodeFile type of the image * @param txid the transaction ID of the image to be uploaded */ - public static void uploadImageFromStorage(URL fsName, URL myNNAddress, - Storage storage, NameNodeFile nnf, long txid) throws IOException { + public static void uploadImageFromStorage(URL fsName, Configuration conf, + NNStorage storage, NameNodeFile nnf, long txid) throws IOException { - String fileid = GetImageServlet.getParamStringToPutImage(nnf, txid, - myNNAddress, storage); - // this doesn't directly upload an image, but rather asks the NN - // to connect back to the 2NN to download the specified image. + URL url = new URL(fsName, ImageServlet.PATH_SPEC); + long startTime = Time.monotonicNow(); try { - TransferFsImage.getFileClient(fsName, fileid, null, null, false); - } catch (HttpGetFailedException e) { + uploadImage(url, conf, storage, nnf, txid); + } catch (HttpPutFailedException e) { if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) { // this is OK - this means that a previous attempt to upload // this checkpoint succeeded even though we thought it failed. @@ -186,25 +213,105 @@ public static void uploadImageFromStorage(URL fsName, URL myNNAddress, throw e; } } - LOG.info("Uploaded image with txid " + txid + " to namenode at " + - fsName); + double xferSec = Math.max( + ((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001); + LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName + + " in " + xferSec + " seconds"); + } + + /* + * Uploads the imagefile using HTTP PUT method + */ + private static void uploadImage(URL url, Configuration conf, + NNStorage storage, NameNodeFile nnf, long txId) throws IOException { + + File imageFile = storage.findImageFile(nnf, txId); + if (imageFile == null) { + throw new IOException("Could not find image with txid " + txId); + } + + HttpURLConnection connection = null; + try { + URIBuilder uriBuilder = new URIBuilder(url.toURI()); + + // write all params for image upload request as query itself. + // Request body contains the image to be uploaded. + Map params = ImageServlet.getParamsForPutImage(storage, + txId, imageFile.length(), nnf); + for (Entry entry : params.entrySet()) { + uriBuilder.addParameter(entry.getKey(), entry.getValue()); + } + + URL urlWithParams = uriBuilder.build().toURL(); + connection = (HttpURLConnection) connectionFactory.openConnection( + urlWithParams, UserGroupInformation.isSecurityEnabled()); + // Set the request to PUT + connection.setRequestMethod("PUT"); + connection.setDoOutput(true); + + + int chunkSize = conf.getInt( + DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY, + DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT); + if (imageFile.length() > chunkSize) { + // using chunked streaming mode to support upload of 2GB+ files and to + // avoid internal buffering. + // this mode should be used only if more than chunkSize data is present + // to upload. otherwise upload may not happen sometimes. + connection.setChunkedStreamingMode(chunkSize); + } + + setTimeout(connection); + + // set headers for verification + ImageServlet.setVerificationHeadersForPut(connection, imageFile); + + // Write the file to output stream. + writeFileToPutRequest(conf, connection, imageFile); + + int responseCode = connection.getResponseCode(); + if (responseCode != HttpURLConnection.HTTP_OK) { + throw new HttpPutFailedException(connection.getResponseMessage(), + responseCode); + } + } catch (AuthenticationException e) { + throw new IOException(e); + } catch (URISyntaxException e) { + throw new IOException(e); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + } + + private static void writeFileToPutRequest(Configuration conf, + HttpURLConnection connection, File imageFile) + throws FileNotFoundException, IOException { + connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream"); + connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary"); + OutputStream output = connection.getOutputStream(); + FileInputStream input = new FileInputStream(imageFile); + try { + copyFileToStream(output, imageFile, input, + ImageServlet.getThrottler(conf)); + } finally { + IOUtils.closeStream(input); + IOUtils.closeStream(output); + } } - /** * A server-side method to respond to a getfile http request * Copies the contents of the local file into the output stream. */ - public static void getFileServer(ServletResponse response, File localfile, - FileInputStream infile, - DataTransferThrottler throttler) + public static void copyFileToStream(OutputStream out, File localfile, + FileInputStream infile, DataTransferThrottler throttler) throws IOException { byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE]; - ServletOutputStream out = null; try { CheckpointFaultInjector.getInstance() .aboutToSendFile(localfile); - out = response.getOutputStream(); if (CheckpointFaultInjector.getInstance(). shouldSendShortFile(localfile)) { @@ -250,14 +357,13 @@ public static void getFileServer(ServletResponse response, File localfile, static MD5Hash getFileClient(URL infoServer, String queryString, List localPaths, Storage dstStorage, boolean getChecksum) throws IOException { - URL url = new URL(infoServer, "/getimage?" + queryString); + URL url = new URL(infoServer, ImageServlet.PATH_SPEC + "?" + queryString); LOG.info("Opening connection to " + url); return doGetUrl(url, localPaths, dstStorage, getChecksum); } public static MD5Hash doGetUrl(URL url, List localPaths, Storage dstStorage, boolean getChecksum) throws IOException { - long startTime = Time.monotonicNow(); HttpURLConnection connection; try { connection = (HttpURLConnection) @@ -266,16 +372,7 @@ public static MD5Hash doGetUrl(URL url, List localPaths, throw new IOException(e); } - if (timeout <= 0) { - Configuration conf = new HdfsConfiguration(); - timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY, - DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT); - } - - if (timeout > 0) { - connection.setConnectTimeout(timeout); - connection.setReadTimeout(timeout); - } + setTimeout(connection); if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) { throw new HttpGetFailedException( @@ -293,10 +390,37 @@ public static MD5Hash doGetUrl(URL url, List localPaths, throw new IOException(CONTENT_LENGTH + " header is not provided " + "by the namenode when trying to fetch " + url); } - + MD5Hash advertisedDigest = parseMD5Header(connection); + String fsImageName = connection + .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER); + InputStream stream = connection.getInputStream(); + + return receiveFile(url.toExternalForm(), localPaths, dstStorage, + getChecksum, advertisedSize, advertisedDigest, fsImageName, stream, + null); + } + + private static void setTimeout(HttpURLConnection connection) { + if (timeout <= 0) { + Configuration conf = new HdfsConfiguration(); + timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY, + DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT); + LOG.info("Image Transfer timeout configured to " + timeout + + " milliseconds"); + } + + if (timeout > 0) { + connection.setConnectTimeout(timeout); + connection.setReadTimeout(timeout); + } + } + + private static MD5Hash receiveFile(String url, List localPaths, + Storage dstStorage, boolean getChecksum, long advertisedSize, + MD5Hash advertisedDigest, String fsImageName, InputStream stream, + DataTransferThrottler throttler) throws IOException { + long startTime = Time.monotonicNow(); if (localPaths != null) { - String fsImageName = connection.getHeaderField( - GetImageServlet.HADOOP_IMAGE_EDITS_HEADER); // If the local paths refer to directories, use the server-provided header // as the filename within that directory List newLocalPaths = new ArrayList(); @@ -313,10 +437,8 @@ public static MD5Hash doGetUrl(URL url, List localPaths, localPaths = newLocalPaths; } - MD5Hash advertisedDigest = parseMD5Header(connection); long received = 0; - InputStream stream = connection.getInputStream(); MessageDigest digester = null; if (getChecksum) { digester = MD5Hash.getDigester(); @@ -361,6 +483,9 @@ public static MD5Hash doGetUrl(URL url, List localPaths, for (FileOutputStream fos : outputStreams) { fos.write(buf, 0, num); } + if (throttler != null) { + throttler.throttle(num); + } } } finishedReceiving = true; @@ -404,7 +529,12 @@ private static MD5Hash parseMD5Header(HttpURLConnection connection) { String header = connection.getHeaderField(MD5_HEADER); return (header != null) ? new MD5Hash(header) : null; } - + + private static MD5Hash parseMD5Header(HttpServletRequest request) { + String header = request.getHeader(MD5_HEADER); + return (header != null) ? new MD5Hash(header) : null; + } + public static class HttpGetFailedException extends IOException { private static final long serialVersionUID = 1L; private final int responseCode; @@ -419,4 +549,18 @@ public int getResponseCode() { } } + public static class HttpPutFailedException extends IOException { + private static final long serialVersionUID = 1L; + private final int responseCode; + + HttpPutFailedException(String msg, int responseCode) throws IOException { + super(msg); + this.responseCode = responseCode; + } + + public int getResponseCode() { + return responseCode; + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java index 7aa6077a8b7..a80c8774720 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java @@ -63,6 +63,7 @@ public class StandbyCheckpointer { private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class); private static final long PREVENT_AFTER_CANCEL_MS = 2*60*1000L; private final CheckpointConf checkpointConf; + private final Configuration conf; private final FSNamesystem namesystem; private long lastCheckpointTime; private final CheckpointerThread thread; @@ -80,6 +81,7 @@ public class StandbyCheckpointer { public StandbyCheckpointer(Configuration conf, FSNamesystem ns) throws IOException { this.namesystem = ns; + this.conf = conf; this.checkpointConf = new CheckpointConf(conf); this.thread = new CheckpointerThread(); this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true) @@ -193,7 +195,7 @@ private void doCheckpoint() throws InterruptedException, IOException { Future upload = executor.submit(new Callable() { @Override public Void call() throws IOException { - TransferFsImage.uploadImageFromStorage(activeNNAddress, myNNAddress, + TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem.getFSImage().getStorage(), imageType, txid); return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 90e72357bfe..ca5e4b89a14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -858,15 +858,13 @@ dfs.image.transfer.timeout - 600000 + 60000 - Timeout for image transfer in milliseconds. This timeout and the related + Socket timeout for image transfer in milliseconds. This timeout and the related dfs.image.transfer.bandwidthPerSec parameter should be configured such - that normal image transfer can complete within the timeout. + that normal image transfer can complete successfully. This timeout prevents client hangs when the sender fails during - image transfer, which is particularly important during checkpointing. - Note that this timeout applies to the entirety of image transfer, and - is not a socket timeout. + image transfer. This is socket timeout during image tranfer. @@ -883,6 +881,16 @@ + + dfs.image.transfer.chunksize + 65536 + + Chunksize in bytes to upload the checkpoint. + Chunked streaming is used to avoid internal buffering of contents + of image file of huge size. + + + dfs.namenode.support.allow.format true diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java index 735a1fa9076..ca8e4b65401 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java @@ -31,6 +31,7 @@ import static org.junit.Assert.fail; import java.io.File; +import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; import java.io.RandomAccessFile; @@ -554,23 +555,9 @@ public void testSecondaryNamenodeError3() throws IOException { } /** - * Simulate a secondary node failure to transfer image - * back to the name-node. - * Used to truncate primary fsimage file. - */ - @Test - public void testSecondaryFailsToReturnImage() throws IOException { - Mockito.doThrow(new IOException("If this exception is not caught by the " + - "name-node, fs image will be truncated.")) - .when(faultInjector).aboutToSendFile(filePathContaining("secondary")); - - doSecondaryFailsToReturnImage(); - } - - /** - * Similar to above test, but uses an unchecked Error, and causes it - * before even setting the length header. This used to cause image - * truncation. Regression test for HDFS-3330. + * Simulate a secondary node failure to transfer image. Uses an unchecked + * error and fail transfer before even setting the length header. This used to + * cause image truncation. Regression test for HDFS-3330. */ @Test public void testSecondaryFailsWithErrorBeforeSettingHeaders() @@ -1975,7 +1962,14 @@ public void testNamespaceVerifiedOnFileTransfer() throws IOException { Mockito.doReturn(Lists.newArrayList(new File("/wont-be-written"))) .when(dstImage).getFiles( Mockito.anyObject(), Mockito.anyString()); - + + File mockImageFile = File.createTempFile("image", ""); + FileOutputStream imageFile = new FileOutputStream(mockImageFile); + imageFile.write("data".getBytes()); + imageFile.close(); + Mockito.doReturn(mockImageFile).when(dstImage) + .findImageFile(Mockito.any(NameNodeFile.class), Mockito.anyLong()); + Mockito.doReturn(new StorageInfo(1, 1, "X", 1, NodeType.NAME_NODE).toColonSeparatedString()) .when(dstImage).toColonSeparatedString(); @@ -1996,8 +1990,8 @@ public void testNamespaceVerifiedOnFileTransfer() throws IOException { } try { - TransferFsImage.uploadImageFromStorage(fsName, new URL( - "http://localhost:1234"), dstImage, NameNodeFile.IMAGE, 0); + TransferFsImage.uploadImageFromStorage(fsName, conf, dstImage, + NameNodeFile.IMAGE, 0); fail("Storage info was not verified"); } catch (IOException ioe) { String msg = StringUtils.stringifyException(ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java index bffa54f6f29..5249d9fe3e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java @@ -69,7 +69,7 @@ public void testIsValidRequestor() throws IOException { Mockito.when(context.getAttribute(HttpServer2.ADMINS_ACL)).thenReturn(acls); // Make sure that NN2 is considered a valid fsimage/edits requestor. - assertTrue(GetImageServlet.isValidRequestor(context, + assertTrue(ImageServlet.isValidRequestor(context, "hdfs/host2@TEST-REALM.COM", conf)); // Mark atm as an admin. @@ -81,15 +81,15 @@ public boolean matches(Object argument) { }))).thenReturn(true); // Make sure that NN2 is still considered a valid requestor. - assertTrue(GetImageServlet.isValidRequestor(context, + assertTrue(ImageServlet.isValidRequestor(context, "hdfs/host2@TEST-REALM.COM", conf)); // Make sure an admin is considered a valid requestor. - assertTrue(GetImageServlet.isValidRequestor(context, + assertTrue(ImageServlet.isValidRequestor(context, "atm@TEST-REALM.COM", conf)); // Make sure other users are *not* considered valid requestors. - assertFalse(GetImageServlet.isValidRequestor(context, + assertFalse(ImageServlet.isValidRequestor(context, "todd@TEST-REALM.COM", conf)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java index 14d4441b4c2..fd03759c8fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java @@ -22,6 +22,7 @@ import static org.junit.Assert.fail; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.net.SocketTimeoutException; import java.net.URL; @@ -34,9 +35,11 @@ import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.http.HttpServerFunctionalTest; import org.apache.hadoop.test.PathUtils; @@ -118,10 +121,11 @@ public void testClientSideExceptionOnJustOneDir() throws IOException { * Test to verify the read timeout */ @Test(timeout = 5000) - public void testImageTransferTimeout() throws Exception { + public void testGetImageTimeout() throws Exception { HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs"); try { - testServer.addServlet("GetImage", "/getimage", TestGetImageServlet.class); + testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC, + TestImageTransferServlet.class); testServer.start(); URL serverURL = HttpServerFunctionalTest.getServerURL(testServer); TransferFsImage.timeout = 2000; @@ -139,7 +143,48 @@ public void testImageTransferTimeout() throws Exception { } } - public static class TestGetImageServlet extends HttpServlet { + /** + * Test to verify the timeout of Image upload + */ + @Test(timeout = 10000) + public void testImageUploadTimeout() throws Exception { + Configuration conf = new HdfsConfiguration(); + NNStorage mockStorage = Mockito.mock(NNStorage.class); + HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs"); + try { + testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC, + TestImageTransferServlet.class); + testServer.start(); + URL serverURL = HttpServerFunctionalTest.getServerURL(testServer); + // set the timeout here, otherwise it will take default. + TransferFsImage.timeout = 2000; + + File tmpDir = new File(new FileSystemTestHelper().getTestRootDir()); + tmpDir.mkdirs(); + + File mockImageFile = File.createTempFile("image", "", tmpDir); + FileOutputStream imageFile = new FileOutputStream(mockImageFile); + imageFile.write("data".getBytes()); + imageFile.close(); + Mockito.when( + mockStorage.findImageFile(Mockito.any(NameNodeFile.class), + Mockito.anyLong())).thenReturn(mockImageFile); + Mockito.when(mockStorage.toColonSeparatedString()).thenReturn( + "storage:info:string"); + + try { + TransferFsImage.uploadImageFromStorage(serverURL, conf, mockStorage, + NameNodeFile.IMAGE, 1L); + fail("TransferImage Should fail with timeout"); + } catch (SocketTimeoutException e) { + assertEquals("Upload should timeout", "Read timed out", e.getMessage()); + } + } finally { + testServer.stop(); + } + } + + public static class TestImageTransferServlet extends HttpServlet { private static final long serialVersionUID = 1L; @Override @@ -153,5 +198,17 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) } } } + + @Override + protected void doPut(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + synchronized (this) { + try { + wait(5000); + } catch (InterruptedException e) { + // Ignore + } + } + } } }