HDFS-11273. Move TransferFsImage#doGetUrl function to a Util class. Contributed by Hanisha Koneru.
This commit is contained in:
parent
511d39e074
commit
7ec609b289
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.common;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class HttpGetFailedException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
private final int responseCode;
|
||||
|
||||
public HttpGetFailedException(String msg, HttpURLConnection connection)
|
||||
throws IOException {
|
||||
super(msg);
|
||||
this.responseCode = connection.getResponseCode();
|
||||
}
|
||||
|
||||
public int getResponseCode() {
|
||||
return responseCode;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.common;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class HttpPutFailedException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
private final int responseCode;
|
||||
|
||||
public HttpPutFailedException(String msg, int responseCode) throws IOException {
|
||||
super(msg);
|
||||
this.responseCode = responseCode;
|
||||
}
|
||||
|
||||
public int getResponseCode() {
|
||||
return responseCode;
|
||||
}
|
||||
}
|
|
@ -18,30 +18,66 @@
|
|||
package org.apache.hadoop.hdfs.server.common;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.security.DigestInputStream;
|
||||
import java.security.MessageDigest;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
|
||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||
import org.apache.hadoop.io.MD5Hash;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public final class Util {
|
||||
private final static Log LOG = LogFactory.getLog(Util.class.getName());
|
||||
|
||||
public final static String FILE_LENGTH = "File-Length";
|
||||
public final static String CONTENT_LENGTH = "Content-Length";
|
||||
public final static String MD5_HEADER = "X-MD5-Digest";
|
||||
public final static String CONTENT_TYPE = "Content-Type";
|
||||
public final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
|
||||
|
||||
public final static int IO_FILE_BUFFER_SIZE;
|
||||
private static final boolean isSpnegoEnabled;
|
||||
public static final URLConnectionFactory connectionFactory;
|
||||
|
||||
static {
|
||||
Configuration conf = new Configuration();
|
||||
connectionFactory = URLConnectionFactory
|
||||
.newDefaultURLConnectionFactory(conf);
|
||||
isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Interprets the passed string as a URI. In case of error it
|
||||
* assumes the specified string is a file.
|
||||
*
|
||||
* @param s the string to interpret
|
||||
* @return the resulting URI
|
||||
* @throws IOException
|
||||
*/
|
||||
public static URI stringAsURI(String s) throws IOException {
|
||||
static URI stringAsURI(String s) throws IOException {
|
||||
URI u = null;
|
||||
// try to make a URI
|
||||
try {
|
||||
|
@ -67,7 +103,6 @@ public final class Util {
|
|||
*
|
||||
* @param f the file to convert
|
||||
* @return the resulting URI
|
||||
* @throws IOException
|
||||
*/
|
||||
public static URI fileAsURI(File f) throws IOException {
|
||||
URI u = f.getCanonicalFile().toURI();
|
||||
|
@ -92,7 +127,7 @@ public final class Util {
|
|||
*/
|
||||
public static List<URI> stringCollectionAsURIs(
|
||||
Collection<String> names) {
|
||||
List<URI> uris = new ArrayList<URI>(names.size());
|
||||
List<URI> uris = new ArrayList<>(names.size());
|
||||
for(String name : names) {
|
||||
try {
|
||||
uris.add(stringAsURI(name));
|
||||
|
@ -102,4 +137,217 @@ public final class Util {
|
|||
}
|
||||
return uris;
|
||||
}
|
||||
|
||||
/**
|
||||
* Downloads the files at the specified url location into destination
|
||||
* storage.
|
||||
*/
|
||||
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
|
||||
Storage dstStorage, boolean getChecksum, int timeout) throws IOException {
|
||||
HttpURLConnection connection;
|
||||
try {
|
||||
connection = (HttpURLConnection)
|
||||
connectionFactory.openConnection(url, isSpnegoEnabled);
|
||||
} catch (AuthenticationException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
setTimeout(connection, timeout);
|
||||
|
||||
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
|
||||
throw new HttpGetFailedException("Image transfer servlet at " + url +
|
||||
" failed with status code " + connection.getResponseCode() +
|
||||
"\nResponse message:\n" + connection.getResponseMessage(),
|
||||
connection);
|
||||
}
|
||||
|
||||
long advertisedSize;
|
||||
String contentLength = connection.getHeaderField(CONTENT_LENGTH);
|
||||
if (contentLength != null) {
|
||||
advertisedSize = Long.parseLong(contentLength);
|
||||
} else {
|
||||
throw new IOException(CONTENT_LENGTH + " header is not provided " +
|
||||
"by the namenode when trying to fetch " + url);
|
||||
}
|
||||
MD5Hash advertisedDigest = parseMD5Header(connection);
|
||||
String fsImageName = connection
|
||||
.getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
|
||||
InputStream stream = connection.getInputStream();
|
||||
|
||||
return receiveFile(url.toExternalForm(), localPaths, dstStorage,
|
||||
getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
|
||||
null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Receives file at the url location from the input stream and puts them in
|
||||
* the specified destination storage location.
|
||||
*/
|
||||
public static MD5Hash receiveFile(String url, List<File> localPaths,
|
||||
Storage dstStorage, boolean getChecksum, long advertisedSize,
|
||||
MD5Hash advertisedDigest, String fsImageName, InputStream stream,
|
||||
DataTransferThrottler throttler) throws
|
||||
IOException {
|
||||
long startTime = Time.monotonicNow();
|
||||
Map<FileOutputStream, File> streamPathMap = new HashMap<>();
|
||||
StringBuilder xferStats = new StringBuilder();
|
||||
double xferCombined = 0;
|
||||
if (localPaths != null) {
|
||||
// If the local paths refer to directories, use the server-provided header
|
||||
// as the filename within that directory
|
||||
List<File> newLocalPaths = new ArrayList<>();
|
||||
for (File localPath : localPaths) {
|
||||
if (localPath.isDirectory()) {
|
||||
if (fsImageName == null) {
|
||||
throw new IOException("No filename header provided by server");
|
||||
}
|
||||
newLocalPaths.add(new File(localPath, fsImageName));
|
||||
} else {
|
||||
newLocalPaths.add(localPath);
|
||||
}
|
||||
}
|
||||
localPaths = newLocalPaths;
|
||||
}
|
||||
|
||||
|
||||
long received = 0;
|
||||
MessageDigest digester = null;
|
||||
if (getChecksum) {
|
||||
digester = MD5Hash.getDigester();
|
||||
stream = new DigestInputStream(stream, digester);
|
||||
}
|
||||
boolean finishedReceiving = false;
|
||||
|
||||
List<FileOutputStream> outputStreams = Lists.newArrayList();
|
||||
|
||||
try {
|
||||
if (localPaths != null) {
|
||||
for (File f : localPaths) {
|
||||
try {
|
||||
if (f.exists()) {
|
||||
LOG.warn("Overwriting existing file " + f
|
||||
+ " with file downloaded from " + url);
|
||||
}
|
||||
FileOutputStream fos = new FileOutputStream(f);
|
||||
outputStreams.add(fos);
|
||||
streamPathMap.put(fos, f);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Unable to download file " + f, ioe);
|
||||
// This will be null if we're downloading the fsimage to a file
|
||||
// outside of an NNStorage directory.
|
||||
if (dstStorage != null &&
|
||||
(dstStorage instanceof StorageErrorReporter)) {
|
||||
((StorageErrorReporter)dstStorage).reportErrorOnFile(f);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (outputStreams.isEmpty()) {
|
||||
throw new IOException(
|
||||
"Unable to download to any storage directory");
|
||||
}
|
||||
}
|
||||
|
||||
int num = 1;
|
||||
byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
|
||||
while (num > 0) {
|
||||
num = stream.read(buf);
|
||||
if (num > 0) {
|
||||
received += num;
|
||||
for (FileOutputStream fos : outputStreams) {
|
||||
fos.write(buf, 0, num);
|
||||
}
|
||||
if (throttler != null) {
|
||||
throttler.throttle(num);
|
||||
}
|
||||
}
|
||||
}
|
||||
finishedReceiving = true;
|
||||
double xferSec = Math.max(
|
||||
((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001);
|
||||
long xferKb = received / 1024;
|
||||
xferCombined += xferSec;
|
||||
xferStats.append(
|
||||
String.format(" The fsimage download took %.2fs at %.2f KB/s.",
|
||||
xferSec, xferKb / xferSec));
|
||||
} finally {
|
||||
stream.close();
|
||||
for (FileOutputStream fos : outputStreams) {
|
||||
long flushStartTime = Time.monotonicNow();
|
||||
fos.getChannel().force(true);
|
||||
fos.close();
|
||||
double writeSec = Math.max(((float)
|
||||
(flushStartTime - Time.monotonicNow())) / 1000.0, 0.001);
|
||||
xferCombined += writeSec;
|
||||
xferStats.append(String
|
||||
.format(" Synchronous (fsync) write to disk of " +
|
||||
streamPathMap.get(fos).getAbsolutePath() +
|
||||
" took %.2fs.", writeSec));
|
||||
}
|
||||
|
||||
// Something went wrong and did not finish reading.
|
||||
// Remove the temporary files.
|
||||
if (!finishedReceiving) {
|
||||
deleteTmpFiles(localPaths);
|
||||
}
|
||||
|
||||
if (finishedReceiving && received != advertisedSize) {
|
||||
// only throw this exception if we think we read all of it on our end
|
||||
// -- otherwise a client-side IOException would be masked by this
|
||||
// exception that makes it look like a server-side problem!
|
||||
deleteTmpFiles(localPaths);
|
||||
throw new IOException("File " + url + " received length " + received +
|
||||
" is not of the advertised size " +
|
||||
advertisedSize);
|
||||
}
|
||||
}
|
||||
xferStats.insert(0, String.format("Combined time for fsimage download and" +
|
||||
" fsync to all disks took %.2fs.", xferCombined));
|
||||
LOG.info(xferStats.toString());
|
||||
|
||||
if (digester != null) {
|
||||
MD5Hash computedDigest = new MD5Hash(digester.digest());
|
||||
|
||||
if (advertisedDigest != null &&
|
||||
!computedDigest.equals(advertisedDigest)) {
|
||||
deleteTmpFiles(localPaths);
|
||||
throw new IOException("File " + url + " computed digest " +
|
||||
computedDigest + " does not match advertised digest " +
|
||||
advertisedDigest);
|
||||
}
|
||||
return computedDigest;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static void deleteTmpFiles(List<File> files) {
|
||||
if (files == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Deleting temporary files: " + files);
|
||||
for (File file : files) {
|
||||
if (!file.delete()) {
|
||||
LOG.warn("Deleting " + file + " has failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a timeout value in millisecods for the Http connection.
|
||||
* @param connection the Http connection for which timeout needs to be set
|
||||
* @param timeout value to be set as timeout in milliseconds
|
||||
*/
|
||||
public static void setTimeout(HttpURLConnection connection, int timeout) {
|
||||
if (timeout > 0) {
|
||||
connection.setConnectTimeout(timeout);
|
||||
connection.setReadTimeout(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
private static MD5Hash parseMD5Header(HttpURLConnection connection) {
|
||||
String header = connection.getHeaderField(MD5_HEADER);
|
||||
return (header != null) ? new MD5Hash(header) : null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,9 +36,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.protocol.LayoutFlags;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
|
||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.common.Util;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.net.HttpURLConnection;
|
||||
|
@ -304,11 +305,12 @@ public class ImageServlet extends HttpServlet {
|
|||
*/
|
||||
public static void setVerificationHeadersForGet(HttpServletResponse response,
|
||||
File file) throws IOException {
|
||||
response.setHeader(TransferFsImage.CONTENT_LENGTH,
|
||||
response.setHeader(
|
||||
Util.CONTENT_LENGTH,
|
||||
String.valueOf(file.length()));
|
||||
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
|
||||
if (hash != null) {
|
||||
response.setHeader(TransferFsImage.MD5_HEADER, hash.toString());
|
||||
response.setHeader(Util.MD5_HEADER, hash.toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -437,12 +439,13 @@ public class ImageServlet extends HttpServlet {
|
|||
*/
|
||||
static void setVerificationHeadersForPut(HttpURLConnection connection,
|
||||
File file) throws IOException {
|
||||
connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH,
|
||||
connection.setRequestProperty(
|
||||
Util.CONTENT_LENGTH,
|
||||
String.valueOf(file.length()));
|
||||
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
|
||||
if (hash != null) {
|
||||
connection
|
||||
.setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString());
|
||||
.setRequestProperty(Util.MD5_HEADER, hash.toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -462,7 +465,7 @@ public class ImageServlet extends HttpServlet {
|
|||
params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString());
|
||||
// setting the length of the file to be uploaded in separate property as
|
||||
// Content-Length only supports up to 2GB
|
||||
params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize));
|
||||
params.put(Util.FILE_LENGTH, Long.toString(imageFileSize));
|
||||
params.put(IMAGE_FILE_TYPE, nnf.name());
|
||||
return params;
|
||||
}
|
||||
|
@ -586,7 +589,7 @@ public class ImageServlet extends HttpServlet {
|
|||
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
|
||||
storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM);
|
||||
fileSize = ServletUtil.parseLongParam(request,
|
||||
TransferFsImage.FILE_LENGTH);
|
||||
Util.FILE_LENGTH);
|
||||
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
|
||||
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
|
||||
.valueOf(imageType);
|
||||
|
|
|
@ -28,6 +28,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
|
||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
|
@ -136,6 +137,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||
import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
|
||||
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||
|
@ -2104,7 +2106,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
|||
} catch (FileNotFoundException e) {
|
||||
LOG.debug("Tried to read from deleted or moved edit log segment", e);
|
||||
return null;
|
||||
} catch (TransferFsImage.HttpGetFailedException e) {
|
||||
} catch (HttpGetFailedException e) {
|
||||
LOG.debug("Tried to read from deleted edit log segment", e);
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -19,18 +19,12 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.security.DigestInputStream;
|
||||
import java.security.MessageDigest;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
@ -44,17 +38,16 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.common.HttpPutFailedException;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
||||
import org.apache.hadoop.hdfs.server.common.Util;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||
import org.apache.hadoop.hdfs.util.Canceler;
|
||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.MD5Hash;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -66,6 +59,9 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.common.collect.Lists;
|
||||
import org.eclipse.jetty.io.EofException;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.common.Util.IO_FILE_BUFFER_SIZE;
|
||||
import static org.apache.hadoop.hdfs.server.common.Util.connectionFactory;
|
||||
|
||||
/**
|
||||
* This class provides fetching a specified file from the NameNode.
|
||||
*/
|
||||
|
@ -82,43 +78,23 @@ public class TransferFsImage {
|
|||
private final int response;
|
||||
private final boolean shouldReThrowException;
|
||||
|
||||
private TransferResult(int response, boolean rethrow) {
|
||||
TransferResult(int response, boolean rethrow) {
|
||||
this.response = response;
|
||||
this.shouldReThrowException = rethrow;
|
||||
}
|
||||
|
||||
public static TransferResult getResultForCode(int code){
|
||||
TransferResult ret = UNEXPECTED_FAILURE;
|
||||
for(TransferResult result:TransferResult.values()){
|
||||
if(result.response == code){
|
||||
return result;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
return UNEXPECTED_FAILURE;
|
||||
}
|
||||
}
|
||||
|
||||
public final static String CONTENT_LENGTH = "Content-Length";
|
||||
public final static String FILE_LENGTH = "File-Length";
|
||||
public final static String MD5_HEADER = "X-MD5-Digest";
|
||||
|
||||
private final static String CONTENT_TYPE = "Content-Type";
|
||||
private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
|
||||
private final static int IO_FILE_BUFFER_SIZE;
|
||||
|
||||
@VisibleForTesting
|
||||
static int timeout = 0;
|
||||
private static final URLConnectionFactory connectionFactory;
|
||||
private static final boolean isSpnegoEnabled;
|
||||
|
||||
static {
|
||||
Configuration conf = new Configuration();
|
||||
connectionFactory = URLConnectionFactory
|
||||
.newDefaultURLConnectionFactory(conf);
|
||||
isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
|
||||
}
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
|
||||
|
||||
public static void downloadMostRecentImageToDirectory(URL infoServer,
|
||||
|
@ -159,7 +135,7 @@ public class TransferFsImage {
|
|||
}
|
||||
|
||||
MD5Hash advertisedDigest = parseMD5Header(request);
|
||||
MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
|
||||
MD5Hash hash = Util.receiveFile(fileName, dstFiles, dstStorage, true,
|
||||
advertisedSize, advertisedDigest, fileName, stream, throttler);
|
||||
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
|
||||
+ dstFiles.get(0).length() + " bytes.");
|
||||
|
@ -226,8 +202,9 @@ public class TransferFsImage {
|
|||
* @param txid the transaction ID of the image to be uploaded
|
||||
* @throws IOException if there is an I/O error
|
||||
*/
|
||||
public static TransferResult uploadImageFromStorage(URL fsName, Configuration conf,
|
||||
NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
|
||||
static TransferResult uploadImageFromStorage(URL fsName,
|
||||
Configuration conf, NNStorage storage, NameNodeFile nnf, long txid)
|
||||
throws IOException {
|
||||
return uploadImageFromStorage(fsName, conf, storage, nnf, txid, null);
|
||||
}
|
||||
|
||||
|
@ -323,9 +300,7 @@ public class TransferFsImage {
|
|||
responseCode, urlWithParams, connection.getResponseMessage()),
|
||||
responseCode);
|
||||
}
|
||||
} catch (AuthenticationException e) {
|
||||
throw new IOException(e);
|
||||
} catch (URISyntaxException e) {
|
||||
} catch (AuthenticationException | URISyntaxException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
|
@ -336,9 +311,9 @@ public class TransferFsImage {
|
|||
|
||||
private static void writeFileToPutRequest(Configuration conf,
|
||||
HttpURLConnection connection, File imageFile, Canceler canceler)
|
||||
throws FileNotFoundException, IOException {
|
||||
connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
|
||||
connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
|
||||
throws IOException {
|
||||
connection.setRequestProperty(Util.CONTENT_TYPE, "application/octet-stream");
|
||||
connection.setRequestProperty(Util.CONTENT_TRANSFER_ENCODING, "binary");
|
||||
OutputStream output = connection.getOutputStream();
|
||||
FileInputStream input = new FileInputStream(imageFile);
|
||||
try {
|
||||
|
@ -414,7 +389,7 @@ public class TransferFsImage {
|
|||
* Copies the response from the URL to a list of local files.
|
||||
* @param dstStorage if an error occurs writing to one of the files,
|
||||
* this storage object will be notified.
|
||||
* @Return a digest of the received file if getChecksum is true
|
||||
* @return a digest of the received file if getChecksum is true
|
||||
*/
|
||||
static MD5Hash getFileClient(URL infoServer,
|
||||
String queryString, List<File> localPaths,
|
||||
|
@ -426,40 +401,12 @@ public class TransferFsImage {
|
|||
|
||||
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
|
||||
Storage dstStorage, boolean getChecksum) throws IOException {
|
||||
HttpURLConnection connection;
|
||||
try {
|
||||
connection = (HttpURLConnection)
|
||||
connectionFactory.openConnection(url, isSpnegoEnabled);
|
||||
} catch (AuthenticationException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout);
|
||||
}
|
||||
|
||||
setTimeout(connection);
|
||||
|
||||
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
|
||||
throw new HttpGetFailedException(
|
||||
"Image transfer servlet at " + url +
|
||||
" failed with status code " + connection.getResponseCode() +
|
||||
"\nResponse message:\n" + connection.getResponseMessage(),
|
||||
connection);
|
||||
}
|
||||
|
||||
long advertisedSize;
|
||||
String contentLength = connection.getHeaderField(CONTENT_LENGTH);
|
||||
if (contentLength != null) {
|
||||
advertisedSize = Long.parseLong(contentLength);
|
||||
} else {
|
||||
throw new IOException(CONTENT_LENGTH + " header is not provided " +
|
||||
"by the namenode when trying to fetch " + url);
|
||||
}
|
||||
MD5Hash advertisedDigest = parseMD5Header(connection);
|
||||
String fsImageName = connection
|
||||
.getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
|
||||
InputStream stream = connection.getInputStream();
|
||||
|
||||
return receiveFile(url.toExternalForm(), localPaths, dstStorage,
|
||||
getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
|
||||
null);
|
||||
private static MD5Hash parseMD5Header(HttpServletRequest request) {
|
||||
String header = request.getHeader(Util.MD5_HEADER);
|
||||
return (header != null) ? new MD5Hash(header) : null;
|
||||
}
|
||||
|
||||
private static void setTimeout(HttpURLConnection connection) {
|
||||
|
@ -467,204 +414,10 @@ public class TransferFsImage {
|
|||
Configuration conf = new HdfsConfiguration();
|
||||
timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
|
||||
DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
|
||||
LOG.info("Image Transfer timeout configured to " + timeout
|
||||
+ " milliseconds");
|
||||
LOG.info("Image Transfer timeout configured to " + timeout +
|
||||
" milliseconds");
|
||||
}
|
||||
|
||||
if (timeout > 0) {
|
||||
connection.setConnectTimeout(timeout);
|
||||
connection.setReadTimeout(timeout);
|
||||
}
|
||||
Util.setTimeout(connection, timeout);
|
||||
}
|
||||
|
||||
private static MD5Hash receiveFile(String url, List<File> localPaths,
|
||||
Storage dstStorage, boolean getChecksum, long advertisedSize,
|
||||
MD5Hash advertisedDigest, String fsImageName, InputStream stream,
|
||||
DataTransferThrottler throttler) throws IOException {
|
||||
long startTime = Time.monotonicNow();
|
||||
Map<FileOutputStream, File> streamPathMap = new HashMap<>();
|
||||
StringBuilder xferStats = new StringBuilder();
|
||||
double xferCombined = 0;
|
||||
if (localPaths != null) {
|
||||
// If the local paths refer to directories, use the server-provided header
|
||||
// as the filename within that directory
|
||||
List<File> newLocalPaths = new ArrayList<File>();
|
||||
for (File localPath : localPaths) {
|
||||
if (localPath.isDirectory()) {
|
||||
if (fsImageName == null) {
|
||||
throw new IOException("No filename header provided by server");
|
||||
}
|
||||
newLocalPaths.add(new File(localPath, fsImageName));
|
||||
} else {
|
||||
newLocalPaths.add(localPath);
|
||||
}
|
||||
}
|
||||
localPaths = newLocalPaths;
|
||||
}
|
||||
|
||||
|
||||
long received = 0;
|
||||
MessageDigest digester = null;
|
||||
if (getChecksum) {
|
||||
digester = MD5Hash.getDigester();
|
||||
stream = new DigestInputStream(stream, digester);
|
||||
}
|
||||
boolean finishedReceiving = false;
|
||||
|
||||
List<FileOutputStream> outputStreams = Lists.newArrayList();
|
||||
|
||||
try {
|
||||
if (localPaths != null) {
|
||||
for (File f : localPaths) {
|
||||
try {
|
||||
if (f.exists()) {
|
||||
LOG.warn("Overwriting existing file " + f
|
||||
+ " with file downloaded from " + url);
|
||||
}
|
||||
FileOutputStream fos = new FileOutputStream(f);
|
||||
outputStreams.add(fos);
|
||||
streamPathMap.put(fos, f);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Unable to download file " + f, ioe);
|
||||
// This will be null if we're downloading the fsimage to a file
|
||||
// outside of an NNStorage directory.
|
||||
if (dstStorage != null &&
|
||||
(dstStorage instanceof StorageErrorReporter)) {
|
||||
((StorageErrorReporter)dstStorage).reportErrorOnFile(f);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (outputStreams.isEmpty()) {
|
||||
throw new IOException(
|
||||
"Unable to download to any storage directory");
|
||||
}
|
||||
}
|
||||
|
||||
int num = 1;
|
||||
byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
|
||||
while (num > 0) {
|
||||
num = stream.read(buf);
|
||||
if (num > 0) {
|
||||
received += num;
|
||||
for (FileOutputStream fos : outputStreams) {
|
||||
fos.write(buf, 0, num);
|
||||
}
|
||||
if (throttler != null) {
|
||||
throttler.throttle(num);
|
||||
}
|
||||
}
|
||||
}
|
||||
finishedReceiving = true;
|
||||
double xferSec = Math.max(
|
||||
((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001);
|
||||
long xferKb = received / 1024;
|
||||
xferCombined += xferSec;
|
||||
xferStats.append(
|
||||
String.format(" The fsimage download took %.2fs at %.2f KB/s.",
|
||||
xferSec, xferKb / xferSec));
|
||||
} finally {
|
||||
stream.close();
|
||||
for (FileOutputStream fos : outputStreams) {
|
||||
long flushStartTime = Time.monotonicNow();
|
||||
fos.getChannel().force(true);
|
||||
fos.close();
|
||||
double writeSec = Math.max(((float)
|
||||
(flushStartTime - Time.monotonicNow())) / 1000.0, 0.001);
|
||||
xferCombined += writeSec;
|
||||
xferStats.append(String
|
||||
.format(" Synchronous (fsync) write to disk of " +
|
||||
streamPathMap.get(fos).getAbsolutePath() +
|
||||
" took %.2fs.", writeSec));
|
||||
}
|
||||
|
||||
// Something went wrong and did not finish reading.
|
||||
// Remove the temporary files.
|
||||
if (!finishedReceiving) {
|
||||
deleteTmpFiles(localPaths);
|
||||
}
|
||||
|
||||
if (finishedReceiving && received != advertisedSize) {
|
||||
// only throw this exception if we think we read all of it on our end
|
||||
// -- otherwise a client-side IOException would be masked by this
|
||||
// exception that makes it look like a server-side problem!
|
||||
deleteTmpFiles(localPaths);
|
||||
throw new IOException("File " + url + " received length " + received +
|
||||
" is not of the advertised size " +
|
||||
advertisedSize);
|
||||
}
|
||||
}
|
||||
xferStats.insert(
|
||||
0, String.format(
|
||||
"Combined time for fsimage download and fsync " +
|
||||
"to all disks took %.2fs.", xferCombined));
|
||||
LOG.info(xferStats.toString());
|
||||
|
||||
if (digester != null) {
|
||||
MD5Hash computedDigest = new MD5Hash(digester.digest());
|
||||
|
||||
if (advertisedDigest != null &&
|
||||
!computedDigest.equals(advertisedDigest)) {
|
||||
deleteTmpFiles(localPaths);
|
||||
throw new IOException("File " + url + " computed digest " +
|
||||
computedDigest + " does not match advertised digest " +
|
||||
advertisedDigest);
|
||||
}
|
||||
return computedDigest;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static void deleteTmpFiles(List<File> files) {
|
||||
if (files == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("Deleting temporary files: " + files);
|
||||
for (File file : files) {
|
||||
if (!file.delete()) {
|
||||
LOG.warn("Deleting " + file + " has failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static MD5Hash parseMD5Header(HttpURLConnection connection) {
|
||||
String header = connection.getHeaderField(MD5_HEADER);
|
||||
return (header != null) ? new MD5Hash(header) : null;
|
||||
}
|
||||
|
||||
private static MD5Hash parseMD5Header(HttpServletRequest request) {
|
||||
String header = request.getHeader(MD5_HEADER);
|
||||
return (header != null) ? new MD5Hash(header) : null;
|
||||
}
|
||||
|
||||
public static class HttpGetFailedException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
private final int responseCode;
|
||||
|
||||
HttpGetFailedException(String msg, HttpURLConnection connection) throws IOException {
|
||||
super(msg);
|
||||
this.responseCode = connection.getResponseCode();
|
||||
}
|
||||
|
||||
public int getResponseCode() {
|
||||
return responseCode;
|
||||
}
|
||||
}
|
||||
|
||||
public static class HttpPutFailedException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
private final int responseCode;
|
||||
|
||||
HttpPutFailedException(String msg, int responseCode) throws IOException {
|
||||
super(msg);
|
||||
this.responseCode = responseCode;
|
||||
}
|
||||
|
||||
public int getResponseCode() {
|
||||
return responseCode;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue