From 7ec609b28989303fe0cc36812f225028b0251b32 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Mon, 9 Jan 2017 18:00:14 -0800 Subject: [PATCH] HDFS-11273. Move TransferFsImage#doGetUrl function to a Util class. Contributed by Hanisha Koneru. --- .../server/common/HttpGetFailedException.java | 39 +++ .../server/common/HttpPutFailedException.java | 37 +++ .../hadoop/hdfs/server/common/Util.java | 258 ++++++++++++++- .../namenode/EditLogFileInputStream.java | 3 +- .../hdfs/server/namenode/ImageServlet.java | 15 +- .../server/namenode/NameNodeRpcServer.java | 4 +- .../hdfs/server/namenode/TransferFsImage.java | 295 ++---------------- 7 files changed, 366 insertions(+), 285 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java new file mode 100644 index 00000000000..f36aca3466a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.io.IOException; +import java.net.HttpURLConnection; + +@InterfaceAudience.Private +public class HttpGetFailedException extends IOException { + private static final long serialVersionUID = 1L; + private final int responseCode; + + public HttpGetFailedException(String msg, HttpURLConnection connection) + throws IOException { + super(msg); + this.responseCode = connection.getResponseCode(); + } + + public int getResponseCode() { + return responseCode; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java new file mode 100644 index 00000000000..5fad77df93e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.io.IOException; + +@InterfaceAudience.Private +public class HttpPutFailedException extends IOException { + private static final long serialVersionUID = 1L; + private final int responseCode; + + public HttpPutFailedException(String msg, int responseCode) throws IOException { + super(msg); + this.responseCode = responseCode; + } + + public int getResponseCode() { + return responseCode; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java index 6ec686fef5c..f08c3faa708 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java @@ -18,30 +18,66 @@ package org.apache.hadoop.hdfs.server.common; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; +import java.net.URL; +import java.security.DigestInputStream; +import java.security.MessageDigest; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.server.namenode.ImageServlet; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.hdfs.web.URLConnectionFactory; + @InterfaceAudience.Private public final class Util { private final static Log LOG = LogFactory.getLog(Util.class.getName()); + public final static String FILE_LENGTH = "File-Length"; + public final static String CONTENT_LENGTH = "Content-Length"; + public final static String MD5_HEADER = "X-MD5-Digest"; + public final static String CONTENT_TYPE = "Content-Type"; + public final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding"; + + public final static int IO_FILE_BUFFER_SIZE; + private static final boolean isSpnegoEnabled; + public static final URLConnectionFactory connectionFactory; + + static { + Configuration conf = new Configuration(); + connectionFactory = URLConnectionFactory + .newDefaultURLConnectionFactory(conf); + isSpnegoEnabled = UserGroupInformation.isSecurityEnabled(); + IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf); + } + /** * Interprets the passed string as a URI. In case of error it * assumes the specified string is a file. * * @param s the string to interpret - * @return the resulting URI - * @throws IOException + * @return the resulting URI */ - public static URI stringAsURI(String s) throws IOException { + static URI stringAsURI(String s) throws IOException { URI u = null; // try to make a URI try { @@ -67,7 +103,6 @@ public static URI stringAsURI(String s) throws IOException { * * @param f the file to convert * @return the resulting URI - * @throws IOException */ public static URI fileAsURI(File f) throws IOException { URI u = f.getCanonicalFile().toURI(); @@ -92,7 +127,7 @@ public static URI fileAsURI(File f) throws IOException { */ public static List stringCollectionAsURIs( Collection names) { - List uris = new ArrayList(names.size()); + List uris = new ArrayList<>(names.size()); for(String name : names) { try { uris.add(stringAsURI(name)); @@ -102,4 +137,217 @@ public static List stringCollectionAsURIs( } return uris; } + + /** + * Downloads the files at the specified url location into destination + * storage. + */ + public static MD5Hash doGetUrl(URL url, List localPaths, + Storage dstStorage, boolean getChecksum, int timeout) throws IOException { + HttpURLConnection connection; + try { + connection = (HttpURLConnection) + connectionFactory.openConnection(url, isSpnegoEnabled); + } catch (AuthenticationException e) { + throw new IOException(e); + } + + setTimeout(connection, timeout); + + if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) { + throw new HttpGetFailedException("Image transfer servlet at " + url + + " failed with status code " + connection.getResponseCode() + + "\nResponse message:\n" + connection.getResponseMessage(), + connection); + } + + long advertisedSize; + String contentLength = connection.getHeaderField(CONTENT_LENGTH); + if (contentLength != null) { + advertisedSize = Long.parseLong(contentLength); + } else { + throw new IOException(CONTENT_LENGTH + " header is not provided " + + "by the namenode when trying to fetch " + url); + } + MD5Hash advertisedDigest = parseMD5Header(connection); + String fsImageName = connection + .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER); + InputStream stream = connection.getInputStream(); + + return receiveFile(url.toExternalForm(), localPaths, dstStorage, + getChecksum, advertisedSize, advertisedDigest, fsImageName, stream, + null); + } + + /** + * Receives file at the url location from the input stream and puts them in + * the specified destination storage location. + */ + public static MD5Hash receiveFile(String url, List localPaths, + Storage dstStorage, boolean getChecksum, long advertisedSize, + MD5Hash advertisedDigest, String fsImageName, InputStream stream, + DataTransferThrottler throttler) throws + IOException { + long startTime = Time.monotonicNow(); + Map streamPathMap = new HashMap<>(); + StringBuilder xferStats = new StringBuilder(); + double xferCombined = 0; + if (localPaths != null) { + // If the local paths refer to directories, use the server-provided header + // as the filename within that directory + List newLocalPaths = new ArrayList<>(); + for (File localPath : localPaths) { + if (localPath.isDirectory()) { + if (fsImageName == null) { + throw new IOException("No filename header provided by server"); + } + newLocalPaths.add(new File(localPath, fsImageName)); + } else { + newLocalPaths.add(localPath); + } + } + localPaths = newLocalPaths; + } + + + long received = 0; + MessageDigest digester = null; + if (getChecksum) { + digester = MD5Hash.getDigester(); + stream = new DigestInputStream(stream, digester); + } + boolean finishedReceiving = false; + + List outputStreams = Lists.newArrayList(); + + try { + if (localPaths != null) { + for (File f : localPaths) { + try { + if (f.exists()) { + LOG.warn("Overwriting existing file " + f + + " with file downloaded from " + url); + } + FileOutputStream fos = new FileOutputStream(f); + outputStreams.add(fos); + streamPathMap.put(fos, f); + } catch (IOException ioe) { + LOG.warn("Unable to download file " + f, ioe); + // This will be null if we're downloading the fsimage to a file + // outside of an NNStorage directory. + if (dstStorage != null && + (dstStorage instanceof StorageErrorReporter)) { + ((StorageErrorReporter)dstStorage).reportErrorOnFile(f); + } + } + } + + if (outputStreams.isEmpty()) { + throw new IOException( + "Unable to download to any storage directory"); + } + } + + int num = 1; + byte[] buf = new byte[IO_FILE_BUFFER_SIZE]; + while (num > 0) { + num = stream.read(buf); + if (num > 0) { + received += num; + for (FileOutputStream fos : outputStreams) { + fos.write(buf, 0, num); + } + if (throttler != null) { + throttler.throttle(num); + } + } + } + finishedReceiving = true; + double xferSec = Math.max( + ((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001); + long xferKb = received / 1024; + xferCombined += xferSec; + xferStats.append( + String.format(" The fsimage download took %.2fs at %.2f KB/s.", + xferSec, xferKb / xferSec)); + } finally { + stream.close(); + for (FileOutputStream fos : outputStreams) { + long flushStartTime = Time.monotonicNow(); + fos.getChannel().force(true); + fos.close(); + double writeSec = Math.max(((float) + (flushStartTime - Time.monotonicNow())) / 1000.0, 0.001); + xferCombined += writeSec; + xferStats.append(String + .format(" Synchronous (fsync) write to disk of " + + streamPathMap.get(fos).getAbsolutePath() + + " took %.2fs.", writeSec)); + } + + // Something went wrong and did not finish reading. + // Remove the temporary files. + if (!finishedReceiving) { + deleteTmpFiles(localPaths); + } + + if (finishedReceiving && received != advertisedSize) { + // only throw this exception if we think we read all of it on our end + // -- otherwise a client-side IOException would be masked by this + // exception that makes it look like a server-side problem! + deleteTmpFiles(localPaths); + throw new IOException("File " + url + " received length " + received + + " is not of the advertised size " + + advertisedSize); + } + } + xferStats.insert(0, String.format("Combined time for fsimage download and" + + " fsync to all disks took %.2fs.", xferCombined)); + LOG.info(xferStats.toString()); + + if (digester != null) { + MD5Hash computedDigest = new MD5Hash(digester.digest()); + + if (advertisedDigest != null && + !computedDigest.equals(advertisedDigest)) { + deleteTmpFiles(localPaths); + throw new IOException("File " + url + " computed digest " + + computedDigest + " does not match advertised digest " + + advertisedDigest); + } + return computedDigest; + } else { + return null; + } + } + + private static void deleteTmpFiles(List files) { + if (files == null) { + return; + } + + LOG.info("Deleting temporary files: " + files); + for (File file : files) { + if (!file.delete()) { + LOG.warn("Deleting " + file + " has failed"); + } + } + } + + /** + * Sets a timeout value in millisecods for the Http connection. + * @param connection the Http connection for which timeout needs to be set + * @param timeout value to be set as timeout in milliseconds + */ + public static void setTimeout(HttpURLConnection connection, int timeout) { + if (timeout > 0) { + connection.setConnectTimeout(timeout); + connection.setReadTimeout(timeout); + } + } + + private static MD5Hash parseMD5Header(HttpURLConnection connection) { + String header = connection.getHeaderField(MD5_HEADER); + return (header != null) ? new MD5Hash(header) : null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index 48df8d67159..36c2232d3b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -36,9 +36,8 @@ import org.apache.hadoop.hdfs.protocol.LayoutFlags; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.common.HttpGetFailedException; import org.apache.hadoop.hdfs.server.common.Storage; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; -import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException; import org.apache.hadoop.hdfs.web.URLConnectionFactory; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.SecurityUtil; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java index 4e4028dcfaa..7a26df9ef03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import org.apache.hadoop.hdfs.server.common.Util; import static org.apache.hadoop.util.Time.monotonicNow; import java.net.HttpURLConnection; @@ -304,11 +305,12 @@ static boolean isValidRequestor(ServletContext context, String remoteUser, */ public static void setVerificationHeadersForGet(HttpServletResponse response, File file) throws IOException { - response.setHeader(TransferFsImage.CONTENT_LENGTH, + response.setHeader( + Util.CONTENT_LENGTH, String.valueOf(file.length())); MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file); if (hash != null) { - response.setHeader(TransferFsImage.MD5_HEADER, hash.toString()); + response.setHeader(Util.MD5_HEADER, hash.toString()); } } @@ -437,12 +439,13 @@ boolean shouldFetchLatest() { */ static void setVerificationHeadersForPut(HttpURLConnection connection, File file) throws IOException { - connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH, + connection.setRequestProperty( + Util.CONTENT_LENGTH, String.valueOf(file.length())); MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file); if (hash != null) { connection - .setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString()); + .setRequestProperty(Util.MD5_HEADER, hash.toString()); } } @@ -462,7 +465,7 @@ static Map getParamsForPutImage(Storage storage, long txid, params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString()); // setting the length of the file to be uploaded in separate property as // Content-Length only supports up to 2GB - params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize)); + params.put(Util.FILE_LENGTH, Long.toString(imageFileSize)); params.put(IMAGE_FILE_TYPE, nnf.name()); return params; } @@ -586,7 +589,7 @@ public PutImageParams(HttpServletRequest request, txId = ServletUtil.parseLongParam(request, TXID_PARAM); storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM); fileSize = ServletUtil.parseLongParam(request, - TransferFsImage.FILE_LENGTH); + Util.FILE_LENGTH); String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE); nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile .valueOf(imageType); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 0fc3e60f7ee..735b2c06206 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -28,6 +28,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH; + import static org.apache.hadoop.util.Time.now; import java.io.FileNotFoundException; @@ -136,6 +137,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; +import org.apache.hadoop.hdfs.server.common.HttpGetFailedException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; @@ -2104,7 +2106,7 @@ private static FSEditLogOp readOp(EditLogInputStream elis) } catch (FileNotFoundException e) { LOG.debug("Tried to read from deleted or moved edit log segment", e); return null; - } catch (TransferFsImage.HttpGetFailedException e) { + } catch (HttpGetFailedException e) { LOG.debug("Tried to read from deleted edit log segment", e); return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java index e4b95ee0fec..58213530052 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java @@ -19,18 +19,12 @@ import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URISyntaxException; import java.net.URL; -import java.security.DigestInputStream; -import java.security.MessageDigest; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -44,17 +38,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.common.HttpPutFailedException; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; +import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.DataTransferThrottler; -import org.apache.hadoop.hdfs.web.URLConnectionFactory; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.security.UserGroupInformation; @@ -66,6 +59,9 @@ import com.google.common.collect.Lists; import org.eclipse.jetty.io.EofException; +import static org.apache.hadoop.hdfs.server.common.Util.IO_FILE_BUFFER_SIZE; +import static org.apache.hadoop.hdfs.server.common.Util.connectionFactory; + /** * This class provides fetching a specified file from the NameNode. */ @@ -82,43 +78,23 @@ public enum TransferResult{ private final int response; private final boolean shouldReThrowException; - private TransferResult(int response, boolean rethrow) { + TransferResult(int response, boolean rethrow) { this.response = response; this.shouldReThrowException = rethrow; } public static TransferResult getResultForCode(int code){ - TransferResult ret = UNEXPECTED_FAILURE; for(TransferResult result:TransferResult.values()){ if(result.response == code){ return result; } } - return ret; + return UNEXPECTED_FAILURE; } } - public final static String CONTENT_LENGTH = "Content-Length"; - public final static String FILE_LENGTH = "File-Length"; - public final static String MD5_HEADER = "X-MD5-Digest"; - - private final static String CONTENT_TYPE = "Content-Type"; - private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding"; - private final static int IO_FILE_BUFFER_SIZE; - @VisibleForTesting static int timeout = 0; - private static final URLConnectionFactory connectionFactory; - private static final boolean isSpnegoEnabled; - - static { - Configuration conf = new Configuration(); - connectionFactory = URLConnectionFactory - .newDefaultURLConnectionFactory(conf); - isSpnegoEnabled = UserGroupInformation.isSecurityEnabled(); - IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf); - } - private static final Log LOG = LogFactory.getLog(TransferFsImage.class); public static void downloadMostRecentImageToDirectory(URL infoServer, @@ -159,7 +135,7 @@ static MD5Hash handleUploadImageRequest(HttpServletRequest request, } MD5Hash advertisedDigest = parseMD5Header(request); - MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true, + MD5Hash hash = Util.receiveFile(fileName, dstFiles, dstStorage, true, advertisedSize, advertisedDigest, fileName, stream, throttler); LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " + dstFiles.get(0).length() + " bytes."); @@ -226,8 +202,9 @@ static void downloadEditsToStorage(URL fsName, RemoteEditLog log, * @param txid the transaction ID of the image to be uploaded * @throws IOException if there is an I/O error */ - public static TransferResult uploadImageFromStorage(URL fsName, Configuration conf, - NNStorage storage, NameNodeFile nnf, long txid) throws IOException { + static TransferResult uploadImageFromStorage(URL fsName, + Configuration conf, NNStorage storage, NameNodeFile nnf, long txid) + throws IOException { return uploadImageFromStorage(fsName, conf, storage, nnf, txid, null); } @@ -323,9 +300,7 @@ private static void uploadImage(URL url, Configuration conf, responseCode, urlWithParams, connection.getResponseMessage()), responseCode); } - } catch (AuthenticationException e) { - throw new IOException(e); - } catch (URISyntaxException e) { + } catch (AuthenticationException | URISyntaxException e) { throw new IOException(e); } finally { if (connection != null) { @@ -336,9 +311,9 @@ private static void uploadImage(URL url, Configuration conf, private static void writeFileToPutRequest(Configuration conf, HttpURLConnection connection, File imageFile, Canceler canceler) - throws FileNotFoundException, IOException { - connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream"); - connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary"); + throws IOException { + connection.setRequestProperty(Util.CONTENT_TYPE, "application/octet-stream"); + connection.setRequestProperty(Util.CONTENT_TRANSFER_ENCODING, "binary"); OutputStream output = connection.getOutputStream(); FileInputStream input = new FileInputStream(imageFile); try { @@ -414,7 +389,7 @@ private static void copyFileToStream(OutputStream out, File localfile, * Copies the response from the URL to a list of local files. * @param dstStorage if an error occurs writing to one of the files, * this storage object will be notified. - * @Return a digest of the received file if getChecksum is true + * @return a digest of the received file if getChecksum is true */ static MD5Hash getFileClient(URL infoServer, String queryString, List localPaths, @@ -426,40 +401,12 @@ static MD5Hash getFileClient(URL infoServer, public static MD5Hash doGetUrl(URL url, List localPaths, Storage dstStorage, boolean getChecksum) throws IOException { - HttpURLConnection connection; - try { - connection = (HttpURLConnection) - connectionFactory.openConnection(url, isSpnegoEnabled); - } catch (AuthenticationException e) { - throw new IOException(e); - } + return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout); + } - setTimeout(connection); - - if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) { - throw new HttpGetFailedException( - "Image transfer servlet at " + url + - " failed with status code " + connection.getResponseCode() + - "\nResponse message:\n" + connection.getResponseMessage(), - connection); - } - - long advertisedSize; - String contentLength = connection.getHeaderField(CONTENT_LENGTH); - if (contentLength != null) { - advertisedSize = Long.parseLong(contentLength); - } else { - throw new IOException(CONTENT_LENGTH + " header is not provided " + - "by the namenode when trying to fetch " + url); - } - MD5Hash advertisedDigest = parseMD5Header(connection); - String fsImageName = connection - .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER); - InputStream stream = connection.getInputStream(); - - return receiveFile(url.toExternalForm(), localPaths, dstStorage, - getChecksum, advertisedSize, advertisedDigest, fsImageName, stream, - null); + private static MD5Hash parseMD5Header(HttpServletRequest request) { + String header = request.getHeader(Util.MD5_HEADER); + return (header != null) ? new MD5Hash(header) : null; } private static void setTimeout(HttpURLConnection connection) { @@ -467,204 +414,10 @@ private static void setTimeout(HttpURLConnection connection) { Configuration conf = new HdfsConfiguration(); timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY, DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT); - LOG.info("Image Transfer timeout configured to " + timeout - + " milliseconds"); + LOG.info("Image Transfer timeout configured to " + timeout + + " milliseconds"); } - if (timeout > 0) { - connection.setConnectTimeout(timeout); - connection.setReadTimeout(timeout); - } + Util.setTimeout(connection, timeout); } - - private static MD5Hash receiveFile(String url, List localPaths, - Storage dstStorage, boolean getChecksum, long advertisedSize, - MD5Hash advertisedDigest, String fsImageName, InputStream stream, - DataTransferThrottler throttler) throws IOException { - long startTime = Time.monotonicNow(); - Map streamPathMap = new HashMap<>(); - StringBuilder xferStats = new StringBuilder(); - double xferCombined = 0; - if (localPaths != null) { - // If the local paths refer to directories, use the server-provided header - // as the filename within that directory - List newLocalPaths = new ArrayList(); - for (File localPath : localPaths) { - if (localPath.isDirectory()) { - if (fsImageName == null) { - throw new IOException("No filename header provided by server"); - } - newLocalPaths.add(new File(localPath, fsImageName)); - } else { - newLocalPaths.add(localPath); - } - } - localPaths = newLocalPaths; - } - - - long received = 0; - MessageDigest digester = null; - if (getChecksum) { - digester = MD5Hash.getDigester(); - stream = new DigestInputStream(stream, digester); - } - boolean finishedReceiving = false; - - List outputStreams = Lists.newArrayList(); - - try { - if (localPaths != null) { - for (File f : localPaths) { - try { - if (f.exists()) { - LOG.warn("Overwriting existing file " + f - + " with file downloaded from " + url); - } - FileOutputStream fos = new FileOutputStream(f); - outputStreams.add(fos); - streamPathMap.put(fos, f); - } catch (IOException ioe) { - LOG.warn("Unable to download file " + f, ioe); - // This will be null if we're downloading the fsimage to a file - // outside of an NNStorage directory. - if (dstStorage != null && - (dstStorage instanceof StorageErrorReporter)) { - ((StorageErrorReporter)dstStorage).reportErrorOnFile(f); - } - } - } - - if (outputStreams.isEmpty()) { - throw new IOException( - "Unable to download to any storage directory"); - } - } - - int num = 1; - byte[] buf = new byte[IO_FILE_BUFFER_SIZE]; - while (num > 0) { - num = stream.read(buf); - if (num > 0) { - received += num; - for (FileOutputStream fos : outputStreams) { - fos.write(buf, 0, num); - } - if (throttler != null) { - throttler.throttle(num); - } - } - } - finishedReceiving = true; - double xferSec = Math.max( - ((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001); - long xferKb = received / 1024; - xferCombined += xferSec; - xferStats.append( - String.format(" The fsimage download took %.2fs at %.2f KB/s.", - xferSec, xferKb / xferSec)); - } finally { - stream.close(); - for (FileOutputStream fos : outputStreams) { - long flushStartTime = Time.monotonicNow(); - fos.getChannel().force(true); - fos.close(); - double writeSec = Math.max(((float) - (flushStartTime - Time.monotonicNow())) / 1000.0, 0.001); - xferCombined += writeSec; - xferStats.append(String - .format(" Synchronous (fsync) write to disk of " + - streamPathMap.get(fos).getAbsolutePath() + - " took %.2fs.", writeSec)); - } - - // Something went wrong and did not finish reading. - // Remove the temporary files. - if (!finishedReceiving) { - deleteTmpFiles(localPaths); - } - - if (finishedReceiving && received != advertisedSize) { - // only throw this exception if we think we read all of it on our end - // -- otherwise a client-side IOException would be masked by this - // exception that makes it look like a server-side problem! - deleteTmpFiles(localPaths); - throw new IOException("File " + url + " received length " + received + - " is not of the advertised size " + - advertisedSize); - } - } - xferStats.insert( - 0, String.format( - "Combined time for fsimage download and fsync " + - "to all disks took %.2fs.", xferCombined)); - LOG.info(xferStats.toString()); - - if (digester != null) { - MD5Hash computedDigest = new MD5Hash(digester.digest()); - - if (advertisedDigest != null && - !computedDigest.equals(advertisedDigest)) { - deleteTmpFiles(localPaths); - throw new IOException("File " + url + " computed digest " + - computedDigest + " does not match advertised digest " + - advertisedDigest); - } - return computedDigest; - } else { - return null; - } - } - - private static void deleteTmpFiles(List files) { - if (files == null) { - return; - } - - LOG.info("Deleting temporary files: " + files); - for (File file : files) { - if (!file.delete()) { - LOG.warn("Deleting " + file + " has failed"); - } - } - } - - private static MD5Hash parseMD5Header(HttpURLConnection connection) { - String header = connection.getHeaderField(MD5_HEADER); - return (header != null) ? new MD5Hash(header) : null; - } - - private static MD5Hash parseMD5Header(HttpServletRequest request) { - String header = request.getHeader(MD5_HEADER); - return (header != null) ? new MD5Hash(header) : null; - } - - public static class HttpGetFailedException extends IOException { - private static final long serialVersionUID = 1L; - private final int responseCode; - - HttpGetFailedException(String msg, HttpURLConnection connection) throws IOException { - super(msg); - this.responseCode = connection.getResponseCode(); - } - - public int getResponseCode() { - return responseCode; - } - } - - public static class HttpPutFailedException extends IOException { - private static final long serialVersionUID = 1L; - private final int responseCode; - - HttpPutFailedException(String msg, int responseCode) throws IOException { - super(msg); - this.responseCode = responseCode; - } - - public int getResponseCode() { - return responseCode; - } - } - }