HDFS-12574. Add CryptoInputStream to WebHdfsFileSystem read call. Contributed by Rushabh S Shah

(cherry picked from commit fde95d463c)
This commit is contained in:
Kihwal Lee 2018-01-29 17:25:30 -06:00
parent 95a96b13e2
commit 673200ac1e
9 changed files with 403 additions and 90 deletions

View File

@ -38,7 +38,6 @@ import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.URI; import java.net.URI;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.security.GeneralSecurityException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
@ -62,8 +61,6 @@ import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.crypto.CryptoOutputStream; import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
@ -908,46 +905,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
} }
/**
* Decrypts a EDEK by consulting the KeyProvider.
*/
private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
feInfo) throws IOException {
try (TraceScope ignored = tracer.newScope("decryptEDEK")) {
KeyProvider provider = getKeyProvider();
if (provider == null) {
throw new IOException("No KeyProvider is configured, cannot access" +
" an encrypted file");
}
EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(),
feInfo.getEncryptedDataEncryptionKey());
try {
KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
.createKeyProviderCryptoExtension(provider);
return cryptoProvider.decryptEncryptedKey(ekv);
} catch (GeneralSecurityException e) {
throw new IOException(e);
}
}
}
/** /**
* Wraps the stream in a CryptoInputStream if the underlying file is * Wraps the stream in a CryptoInputStream if the underlying file is
* encrypted. * encrypted.
*/ */
public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis) public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)
throws IOException { throws IOException {
final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo(); FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
if (feInfo != null) { if (feInfo != null) {
// File is encrypted, wrap the stream in a crypto stream. CryptoInputStream cryptoIn;
// Currently only one version, so no special logic based on the version # try (TraceScope ignored = getTracer().newScope("decryptEDEK")) {
HdfsKMSUtil.getCryptoProtocolVersion(feInfo); cryptoIn = HdfsKMSUtil.createWrappedInputStream(dfsis,
final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(conf, feInfo); getKeyProvider(), feInfo, getConfiguration());
final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo); }
final CryptoInputStream cryptoIn =
new CryptoInputStream(dfsis, codec, decrypted.getMaterial(),
feInfo.getIV());
return new HdfsDataInputStream(cryptoIn); return new HdfsDataInputStream(cryptoIn);
} else { } else {
// No FileEncryptionInfo so no encryption. // No FileEncryptionInfo so no encryption.
@ -976,7 +946,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
// Currently only one version, so no special logic based on the version # // Currently only one version, so no special logic based on the version #
HdfsKMSUtil.getCryptoProtocolVersion(feInfo); HdfsKMSUtil.getCryptoProtocolVersion(feInfo);
final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(conf, feInfo); final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(conf, feInfo);
KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo); KeyVersion decrypted;
try (TraceScope ignored = tracer.newScope("decryptEDEK")) {
decrypted = HdfsKMSUtil.decryptEncryptedDataEncryptionKey(feInfo,
getKeyProvider());
}
final CryptoOutputStream cryptoOut = final CryptoOutputStream cryptoOut =
new CryptoOutputStream(dfsos, codec, new CryptoOutputStream(dfsos, codec,
decrypted.getMaterial(), feInfo.getIV(), startPos); decrypted.getMaterial(), feInfo.getIV(), startPos);

View File

@ -20,15 +20,21 @@ package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.security.GeneralSecurityException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoCodec; import org.apache.hadoop.crypto.CryptoCodec;
import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension; import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -187,4 +193,39 @@ public final class HdfsKMSUtil {
return new Text(DFS_KMS_PREFIX + namenodeUri.getScheme() return new Text(DFS_KMS_PREFIX + namenodeUri.getScheme()
+"://" + namenodeUri.getAuthority()); +"://" + namenodeUri.getAuthority());
} }
public static CryptoInputStream createWrappedInputStream(InputStream is,
KeyProvider keyProvider, FileEncryptionInfo fileEncryptionInfo,
Configuration conf) throws IOException {
// File is encrypted, wrap the stream in a crypto stream.
// Currently only one version, so no special logic based on the version#
HdfsKMSUtil.getCryptoProtocolVersion(fileEncryptionInfo);
final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(
conf, fileEncryptionInfo);
final KeyVersion decrypted =
decryptEncryptedDataEncryptionKey(fileEncryptionInfo, keyProvider);
return new CryptoInputStream(is, codec, decrypted.getMaterial(),
fileEncryptionInfo.getIV());
}
/**
* Decrypts a EDEK by consulting the KeyProvider.
*/
static KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
feInfo, KeyProvider keyProvider) throws IOException {
if (keyProvider == null) {
throw new IOException("No KeyProvider is configured, cannot access" +
" an encrypted file");
}
EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(),
feInfo.getEncryptedDataEncryptionKey());
try {
KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
.createKeyProviderCryptoExtension(keyProvider);
return cryptoProvider.decryptEncryptedKey(ekv);
} catch (GeneralSecurityException e) {
throw new IOException(e);
}
}
} }

View File

@ -37,8 +37,11 @@ import java.net.InetSocketAddress;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Base64;
import java.util.Base64.Decoder;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
@ -66,6 +69,7 @@ import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
@ -92,6 +96,8 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FileEncryptionInfoProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.web.resources.*; import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op; import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
@ -133,6 +139,8 @@ public class WebHdfsFileSystem extends FileSystem
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */ /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
public static final String PATH_PREFIX = "/" + WebHdfsConstants.WEBHDFS_SCHEME public static final String PATH_PREFIX = "/" + WebHdfsConstants.WEBHDFS_SCHEME
+ "/v" + VERSION; + "/v" + VERSION;
public static final String EZ_HEADER = "X-Hadoop-Accept-EZ";
public static final String FEFINFO_HEADER = "X-Hadoop-feInfo";
/** /**
* Default connection factory may be overridden in tests to use smaller * Default connection factory may be overridden in tests to use smaller
@ -613,12 +621,19 @@ public class WebHdfsFileSystem extends FileSystem
private boolean checkRetry; private boolean checkRetry;
private String redirectHost; private String redirectHost;
private boolean followRedirect = true;
protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) { protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
this.op = op; this.op = op;
this.redirected = redirected; this.redirected = redirected;
} }
protected AbstractRunner(final HttpOpParam.Op op, boolean redirected,
boolean followRedirect) {
this(op, redirected);
this.followRedirect = followRedirect;
}
T run() throws IOException { T run() throws IOException {
UserGroupInformation connectUgi = ugi.getRealUser(); UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) { if (connectUgi == null) {
@ -685,9 +700,17 @@ public class WebHdfsFileSystem extends FileSystem
// See http://tinyurl.com/java7-http-keepalive // See http://tinyurl.com/java7-http-keepalive
conn.disconnect(); conn.disconnect();
} }
if (!followRedirect) {
return conn;
}
} }
try { try {
return connect(op, url); final HttpURLConnection conn = connect(op, url);
// output streams will validate on close
if (!op.getDoOutput()) {
validateResponse(op, conn, false);
}
return conn;
} catch (IOException ioe) { } catch (IOException ioe) {
if (redirectHost != null) { if (redirectHost != null) {
if (excludeDatanodes.getValue() != null) { if (excludeDatanodes.getValue() != null) {
@ -713,6 +736,7 @@ public class WebHdfsFileSystem extends FileSystem
// The value of the header is unimportant. Only its presence matters. // The value of the header is unimportant. Only its presence matters.
conn.setRequestProperty(restCsrfCustomHeader, "\"\""); conn.setRequestProperty(restCsrfCustomHeader, "\"\"");
} }
conn.setRequestProperty(EZ_HEADER, "true");
switch (op.getType()) { switch (op.getType()) {
// if not sending a message body for a POST or PUT operation, need // if not sending a message body for a POST or PUT operation, need
// to ensure the server/proxy knows this // to ensure the server/proxy knows this
@ -760,10 +784,6 @@ public class WebHdfsFileSystem extends FileSystem
final URL url = getUrl(); final URL url = getUrl();
try { try {
final HttpURLConnection conn = connect(url); final HttpURLConnection conn = connect(url);
// output streams will validate on close
if (!op.getDoOutput()) {
validateResponse(op, conn, false);
}
return getResponse(conn); return getResponse(conn);
} catch (AccessControlException ace) { } catch (AccessControlException ace) {
// no retries for auth failures // no retries for auth failures
@ -808,7 +828,6 @@ public class WebHdfsFileSystem extends FileSystem
a.action == RetryPolicy.RetryAction.RetryDecision.RETRY; a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
boolean isFailoverAndRetry = boolean isFailoverAndRetry =
a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY; a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
if (isRetry || isFailoverAndRetry) { if (isRetry || isFailoverAndRetry) {
LOG.info("Retrying connect to namenode: {}. Already retried {}" LOG.info("Retrying connect to namenode: {}. Already retried {}"
+ " time(s); retry policy is {}, delay {}ms.", + " time(s); retry policy is {}, delay {}ms.",
@ -989,16 +1008,16 @@ public class WebHdfsFileSystem extends FileSystem
/** /**
* Used by open() which tracks the resolved url itself * Used by open() which tracks the resolved url itself
*/ */
final class URLRunner extends AbstractRunner<HttpURLConnection> { class URLRunner extends AbstractRunner<HttpURLConnection> {
private final URL url; private final URL url;
@Override @Override
protected URL getUrl() { protected URL getUrl() throws IOException {
return url; return url;
} }
protected URLRunner(final HttpOpParam.Op op, final URL url, protected URLRunner(final HttpOpParam.Op op, final URL url,
boolean redirected) { boolean redirected, boolean followRedirect) {
super(op, redirected); super(op, redirected, followRedirect);
this.url = url; this.url = url;
} }
@ -1411,12 +1430,20 @@ public class WebHdfsFileSystem extends FileSystem
).run(); ).run();
} }
@SuppressWarnings("resource")
@Override @Override
public FSDataInputStream open(final Path f, final int bufferSize public FSDataInputStream open(final Path f, final int bufferSize
) throws IOException { ) throws IOException {
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.OPEN); storageStatistics.incrementOpCounter(OpType.OPEN);
return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize)); WebHdfsInputStream webfsInputStream =
new WebHdfsInputStream(f, bufferSize);
if (webfsInputStream.getFileEncryptionInfo() == null) {
return new FSDataInputStream(webfsInputStream);
} else {
return new FSDataInputStream(
webfsInputStream.createWrappedInputStream());
}
} }
@Override @Override
@ -1461,7 +1488,8 @@ public class WebHdfsFileSystem extends FileSystem
final boolean resolved) throws IOException { final boolean resolved) throws IOException {
final URL offsetUrl = offset == 0L? url final URL offsetUrl = offset == 0L? url
: new URL(url + "&" + new OffsetParam(offset)); : new URL(url + "&" + new OffsetParam(offset));
return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run(); return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved,
true).run();
} }
} }
@ -1911,6 +1939,15 @@ public class WebHdfsFileSystem extends FileSystem
void setReadRunner(ReadRunner rr) { void setReadRunner(ReadRunner rr) {
this.readRunner = rr; this.readRunner = rr;
} }
FileEncryptionInfo getFileEncryptionInfo() {
return readRunner.getFileEncryptionInfo();
}
InputStream createWrappedInputStream() throws IOException {
return HdfsKMSUtil.createWrappedInputStream(
this, getKeyProvider(), getFileEncryptionInfo(), getConf());
}
} }
enum RunnerState { enum RunnerState {
@ -1947,7 +1984,7 @@ public class WebHdfsFileSystem extends FileSystem
private byte[] readBuffer; private byte[] readBuffer;
private int readOffset; private int readOffset;
private int readLength; private int readLength;
private RunnerState runnerState = RunnerState.DISCONNECTED; private RunnerState runnerState = RunnerState.SEEK;
private URL originalUrl = null; private URL originalUrl = null;
private URL resolvedUrl = null; private URL resolvedUrl = null;
@ -1955,6 +1992,7 @@ public class WebHdfsFileSystem extends FileSystem
private final int bufferSize; private final int bufferSize;
private long pos = 0; private long pos = 0;
private long fileLength = 0; private long fileLength = 0;
private FileEncryptionInfo feInfo = null;
/* The following methods are WebHdfsInputStream helpers. */ /* The following methods are WebHdfsInputStream helpers. */
@ -1962,6 +2000,36 @@ public class WebHdfsFileSystem extends FileSystem
super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs)); super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs));
this.path = p; this.path = p;
this.bufferSize = bs; this.bufferSize = bs;
getRedirectedUrl();
}
private void getRedirectedUrl() throws IOException {
URLRunner urlRunner = new URLRunner(GetOpParam.Op.OPEN, null, false,
false) {
@Override
protected URL getUrl() throws IOException {
return toUrl(op, path, new BufferSizeParam(bufferSize));
}
};
HttpURLConnection conn = urlRunner.run();
String feInfoStr = conn.getHeaderField(FEFINFO_HEADER);
if (feInfoStr != null) {
Decoder decoder = Base64.getDecoder();
byte[] decodedBytes = decoder.decode(
feInfoStr.getBytes(StandardCharsets.UTF_8));
feInfo = PBHelperClient
.convert(FileEncryptionInfoProto.parseFrom(decodedBytes));
}
String location = conn.getHeaderField("Location");
if (location != null) {
// This saves the location for datanode where redirect was issued.
// Need to remove offset because seek can be called after open.
resolvedUrl = removeOffsetParam(new URL(location));
} else {
// This is cached for proxies like httpfsfilesystem.
cachedConnection = conn;
}
originalUrl = super.getUrl();
} }
int read(byte[] b, int off, int len) throws IOException { int read(byte[] b, int off, int len) throws IOException {
@ -1994,7 +2062,8 @@ public class WebHdfsFileSystem extends FileSystem
if (runnerState == RunnerState.SEEK) { if (runnerState == RunnerState.SEEK) {
try { try {
final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos)); final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos));
cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true).run(); cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true,
false).run();
} catch (IOException ioe) { } catch (IOException ioe) {
closeInputStream(RunnerState.DISCONNECTED); closeInputStream(RunnerState.DISCONNECTED);
} }
@ -2178,5 +2247,9 @@ public class WebHdfsFileSystem extends FileSystem
long getPos() { long getPos() {
return pos; return pos;
} }
protected FileEncryptionInfo getFileEncryptionInfo() {
return feInfo;
}
} }
} }

View File

@ -102,12 +102,14 @@ public class TestWebHdfsContentLength {
public void testGetOpWithRedirect() { public void testGetOpWithRedirect() {
Future<String> future1 = contentLengthFuture(redirectResponse); Future<String> future1 = contentLengthFuture(redirectResponse);
Future<String> future2 = contentLengthFuture(errResponse); Future<String> future2 = contentLengthFuture(errResponse);
Future<String> future3 = contentLengthFuture(errResponse);
try { try {
fs.open(p).read(); fs.open(p).read();
Assert.fail(); Assert.fail();
} catch (IOException ioe) {} // expected } catch (IOException ioe) {} // expected
Assert.assertEquals(null, getContentLength(future1)); Assert.assertEquals(null, getContentLength(future1));
Assert.assertEquals(null, getContentLength(future2)); Assert.assertEquals(null, getContentLength(future2));
Assert.assertEquals(null, getContentLength(future3));
} }
@Test @Test

View File

@ -28,6 +28,8 @@ import java.net.URISyntaxException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.security.Principal; import java.security.Principal;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -50,11 +52,14 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
@ -73,6 +78,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@ -117,9 +123,9 @@ public class NamenodeWebHdfsMethods {
private Principal userPrincipal; private Principal userPrincipal;
private String remoteAddr; private String remoteAddr;
private static volatile String serverDefaultsResponse = null;
private @Context ServletContext context; private @Context ServletContext context;
private @Context HttpServletResponse response; private @Context HttpServletResponse response;
private boolean supportEZ;
public NamenodeWebHdfsMethods(@Context HttpServletRequest request) { public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
// the request object is a proxy to thread-locals so we have to extract // the request object is a proxy to thread-locals so we have to extract
@ -130,6 +136,8 @@ public class NamenodeWebHdfsMethods {
// get the remote address, if coming in via a trusted proxy server then // get the remote address, if coming in via a trusted proxy server then
// the address with be that of the proxied client // the address with be that of the proxied client
remoteAddr = JspHelper.getRemoteAddr(request); remoteAddr = JspHelper.getRemoteAddr(request);
supportEZ =
Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER));
} }
private void init(final UserGroupInformation ugi, private void init(final UserGroupInformation ugi,
@ -228,7 +236,7 @@ public class NamenodeWebHdfsMethods {
static DatanodeInfo chooseDatanode(final NameNode namenode, static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset, final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize, final String excludeDatanodes, final long blocksize, final String excludeDatanodes,
final String remoteAddr) throws IOException { final String remoteAddr, final HdfsFileStatus status) throws IOException {
FSNamesystem fsn = namenode.getNamesystem(); FSNamesystem fsn = namenode.getNamesystem();
if (fsn == null) { if (fsn == null) {
throw new IOException("Namesystem has not been intialized yet."); throw new IOException("Namesystem has not been intialized yet.");
@ -265,7 +273,6 @@ public class NamenodeWebHdfsMethods {
|| op == PostOpParam.Op.APPEND) { || op == PostOpParam.Op.APPEND) {
//choose a datanode containing a replica //choose a datanode containing a replica
final NamenodeProtocols np = getRPCServer(namenode); final NamenodeProtocols np = getRPCServer(namenode);
final HdfsFileStatus status = np.getFileInfo(path);
if (status == null) { if (status == null) {
throw new FileNotFoundException("File " + path + " not found."); throw new FileNotFoundException("File " + path + " not found.");
} }
@ -322,15 +329,22 @@ public class NamenodeWebHdfsMethods {
return t; return t;
} }
private URI redirectURI(final NameNode namenode, private URI redirectURI(ResponseBuilder rb, final NameNode namenode,
final UserGroupInformation ugi, final DelegationParam delegation, final UserGroupInformation ugi, final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser, final UserParam username, final DoAsParam doAsUser,
final String path, final HttpOpParam.Op op, final long openOffset, final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize, final String excludeDatanodes, final long blocksize, final String excludeDatanodes,
final Param<?, ?>... parameters) throws URISyntaxException, IOException { final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn; final DatanodeInfo dn;
final NamenodeProtocols np = getRPCServer(namenode);
HdfsFileStatus status = null;
if (op == GetOpParam.Op.OPEN
|| op == GetOpParam.Op.GETFILECHECKSUM
|| op == PostOpParam.Op.APPEND) {
status = np.getFileInfo(path);
}
dn = chooseDatanode(namenode, path, op, openOffset, blocksize, dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
excludeDatanodes, remoteAddr); excludeDatanodes, remoteAddr, status);
if (dn == null) { if (dn == null) {
throw new IOException("Failed to find datanode, suggest to check cluster" throw new IOException("Failed to find datanode, suggest to check cluster"
+ " health. excludeDatanodes=" + excludeDatanodes); + " health. excludeDatanodes=" + excludeDatanodes);
@ -349,15 +363,27 @@ public class NamenodeWebHdfsMethods {
namenode, ugi, null); namenode, ugi, null);
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString()); delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
} }
final String query = op.toQueryString() + delegationQuery
+ "&" + new NamenodeAddressParam(namenode) StringBuilder queryBuilder = new StringBuilder();
+ Param.toSortedString("&", parameters); queryBuilder.append(op.toQueryString());
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path; queryBuilder.append(delegationQuery);
queryBuilder.append("&").append(new NamenodeAddressParam(namenode));
queryBuilder.append(Param.toSortedString("&", parameters));
boolean prependReservedRawPath = false;
if (op == GetOpParam.Op.OPEN && supportEZ
&& status.getFileEncryptionInfo() != null) {
prependReservedRawPath = true;
rb.header(WebHdfsFileSystem.FEFINFO_HEADER,
encodeFeInfo(status.getFileEncryptionInfo()));
}
final String uripath = WebHdfsFileSystem.PATH_PREFIX +
(prependReservedRawPath ? "/.reserved/raw" + path : path);
int port = "http".equals(scheme) ? dn.getInfoPort() : dn int port = "http".equals(scheme) ? dn.getInfoPort() : dn
.getInfoSecurePort(); .getInfoSecurePort();
final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath, final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath,
query, null); queryBuilder.toString(), null);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("redirectURI=" + uri); LOG.trace("redirectURI=" + uri);
@ -581,7 +607,7 @@ public class NamenodeWebHdfsMethods {
switch(op.getValue()) { switch(op.getValue()) {
case CREATE: case CREATE:
{ {
final URI uri = redirectURI(namenode, ugi, delegation, username, final URI uri = redirectURI(null, namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf), doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
exclDatanodes.getValue(), permission, unmaskedPermission, exclDatanodes.getValue(), permission, unmaskedPermission,
overwrite, bufferSize, replication, blockSize, createParent, overwrite, bufferSize, replication, blockSize, createParent,
@ -830,7 +856,7 @@ public class NamenodeWebHdfsMethods {
case APPEND: case APPEND:
{ {
final NameNode namenode = (NameNode)context.getAttribute("name.node"); final NameNode namenode = (NameNode)context.getAttribute("name.node");
final URI uri = redirectURI(namenode, ugi, delegation, username, final URI uri = redirectURI(null, namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L, -1L, doAsUser, fullpath, op.getValue(), -1L, -1L,
excludeDatanodes.getValue(), bufferSize); excludeDatanodes.getValue(), bufferSize);
if(!noredirectParam.getValue()) { if(!noredirectParam.getValue()) {
@ -967,6 +993,13 @@ public class NamenodeWebHdfsMethods {
}); });
} }
private static String encodeFeInfo(FileEncryptionInfo feInfo) {
Encoder encoder = Base64.getEncoder();
String encodedValue = encoder
.encodeToString(PBHelperClient.convert(feInfo).toByteArray());
return encodedValue;
}
private Response get( private Response get(
final UserGroupInformation ugi, final UserGroupInformation ugi,
final DelegationParam delegation, final DelegationParam delegation,
@ -995,15 +1028,17 @@ public class NamenodeWebHdfsMethods {
case OPEN: case OPEN:
{ {
final NameNode namenode = (NameNode)context.getAttribute("name.node"); final NameNode namenode = (NameNode)context.getAttribute("name.node");
final URI uri = redirectURI(namenode, ugi, delegation, username, ResponseBuilder rb = Response.noContent();
final URI uri = redirectURI(rb, namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), offset.getValue(), -1L, doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
excludeDatanodes.getValue(), offset, length, bufferSize); excludeDatanodes.getValue(), offset, length, bufferSize);
if(!noredirectParam.getValue()) { if(!noredirectParam.getValue()) {
return Response.temporaryRedirect(uri) return rb.status(Status.TEMPORARY_REDIRECT).location(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build(); .type(MediaType.APPLICATION_OCTET_STREAM).build();
} else { } else {
final String js = JsonUtil.toJsonString("Location", uri); final String js = JsonUtil.toJsonString("Location", uri);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); return rb.status(Status.OK).entity(js).type(MediaType.APPLICATION_JSON)
.build();
} }
} }
case GET_BLOCK_LOCATIONS: case GET_BLOCK_LOCATIONS:
@ -1039,8 +1074,8 @@ public class NamenodeWebHdfsMethods {
case GETFILECHECKSUM: case GETFILECHECKSUM:
{ {
final NameNode namenode = (NameNode)context.getAttribute("name.node"); final NameNode namenode = (NameNode)context.getAttribute("name.node");
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, final URI uri = redirectURI(null, namenode, ugi, delegation, username,
fullpath, op.getValue(), -1L, -1L, null); doAsUser, fullpath, op.getValue(), -1L, -1L, null);
if(!noredirectParam.getValue()) { if(!noredirectParam.getValue()) {
return Response.temporaryRedirect(uri) return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build(); .type(MediaType.APPLICATION_OCTET_STREAM).build();
@ -1140,9 +1175,12 @@ public class NamenodeWebHdfsMethods {
case GETSERVERDEFAULTS: { case GETSERVERDEFAULTS: {
// Since none of the server defaults values are hot reloaded, we can // Since none of the server defaults values are hot reloaded, we can
// cache the output of serverDefaults. // cache the output of serverDefaults.
String serverDefaultsResponse =
(String) context.getAttribute("serverDefaults");
if (serverDefaultsResponse == null) { if (serverDefaultsResponse == null) {
FsServerDefaults serverDefaults = cp.getServerDefaults(); FsServerDefaults serverDefaults = cp.getServerDefaults();
serverDefaultsResponse = JsonUtil.toJsonString(serverDefaults); serverDefaultsResponse = JsonUtil.toJsonString(serverDefaults);
context.setAttribute("serverDefaults", serverDefaultsResponse);
} }
return Response.ok(serverDefaultsResponse) return Response.ok(serverDefaultsResponse)
.type(MediaType.APPLICATION_JSON).build(); .type(MediaType.APPLICATION_JSON).build();
@ -1152,15 +1190,6 @@ public class NamenodeWebHdfsMethods {
} }
} }
/*
* This is used only and only for testing.
* Please don't use it otherwise.
*/
@VisibleForTesting
public static void resetServerDefaultsResponse() {
serverDefaultsResponse = null;
}
private static String getTrashRoot(String fullPath, private static String getTrashRoot(String fullPath,
Configuration conf) throws IOException { Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf != null ? conf : new Configuration()); FileSystem fs = FileSystem.get(conf != null ? conf : new Configuration());

View File

@ -20,10 +20,14 @@ package org.apache.hadoop.hdfs;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.io.StringReader; import java.io.StringReader;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -41,12 +45,14 @@ import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSTestWrapper; import org.apache.hadoop.fs.FSTestWrapper;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
@ -80,6 +86,7 @@ import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
@ -1985,4 +1992,185 @@ public class TestEncryptionZones {
Assert.assertEquals(tokens[1], testToken); Assert.assertEquals(tokens[1], testToken);
Assert.assertEquals(1, creds.numberOfTokens()); Assert.assertEquals(1, creds.numberOfTokens());
} }
/**
* Creates a file with stable {@link DistributedFileSystem}.
* Tests the following 2 scenarios.
* 1. The decrypted data using {@link WebHdfsFileSystem} should be same as
* input data.
* 2. Gets the underlying raw encrypted stream and verifies that the
* encrypted data is different than input data.
* @throws Exception
*/
@Test
public void testWebhdfsRead() throws Exception {
Path zonePath = new Path("/TestEncryptionZone");
fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH);
final Path encryptedFilePath =
new Path("/TestEncryptionZone/encryptedFile.txt");
final Path rawPath =
new Path("/.reserved/raw/TestEncryptionZone/encryptedFile.txt");
final String content = "hello world";
// Create a file using DistributedFileSystem.
DFSTestUtil.writeFile(fs, encryptedFilePath, content);
final FileSystem webhdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
// Verify whether decrypted input stream data is same as content.
InputStream decryptedIputStream = webhdfs.open(encryptedFilePath);
verifyStreamsSame(content, decryptedIputStream);
// Get the underlying stream from CryptoInputStream which should be
// raw encrypted bytes.
InputStream cryptoStream =
webhdfs.open(encryptedFilePath).getWrappedStream();
Assert.assertTrue("cryptoStream should be an instance of "
+ "CryptoInputStream", (cryptoStream instanceof CryptoInputStream));
InputStream encryptedStream =
((CryptoInputStream)cryptoStream).getWrappedStream();
// Verify that the data read from the raw input stream is different
// from the original content. Also check it is identical to the raw
// encrypted data from dfs.
verifyRaw(content, encryptedStream, fs.open(rawPath));
}
private void verifyStreamsSame(String content, InputStream is)
throws IOException {
byte[] streamBytes;
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
IOUtils.copyBytes(is, os, 1024, true);
streamBytes = os.toByteArray();
}
Assert.assertArrayEquals(content.getBytes(), streamBytes);
}
private void verifyRaw(String content, InputStream is, InputStream rawIs)
throws IOException {
byte[] streamBytes, rawBytes;
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
IOUtils.copyBytes(is, os, 1024, true);
streamBytes = os.toByteArray();
}
Assert.assertFalse(Arrays.equals(content.getBytes(), streamBytes));
// webhdfs raw bytes should match the raw bytes from dfs.
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
IOUtils.copyBytes(rawIs, os, 1024, true);
rawBytes = os.toByteArray();
}
Assert.assertArrayEquals(rawBytes, streamBytes);
}
/* Tests that if client is old and namenode is new then the
* data will be decrypted by datanode.
* @throws Exception
*/
@Test
public void testWebhdfsReadOldBehavior() throws Exception {
Path zonePath = new Path("/TestEncryptionZone");
fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH);
final Path encryptedFilePath = new Path("/TestEncryptionZone/foo");
final String content = "hello world";
// Create a file using DistributedFileSystem.
DFSTestUtil.writeFile(fs, encryptedFilePath, content);
InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
URL url = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString()
+ "?op=OPEN");
// Return a connection with client not supporting EZ.
HttpURLConnection namenodeConnection = returnConnection(url, "GET", false);
String location = namenodeConnection.getHeaderField("Location");
URL datanodeURL = new URL(location);
String path = datanodeURL.getPath();
Assert.assertEquals(
WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString(), path);
HttpURLConnection datanodeConnection = returnConnection(datanodeURL,
"GET", false);
InputStream in = datanodeConnection.getInputStream();
// Comparing with the original contents
// and making sure they are decrypted.
verifyStreamsSame(content, in);
}
/* Tests namenode returns path starting with /.reserved/raw if client
* supports EZ and not if otherwise
* @throws Exception
*/
@Test
public void testWebhfsEZRedirectLocation()
throws Exception {
Path zonePath = new Path("/TestEncryptionZone");
fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH);
final Path encryptedFilePath =
new Path("/TestEncryptionZone/foo");
final String content = "hello world";
// Create a file using DistributedFileSystem.
DFSTestUtil.writeFile(fs, encryptedFilePath, content);
InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
URL url = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString()
+ "?op=OPEN");
// Return a connection with client not supporting EZ.
HttpURLConnection namenodeConnection =
returnConnection(url, "GET", false);
Assert.assertNotNull(namenodeConnection.getHeaderField("Location"));
URL datanodeUrl = new URL(namenodeConnection.getHeaderField("Location"));
Assert.assertNotNull(datanodeUrl);
String path = datanodeUrl.getPath();
Assert.assertEquals(
WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString(), path);
url = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + encryptedFilePath.toString()
+ "?op=OPEN");
// Return a connection with client supporting EZ.
namenodeConnection = returnConnection(url, "GET", true);
Assert.assertNotNull(namenodeConnection.getHeaderField("Location"));
datanodeUrl = new URL(namenodeConnection.getHeaderField("Location"));
Assert.assertNotNull(datanodeUrl);
path = datanodeUrl.getPath();
Assert.assertEquals(WebHdfsFileSystem.PATH_PREFIX
+ "/.reserved/raw" + encryptedFilePath.toString(), path);
}
private static HttpURLConnection returnConnection(URL url,
String httpRequestType, boolean supportEZ) throws Exception {
HttpURLConnection conn = null;
conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod(httpRequestType);
conn.setDoOutput(true);
conn.setInstanceFollowRedirects(false);
if (supportEZ) {
conn.setRequestProperty(WebHdfsFileSystem.EZ_HEADER, "true");
}
return conn;
}
/*
* Test seek behavior of the webhdfs input stream which reads data from
* encryption zone.
*/
@Test
public void testPread() throws Exception {
Path zonePath = new Path("/TestEncryptionZone");
fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
dfsAdmin.createEncryptionZone(zonePath, TEST_KEY, NO_TRASH);
final Path encryptedFilePath =
new Path("/TestEncryptionZone/foo");
// Create a file using DistributedFileSystem.
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
DFSTestUtil.createFile(webfs, encryptedFilePath, 1024, (short)1, 0xFEED);
byte[] data = DFSTestUtil.readFileAsBytes(fs, encryptedFilePath);
FSDataInputStream in = webfs.open(encryptedFilePath);
for (int i = 0; i < 1024; i++) {
in.seek(i);
Assert.assertEquals((data[i] & 0XFF), in.read());
}
}
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@ -101,7 +102,7 @@ public class TestWebHdfsDataLocality {
//The chosen datanode must be the same as the client address //The chosen datanode must be the same as the client address
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null, namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null,
LOCALHOST); LOCALHOST, null);
Assert.assertEquals(ipAddr, chosen.getIpAddr()); Assert.assertEquals(ipAddr, chosen.getIpAddr());
} }
} }
@ -125,23 +126,26 @@ public class TestWebHdfsDataLocality {
//the chosen datanode must be the same as the replica location. //the chosen datanode must be the same as the replica location.
{ //test GETFILECHECKSUM { //test GETFILECHECKSUM
final HdfsFileStatus status = dfs.getClient().getFileInfo(f);
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null, namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null,
LOCALHOST); LOCALHOST, status);
Assert.assertEquals(expected, chosen); Assert.assertEquals(expected, chosen);
} }
{ //test OPEN { //test OPEN
final HdfsFileStatus status = dfs.getClient().getFileInfo(f);
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null, namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null,
LOCALHOST); LOCALHOST, status);
Assert.assertEquals(expected, chosen); Assert.assertEquals(expected, chosen);
} }
{ //test APPEND { //test APPEND
final HdfsFileStatus status = dfs.getClient().getFileInfo(f);
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null, namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null,
LOCALHOST); LOCALHOST, status);
Assert.assertEquals(expected, chosen); Assert.assertEquals(expected, chosen);
} }
} finally { } finally {
@ -195,9 +199,10 @@ public class TestWebHdfsDataLocality {
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
sb.append(locations[i].getXferAddr()); sb.append(locations[i].getXferAddr());
{ // test GETFILECHECKSUM { // test GETFILECHECKSUM
final HdfsFileStatus status = dfs.getClient().getFileInfo(f);
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize,
sb.toString(), LOCALHOST); sb.toString(), LOCALHOST, status);
for (int j = 0; j <= i; j++) { for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(), Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName()); chosen.getHostName());
@ -205,9 +210,10 @@ public class TestWebHdfsDataLocality {
} }
{ // test OPEN { // test OPEN
final HdfsFileStatus status = dfs.getClient().getFileInfo(f);
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString(), namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString(),
LOCALHOST); LOCALHOST, status);
for (int j = 0; j <= i; j++) { for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(), Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName()); chosen.getHostName());
@ -215,9 +221,10 @@ public class TestWebHdfsDataLocality {
} }
{ // test APPEND { // test APPEND
final HdfsFileStatus status = dfs.getClient().getFileInfo(f);
final DatanodeInfo chosen = NamenodeWebHdfsMethods final DatanodeInfo chosen = NamenodeWebHdfsMethods
.chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L, .chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L,
blocksize, sb.toString(), LOCALHOST); blocksize, sb.toString(), LOCALHOST, status);
for (int j = 0; j <= i; j++) { for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(), Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName()); chosen.getHostName());
@ -238,6 +245,6 @@ public class TestWebHdfsDataLocality {
exception.expect(IOException.class); exception.expect(IOException.class);
exception.expectMessage("Namesystem has not been intialized yet."); exception.expectMessage("Namesystem has not been intialized yet.");
NamenodeWebHdfsMethods.chooseDatanode(nn, "/path", PutOpParam.Op.CREATE, 0, NamenodeWebHdfsMethods.chooseDatanode(nn, "/path", PutOpParam.Op.CREATE, 0,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null, LOCALHOST); DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null, LOCALHOST, null);
} }
} }

