HDFS-12574. Add CryptoInputStream to WebHdfsFileSystem read call. Contributed by Rushabh S Shah.
This commit is contained in:
parent
987a8972ac
commit
eda786ea12
|
@ -41,7 +41,6 @@ import java.net.Socket;
|
|||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
|
@ -66,8 +65,6 @@ import org.apache.hadoop.crypto.CryptoInputStream;
|
|||
import org.apache.hadoop.crypto.CryptoOutputStream;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.BlockStorageLocation;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
|
@ -980,46 +977,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
return volumeBlockLocations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrypts a EDEK by consulting the KeyProvider.
|
||||
*/
|
||||
private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
|
||||
feInfo) throws IOException {
|
||||
try (TraceScope ignored = tracer.newScope("decryptEDEK")) {
|
||||
KeyProvider provider = getKeyProvider();
|
||||
if (provider == null) {
|
||||
throw new IOException("No KeyProvider is configured, cannot access" +
|
||||
" an encrypted file");
|
||||
}
|
||||
EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
|
||||
feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(),
|
||||
feInfo.getEncryptedDataEncryptionKey());
|
||||
try {
|
||||
KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
|
||||
.createKeyProviderCryptoExtension(provider);
|
||||
return cryptoProvider.decryptEncryptedKey(ekv);
|
||||
} catch (GeneralSecurityException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps the stream in a CryptoInputStream if the underlying file is
|
||||
* encrypted.
|
||||
*/
|
||||
public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)
|
||||
throws IOException {
|
||||
final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
|
||||
FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
|
||||
if (feInfo != null) {
|
||||
// File is encrypted, wrap the stream in a crypto stream.
|
||||
// Currently only one version, so no special logic based on the version #
|
||||
HdfsKMSUtil.getCryptoProtocolVersion(feInfo);
|
||||
final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(conf, feInfo);
|
||||
final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
|
||||
final CryptoInputStream cryptoIn =
|
||||
new CryptoInputStream(dfsis, codec, decrypted.getMaterial(),
|
||||
feInfo.getIV());
|
||||
CryptoInputStream cryptoIn;
|
||||
try (TraceScope ignored = getTracer().newScope("decryptEDEK")) {
|
||||
cryptoIn = HdfsKMSUtil.createWrappedInputStream(dfsis,
|
||||
getKeyProvider(), feInfo, getConfiguration());
|
||||
}
|
||||
return new HdfsDataInputStream(cryptoIn);
|
||||
} else {
|
||||
// No FileEncryptionInfo so no encryption.
|
||||
|
@ -1048,7 +1018,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
// Currently only one version, so no special logic based on the version #
|
||||
HdfsKMSUtil.getCryptoProtocolVersion(feInfo);
|
||||
final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(conf, feInfo);
|
||||
KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
|
||||
KeyVersion decrypted;
|
||||
try (TraceScope ignored = tracer.newScope("decryptEDEK")) {
|
||||
decrypted = HdfsKMSUtil.decryptEncryptedDataEncryptionKey(feInfo,
|
||||
getKeyProvider());
|
||||
}
|
||||
final CryptoOutputStream cryptoOut =
|
||||
new CryptoOutputStream(dfsos, codec,
|
||||
decrypted.getMaterial(), feInfo.getIV(), startPos);
|
||||
|
|
|
@ -20,15 +20,21 @@ package org.apache.hadoop.hdfs;
|
|||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.security.GeneralSecurityException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.crypto.CryptoCodec;
|
||||
import org.apache.hadoop.crypto.CryptoInputStream;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
|
@ -187,4 +193,39 @@ public final class HdfsKMSUtil {
|
|||
return new Text(DFS_KMS_PREFIX + namenodeUri.getScheme()
|
||||
+"://" + namenodeUri.getAuthority());
|
||||
}
|
||||
|
||||
public static CryptoInputStream createWrappedInputStream(InputStream is,
|
||||
KeyProvider keyProvider, FileEncryptionInfo fileEncryptionInfo,
|
||||
Configuration conf) throws IOException {
|
||||
// File is encrypted, wrap the stream in a crypto stream.
|
||||
// Currently only one version, so no special logic based on the version#
|
||||
HdfsKMSUtil.getCryptoProtocolVersion(fileEncryptionInfo);
|
||||
final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(
|
||||
conf, fileEncryptionInfo);
|
||||
final KeyVersion decrypted =
|
||||
decryptEncryptedDataEncryptionKey(fileEncryptionInfo, keyProvider);
|
||||
return new CryptoInputStream(is, codec, decrypted.getMaterial(),
|
||||
fileEncryptionInfo.getIV());
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrypts a EDEK by consulting the KeyProvider.
|
||||
*/
|
||||
static KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
|
||||
feInfo, KeyProvider keyProvider) throws IOException {
|
||||
if (keyProvider == null) {
|
||||
throw new IOException("No KeyProvider is configured, cannot access" +
|
||||
" an encrypted file");
|
||||
}
|
||||
EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
|
||||
feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(),
|
||||
feInfo.getEncryptedDataEncryptionKey());
|
||||
try {
|
||||
KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
|
||||
.createKeyProviderCryptoExtension(keyProvider);
|
||||
return cryptoProvider.decryptEncryptedKey(ekv);
|
||||
} catch (GeneralSecurityException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import java.net.InetSocketAddress;
|
|||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -51,6 +52,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import javax.ws.rs.core.HttpHeaders;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.io.input.BoundedInputStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -64,6 +66,7 @@ import org.apache.hadoop.fs.DelegationTokenRenewer;
|
|||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
|
@ -89,6 +92,8 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
|||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FileEncryptionInfoProto;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.web.resources.*;
|
||||
import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
|
||||
|
@ -132,6 +137,8 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
|
||||
public static final String PATH_PREFIX = "/" + WebHdfsConstants.WEBHDFS_SCHEME
|
||||
+ "/v" + VERSION;
|
||||
public static final String EZ_HEADER = "X-Hadoop-Accept-EZ";
|
||||
public static final String FEFINFO_HEADER = "X-Hadoop-feInfo";
|
||||
|
||||
/**
|
||||
* Default connection factory may be overridden in tests to use smaller
|
||||
|
@ -621,12 +628,19 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
|
||||
private boolean checkRetry;
|
||||
private String redirectHost;
|
||||
private boolean followRedirect = true;
|
||||
|
||||
protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
|
||||
this.op = op;
|
||||
this.redirected = redirected;
|
||||
}
|
||||
|
||||
protected AbstractRunner(final HttpOpParam.Op op, boolean redirected,
|
||||
boolean followRedirect) {
|
||||
this(op, redirected);
|
||||
this.followRedirect = followRedirect;
|
||||
}
|
||||
|
||||
T run() throws IOException {
|
||||
UserGroupInformation connectUgi = ugi.getRealUser();
|
||||
if (connectUgi == null) {
|
||||
|
@ -693,9 +707,17 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
// See http://tinyurl.com/java7-http-keepalive
|
||||
conn.disconnect();
|
||||
}
|
||||
if (!followRedirect) {
|
||||
return conn;
|
||||
}
|
||||
}
|
||||
try {
|
||||
return connect(op, url);
|
||||
final HttpURLConnection conn = connect(op, url);
|
||||
// output streams will validate on close
|
||||
if (!op.getDoOutput()) {
|
||||
validateResponse(op, conn, false);
|
||||
}
|
||||
return conn;
|
||||
} catch (IOException ioe) {
|
||||
if (redirectHost != null) {
|
||||
if (excludeDatanodes.getValue() != null) {
|
||||
|
@ -721,6 +743,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
// The value of the header is unimportant. Only its presence matters.
|
||||
conn.setRequestProperty(restCsrfCustomHeader, "\"\"");
|
||||
}
|
||||
conn.setRequestProperty(EZ_HEADER, "true");
|
||||
switch (op.getType()) {
|
||||
// if not sending a message body for a POST or PUT operation, need
|
||||
// to ensure the server/proxy knows this
|
||||
|
@ -768,10 +791,6 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
final URL url = getUrl();
|
||||
try {
|
||||
final HttpURLConnection conn = connect(url);
|
||||
// output streams will validate on close
|
||||
if (!op.getDoOutput()) {
|
||||
validateResponse(op, conn, false);
|
||||
}
|
||||
return getResponse(conn);
|
||||
} catch (AccessControlException ace) {
|
||||
// no retries for auth failures
|
||||
|
@ -817,7 +836,6 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
|
||||
boolean isFailoverAndRetry =
|
||||
a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
|
||||
|
||||
if (isRetry || isFailoverAndRetry) {
|
||||
LOG.info("Retrying connect to namenode: {}. Already retried {}"
|
||||
+ " time(s); retry policy is {}, delay {}ms.",
|
||||
|
@ -998,16 +1016,16 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
/**
|
||||
* Used by open() which tracks the resolved url itself
|
||||
*/
|
||||
final class URLRunner extends AbstractRunner<HttpURLConnection> {
|
||||
class URLRunner extends AbstractRunner<HttpURLConnection> {
|
||||
private final URL url;
|
||||
@Override
|
||||
protected URL getUrl() {
|
||||
protected URL getUrl() throws IOException {
|
||||
return url;
|
||||
}
|
||||
|
||||
protected URLRunner(final HttpOpParam.Op op, final URL url,
|
||||
boolean redirected) {
|
||||
super(op, redirected);
|
||||
boolean redirected, boolean followRedirect) {
|
||||
super(op, redirected, followRedirect);
|
||||
this.url = url;
|
||||
}
|
||||
|
||||
|
@ -1419,12 +1437,20 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
).run();
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
@Override
|
||||
public FSDataInputStream open(final Path f, final int bufferSize
|
||||
) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.OPEN);
|
||||
return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize));
|
||||
WebHdfsInputStream webfsInputStream =
|
||||
new WebHdfsInputStream(f, bufferSize);
|
||||
if (webfsInputStream.getFileEncryptionInfo() == null) {
|
||||
return new FSDataInputStream(webfsInputStream);
|
||||
} else {
|
||||
return new FSDataInputStream(
|
||||
webfsInputStream.createWrappedInputStream());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1469,7 +1495,8 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
final boolean resolved) throws IOException {
|
||||
final URL offsetUrl = offset == 0L? url
|
||||
: new URL(url + "&" + new OffsetParam(offset));
|
||||
return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
|
||||
return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved,
|
||||
true).run();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1971,6 +1998,15 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
void setReadRunner(ReadRunner rr) {
|
||||
this.readRunner = rr;
|
||||
}
|
||||
|
||||
FileEncryptionInfo getFileEncryptionInfo() {
|
||||
return readRunner.getFileEncryptionInfo();
|
||||
}
|
||||
|
||||
InputStream createWrappedInputStream() throws IOException {
|
||||
return HdfsKMSUtil.createWrappedInputStream(
|
||||
this, getKeyProvider(), getFileEncryptionInfo(), getConf());
|
||||
}
|
||||
}
|
||||
|
||||
enum RunnerState {
|
||||
|
@ -2007,7 +2043,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
private byte[] readBuffer;
|
||||
private int readOffset;
|
||||
private int readLength;
|
||||
private RunnerState runnerState = RunnerState.DISCONNECTED;
|
||||
private RunnerState runnerState = RunnerState.SEEK;
|
||||
private URL originalUrl = null;
|
||||
private URL resolvedUrl = null;
|
||||
|
||||
|
@ -2015,6 +2051,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
private final int bufferSize;
|
||||
private long pos = 0;
|
||||
private long fileLength = 0;
|
||||
private FileEncryptionInfo feInfo = null;
|
||||
|
||||
/* The following methods are WebHdfsInputStream helpers. */
|
||||
|
||||
|
@ -2022,6 +2059,35 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs));
|
||||
this.path = p;
|
||||
this.bufferSize = bs;
|
||||
getRedirectedUrl();
|
||||
}
|
||||
|
||||
private void getRedirectedUrl() throws IOException {
|
||||
URLRunner urlRunner = new URLRunner(GetOpParam.Op.OPEN, null, false,
|
||||
false) {
|
||||
@Override
|
||||
protected URL getUrl() throws IOException {
|
||||
return toUrl(op, path, new BufferSizeParam(bufferSize));
|
||||
}
|
||||
};
|
||||
HttpURLConnection conn = urlRunner.run();
|
||||
String feInfoStr = conn.getHeaderField(FEFINFO_HEADER);
|
||||
if (feInfoStr != null) {
|
||||
byte[] decodedBytes = Base64.decodeBase64(
|
||||
feInfoStr.getBytes(StandardCharsets.UTF_8));
|
||||
feInfo = PBHelperClient
|
||||
.convert(FileEncryptionInfoProto.parseFrom(decodedBytes));
|
||||
}
|
||||
String location = conn.getHeaderField("Location");
|
||||
if (location != null) {
|
||||
// This saves the location for datanode where redirect was issued.
|
||||
// Need to remove offset because seek can be called after open.
|
||||
resolvedUrl = removeOffsetParam(new URL(location));
|
||||
} else {
|
||||
// This is cached for proxies like httpfsfilesystem.
|
||||
cachedConnection = conn;
|
||||
}
|
||||
originalUrl = super.getUrl();
|
||||
}
|
||||
|
||||
int read(byte[] b, int off, int len) throws IOException {
|
||||
|
@ -2054,7 +2120,8 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
if (runnerState == RunnerState.SEEK) {
|
||||
try {
|
||||
final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos));
|
||||
cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true).run();
|
||||
cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true,
|
||||
false).run();
|
||||
} catch (IOException ioe) {
|
||||
closeInputStream(RunnerState.DISCONNECTED);
|
||||
}
|
||||
|
@ -2238,5 +2305,9 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
long getPos() {
|
||||
return pos;
|
||||
}
|
||||
|
||||
protected FileEncryptionInfo getFileEncryptionInfo() {
|
||||
return feInfo;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -102,12 +102,14 @@ public class TestWebHdfsContentLength {
|
|||
public void testGetOpWithRedirect() {
|
||||
Future<String> future1 = contentLengthFuture(redirectResponse);
|
||||
Future<String> future2 = contentLengthFuture(errResponse);
|
||||
Future<String> future3 = contentLengthFuture(errResponse);
|
||||
try {
|
||||
fs.open(p).read();
|
||||
Assert.fail();
|
||||
} catch (IOException ioe) {} // expected
|
||||
Assert.assertEquals(null, getContentLength(future1));
|
||||
Assert.assertEquals(null, getContentLength(future2));
|
||||
Assert.assertEquals(null, getContentLength(future3));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -50,12 +50,16 @@ import javax.ws.rs.core.Context;
|
|||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.StreamingOutput;
|
||||
import javax.ws.rs.core.Response.ResponseBuilder;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
|
@ -72,6 +76,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
|
@ -115,9 +120,9 @@ public class NamenodeWebHdfsMethods {
|
|||
private Principal userPrincipal;
|
||||
private String remoteAddr;
|
||||
|
||||
private static volatile String serverDefaultsResponse = null;
|
||||
private @Context ServletContext context;
|
||||
private @Context HttpServletResponse response;
|
||||
private boolean supportEZ;
|
||||
|
||||
public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
|
||||
// the request object is a proxy to thread-locals so we have to extract
|
||||
|
@ -128,6 +133,8 @@ public class NamenodeWebHdfsMethods {
|
|||
// get the remote address, if coming in via a trusted proxy server then
|
||||
// the address with be that of the proxied client
|
||||
remoteAddr = JspHelper.getRemoteAddr(request);
|
||||
supportEZ =
|
||||
Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER));
|
||||
}
|
||||
|
||||
private void init(final UserGroupInformation ugi,
|
||||
|
@ -226,7 +233,7 @@ public class NamenodeWebHdfsMethods {
|
|||
static DatanodeInfo chooseDatanode(final NameNode namenode,
|
||||
final String path, final HttpOpParam.Op op, final long openOffset,
|
||||
final long blocksize, final String excludeDatanodes,
|
||||
final String remoteAddr) throws IOException {
|
||||
final String remoteAddr, final HdfsFileStatus status) throws IOException {
|
||||
FSNamesystem fsn = namenode.getNamesystem();
|
||||
if (fsn == null) {
|
||||
throw new IOException("Namesystem has not been intialized yet.");
|
||||
|
@ -263,7 +270,6 @@ public class NamenodeWebHdfsMethods {
|
|||
|| op == PostOpParam.Op.APPEND) {
|
||||
//choose a datanode containing a replica
|
||||
final NamenodeProtocols np = getRPCServer(namenode);
|
||||
final HdfsFileStatus status = np.getFileInfo(path);
|
||||
if (status == null) {
|
||||
throw new FileNotFoundException("File " + path + " not found.");
|
||||
}
|
||||
|
@ -283,7 +289,7 @@ public class NamenodeWebHdfsMethods {
|
|||
return bestNode(locations.get(0).getLocations(), excludes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology(
|
||||
).chooseRandom(NodeBase.ROOT, excludes);
|
||||
|
@ -320,15 +326,22 @@ public class NamenodeWebHdfsMethods {
|
|||
return t;
|
||||
}
|
||||
|
||||
private URI redirectURI(final NameNode namenode,
|
||||
private URI redirectURI(ResponseBuilder rb, final NameNode namenode,
|
||||
final UserGroupInformation ugi, final DelegationParam delegation,
|
||||
final UserParam username, final DoAsParam doAsUser,
|
||||
final String path, final HttpOpParam.Op op, final long openOffset,
|
||||
final long blocksize, final String excludeDatanodes,
|
||||
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
|
||||
final DatanodeInfo dn;
|
||||
final NamenodeProtocols np = getRPCServer(namenode);
|
||||
HdfsFileStatus status = null;
|
||||
if (op == GetOpParam.Op.OPEN
|
||||
|| op == GetOpParam.Op.GETFILECHECKSUM
|
||||
|| op == PostOpParam.Op.APPEND) {
|
||||
status = np.getFileInfo(path);
|
||||
}
|
||||
dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
|
||||
excludeDatanodes, remoteAddr);
|
||||
excludeDatanodes, remoteAddr, status);
|
||||
if (dn == null) {
|
||||
throw new IOException("Failed to find datanode, suggest to check cluster"
|
||||
+ " health. excludeDatanodes=" + excludeDatanodes);
|
||||
|
@ -347,15 +360,27 @@ public class NamenodeWebHdfsMethods {
|
|||
namenode, ugi, null);
|
||||
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
|
||||
}
|
||||
final String query = op.toQueryString() + delegationQuery
|
||||
+ "&" + new NamenodeAddressParam(namenode)
|
||||
+ Param.toSortedString("&", parameters);
|
||||
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
|
||||
|
||||
StringBuilder queryBuilder = new StringBuilder();
|
||||
queryBuilder.append(op.toQueryString());
|
||||
queryBuilder.append(delegationQuery);
|
||||
queryBuilder.append("&").append(new NamenodeAddressParam(namenode));
|
||||
queryBuilder.append(Param.toSortedString("&", parameters));
|
||||
|
||||
boolean prependReservedRawPath = false;
|
||||
if (op == GetOpParam.Op.OPEN && supportEZ
|
||||
&& status.getFileEncryptionInfo() != null) {
|
||||
prependReservedRawPath = true;
|
||||
rb.header(WebHdfsFileSystem.FEFINFO_HEADER,
|
||||
encodeFeInfo(status.getFileEncryptionInfo()));
|
||||
}
|
||||
final String uripath = WebHdfsFileSystem.PATH_PREFIX +
|
||||
(prependReservedRawPath ? "/.reserved/raw" + path : path);
|
||||
|
||||
int port = "http".equals(scheme) ? dn.getInfoPort() : dn
|
||||
.getInfoSecurePort();
|
||||
final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath,
|
||||
query, null);
|
||||
queryBuilder.toString(), null);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("redirectURI=" + uri);
|
||||
|
@ -556,7 +581,7 @@ public class NamenodeWebHdfsMethods {
|
|||
switch(op.getValue()) {
|
||||
case CREATE:
|
||||
{
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username,
|
||||
final URI uri = redirectURI(null, namenode, ugi, delegation, username,
|
||||
doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
|
||||
exclDatanodes.getValue(), permission, overwrite, bufferSize,
|
||||
replication, blockSize, createParent, createFlagParam);
|
||||
|
@ -786,7 +811,7 @@ public class NamenodeWebHdfsMethods {
|
|||
case APPEND:
|
||||
{
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username,
|
||||
final URI uri = redirectURI(null, namenode, ugi, delegation, username,
|
||||
doAsUser, fullpath, op.getValue(), -1L, -1L,
|
||||
excludeDatanodes.getValue(), bufferSize);
|
||||
if(!noredirectParam.getValue()) {
|
||||
|
@ -923,6 +948,12 @@ public class NamenodeWebHdfsMethods {
|
|||
});
|
||||
}
|
||||
|
||||
private static String encodeFeInfo(FileEncryptionInfo feInfo) {
|
||||
String encodedValue = Base64.encodeBase64String(
|
||||
PBHelperClient.convert(feInfo).toByteArray());
|
||||
return encodedValue;
|
||||
}
|
||||
|
||||
private Response get(
|
||||
final UserGroupInformation ugi,
|
||||
final DelegationParam delegation,
|
||||
|
@ -951,15 +982,17 @@ public class NamenodeWebHdfsMethods {
|
|||
case OPEN:
|
||||
{
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username,
|
||||
ResponseBuilder rb = Response.noContent();
|
||||
final URI uri = redirectURI(rb, namenode, ugi, delegation, username,
|
||||
doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
|
||||
excludeDatanodes.getValue(), offset, length, bufferSize);
|
||||
if(!noredirectParam.getValue()) {
|
||||
return Response.temporaryRedirect(uri)
|
||||
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
return rb.status(Status.TEMPORARY_REDIRECT).location(uri)
|
||||
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
} else {
|
||||
final String js = JsonUtil.toJsonString("Location", uri);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
return rb.status(Status.OK).entity(js).type(MediaType.APPLICATION_JSON)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
case GETFILEBLOCKLOCATIONS:
|
||||
|
@ -1010,8 +1043,8 @@ public class NamenodeWebHdfsMethods {
|
|||
case GETFILECHECKSUM:
|
||||
{
|
||||
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
|
||||
fullpath, op.getValue(), -1L, -1L, null);
|
||||
final URI uri = redirectURI(null, namenode, ugi, delegation, username,
|
||||
doAsUser, fullpath, op.getValue(), -1L, -1L, null);
|
||||
if(!noredirectParam.getValue()) {
|
||||
return Response.temporaryRedirect(uri)
|
||||
.type(MediaType.APPLICATION_OCTET_STREAM).build();
|
||||
|
@ -1109,9 +1142,12 @@ public class NamenodeWebHdfsMethods {
|
|||
case GETSERVERDEFAULTS: {
|
||||
// Since none of the server defaults values are hot reloaded, we can
|
||||
// cache the output of serverDefaults.
|
||||
String serverDefaultsResponse =
|
||||
(String) context.getAttribute("serverDefaults");
|
||||
if (serverDefaultsResponse == null) {
|
||||
FsServerDefaults serverDefaults = cp.getServerDefaults();
|
||||
serverDefaultsResponse = JsonUtil.toJsonString(serverDefaults);
|
||||
context.setAttribute("serverDefaults", serverDefaultsResponse);
|
||||
}
|
||||
return Response.ok(serverDefaultsResponse)
|
||||
.type(MediaType.APPLICATION_JSON).build();
|
||||
|
@ -1121,15 +1157,6 @@ public class NamenodeWebHdfsMethods {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This is used only and only for testing.
|
||||
* Please don't use it otherwise.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static void resetServerDefaultsResponse() {
|
||||
serverDefaultsResponse = null;
|
||||
}
|
||||
|
||||
private static String getTrashRoot(String fullPath,
|
||||
Configuration conf) throws IOException {
|
||||
FileSystem fs = FileSystem.get(conf != null ? conf : new Configuration());
|
||||
|
|
|
@ -20,10 +20,14 @@ package org.apache.hadoop.hdfs;
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.io.StringReader;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -41,6 +45,7 @@ import com.google.common.collect.Lists;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.crypto.CryptoInputStream;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
|
@ -49,6 +54,7 @@ import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
|||
import org.apache.hadoop.crypto.key.kms.server.EagerKeyGeneratorKeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FSTestWrapper;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
|
@ -82,6 +88,7 @@ import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
|||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
|
@ -1997,4 +2004,185 @@ public class TestEncryptionZones {
|
|||
Assert.assertEquals(tokens[1], testToken);
|
||||
Assert.assertEquals(1, creds.numberOfTokens());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a file with stable {@link DistributedFileSystem}.
|
||||
* Tests the following 2 scenarios.
|
||||
* 1. The decrypted data using {@link WebHdfsFileSystem} should be same as
|
||||
* input data.
|
||||
* 2. Gets the underlying raw encrypted stream and verifies that the
|
||||
* encrypted data is different than input data.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testWebhdfsRead() throws Exception {
|
||||
Path zonePath = new Path("/TestEncryptionZone");
|
||||
fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
|
||||
dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH);
|
||||
final Path encryptedFilePath =
|
||||
new Path("/TestEncryptionZone/encryptedFile.txt");
|
||||
final Path rawPath =
|
||||
new Path("/.reserved/raw/TestEncryptionZone/encryptedFile.txt");
|
||||
final String content = "hello world";
|
||||
// Create a file using DistributedFileSystem.
|
||||
DFSTestUtil.writeFile(fs, encryptedFilePath, content);
|
||||
final FileSystem webhdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
|
||||
// Verify whether decrypted input stream data is same as content.
|
||||
InputStream decryptedIputStream = webhdfs.open(encryptedFilePath);
|
||||
verifyStreamsSame(content, decryptedIputStream);
|
||||
|
||||
// Get the underlying stream from CryptoInputStream which should be
|
||||
// raw encrypted bytes.
|
||||
InputStream cryptoStream =
|
||||
webhdfs.open(encryptedFilePath).getWrappedStream();
|
||||
Assert.assertTrue("cryptoStream should be an instance of "
|
||||
+ "CryptoInputStream", (cryptoStream instanceof CryptoInputStream));
|
||||
InputStream encryptedStream =
|
||||
((CryptoInputStream)cryptoStream).getWrappedStream();
|
||||
// Verify that the data read from the raw input stream is different
|
||||
// from the original content. Also check it is identical to the raw
|
||||
// encrypted data from dfs.
|
||||
verifyRaw(content, encryptedStream, fs.open(rawPath));
|
||||
}
|
||||
|
||||
private void verifyStreamsSame(String content, InputStream is)
|
||||
throws IOException {
|
||||
byte[] streamBytes;
|
||||
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
|
||||
IOUtils.copyBytes(is, os, 1024, true);
|
||||
streamBytes = os.toByteArray();
|
||||
}
|
||||
Assert.assertArrayEquals(content.getBytes(), streamBytes);
|
||||
}
|
||||
|
||||
private void verifyRaw(String content, InputStream is, InputStream rawIs)
|
||||
throws IOException {
|
||||
byte[] streamBytes, rawBytes;
|
||||
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
|
||||
IOUtils.copyBytes(is, os, 1024, true);
|
||||
streamBytes = os.toByteArray();
|
||||
}
|
||||
Assert.assertFalse(Arrays.equals(content.getBytes(), streamBytes));
|
||||
|
||||
// webhdfs raw bytes should match the raw bytes from dfs.
|
||||
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
|
||||
IOUtils.copyBytes(rawIs, os, 1024, true);
|
||||
rawBytes = os.toByteArray();
|
||||
}
|
||||
Assert.assertArrayEquals(rawBytes, streamBytes);
|
||||
}
|
||||
|
||||
/* Tests that if client is old and namenode is new then the
|
||||
* data will be decrypted by datanode.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testWebhdfsReadOldBehavior() throws Exception {
|
||||
Path zonePath = new Path("/TestEncryptionZone");
|
||||
fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
|
||||
dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH);
|
||||
final Path encryptedFilePath = new Path("/TestEncryptionZone/foo");
|
||||
final String content = "hello world";
|
||||
// Create a file using DistributedFileSystem.
|
||||
DFSTestUtil.writeFile(fs, encryptedFilePath, content);
|
||||
|
||||
InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
|
||||
URL url = new URL("http", addr.getHostString(), addr.getPort(),
|
||||
WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString()
|
||||
+ "?op=OPEN");
|
||||
// Return a connection with client not supporting EZ.
|
||||
HttpURLConnection namenodeConnection = returnConnection(url, "GET", false);
|
||||
String location = namenodeConnection.getHeaderField("Location");
|
||||
URL datanodeURL = new URL(location);
|
||||
String path = datanodeURL.getPath();
|
||||
Assert.assertEquals(
|
||||
WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString(), path);
|
||||
HttpURLConnection datanodeConnection = returnConnection(datanodeURL,
|
||||
"GET", false);
|
||||
InputStream in = datanodeConnection.getInputStream();
|
||||
// Comparing with the original contents
|
||||
// and making sure they are decrypted.
|
||||
verifyStreamsSame(content, in);
|
||||
}
|
||||
|
||||
/* Tests namenode returns path starting with /.reserved/raw if client
|
||||
* supports EZ and not if otherwise
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testWebhfsEZRedirectLocation()
|
||||
throws Exception {
|
||||
Path zonePath = new Path("/TestEncryptionZone");
|
||||
fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
|
||||
dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH);
|
||||
final Path encryptedFilePath =
|
||||
new Path("/TestEncryptionZone/foo");
|
||||
final String content = "hello world";
|
||||
// Create a file using DistributedFileSystem.
|
||||
DFSTestUtil.writeFile(fs, encryptedFilePath, content);
|
||||
|
||||
InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
|
||||
URL url = new URL("http", addr.getHostString(), addr.getPort(),
|
||||
WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString()
|
||||
+ "?op=OPEN");
|
||||
// Return a connection with client not supporting EZ.
|
||||
HttpURLConnection namenodeConnection =
|
||||
returnConnection(url, "GET", false);
|
||||
Assert.assertNotNull(namenodeConnection.getHeaderField("Location"));
|
||||
URL datanodeUrl = new URL(namenodeConnection.getHeaderField("Location"));
|
||||
Assert.assertNotNull(datanodeUrl);
|
||||
String path = datanodeUrl.getPath();
|
||||
Assert.assertEquals(
|
||||
WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString(), path);
|
||||
|
||||
url = new URL("http", addr.getHostString(), addr.getPort(),
|
||||
WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString()
|
||||
+ "?op=OPEN");
|
||||
// Return a connection with client supporting EZ.
|
||||
namenodeConnection = returnConnection(url, "GET", true);
|
||||
Assert.assertNotNull(namenodeConnection.getHeaderField("Location"));
|
||||
datanodeUrl = new URL(namenodeConnection.getHeaderField("Location"));
|
||||
Assert.assertNotNull(datanodeUrl);
|
||||
path = datanodeUrl.getPath();
|
||||
Assert.assertEquals(WebHdfsFileSystem.PATH_PREFIX
|
||||
+ "/.reserved/raw" + encryptedFilePath.toString(), path);
|
||||
}
|
||||
|
||||
private static HttpURLConnection returnConnection(URL url,
|
||||
String httpRequestType, boolean supportEZ) throws Exception {
|
||||
HttpURLConnection conn = null;
|
||||
conn = (HttpURLConnection) url.openConnection();
|
||||
conn.setRequestMethod(httpRequestType);
|
||||
conn.setDoOutput(true);
|
||||
conn.setInstanceFollowRedirects(false);
|
||||
if (supportEZ) {
|
||||
conn.setRequestProperty(WebHdfsFileSystem.EZ_HEADER, "true");
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
||||
/*
|
||||
* Test seek behavior of the webhdfs input stream which reads data from
|
||||
* encryption zone.
|
||||
*/
|
||||
@Test
|
||||
public void testPread() throws Exception {
|
||||
Path zonePath = new Path("/TestEncryptionZone");
|
||||
fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
|
||||
dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH);
|
||||
final Path encryptedFilePath =
|
||||
new Path("/TestEncryptionZone/foo");
|
||||
// Create a file using DistributedFileSystem.
|
||||
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
DFSTestUtil.createFile(webfs, encryptedFilePath, 1024, (short)1, 0xFEED);
|
||||
byte[] data = DFSTestUtil.readFileBuffer(fs, encryptedFilePath);
|
||||
FSDataInputStream in = webfs.open(encryptedFilePath);
|
||||
for (int i = 0; i < 1024; i++) {
|
||||
in.seek(i);
|
||||
Assert.assertEquals((data[i] & 0XFF), in.read());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
|
@ -101,7 +102,7 @@ public class TestWebHdfsDataLocality {
|
|||
//The chosen datanode must be the same as the client address
|
||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||
namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null,
|
||||
LOCALHOST);
|
||||
LOCALHOST, null);
|
||||
Assert.assertEquals(ipAddr, chosen.getIpAddr());
|
||||
}
|
||||
}
|
||||
|
@ -125,23 +126,26 @@ public class TestWebHdfsDataLocality {
|
|||
//the chosen datanode must be the same as the replica location.
|
||||
|
||||
{ //test GETFILECHECKSUM
|
||||
final HdfsFileStatus status = dfs.getClient().getFileInfo(f);
|
||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null,
|
||||
LOCALHOST);
|
||||
LOCALHOST, status);
|
||||
Assert.assertEquals(expected, chosen);
|
||||
}
|
||||
|
||||
{ //test OPEN
|
||||
final HdfsFileStatus status = dfs.getClient().getFileInfo(f);
|
||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null,
|
||||
LOCALHOST);
|
||||
LOCALHOST, status);
|
||||
Assert.assertEquals(expected, chosen);
|
||||
}
|
||||
|
||||
{ //test APPEND
|
||||
final HdfsFileStatus status = dfs.getClient().getFileInfo(f);
|
||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||
namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null,
|
||||
LOCALHOST);
|
||||
LOCALHOST, status);
|
||||
Assert.assertEquals(expected, chosen);
|
||||
}
|
||||
} finally {
|
||||
|
@ -195,9 +199,10 @@ public class TestWebHdfsDataLocality {
|
|||
for (int i = 0; i < 2; i++) {
|
||||
sb.append(locations[i].getXferAddr());
|
||||
{ // test GETFILECHECKSUM
|
||||
final HdfsFileStatus status = dfs.getClient().getFileInfo(f);
|
||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize,
|
||||
sb.toString(), LOCALHOST);
|
||||
sb.toString(), LOCALHOST, status);
|
||||
for (int j = 0; j <= i; j++) {
|
||||
Assert.assertNotEquals(locations[j].getHostName(),
|
||||
chosen.getHostName());
|
||||
|
@ -205,9 +210,10 @@ public class TestWebHdfsDataLocality {
|
|||
}
|
||||
|
||||
{ // test OPEN
|
||||
final HdfsFileStatus status = dfs.getClient().getFileInfo(f);
|
||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString(),
|
||||
LOCALHOST);
|
||||
LOCALHOST, status);
|
||||
for (int j = 0; j <= i; j++) {
|
||||
Assert.assertNotEquals(locations[j].getHostName(),
|
||||
chosen.getHostName());
|
||||
|
@ -215,9 +221,10 @@ public class TestWebHdfsDataLocality {
|
|||
}
|
||||
|
||||
{ // test APPEND
|
||||
final HdfsFileStatus status = dfs.getClient().getFileInfo(f);
|
||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods
|
||||
.chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L,
|
||||
blocksize, sb.toString(), LOCALHOST);
|
||||
blocksize, sb.toString(), LOCALHOST, status);
|
||||
for (int j = 0; j <= i; j++) {
|
||||
Assert.assertNotEquals(locations[j].getHostName(),
|
||||
chosen.getHostName());
|
||||
|
@ -238,6 +245,6 @@ public class TestWebHdfsDataLocality {
|
|||
exception.expect(IOException.class);
|
||||
exception.expectMessage("Namesystem has not been intialized yet.");
|
||||
NamenodeWebHdfsMethods.chooseDatanode(nn, "/path", PutOpParam.Op.CREATE, 0,
|
||||
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null, LOCALHOST);
|
||||
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null, LOCALHOST, null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1635,7 +1635,6 @@ public class TestWebHDFS {
|
|||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||
final WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(
|
||||
conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
NamenodeWebHdfsMethods.resetServerDefaultsResponse();
|
||||
FSNamesystem fsnSpy =
|
||||
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode());
|
||||
Mockito.when(fsnSpy.getServerDefaults()).
|
||||
|
|
|
@ -392,7 +392,7 @@ public class TestWebHdfsTokens {
|
|||
InputStream is = fs.open(p);
|
||||
is.read();
|
||||
is.close();
|
||||
verify(fs, times(2)).getDelegationToken(); // first bad, then good
|
||||
verify(fs, times(3)).getDelegationToken(); // first bad, then good
|
||||
verify(fs, times(1)).replaceExpiredDelegationToken();
|
||||
verify(fs, times(1)).getDelegationToken(null);
|
||||
verify(fs, times(1)).setDelegationToken(any(Token.class));
|
||||
|
@ -409,7 +409,7 @@ public class TestWebHdfsTokens {
|
|||
is = fs.open(p);
|
||||
is.read();
|
||||
is.close();
|
||||
verify(fs, times(2)).getDelegationToken(); // first bad, then good
|
||||
verify(fs, times(3)).getDelegationToken(); // first bad, then good
|
||||
verify(fs, times(1)).replaceExpiredDelegationToken();
|
||||
verify(fs, times(1)).getDelegationToken(null);
|
||||
verify(fs, times(1)).setDelegationToken(any(Token.class));
|
||||
|
|
Loading…
Reference in New Issue