HDFS-14696. Backport HDFS-11273 to branch-2 (Move TransferFsImage#doGetUrl function to a Util class) (#1251) Contributed by Siyao Meng.

This commit is contained in:
Siyao Meng 2019-08-08 14:55:41 -07:00 committed by Wei-Chiu Chuang
parent ce12c8fc54
commit 15062b6d28
7 changed files with 374 additions and 281 deletions

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.IOException;
import java.net.HttpURLConnection;
/**
* The exception is thrown when HTTP GET operation has failed.
*
*/
@InterfaceAudience.Private
public class HttpGetFailedException extends IOException {
private static final long serialVersionUID = 1L;
private final int responseCode;
public HttpGetFailedException(String msg, HttpURLConnection connection)
throws IOException {
super(msg);
this.responseCode = connection.getResponseCode();
}
public int getResponseCode() {
return responseCode;
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.IOException;
/**
* The exception is thrown when HTTP PUT operation has failed.
*
*/
@InterfaceAudience.Private
public class HttpPutFailedException extends IOException {
private static final long serialVersionUID = 1L;
private final int responseCode;
public HttpPutFailedException(String msg, int responseCode)
throws IOException {
super(msg);
this.responseCode = responseCode;
}
public int getResponseCode() {
return responseCode;
}
}

View File

@ -23,26 +23,64 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@InterfaceAudience.Private
public final class Util {
private final static Log LOG = LogFactory.getLog(Util.class.getName());
/* Required headers for FSImage transfer. */
public final static String FILE_LENGTH = "File-Length";
public final static String CONTENT_LENGTH = "Content-Length";
public final static String MD5_HEADER = "X-MD5-Digest";
public final static String CONTENT_TYPE = "Content-Type";
public final static String CONTENT_TRANSFER_ENCODING =
"Content-Transfer-Encoding";
public final static int IO_FILE_BUFFER_SIZE;
private static final boolean isSpnegoEnabled;
public static final URLConnectionFactory connectionFactory;
static {
Configuration conf = new Configuration();
connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
}
/**
* Interprets the passed string as a URI. In case of error it
* assumes the specified string is a file.
*
* @param s the string to interpret
* @return the resulting URI
* @throws IOException
* @return the resulting URI
*/
public static URI stringAsURI(String s) throws IOException {
static URI stringAsURI(String s) throws IOException {
URI u = null;
// try to make a URI
try {
@ -67,7 +105,6 @@ public final class Util {
*
* @param f the file to convert
* @return the resulting URI
* @throws IOException
*/
public static URI fileAsURI(File f) throws IOException {
URI u = f.getCanonicalFile().toURI();
@ -92,7 +129,7 @@ public final class Util {
*/
public static List<URI> stringCollectionAsURIs(
Collection<String> names) {
List<URI> uris = new ArrayList<URI>(names.size());
List<URI> uris = new ArrayList<>(names.size());
for(String name : names) {
try {
uris.add(stringAsURI(name));
@ -103,7 +140,6 @@ public final class Util {
return uris;
}
public static boolean isDiskStatsEnabled(int fileIOSamplingPercentage) {
final boolean isEnabled;
if (fileIOSamplingPercentage <= 0) {
@ -120,4 +156,217 @@ public final class Util {
return isEnabled;
}
/**
* Downloads the files at the specified url location into destination
* storage.
*/
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
Storage dstStorage, boolean getChecksum, int timeout) throws IOException {
HttpURLConnection connection;
try {
connection = (HttpURLConnection)
connectionFactory.openConnection(url, isSpnegoEnabled);
} catch (AuthenticationException e) {
throw new IOException(e);
}
setTimeout(connection, timeout);
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new HttpGetFailedException("Image transfer servlet at " + url +
" failed with status code " + connection.getResponseCode() +
"\nResponse message:\n" + connection.getResponseMessage(),
connection);
}
long advertisedSize;
String contentLength = connection.getHeaderField(CONTENT_LENGTH);
if (contentLength != null) {
advertisedSize = Long.parseLong(contentLength);
} else {
throw new IOException(CONTENT_LENGTH + " header is not provided " +
"by the namenode when trying to fetch " + url);
}
MD5Hash advertisedDigest = parseMD5Header(connection);
String fsImageName = connection
.getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
InputStream stream = connection.getInputStream();
return receiveFile(url.toExternalForm(), localPaths, dstStorage,
getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
null);
}
/**
* Receives file at the url location from the input stream and puts them in
* the specified destination storage location.
*/
public static MD5Hash receiveFile(String url, List<File> localPaths,
Storage dstStorage, boolean getChecksum, long advertisedSize,
MD5Hash advertisedDigest, String fsImageName, InputStream stream,
DataTransferThrottler throttler) throws
IOException {
long startTime = Time.monotonicNow();
Map<FileOutputStream, File> streamPathMap = new HashMap<>();
StringBuilder xferStats = new StringBuilder();
double xferCombined = 0;
if (localPaths != null) {
// If the local paths refer to directories, use the server-provided header
// as the filename within that directory
List<File> newLocalPaths = new ArrayList<>();
for (File localPath : localPaths) {
if (localPath.isDirectory()) {
if (fsImageName == null) {
throw new IOException("No filename header provided by server");
}
newLocalPaths.add(new File(localPath, fsImageName));
} else {
newLocalPaths.add(localPath);
}
}
localPaths = newLocalPaths;
}
long received = 0;
MessageDigest digester = null;
if (getChecksum) {
digester = MD5Hash.getDigester();
stream = new DigestInputStream(stream, digester);
}
boolean finishedReceiving = false;
List<FileOutputStream> outputStreams = Lists.newArrayList();
try {
if (localPaths != null) {
for (File f : localPaths) {
try {
if (f.exists()) {
LOG.warn("Overwriting existing file " + f
+ " with file downloaded from " + url);
}
FileOutputStream fos = new FileOutputStream(f);
outputStreams.add(fos);
streamPathMap.put(fos, f);
} catch (IOException ioe) {
LOG.warn("Unable to download file " + f, ioe);
// This will be null if we're downloading the fsimage to a file
// outside of an NNStorage directory.
if (dstStorage != null &&
(dstStorage instanceof StorageErrorReporter)) {
((StorageErrorReporter)dstStorage).reportErrorOnFile(f);
}
}
}
if (outputStreams.isEmpty()) {
throw new IOException(
"Unable to download to any storage directory");
}
}
int num = 1;
byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
while (num > 0) {
num = stream.read(buf);
if (num > 0) {
received += num;
for (FileOutputStream fos : outputStreams) {
fos.write(buf, 0, num);
}
if (throttler != null) {
throttler.throttle(num);
}
}
}
finishedReceiving = true;
double xferSec = Math.max(
((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001);
long xferKb = received / 1024;
xferCombined += xferSec;
xferStats.append(
String.format(" The fsimage download took %.2fs at %.2f KB/s.",
xferSec, xferKb / xferSec));
} finally {
stream.close();
for (FileOutputStream fos : outputStreams) {
long flushStartTime = Time.monotonicNow();
fos.getChannel().force(true);
fos.close();
double writeSec = Math.max(((float)
(flushStartTime - Time.monotonicNow())) / 1000.0, 0.001);
xferCombined += writeSec;
xferStats.append(String
.format(" Synchronous (fsync) write to disk of " +
streamPathMap.get(fos).getAbsolutePath() +
" took %.2fs.", writeSec));
}
// Something went wrong and did not finish reading.
// Remove the temporary files.
if (!finishedReceiving) {
deleteTmpFiles(localPaths);
}
if (finishedReceiving && received != advertisedSize) {
// only throw this exception if we think we read all of it on our end
// -- otherwise a client-side IOException would be masked by this
// exception that makes it look like a server-side problem!
deleteTmpFiles(localPaths);
throw new IOException("File " + url + " received length " + received +
" is not of the advertised size " +
advertisedSize);
}
}
xferStats.insert(0, String.format("Combined time for fsimage download and" +
" fsync to all disks took %.2fs.", xferCombined));
LOG.info(xferStats.toString());
if (digester != null) {
MD5Hash computedDigest = new MD5Hash(digester.digest());
if (advertisedDigest != null &&
!computedDigest.equals(advertisedDigest)) {
deleteTmpFiles(localPaths);
throw new IOException("File " + url + " computed digest " +
computedDigest + " does not match advertised digest " +
advertisedDigest);
}
return computedDigest;
} else {
return null;
}
}
private static void deleteTmpFiles(List<File> files) {
if (files == null) {
return;
}
LOG.info("Deleting temporary files: " + files);
for (File file : files) {
if (!file.delete()) {
LOG.warn("Deleting " + file + " has failed");
}
}
}
/**
* Sets a timeout value in millisecods for the Http connection.
* @param connection the Http connection for which timeout needs to be set
* @param timeout value to be set as timeout in milliseconds
*/
public static void setTimeout(HttpURLConnection connection, int timeout) {
if (timeout > 0) {
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
}
}
private static MD5Hash parseMD5Header(HttpURLConnection connection) {
String header = connection.getHeaderField(MD5_HEADER);
return (header != null) ? new MD5Hash(header) : null;
}
}

View File

@ -36,9 +36,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.LayoutFlags;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.hdfs.server.common.Util;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.net.HttpURLConnection;
@ -304,11 +305,12 @@ public class ImageServlet extends HttpServlet {
*/
public static void setVerificationHeadersForGet(HttpServletResponse response,
File file) throws IOException {
response.setHeader(TransferFsImage.CONTENT_LENGTH,
response.setHeader(
Util.CONTENT_LENGTH,
String.valueOf(file.length()));
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
if (hash != null) {
response.setHeader(TransferFsImage.MD5_HEADER, hash.toString());
response.setHeader(Util.MD5_HEADER, hash.toString());
}
}
@ -437,12 +439,13 @@ public class ImageServlet extends HttpServlet {
*/
static void setVerificationHeadersForPut(HttpURLConnection connection,
File file) throws IOException {
connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH,
connection.setRequestProperty(
Util.CONTENT_LENGTH,
String.valueOf(file.length()));
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
if (hash != null) {
connection
.setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString());
.setRequestProperty(Util.MD5_HEADER, hash.toString());
}
}
@ -462,7 +465,7 @@ public class ImageServlet extends HttpServlet {
params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString());
// setting the length of the file to be uploaded in separate property as
// Content-Length only supports up to 2GB
params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize));
params.put(Util.FILE_LENGTH, Long.toString(imageFileSize));
params.put(IMAGE_FILE_TYPE, nnf.name());
return params;
}
@ -586,7 +589,7 @@ public class ImageServlet extends HttpServlet {
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM);
fileSize = ServletUtil.parseLongParam(request,
TransferFsImage.FILE_LENGTH);
Util.FILE_LENGTH);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);

View File

@ -28,6 +28,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
import static org.apache.hadoop.util.Time.now;
import java.io.FileNotFoundException;
@ -137,6 +138,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
@ -2117,7 +2119,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
} catch (FileNotFoundException e) {
LOG.debug("Tried to read from deleted or moved edit log segment", e);
return null;
} catch (TransferFsImage.HttpGetFailedException e) {
} catch (HttpGetFailedException e) {
LOG.debug("Tried to read from deleted edit log segment", e);
return null;
}

View File

@ -19,18 +19,12 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -44,17 +38,16 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.HttpPutFailedException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
@ -66,6 +59,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.mortbay.jetty.EofException;
import static org.apache.hadoop.hdfs.server.common.Util.IO_FILE_BUFFER_SIZE;
import static org.apache.hadoop.hdfs.server.common.Util.connectionFactory;
/**
* This class provides fetching a specified file from the NameNode.
*/
@ -98,27 +94,8 @@ public class TransferFsImage {
}
}
public final static String CONTENT_LENGTH = "Content-Length";
public final static String FILE_LENGTH = "File-Length";
public final static String MD5_HEADER = "X-MD5-Digest";
private final static String CONTENT_TYPE = "Content-Type";
private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
private final static int IO_FILE_BUFFER_SIZE;
@VisibleForTesting
static int timeout = 0;
private static final URLConnectionFactory connectionFactory;
private static final boolean isSpnegoEnabled;
static {
Configuration conf = new Configuration();
connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
}
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
public static void downloadMostRecentImageToDirectory(URL infoServer,
@ -159,7 +136,7 @@ public class TransferFsImage {
}
MD5Hash advertisedDigest = parseMD5Header(request);
MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
MD5Hash hash = Util.receiveFile(fileName, dstFiles, dstStorage, true,
advertisedSize, advertisedDigest, fileName, stream, throttler);
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
+ dstFiles.get(0).length() + " bytes.");
@ -323,9 +300,7 @@ public class TransferFsImage {
responseCode, urlWithParams, connection.getResponseMessage()),
responseCode);
}
} catch (AuthenticationException e) {
throw new IOException(e);
} catch (URISyntaxException e) {
} catch (AuthenticationException | URISyntaxException e) {
throw new IOException(e);
} finally {
if (connection != null) {
@ -336,9 +311,10 @@ public class TransferFsImage {
private static void writeFileToPutRequest(Configuration conf,
HttpURLConnection connection, File imageFile, Canceler canceler)
throws FileNotFoundException, IOException {
connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
throws IOException {
connection.setRequestProperty(Util.CONTENT_TYPE,
"application/octet-stream");
connection.setRequestProperty(Util.CONTENT_TRANSFER_ENCODING, "binary");
OutputStream output = connection.getOutputStream();
FileInputStream input = new FileInputStream(imageFile);
try {
@ -431,7 +407,7 @@ public class TransferFsImage {
* Copies the response from the URL to a list of local files.
* @param dstStorage if an error occurs writing to one of the files,
* this storage object will be notified.
* @Return a digest of the received file if getChecksum is true
* @return a digest of the received file if getChecksum is true
*/
static MD5Hash getFileClient(URL infoServer,
String queryString, List<File> localPaths,
@ -443,40 +419,12 @@ public class TransferFsImage {
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
HttpURLConnection connection;
try {
connection = (HttpURLConnection)
connectionFactory.openConnection(url, isSpnegoEnabled);
} catch (AuthenticationException e) {
throw new IOException(e);
}
return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout);
}
setTimeout(connection);
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new HttpGetFailedException(
"Image transfer servlet at " + url +
" failed with status code " + connection.getResponseCode() +
"\nResponse message:\n" + connection.getResponseMessage(),
connection);
}
long advertisedSize;
String contentLength = connection.getHeaderField(CONTENT_LENGTH);
if (contentLength != null) {
advertisedSize = Long.parseLong(contentLength);
} else {
throw new IOException(CONTENT_LENGTH + " header is not provided " +
"by the namenode when trying to fetch " + url);
}
MD5Hash advertisedDigest = parseMD5Header(connection);
String fsImageName = connection
.getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
InputStream stream = connection.getInputStream();
return receiveFile(url.toExternalForm(), localPaths, dstStorage,
getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
null);
private static MD5Hash parseMD5Header(HttpServletRequest request) {
String header = request.getHeader(Util.MD5_HEADER);
return (header != null) ? new MD5Hash(header) : null;
}
private static void setTimeout(HttpURLConnection connection) {
@ -484,204 +432,10 @@ public class TransferFsImage {
Configuration conf = new HdfsConfiguration();
timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
LOG.info("Image Transfer timeout configured to " + timeout
+ " milliseconds");
LOG.info("Image Transfer timeout configured to " + timeout +
" milliseconds");
}
if (timeout > 0) {
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
}
Util.setTimeout(connection, timeout);
}
private static MD5Hash receiveFile(String url, List<File> localPaths,
Storage dstStorage, boolean getChecksum, long advertisedSize,
MD5Hash advertisedDigest, String fsImageName, InputStream stream,
DataTransferThrottler throttler) throws IOException {
long startTime = Time.monotonicNow();
Map<FileOutputStream, File> streamPathMap = new HashMap<>();
StringBuilder xferStats = new StringBuilder();
double xferCombined = 0;
if (localPaths != null) {
// If the local paths refer to directories, use the server-provided header
// as the filename within that directory
List<File> newLocalPaths = new ArrayList<File>();
for (File localPath : localPaths) {
if (localPath.isDirectory()) {
if (fsImageName == null) {
throw new IOException("No filename header provided by server");
}
newLocalPaths.add(new File(localPath, fsImageName));
} else {
newLocalPaths.add(localPath);
}
}
localPaths = newLocalPaths;
}
long received = 0;
MessageDigest digester = null;
if (getChecksum) {
digester = MD5Hash.getDigester();
stream = new DigestInputStream(stream, digester);
}
boolean finishedReceiving = false;
int num = 1;
List<FileOutputStream> outputStreams = Lists.newArrayList();
try {
if (localPaths != null) {
for (File f : localPaths) {
try {
if (f.exists()) {
LOG.warn("Overwriting existing file " + f
+ " with file downloaded from " + url);
}
FileOutputStream fos = new FileOutputStream(f);
outputStreams.add(fos);
streamPathMap.put(fos, f);
} catch (IOException ioe) {
LOG.warn("Unable to download file " + f, ioe);
// This will be null if we're downloading the fsimage to a file
// outside of an NNStorage directory.
if (dstStorage != null &&
(dstStorage instanceof StorageErrorReporter)) {
((StorageErrorReporter)dstStorage).reportErrorOnFile(f);
}
}
}
if (outputStreams.isEmpty()) {
throw new IOException(
"Unable to download to any storage directory");
}
}
byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
while (num > 0) {
num = stream.read(buf);
if (num > 0) {
received += num;
for (FileOutputStream fos : outputStreams) {
fos.write(buf, 0, num);
}
if (throttler != null) {
throttler.throttle(num);
}
}
}
finishedReceiving = true;
double xferSec = Math.max(
((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001);
long xferKb = received / 1024;
xferCombined += xferSec;
xferStats.append(
String.format(" The fsimage download took %.2fs at %.2f KB/s.",
xferSec, xferKb / xferSec));
} finally {
stream.close();
for (FileOutputStream fos : outputStreams) {
long flushStartTime = Time.monotonicNow();
fos.getChannel().force(true);
fos.close();
double writeSec = Math.max(((float)
(flushStartTime - Time.monotonicNow())) / 1000.0, 0.001);
xferCombined += writeSec;
xferStats.append(String
.format(" Synchronous (fsync) write to disk of " +
streamPathMap.get(fos).getAbsolutePath() +
" took %.2fs.", writeSec));
}
// Something went wrong and did not finish reading.
// Remove the temporary files.
if (!finishedReceiving) {
deleteTmpFiles(localPaths);
}
if (finishedReceiving && received != advertisedSize) {
// only throw this exception if we think we read all of it on our end
// -- otherwise a client-side IOException would be masked by this
// exception that makes it look like a server-side problem!
deleteTmpFiles(localPaths);
throw new IOException("File " + url + " received length " + received +
" is not of the advertised size " + advertisedSize +
". Fsimage name: " + fsImageName + " lastReceived: " + num);
}
}
xferStats.insert(
0, String.format(
"Combined time for fsimage download and fsync " +
"to all disks took %.2fs.", xferCombined));
LOG.info(xferStats.toString());
if (digester != null) {
MD5Hash computedDigest = new MD5Hash(digester.digest());
if (advertisedDigest != null &&
!computedDigest.equals(advertisedDigest)) {
deleteTmpFiles(localPaths);
throw new IOException("File " + url + " computed digest " +
computedDigest + " does not match advertised digest " +
advertisedDigest);
}
return computedDigest;
} else {
return null;
}
}
private static void deleteTmpFiles(List<File> files) {
if (files == null) {
return;
}
LOG.info("Deleting temporary files: " + files);
for (File file : files) {
if (!file.delete()) {
LOG.warn("Deleting " + file + " has failed");
}
}
}
private static MD5Hash parseMD5Header(HttpURLConnection connection) {
String header = connection.getHeaderField(MD5_HEADER);
return (header != null) ? new MD5Hash(header) : null;
}
private static MD5Hash parseMD5Header(HttpServletRequest request) {
String header = request.getHeader(MD5_HEADER);
return (header != null) ? new MD5Hash(header) : null;
}
public static class HttpGetFailedException extends IOException {
private static final long serialVersionUID = 1L;
private final int responseCode;
HttpGetFailedException(String msg, HttpURLConnection connection) throws IOException {
super(msg);
this.responseCode = connection.getResponseCode();
}
public int getResponseCode() {
return responseCode;
}
}
public static class HttpPutFailedException extends IOException {
private static final long serialVersionUID = 1L;
private final int responseCode;
HttpPutFailedException(String msg, int responseCode) throws IOException {
super(msg);
this.responseCode = responseCode;
}
public int getResponseCode() {
return responseCode;
}
}
}