View File

@ -1435,7 +1435,6 @@ public class TestWebHDFS {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem( final WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(
conf, WebHdfsConstants.WEBHDFS_SCHEME); conf, WebHdfsConstants.WEBHDFS_SCHEME);
NamenodeWebHdfsMethods.resetServerDefaultsResponse();
FSNamesystem fsnSpy = FSNamesystem fsnSpy =
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode()); NameNodeAdapter.spyOnNamesystem(cluster.getNameNode());
Mockito.when(fsnSpy.getServerDefaults()). Mockito.when(fsnSpy.getServerDefaults()).

View File

@ -385,7 +385,7 @@ public class TestWebHdfsTokens {
InputStream is = fs.open(p); InputStream is = fs.open(p);
is.read(); is.read();
is.close(); is.close();
verify(fs, times(2)).getDelegationToken(); // first bad, then good verify(fs, times(3)).getDelegationToken(); // first bad, then good
verify(fs, times(1)).replaceExpiredDelegationToken(); verify(fs, times(1)).replaceExpiredDelegationToken();
verify(fs, times(1)).getDelegationToken(null); verify(fs, times(1)).getDelegationToken(null);
verify(fs, times(1)).setDelegationToken(any()); verify(fs, times(1)).setDelegationToken(any());
@ -402,7 +402,7 @@ public class TestWebHdfsTokens {
is = fs.open(p); is = fs.open(p);
is.read(); is.read();
is.close(); is.close();
verify(fs, times(2)).getDelegationToken(); // first bad, then good verify(fs, times(3)).getDelegationToken(); // first bad, then good
verify(fs, times(1)).replaceExpiredDelegationToken(); verify(fs, times(1)).replaceExpiredDelegationToken();
verify(fs, times(1)).getDelegationToken(null); verify(fs, times(1)).getDelegationToken(null);
verify(fs, times(1)).setDelegationToken(any()); verify(fs, times(1)).setDelegationToken(any());