Merging r1542123 through r1543110 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1543115 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-11-18 19:22:42 +00:00
commit bd5ab41a90
38 changed files with 662 additions and 836 deletions

View File

@ -437,6 +437,8 @@ Release 2.3.0 - UNRELEASED
HADOOP-10094. NPE in GenericOptionsParser#preProcessForWindows(). HADOOP-10094. NPE in GenericOptionsParser#preProcessForWindows().
(Enis Soztutar via cnauroth) (Enis Soztutar via cnauroth)
HADOOP-10100. MiniKDC shouldn't use apacheds-all artifact. (rkanter via tucu)
Release 2.2.1 - UNRELEASED Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -85,6 +85,7 @@ import org.mortbay.jetty.webapp.WebAppContext;
import org.mortbay.thread.QueuedThreadPool; import org.mortbay.thread.QueuedThreadPool;
import org.mortbay.util.MultiException; import org.mortbay.util.MultiException;
import com.google.common.base.Preconditions;
import com.sun.jersey.spi.container.servlet.ServletContainer; import com.sun.jersey.spi.container.servlet.ServletContainer;
/** /**
@ -716,6 +717,19 @@ public class HttpServer implements FilterContainer {
return webServer.getConnectors()[0].getLocalPort(); return webServer.getConnectors()[0].getLocalPort();
} }
/**
* Get the port that corresponds to a particular connector. In the case of
* HDFS, the second connector corresponds to the HTTPS connector.
*
* @return the corresponding port for the connector, or -1 if there's no such
* connector.
*/
public int getConnectorPort(int index) {
Preconditions.checkArgument(index >= 0);
return index < webServer.getConnectors().length ?
webServer.getConnectors()[index].getLocalPort() : -1;
}
/** /**
* Set the min, max number of worker threads (simultaneous connections). * Set the min, max number of worker threads (simultaneous connections).
*/ */

View File

@ -37,9 +37,109 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.directory.server</groupId> <groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-all</artifactId> <artifactId>apacheds-core-api</artifactId>
<version>2.0.0-M15</version> <version>2.0.0-M15</version>
<scope>compile</scope> <scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.directory.api</groupId>
<artifactId>api-ldap-schema-data</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-interceptor-kerberos</artifactId>
<version>2.0.0-M15</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.directory.api</groupId>
<artifactId>api-ldap-schema-data</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-protocol-shared</artifactId>
<version>2.0.0-M15</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-protocol-kerberos</artifactId>
<version>2.0.0-M15</version>
<scope>compile</scope>
<exclusions>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-ldif-partition</artifactId>
<version>2.0.0-M15</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.directory.api</groupId>
<artifactId>api-ldap-schema-data</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-mavibot-partition</artifactId>
<version>2.0.0-M15</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.directory.api</groupId>
<artifactId>api-ldap-schema-data</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.directory.api</groupId>
<artifactId>api-all</artifactId>
<version>1.0.0-M20</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</exclusion>
<exclusion>
<groupId>xpp3</groupId>
<artifactId>xpp3</artifactId>
</exclusion>
<exclusion>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-jdbm-partition</artifactId>
<version>2.0.0-M15</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.directory.api</groupId>
<artifactId>api-ldap-schema-data</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-protocol-ldap</artifactId>
<version>2.0.0-M15</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.directory.api</groupId>
<artifactId>api-ldap-schema-data</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@ -50,6 +150,12 @@
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>compile</scope> <scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -726,6 +726,7 @@ class OpenFileCtx {
try { try {
// Sync file data and length // Sync file data and length
fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
ret = COMMIT_STATUS.COMMIT_FINISHED; // Remove COMMIT_DO_SYNC status
// Nothing to do for metadata since attr related change is pass-through // Nothing to do for metadata since attr related change is pass-through
} catch (ClosedChannelException cce) { } catch (ClosedChannelException cce) {
if (pendingWrites.isEmpty()) { if (pendingWrites.isEmpty()) {
@ -747,7 +748,8 @@ class OpenFileCtx {
* return one commit status: COMMIT_FINISHED, COMMIT_WAIT, * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
* COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR
*/ */
private synchronized COMMIT_STATUS checkCommitInternal(long commitOffset, @VisibleForTesting
synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
Channel channel, int xid, Nfs3FileAttributes preOpAttr) { Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
if (!activeState) { if (!activeState) {
if (pendingWrites.isEmpty()) { if (pendingWrites.isEmpty()) {

View File

@ -150,10 +150,16 @@ public class TestWrites {
// Test request with non zero commit offset // Test request with non zero commit offset
ctx.setActiveStatusForTest(true); ctx.setActiveStatusForTest(true);
Mockito.when(fos.getPos()).thenReturn((long) 10); Mockito.when(fos.getPos()).thenReturn((long) 10);
COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
// Do_SYNC state will be updated to FINISHED after data sync
ret = ctx.checkCommit(dfsClient, 5, null, 1, attr); ret = ctx.checkCommit(dfsClient, 5, null, 1, attr);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
status = ctx.checkCommitInternal(10, null, 1, attr);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
ret = ctx.checkCommit(dfsClient, 10, null, 1, attr); ret = ctx.checkCommit(dfsClient, 10, null, 1, attr);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
.getPendingCommitsForTest(); .getPendingCommitsForTest();

View File

@ -494,6 +494,8 @@ Release 2.3.0 - UNRELEASED
HDFS-5506. Use URLConnectionFactory in DelegationTokenFetcher. (Haohui Mai HDFS-5506. Use URLConnectionFactory in DelegationTokenFetcher. (Haohui Mai
via jing9) via jing9)
HDFS-5489. Use TokenAspect in WebHDFSFileSystem. (Haohui Mai via jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
@ -562,6 +564,8 @@ Release 2.3.0 - UNRELEASED
HDFS-5438. Flaws in block report processing can cause data loss. (kihwal) HDFS-5438. Flaws in block report processing can cause data loss. (kihwal)
HDFS-5502. Fix HTTPS support in HsftpFileSystem. (Haohui Mai via jing9)
Release 2.2.1 - UNRELEASED Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -659,6 +663,12 @@ Release 2.2.1 - UNRELEASED
HDFS-5469. Add configuration property for the sub-directroy export path HDFS-5469. Add configuration property for the sub-directroy export path
(brandonli) (brandonli)
HDFS-5519. COMMIT handler should update the commit status after sync
(brandonli)
HDFS-5372. In FSNamesystem, hasReadLock() returns false if the current thread
holds the write lock (VinayaKumar B via umamahesh)
Release 2.2.0 - 2013-10-13 Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -816,7 +816,7 @@ public class BlockManager {
final boolean isFileUnderConstruction, final long offset, final boolean isFileUnderConstruction, final long offset,
final long length, final boolean needBlockToken, final boolean inSnapshot) final long length, final boolean needBlockToken, final boolean inSnapshot)
throws IOException { throws IOException {
assert namesystem.hasReadOrWriteLock(); assert namesystem.hasReadLock();
if (blocks == null) { if (blocks == null) {
return null; return null;
} else if (blocks.length == 0) { } else if (blocks.length == 0) {

View File

@ -237,13 +237,13 @@ public final class CacheManager {
} }
public TreeMap<Long, PathBasedCacheEntry> getEntriesById() { public TreeMap<Long, PathBasedCacheEntry> getEntriesById() {
assert namesystem.hasReadOrWriteLock(); assert namesystem.hasReadLock();
return entriesById; return entriesById;
} }
@VisibleForTesting @VisibleForTesting
public GSet<CachedBlock, CachedBlock> getCachedBlocks() { public GSet<CachedBlock, CachedBlock> getCachedBlocks() {
assert namesystem.hasReadOrWriteLock(); assert namesystem.hasReadLock();
return cachedBlocks; return cachedBlocks;
} }
@ -450,7 +450,7 @@ public final class CacheManager {
listPathBasedCacheDirectives(long prevId, listPathBasedCacheDirectives(long prevId,
PathBasedCacheDirective filter, PathBasedCacheDirective filter,
FSPermissionChecker pc) throws IOException { FSPermissionChecker pc) throws IOException {
assert namesystem.hasReadOrWriteLock(); assert namesystem.hasReadLock();
final int NUM_PRE_ALLOCATED_ENTRIES = 16; final int NUM_PRE_ALLOCATED_ENTRIES = 16;
String filterPath = null; String filterPath = null;
if (filter.getId() != null) { if (filter.getId() != null) {
@ -607,7 +607,7 @@ public final class CacheManager {
public BatchedListEntries<CachePoolInfo> public BatchedListEntries<CachePoolInfo>
listCachePools(FSPermissionChecker pc, String prevKey) { listCachePools(FSPermissionChecker pc, String prevKey) {
assert namesystem.hasReadOrWriteLock(); assert namesystem.hasReadLock();
final int NUM_PRE_ALLOCATED_ENTRIES = 16; final int NUM_PRE_ALLOCATED_ENTRIES = 16;
ArrayList<CachePoolInfo> results = ArrayList<CachePoolInfo> results =
new ArrayList<CachePoolInfo>(NUM_PRE_ALLOCATED_ENTRIES); new ArrayList<CachePoolInfo>(NUM_PRE_ALLOCATED_ENTRIES);

View File

@ -1290,11 +1290,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
@Override @Override
public boolean hasReadLock() { public boolean hasReadLock() {
return this.fsLock.getReadHoldCount() > 0; return this.fsLock.getReadHoldCount() > 0 || hasWriteLock();
}
@Override
public boolean hasReadOrWriteLock() {
return hasReadLock() || hasWriteLock();
} }
public int getReadHoldCount() { public int getReadHoldCount() {
@ -2038,7 +2034,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/ */
private void verifyParentDir(String src) throws FileNotFoundException, private void verifyParentDir(String src) throws FileNotFoundException,
ParentNotDirectoryException, UnresolvedLinkException { ParentNotDirectoryException, UnresolvedLinkException {
assert hasReadOrWriteLock(); assert hasReadLock();
Path parent = new Path(src).getParent(); Path parent = new Path(src).getParent();
if (parent != null) { if (parent != null) {
final INode parentNode = dir.getINode(parent.toString()); final INode parentNode = dir.getINode(parent.toString());
@ -2646,7 +2642,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
ExtendedBlock previous, ExtendedBlock previous,
LocatedBlock[] onRetryBlock) LocatedBlock[] onRetryBlock)
throws IOException { throws IOException {
assert hasReadOrWriteLock(); assert hasReadLock();
checkBlock(previous); checkBlock(previous);
onRetryBlock[0] = null; onRetryBlock[0] = null;
@ -2838,7 +2834,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private INodeFileUnderConstruction checkLease(String src, long fileId, private INodeFileUnderConstruction checkLease(String src, long fileId,
String holder, INode inode) throws LeaseExpiredException, String holder, INode inode) throws LeaseExpiredException,
FileNotFoundException { FileNotFoundException {
assert hasReadOrWriteLock(); assert hasReadLock();
if (inode == null || !inode.isFile()) { if (inode == null || !inode.isFile()) {
Lease lease = leaseManager.getLease(holder); Lease lease = leaseManager.getLease(holder);
throw new LeaseExpiredException( throw new LeaseExpiredException(
@ -3796,7 +3792,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
@Override @Override
public boolean isInSnapshot(BlockInfoUnderConstruction blockUC) { public boolean isInSnapshot(BlockInfoUnderConstruction blockUC) {
assert hasReadOrWriteLock(); assert hasReadLock();
final BlockCollection bc = blockUC.getBlockCollection(); final BlockCollection bc = blockUC.getBlockCollection();
if (bc == null || !(bc instanceof INodeFileUnderConstruction)) { if (bc == null || !(bc instanceof INodeFileUnderConstruction)) {
return false; return false;

View File

@ -796,6 +796,10 @@ public class NameNode implements NameNodeStatusMXBean {
return httpServer.getHttpAddress(); return httpServer.getHttpAddress();
} }
public InetSocketAddress getHttpsAddress() {
return httpServer.getHttpsAddress();
}
/** /**
* Verify that configured directories exist, then * Verify that configured directories exist, then
* Interactively confirm that formatting is desired * Interactively confirm that formatting is desired

View File

@ -119,7 +119,12 @@ public class NameNodeHttpServer {
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf); httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
setupServlets(httpServer, conf); setupServlets(httpServer, conf);
httpServer.start(); httpServer.start();
httpAddress = new InetSocketAddress(bindAddress.getAddress(), httpServer.getPort()); httpAddress = new InetSocketAddress(bindAddress.getAddress(),
httpServer.getPort());
if (certSSL) {
httpsAddress = new InetSocketAddress(bindAddress.getAddress(),
httpServer.getConnectorPort(1));
}
} }
private Map<String, String> getAuthFilterParams(Configuration conf) private Map<String, String> getAuthFilterParams(Configuration conf)

View File

@ -221,6 +221,8 @@ public class DelegationTokenFetcher {
.append(renewer); .append(renewer);
} }
boolean isHttps = nnUri.getScheme().equals("https");
HttpURLConnection conn = null; HttpURLConnection conn = null;
DataInputStream dis = null; DataInputStream dis = null;
InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnUri InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnUri
@ -237,7 +239,7 @@ public class DelegationTokenFetcher {
dis = new DataInputStream(in); dis = new DataInputStream(in);
ts.readFields(dis); ts.readFields(dis);
for (Token<?> token : ts.getAllTokens()) { for (Token<?> token : ts.getAllTokens()) {
token.setKind(HftpFileSystem.TOKEN_KIND); token.setKind(isHttps ? HsftpFileSystem.TOKEN_KIND : HftpFileSystem.TOKEN_KIND);
SecurityUtil.setTokenService(token, serviceAddr); SecurityUtil.setTokenService(token, serviceAddr);
} }
return ts; return ts;

View File

@ -39,7 +39,4 @@ public interface RwLock {
/** Check if the current thread holds write lock. */ /** Check if the current thread holds write lock. */
public boolean hasWriteLock(); public boolean hasWriteLock();
/** Check if the current thread holds read or write lock. */
public boolean hasReadOrWriteLock();
} }

View File

@ -86,7 +86,7 @@ public class HftpFileSystem extends FileSystem
HttpURLConnection.setFollowRedirects(true); HttpURLConnection.setFollowRedirects(true);
} }
URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY; URLConnectionFactory connectionFactory;
public static final Text TOKEN_KIND = new Text("HFTP delegation"); public static final Text TOKEN_KIND = new Text("HFTP delegation");
@ -98,7 +98,7 @@ public class HftpFileSystem extends FileSystem
public static final String HFTP_TIMEZONE = "UTC"; public static final String HFTP_TIMEZONE = "UTC";
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
private TokenAspect<HftpFileSystem> tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND); protected TokenAspect<HftpFileSystem> tokenAspect;
private Token<?> delegationToken; private Token<?> delegationToken;
private Token<?> renewToken; private Token<?> renewToken;
@ -172,6 +172,16 @@ public class HftpFileSystem extends FileSystem
return SCHEME; return SCHEME;
} }
/**
* Initialize connectionFactory and tokenAspect. This function is intended to
* be overridden by HsFtpFileSystem.
*/
protected void initConnectionFactoryAndTokenAspect(Configuration conf)
throws IOException {
tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
}
@Override @Override
public void initialize(final URI name, final Configuration conf) public void initialize(final URI name, final Configuration conf)
throws IOException { throws IOException {
@ -179,6 +189,7 @@ public class HftpFileSystem extends FileSystem
setConf(conf); setConf(conf);
this.ugi = UserGroupInformation.getCurrentUser(); this.ugi = UserGroupInformation.getCurrentUser();
this.nnUri = getNamenodeUri(name); this.nnUri = getNamenodeUri(name);
try { try {
this.hftpURI = new URI(name.getScheme(), name.getAuthority(), this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
null, null, null); null, null, null);
@ -186,6 +197,7 @@ public class HftpFileSystem extends FileSystem
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} }
initConnectionFactoryAndTokenAspect(conf);
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
tokenAspect.initDelegationToken(ugi); tokenAspect.initDelegationToken(ugi);
} }
@ -212,8 +224,8 @@ public class HftpFileSystem extends FileSystem
* *
* For other operations, however, the client has to send a * For other operations, however, the client has to send a
* HDFS_DELEGATION_KIND token over the wire so that it can talk to Hadoop * HDFS_DELEGATION_KIND token over the wire so that it can talk to Hadoop
* 0.20.3 clusters. Later releases fix this problem. See HDFS-5440 for more * 0.20.203 clusters. Later releases fix this problem. See HDFS-5440 for
* details. * more details.
*/ */
renewToken = token; renewToken = token;
delegationToken = new Token<T>(token); delegationToken = new Token<T>(token);
@ -229,13 +241,12 @@ public class HftpFileSystem extends FileSystem
return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() { return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
@Override @Override
public Token<?> run() throws IOException { public Token<?> run() throws IOException {
final String nnHttpUrl = nnUri.toString();
Credentials c; Credentials c;
try { try {
c = DelegationTokenFetcher.getDTfromRemote(connectionFactory, nnUri, renewer); c = DelegationTokenFetcher.getDTfromRemote(connectionFactory, nnUri, renewer);
} catch (IOException e) { } catch (IOException e) {
if (e.getCause() instanceof ConnectException) { if (e.getCause() instanceof ConnectException) {
LOG.warn("Couldn't connect to " + nnHttpUrl + LOG.warn("Couldn't connect to " + nnUri +
", assuming security is disabled"); ", assuming security is disabled");
return null; return null;
} }

View File

@ -18,31 +18,14 @@
package org.apache.hadoop.hdfs.web; package org.apache.hadoop.hdfs.web;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection; import java.security.GeneralSecurityException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.security.KeyStore;
import java.security.cert.X509Certificate;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.Time;
/** /**
* An implementation of a protocol for accessing filesystems over HTTPS. The * An implementation of a protocol for accessing filesystems over HTTPS. The
@ -55,9 +38,8 @@ import org.apache.hadoop.util.Time;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class HsftpFileSystem extends HftpFileSystem { public class HsftpFileSystem extends HftpFileSystem {
public static final Text TOKEN_KIND = new Text("HSFTP delegation");
private static final long MM_SECONDS_PER_DAY = 1000 * 60 * 60 * 24; public static final String SCHEME = "hsftp";
private volatile int ExpWarnDays = 0;
/** /**
* Return the protocol scheme for the FileSystem. * Return the protocol scheme for the FileSystem.
@ -67,7 +49,7 @@ public class HsftpFileSystem extends HftpFileSystem {
*/ */
@Override @Override
public String getScheme() { public String getScheme() {
return "hsftp"; return SCHEME;
} }
/** /**
@ -79,66 +61,17 @@ public class HsftpFileSystem extends HftpFileSystem {
} }
@Override @Override
public void initialize(URI name, Configuration conf) throws IOException { protected void initConnectionFactoryAndTokenAspect(Configuration conf) throws IOException {
super.initialize(name, conf); tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
setupSsl(conf);
ExpWarnDays = conf.getInt("ssl.expiration.warn.days", 30);
}
/** connectionFactory = new URLConnectionFactory(
* Set up SSL resources URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT);
*
* @throws IOException
*/
private static void setupSsl(Configuration conf) throws IOException {
Configuration sslConf = new HdfsConfiguration(false);
sslConf.addResource(conf.get(DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
FileInputStream fis = null;
try { try {
SSLContext sc = SSLContext.getInstance("SSL"); connectionFactory.setConnConfigurator(URLConnectionFactory
KeyManager[] kms = null; .newSslConnConfigurator(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
TrustManager[] tms = null; conf));
if (sslConf.get("ssl.client.keystore.location") != null) { } catch (GeneralSecurityException e) {
// initialize default key manager with keystore file and pass throw new IOException(e);
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
KeyStore ks = KeyStore.getInstance(sslConf.get(
"ssl.client.keystore.type", "JKS"));
char[] ksPass = sslConf.get("ssl.client.keystore.password", "changeit")
.toCharArray();
fis = new FileInputStream(sslConf.get("ssl.client.keystore.location",
"keystore.jks"));
ks.load(fis, ksPass);
kmf.init(ks, sslConf.get("ssl.client.keystore.keypassword", "changeit")
.toCharArray());
kms = kmf.getKeyManagers();
fis.close();
fis = null;
}
// initialize default trust manager with truststore file and pass
if (sslConf.getBoolean("ssl.client.do.not.authenticate.server", false)) {
// by pass trustmanager validation
tms = new DummyTrustManager[] { new DummyTrustManager() };
} else {
TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
KeyStore ts = KeyStore.getInstance(sslConf.get(
"ssl.client.truststore.type", "JKS"));
char[] tsPass = sslConf.get("ssl.client.truststore.password",
"changeit").toCharArray();
fis = new FileInputStream(sslConf.get("ssl.client.truststore.location",
"truststore.jks"));
ts.load(fis, tsPass);
tmf.init(ts);
tms = tmf.getTrustManagers();
}
sc.init(kms, tms, new java.security.SecureRandom());
HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
} catch (Exception e) {
throw new IOException("Could not initialize SSLContext", e);
} finally {
if (fis != null) {
fis.close();
}
} }
} }
@ -147,70 +80,4 @@ public class HsftpFileSystem extends HftpFileSystem {
return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,
DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT); DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
} }
@Override
protected HttpURLConnection openConnection(String path, String query)
throws IOException {
query = addDelegationTokenParam(query);
final URL url = new URL(getUnderlyingProtocol(), nnUri.getHost(),
nnUri.getPort(), path + '?' + query);
HttpsURLConnection conn;
conn = (HttpsURLConnection)connectionFactory.openConnection(url);
// bypass hostname verification
conn.setHostnameVerifier(new DummyHostnameVerifier());
conn.setRequestMethod("GET");
conn.connect();
// check cert expiration date
final int warnDays = ExpWarnDays;
if (warnDays > 0) { // make sure only check once
ExpWarnDays = 0;
long expTimeThreshold = warnDays * MM_SECONDS_PER_DAY + Time.now();
X509Certificate[] clientCerts = (X509Certificate[]) conn
.getLocalCertificates();
if (clientCerts != null) {
for (X509Certificate cert : clientCerts) {
long expTime = cert.getNotAfter().getTime();
if (expTime < expTimeThreshold) {
StringBuilder sb = new StringBuilder();
sb.append("\n Client certificate "
+ cert.getSubjectX500Principal().getName());
int dayOffSet = (int) ((expTime - Time.now()) / MM_SECONDS_PER_DAY);
sb.append(" have " + dayOffSet + " days to expire");
LOG.warn(sb.toString());
}
}
}
}
return (HttpURLConnection) conn;
}
/**
* Dummy hostname verifier that is used to bypass hostname checking
*/
protected static class DummyHostnameVerifier implements HostnameVerifier {
@Override
public boolean verify(String hostname, SSLSession session) {
return true;
}
}
/**
* Dummy trustmanager that is used to trust all server certificates
*/
protected static class DummyTrustManager implements X509TrustManager {
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
}
} }

View File

@ -57,6 +57,7 @@ final class TokenAspect<T extends FileSystem & Renewable> {
@Override @Override
public boolean handleKind(Text kind) { public boolean handleKind(Text kind) {
return kind.equals(HftpFileSystem.TOKEN_KIND) return kind.equals(HftpFileSystem.TOKEN_KIND)
|| kind.equals(HsftpFileSystem.TOKEN_KIND)
|| kind.equals(WebHdfsFileSystem.TOKEN_KIND); || kind.equals(WebHdfsFileSystem.TOKEN_KIND);
} }
@ -75,8 +76,11 @@ final class TokenAspect<T extends FileSystem & Renewable> {
final InetSocketAddress address = SecurityUtil.getTokenServiceAddr(token); final InetSocketAddress address = SecurityUtil.getTokenServiceAddr(token);
Text kind = token.getKind(); Text kind = token.getKind();
final URI uri; final URI uri;
if (kind.equals(HftpFileSystem.TOKEN_KIND)) { if (kind.equals(HftpFileSystem.TOKEN_KIND)) {
uri = DFSUtil.createUri(HftpFileSystem.SCHEME, address); uri = DFSUtil.createUri(HftpFileSystem.SCHEME, address);
} else if (kind.equals(HsftpFileSystem.TOKEN_KIND)) {
uri = DFSUtil.createUri(HsftpFileSystem.SCHEME, address);
} else if (kind.equals(WebHdfsFileSystem.TOKEN_KIND)) { } else if (kind.equals(WebHdfsFileSystem.TOKEN_KIND)) {
uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, address); uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, address);
} else { } else {
@ -144,6 +148,10 @@ final class TokenAspect<T extends FileSystem & Renewable> {
} }
} }
public synchronized void reset() {
hasInitedToken = false;
}
synchronized void initDelegationToken(UserGroupInformation ugi) { synchronized void initDelegationToken(UserGroupInformation ugi) {
Token<?> token = selectDelegationToken(ugi); Token<?> token = selectDelegationToken(ugi);
if (token != null) { if (token != null) {

View File

@ -22,6 +22,11 @@ import java.io.IOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.net.URLConnection; import java.net.URLConnection;
import java.security.GeneralSecurityException;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -32,6 +37,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
/** /**
* Utilities for handling URLs * Utilities for handling URLs
@ -64,6 +70,35 @@ public class URLConnectionFactory {
} }
}; };
/**
* Create a new ConnectionConfigurator for SSL connections
*/
static ConnectionConfigurator newSslConnConfigurator(final int timeout,
Configuration conf) throws IOException, GeneralSecurityException {
final SSLFactory factory;
final SSLSocketFactory sf;
final HostnameVerifier hv;
factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
factory.init();
sf = factory.createSSLSocketFactory();
hv = factory.getHostnameVerifier();
return new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
if (conn instanceof HttpsURLConnection) {
HttpsURLConnection c = (HttpsURLConnection) conn;
c.setSSLSocketFactory(sf);
c.setHostnameVerifier(hv);
}
URLConnectionFactory.setTimeouts(conn, timeout);
return conn;
}
};
}
public URLConnectionFactory(int socketTimeout) { public URLConnectionFactory(int socketTimeout) {
this.socketTimeout = socketTimeout; this.socketTimeout = socketTimeout;
} }

View File

@ -118,38 +118,11 @@ public class WebHdfsFileSystem extends FileSystem
/** Delegation token kind */ /** Delegation token kind */
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation"); public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
/** Token selector */ protected TokenAspect<WebHdfsFileSystem> tokenAspect = new TokenAspect<WebHdfsFileSystem>(
public static final DTSelecorByKind DT_SELECTOR this, TOKEN_KIND);
= new DTSelecorByKind(TOKEN_KIND);
private DelegationTokenRenewer dtRenewer = null;
@VisibleForTesting
DelegationTokenRenewer.RenewAction<?> action;
@Override
public URI getCanonicalUri() {
return super.getCanonicalUri();
}
@VisibleForTesting
protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
if (dtRenewer == null) {
dtRenewer = DelegationTokenRenewer.getInstance();
}
action = dtRenewer.addRenewAction(webhdfs);
}
/** Is WebHDFS enabled in conf? */
public static boolean isEnabled(final Configuration conf, final Log log) {
final boolean b = conf.getBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY,
DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT);
return b;
}
private UserGroupInformation ugi; private UserGroupInformation ugi;
private URI uri; private URI uri;
private boolean hasInitedToken;
private Token<?> delegationToken; private Token<?> delegationToken;
private RetryPolicy retryPolicy = null; private RetryPolicy retryPolicy = null;
private Path workingDir; private Path workingDir;
@ -212,41 +185,27 @@ public class WebHdfsFileSystem extends FileSystem
this.workingDir = getHomeDirectory(); this.workingDir = getHomeDirectory();
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
initDelegationToken(); tokenAspect.initDelegationToken(ugi);
} }
} }
protected void initDelegationToken() throws IOException { @Override
// look for webhdfs token, then try hdfs public URI getCanonicalUri() {
Token<?> token = selectDelegationToken(ugi); return super.getCanonicalUri();
if (token != null) {
LOG.debug("Found existing DT for " + token.getService());
setDelegationToken(token);
hasInitedToken = true;
} }
/** Is WebHDFS enabled in conf? */
public static boolean isEnabled(final Configuration conf, final Log log) {
final boolean b = conf.getBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY,
DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT);
return b;
} }
protected synchronized Token<?> getDelegationToken() throws IOException { protected synchronized Token<?> getDelegationToken() throws IOException {
// we haven't inited yet, or we used to have a token but it expired tokenAspect.ensureTokenInitialized();
if (!hasInitedToken || (action != null && !action.isValid())) {
//since we don't already have a token, go get one
Token<?> token = getDelegationToken(null);
// security might be disabled
if (token != null) {
setDelegationToken(token);
addRenewAction(this);
LOG.debug("Created new DT for " + token.getService());
}
hasInitedToken = true;
}
return delegationToken; return delegationToken;
} }
protected Token<DelegationTokenIdentifier> selectDelegationToken(
UserGroupInformation ugi) {
return DT_SELECTOR.selectToken(getCanonicalUri(), ugi.getTokens(), getConf());
}
@Override @Override
protected int getDefaultPort() { protected int getDefaultPort() {
return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
@ -370,7 +329,7 @@ public class WebHdfsFileSystem extends FileSystem
private synchronized void resetStateToFailOver() { private synchronized void resetStateToFailOver() {
currentNNAddrIndex = (currentNNAddrIndex + 1) % nnAddrs.length; currentNNAddrIndex = (currentNNAddrIndex + 1) % nnAddrs.length;
delegationToken = null; delegationToken = null;
hasInitedToken = false; tokenAspect.reset();
} }
/** /**
@ -881,9 +840,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override @Override
public void close() throws IOException { public void close() throws IOException {
super.close(); super.close();
if (dtRenewer != null) { tokenAspect.removeRenewAction();
dtRenewer.removeRenewAction(this); // blocks
}
} }
class OffsetUrlOpener extends ByteRangeInputStream.URLOpener { class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {

View File

@ -1,39 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
public class TestNameNodeHttpServer {
@Test
public void testSslConfiguration() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_HTTPS_ENABLE_KEY, true);
System.setProperty("jetty.ssl.password", "foo");
System.setProperty("jetty.ssl.keypassword", "bar");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.build();
cluster.shutdown();
}
}

View File

@ -18,233 +18,77 @@
package org.apache.hadoop.hdfs.web; package org.apache.hadoop.hdfs.web;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; import static org.mockito.Matchers.anyBoolean;
import static org.junit.Assert.*; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket; import java.net.HttpURLConnection;
import java.net.Socket;
import java.net.URI; import java.net.URI;
import java.security.PrivilegedExceptionAction; import java.net.URISyntaxException;
import java.net.URL;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.HsftpFileSystem;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox; import org.mockito.internal.util.reflection.Whitebox;
public class TestHftpDelegationToken { public class TestHftpDelegationToken {
/**
* Test whether HftpFileSystem maintain wire-compatibility for 0.20.203 when
* obtaining delegation token. See HDFS-5440 for more details.
*/
@Test @Test
public void testHdfsDelegationToken() throws Exception { public void testTokenCompatibilityFor203() throws IOException,
SecurityUtilTestHelper.setTokenServiceUseIp(true); URISyntaxException, AuthenticationException {
final Configuration conf = new Configuration();
conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation user =
UserGroupInformation.createUserForTesting("oom",
new String[]{"memory"});
Token<?> token = new Token<TokenIdentifier>
(new byte[0], new byte[0],
DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
new Text("127.0.0.1:8020"));
user.addToken(token);
Token<?> token2 = new Token<TokenIdentifier>
(null, null, new Text("other token"), new Text("127.0.0.1:8021"));
user.addToken(token2);
assertEquals("wrong tokens in user", 2, user.getTokens().size());
FileSystem fs =
user.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(new URI("hftp://localhost:50470/"), conf);
}
});
assertSame("wrong kind of file system", HftpFileSystem.class,
fs.getClass());
assertSame("wrong token", token,
Whitebox.getInternalState(fs, "renewToken"));
}
@Test
public void testSelectHftpDelegationToken() throws Exception {
SecurityUtilTestHelper.setTokenServiceUseIp(true);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setClass("fs.hftp.impl", HftpFileSystem.class, FileSystem.class); HftpFileSystem fs = new HftpFileSystem();
int httpPort = 80; Token<?> token = new Token<TokenIdentifier>(new byte[0], new byte[0],
int httpsPort = 443; DelegationTokenIdentifier.HDFS_DELEGATION_KIND, new Text(
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, httpPort); "127.0.0.1:8020"));
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, httpsPort); Credentials cred = new Credentials();
cred.addToken(HftpFileSystem.TOKEN_KIND, token);
ByteArrayOutputStream os = new ByteArrayOutputStream();
cred.write(new DataOutputStream(os));
// test with implicit default port HttpURLConnection conn = mock(HttpURLConnection.class);
URI fsUri = URI.create("hftp://localhost"); doReturn(new ByteArrayInputStream(os.toByteArray())).when(conn)
HftpFileSystem fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf); .getInputStream();
assertEquals(httpPort, fs.getCanonicalUri().getPort()); doReturn(HttpURLConnection.HTTP_OK).when(conn).getResponseCode();
checkTokenSelection(fs, httpPort, conf);
// test with explicit default port URLConnectionFactory factory = mock(URLConnectionFactory.class);
// Make sure it uses the port from the hftp URI. doReturn(conn).when(factory).openConnection(Mockito.<URL> any(),
fsUri = URI.create("hftp://localhost:"+httpPort); anyBoolean());
fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpPort, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpPort, conf);
// test with non-default port fs.initialize(new URI("hftp://127.0.0.1:8020"), conf);
// Make sure it uses the port from the hftp URI. fs.connectionFactory = factory;
fsUri = URI.create("hftp://localhost:"+(httpPort+1));
fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpPort+1, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpPort + 1, conf);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5); UserGroupInformation ugi = UserGroupInformation.createUserForTesting("foo",
} new String[] { "bar" });
@Test TokenAspect<HftpFileSystem> tokenAspect = new TokenAspect<HftpFileSystem>(
public void testSelectHsftpDelegationToken() throws Exception { fs, HftpFileSystem.TOKEN_KIND);
SecurityUtilTestHelper.setTokenServiceUseIp(true);
Configuration conf = new Configuration(); tokenAspect.initDelegationToken(ugi);
conf.setClass("fs.hsftp.impl", HsftpFileSystem.class, FileSystem.class); tokenAspect.ensureTokenInitialized();
int httpPort = 80; Assert.assertSame(HftpFileSystem.TOKEN_KIND, fs.getRenewToken().getKind());
int httpsPort = 443;
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, httpPort);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, httpsPort);
// test with implicit default port Token<?> tok = (Token<?>) Whitebox.getInternalState(fs, "delegationToken");
URI fsUri = URI.create("hsftp://localhost"); Assert.assertNotSame("Not making a copy of the remote token", token, tok);
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf); Assert.assertEquals(token.getKind(), tok.getKind());
assertEquals(httpsPort, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpsPort, conf);
// test with explicit default port
fsUri = URI.create("hsftp://localhost:"+httpsPort);
fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpsPort, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpsPort, conf);
// test with non-default port
fsUri = URI.create("hsftp://localhost:"+(httpsPort+1));
fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpsPort+1, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpsPort+1, conf);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5);
}
@Test
public void testInsecureRemoteCluster() throws Exception {
final ServerSocket socket = new ServerSocket(0); // just reserve a port
socket.close();
Configuration conf = new Configuration();
URI fsUri = URI.create("hsftp://localhost:"+socket.getLocalPort());
assertNull(FileSystem.newInstance(fsUri, conf).getDelegationToken(null));
}
@Test
public void testSecureClusterError() throws Exception {
final ServerSocket socket = new ServerSocket(0);
Thread t = new Thread() {
@Override
public void run() {
while (true) { // fetching does a few retries
try {
Socket s = socket.accept();
s.getOutputStream().write(1234);
s.shutdownOutput();
} catch (Exception e) {
break;
}
}
}
};
t.start();
try {
Configuration conf = new Configuration();
URI fsUri = URI.create("hsftp://localhost:"+socket.getLocalPort());
Exception ex = null;
try {
FileSystem.newInstance(fsUri, conf).getDelegationToken(null);
} catch (Exception e) {
ex = e;
}
assertNotNull(ex);
assertNotNull(ex.getCause());
assertEquals("Remote host closed connection during handshake",
ex.getCause().getMessage());
} finally {
t.interrupt();
}
}
private void checkTokenSelection(HftpFileSystem fs,
int port,
Configuration conf) throws IOException {
UserGroupInformation ugi =
UserGroupInformation.createUserForTesting(fs.getUri().getAuthority(), new String[]{});
@SuppressWarnings("unchecked")
TokenAspect<HftpFileSystem> aspect = (TokenAspect<HftpFileSystem>) Whitebox.getInternalState(fs, "tokenAspect");
// use ip-based tokens
SecurityUtilTestHelper.setTokenServiceUseIp(true);
// test fallback to hdfs token
Token<?> hdfsToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
new Text("127.0.0.1:8020"));
ugi.addToken(hdfsToken);
// test fallback to hdfs token
Token<?> token = aspect.selectDelegationToken(ugi);
assertNotNull(token);
assertEquals(hdfsToken, token);
// test hftp is favored over hdfs
Token<?> hftpToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
HftpFileSystem.TOKEN_KIND, new Text("127.0.0.1:"+port));
ugi.addToken(hftpToken);
token = aspect.selectDelegationToken(ugi);
assertNotNull(token);
assertEquals(hftpToken, token);
// switch to using host-based tokens, no token should match
SecurityUtilTestHelper.setTokenServiceUseIp(false);
token = aspect.selectDelegationToken(ugi);
assertNull(token);
// test fallback to hdfs token
hdfsToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
new Text("localhost:8020"));
ugi.addToken(hdfsToken);
token = aspect.selectDelegationToken(ugi);
assertNotNull(token);
assertEquals(hdfsToken, token);
// test hftp is favored over hdfs
hftpToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
HftpFileSystem.TOKEN_KIND, new Text("localhost:"+port));
ugi.addToken(hftpToken);
token = aspect.selectDelegationToken(ugi);
assertNotNull(token);
assertEquals(hftpToken, token);
} }
} }

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
@ -29,23 +30,22 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.net.URLConnection; import java.net.URLConnection;
import java.util.Random;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.util.ServletUtil; import org.apache.hadoop.util.ServletUtil;
import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -53,8 +53,10 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
public class TestHftpFileSystem { public class TestHftpFileSystem {
private static final Random RAN = new Random(); private static final String BASEDIR = System.getProperty("test.build.dir",
"target/test-dir") + "/" + TestHftpFileSystem.class.getSimpleName();
private static String keystoresDir;
private static String sslConfDir;
private static Configuration config = null; private static Configuration config = null;
private static MiniDFSCluster cluster = null; private static MiniDFSCluster cluster = null;
private static String blockPoolId = null; private static String blockPoolId = null;
@ -83,25 +85,28 @@ public class TestHftpFileSystem {
new Path("/foo\">bar/foo\">bar"), }; new Path("/foo\">bar/foo\">bar"), };
@BeforeClass @BeforeClass
public static void setUp() throws IOException { public static void setUp() throws Exception {
((Log4JLogger) HftpFileSystem.LOG).getLogger().setLevel(Level.ALL);
final long seed = RAN.nextLong();
System.out.println("seed=" + seed);
RAN.setSeed(seed);
config = new Configuration(); config = new Configuration();
cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build(); cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build();
blockPoolId = cluster.getNamesystem().getBlockPoolId(); blockPoolId = cluster.getNamesystem().getBlockPoolId();
hftpUri = "hftp://" hftpUri = "hftp://"
+ config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
base.mkdirs();
keystoresDir = new File(BASEDIR).getAbsolutePath();
sslConfDir = KeyStoreTestUtil.getClasspathDir(TestHftpFileSystem.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, config, false);
} }
@AfterClass @AfterClass
public static void tearDown() throws IOException { public static void tearDown() throws Exception {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
FileUtil.fullyDelete(new File(BASEDIR));
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
} }
@Before @Before
@ -352,9 +357,12 @@ public class TestHftpFileSystem {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
URI uri = URI.create("hftp://localhost"); URI uri = URI.create("hftp://localhost");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
URLConnection conn = fs.connectionFactory.openConnection(new URL("http://localhost")); URLConnection conn = fs.connectionFactory.openConnection(new URL(
assertEquals(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT, conn.getConnectTimeout()); "http://localhost"));
assertEquals(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT, conn.getReadTimeout()); assertEquals(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
conn.getConnectTimeout());
assertEquals(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
conn.getReadTimeout());
} }
// / // /

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import java.io.File;
import java.net.InetSocketAddress;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestHttpsFileSystem {
private static final String BASEDIR = System.getProperty("test.build.dir",
"target/test-dir") + "/" + TestHttpsFileSystem.class.getSimpleName();
private static MiniDFSCluster cluster;
private static Configuration conf;
private static String keystoresDir;
private static String sslConfDir;
private static String nnAddr;
@BeforeClass
public static void setUp() throws Exception {
conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_HTTPS_ENABLE_KEY, true);
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
base.mkdirs();
keystoresDir = new File(BASEDIR).getAbsolutePath();
sslConfDir = KeyStoreTestUtil.getClasspathDir(TestHttpsFileSystem.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
cluster.getFileSystem().create(new Path("/test")).close();
InetSocketAddress addr = cluster.getNameNode().getHttpsAddress();
nnAddr = addr.getHostName() + ":" + addr.getPort();
}
@AfterClass
public static void tearDown() throws Exception {
cluster.shutdown();
FileUtil.fullyDelete(new File(BASEDIR));
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
}
@Test
public void testHsftpFileSystem() throws Exception {
FileSystem fs = FileSystem.get(new URI("hsftp://" + nnAddr), conf);
Assert.assertTrue(fs.exists(new Path("/test")));
fs.close();
}
}

View File

@ -19,13 +19,19 @@
package org.apache.hadoop.hdfs.web; package org.apache.hadoop.hdfs.web;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -35,6 +41,7 @@ import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DelegationTokenRenewer; import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.DelegationTokenRenewer.RenewAction;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -163,15 +170,44 @@ public class TestTokenAspect {
} }
} }
private static RenewAction<?> getActionFromTokenAspect(
TokenAspect<DummyFs> tokenAspect) {
return (RenewAction<?>) Whitebox.getInternalState(tokenAspect, "action");
}
@Test @Test
public void testGetRemoteToken() throws IOException, URISyntaxException { public void testCachedInitialization() throws IOException, URISyntaxException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
UserGroupInformation.setConfiguration(conf);
DummyFs fs = spy(new DummyFs()); DummyFs fs = spy(new DummyFs());
Token<TokenIdentifier> token = new Token<TokenIdentifier>(new byte[0], Token<TokenIdentifier> token = new Token<TokenIdentifier>(new byte[0],
new byte[0], DummyFs.TOKEN_KIND, new Text("127.0.0.1:1234")); new byte[0], DummyFs.TOKEN_KIND, new Text("127.0.0.1:1234"));
doReturn(token).when(fs).getDelegationToken(anyString()); doReturn(token).when(fs).getDelegationToken(anyString());
doReturn(token).when(fs).getRenewToken();
fs.emulateSecurityEnabled = true;
fs.initialize(new URI("dummyfs://127.0.0.1:1234"), conf);
fs.tokenAspect.ensureTokenInitialized();
verify(fs, times(1)).getDelegationToken(null);
verify(fs, times(1)).setDelegationToken(token);
// For the second iteration, the token should be cached.
fs.tokenAspect.ensureTokenInitialized();
verify(fs, times(1)).getDelegationToken(null);
verify(fs, times(1)).setDelegationToken(token);
}
@Test
public void testGetRemoteToken() throws IOException, URISyntaxException {
Configuration conf = new Configuration();
DummyFs fs = spy(new DummyFs());
Token<TokenIdentifier> token = new Token<TokenIdentifier>(new byte[0],
new byte[0], DummyFs.TOKEN_KIND, new Text("127.0.0.1:1234"));
doReturn(token).when(fs).getDelegationToken(anyString());
doReturn(token).when(fs).getRenewToken();
fs.initialize(new URI("dummyfs://127.0.0.1:1234"), conf); fs.initialize(new URI("dummyfs://127.0.0.1:1234"), conf);
fs.tokenAspect.ensureTokenInitialized(); fs.tokenAspect.ensureTokenInitialized();
@ -186,7 +222,6 @@ public class TestTokenAspect {
public void testGetRemoteTokenFailure() throws IOException, public void testGetRemoteTokenFailure() throws IOException,
URISyntaxException { URISyntaxException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
UserGroupInformation.setConfiguration(conf);
DummyFs fs = spy(new DummyFs()); DummyFs fs = spy(new DummyFs());
IOException e = new IOException(); IOException e = new IOException();
doThrow(e).when(fs).getDelegationToken(anyString()); doThrow(e).when(fs).getDelegationToken(anyString());
@ -203,7 +238,6 @@ public class TestTokenAspect {
@Test @Test
public void testInitWithNoTokens() throws IOException, URISyntaxException { public void testInitWithNoTokens() throws IOException, URISyntaxException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
UserGroupInformation.setConfiguration(conf);
DummyFs fs = spy(new DummyFs()); DummyFs fs = spy(new DummyFs());
doReturn(null).when(fs).getDelegationToken(anyString()); doReturn(null).when(fs).getDelegationToken(anyString());
fs.initialize(new URI("dummyfs://127.0.0.1:1234"), conf); fs.initialize(new URI("dummyfs://127.0.0.1:1234"), conf);
@ -218,7 +252,6 @@ public class TestTokenAspect {
@Test @Test
public void testInitWithUGIToken() throws IOException, URISyntaxException { public void testInitWithUGIToken() throws IOException, URISyntaxException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
UserGroupInformation.setConfiguration(conf);
DummyFs fs = spy(new DummyFs()); DummyFs fs = spy(new DummyFs());
doReturn(null).when(fs).getDelegationToken(anyString()); doReturn(null).when(fs).getDelegationToken(anyString());
@ -241,6 +274,51 @@ public class TestTokenAspect {
assertNull(Whitebox.getInternalState(fs.tokenAspect, "action")); assertNull(Whitebox.getInternalState(fs.tokenAspect, "action"));
} }
@Test
public void testRenewal() throws Exception {
Configuration conf = new Configuration();
Token<?> token1 = mock(Token.class);
Token<?> token2 = mock(Token.class);
final long renewCycle = 100;
DelegationTokenRenewer.renewCycle = renewCycle;
UserGroupInformation ugi = UserGroupInformation.createUserForTesting("foo",
new String[] { "bar" });
DummyFs fs = spy(new DummyFs());
doReturn(token1).doReturn(token2).when(fs).getDelegationToken(null);
doReturn(token1).when(fs).getRenewToken();
// cause token renewer to abandon the token
doThrow(new IOException("renew failed")).when(token1).renew(conf);
doThrow(new IOException("get failed")).when(fs).addDelegationTokens(null,
null);
TokenAspect<DummyFs> tokenAspect = new TokenAspect<DummyFs>(fs,
DummyFs.TOKEN_KIND);
fs.initialize(new URI("dummyfs://127.0.0.1:1234"), conf);
tokenAspect.initDelegationToken(ugi);
// trigger token acquisition
tokenAspect.ensureTokenInitialized();
DelegationTokenRenewer.RenewAction<?> action = getActionFromTokenAspect(tokenAspect);
verify(fs).setDelegationToken(token1);
assertTrue(action.isValid());
// upon renewal, token will go bad based on above stubbing
Thread.sleep(renewCycle * 2);
assertSame(action, getActionFromTokenAspect(tokenAspect));
assertFalse(action.isValid());
// now that token is invalid, should get a new one
tokenAspect.ensureTokenInitialized();
verify(fs, times(2)).getDelegationToken(anyString());
verify(fs).setDelegationToken(token2);
assertNotSame(action, getActionFromTokenAspect(tokenAspect));
action = getActionFromTokenAspect(tokenAspect);
assertTrue(action.isValid());
}
@Test @Test
public void testTokenSelectionPreferences() throws IOException, public void testTokenSelectionPreferences() throws IOException,
URISyntaxException { URISyntaxException {
@ -252,7 +330,6 @@ public class TestTokenAspect {
DummyFs.TOKEN_KIND); DummyFs.TOKEN_KIND);
UserGroupInformation ugi = UserGroupInformation.createUserForTesting("foo", UserGroupInformation ugi = UserGroupInformation.createUserForTesting("foo",
new String[] { "bar" }); new String[] { "bar" });
UserGroupInformation.setConfiguration(conf);
// use ip-based tokens // use ip-based tokens
SecurityUtilTestHelper.setTokenServiceUseIp(true); SecurityUtilTestHelper.setTokenServiceUseIp(true);

View File

@ -19,16 +19,20 @@
package org.apache.hadoop.hdfs.web; package org.apache.hadoop.hdfs.web;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.*; import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.*; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.DelegationTokenRenewer.RenewAction;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam; import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam; import org.apache.hadoop.hdfs.web.resources.GetOpParam;
@ -40,96 +44,38 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
public class TestWebHdfsTokens { public class TestWebHdfsTokens {
static Configuration conf; private static Configuration conf;
static UserGroupInformation ugi;
@BeforeClass @BeforeClass
public static void setup() throws IOException { public static void setUp() {
conf = new Configuration(); conf = new Configuration();
SecurityUtil.setAuthenticationMethod(KERBEROS, conf); SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
ugi = UserGroupInformation.getCurrentUser(); }
private WebHdfsFileSystem spyWebhdfsInSecureSetup() throws IOException {
WebHdfsFileSystem fsOrig = new WebHdfsFileSystem();
fsOrig.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
WebHdfsFileSystem fs = spy(fsOrig);
Whitebox.setInternalState(fsOrig.tokenAspect, "fs", fs);
return fs;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test(timeout=1000) @Test(timeout = 1000)
public void testInitWithNoToken() throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
doReturn(null).when(fs).getDelegationToken(anyString());
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// when not in ugi, don't get one
verify(fs).initDelegationToken();
verify(fs).selectDelegationToken(ugi);
verify(fs, never()).setDelegationToken(any(Token.class));
verify(fs, never()).getDelegationToken();
verify(fs, never()).getDelegationToken(anyString());
}
@SuppressWarnings("unchecked")
@Test(timeout=1000)
public void testInitWithUGIToken() throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
Token<DelegationTokenIdentifier> token = mock(Token.class);
doReturn(token).when(fs).selectDelegationToken(ugi);
doReturn(null).when(fs).getDelegationToken(anyString());
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// when in the ugi, store it but don't renew it
verify(fs).initDelegationToken();
verify(fs).selectDelegationToken(ugi);
verify(fs).setDelegationToken(token);
verify(fs, never()).getDelegationToken();
verify(fs, never()).getDelegationToken(anyString());
verify(fs, never()).addRenewAction(fs);
}
@SuppressWarnings("unchecked")
@Test(timeout=1000)
public void testInternalGetDelegationToken() throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
Token<DelegationTokenIdentifier> token = mock(Token.class);
doReturn(null).when(fs).selectDelegationToken(ugi);
doReturn(token).when(fs).getDelegationToken(anyString());
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// get token, store it, and renew it
Token<?> token2 = fs.getDelegationToken();
assertEquals(token2, token);
verify(fs).getDelegationToken(null);
verify(fs).setDelegationToken(token);
verify(fs).addRenewAction(fs);
reset(fs);
// just return token, don't get/set/renew
token2 = fs.getDelegationToken();
assertEquals(token2, token);
verify(fs, never()).getDelegationToken(null);
verify(fs, never()).setDelegationToken(any(Token.class));
verify(fs, never()).addRenewAction(fs);
}
@SuppressWarnings("unchecked")
@Test(timeout=1000)
public void testTokenForNonTokenOp() throws IOException { public void testTokenForNonTokenOp() throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem()); WebHdfsFileSystem fs = spyWebhdfsInSecureSetup();
Token<DelegationTokenIdentifier> token = mock(Token.class); Token<DelegationTokenIdentifier> token = mock(Token.class);
doReturn(null).when(fs).selectDelegationToken(ugi);
doReturn(token).when(fs).getDelegationToken(null); doReturn(token).when(fs).getDelegationToken(null);
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// should get/set/renew token // should get/set/renew token
fs.toUrl(GetOpParam.Op.OPEN, null); fs.toUrl(GetOpParam.Op.OPEN, null);
verify(fs).getDelegationToken(); verify(fs).getDelegationToken();
verify(fs).getDelegationToken(null); verify(fs).getDelegationToken(null);
verify(fs).setDelegationToken(token); verify(fs).setDelegationToken(token);
verify(fs).addRenewAction(fs);
reset(fs); reset(fs);
// should return prior token // should return prior token
@ -137,30 +83,27 @@ public class TestWebHdfsTokens {
verify(fs).getDelegationToken(); verify(fs).getDelegationToken();
verify(fs, never()).getDelegationToken(null); verify(fs, never()).getDelegationToken(null);
verify(fs, never()).setDelegationToken(token); verify(fs, never()).setDelegationToken(token);
verify(fs, never()).addRenewAction(fs);
} }
@Test(timeout=1000) @Test(timeout = 1000)
public void testNoTokenForGetToken() throws IOException { public void testNoTokenForGetToken() throws IOException {
checkNoTokenForOperation(GetOpParam.Op.GETDELEGATIONTOKEN); checkNoTokenForOperation(GetOpParam.Op.GETDELEGATIONTOKEN);
} }
@Test(timeout=1000) @Test(timeout = 1000)
public void testNoTokenForCanclToken() throws IOException { public void testNoTokenForCanclToken() throws IOException {
checkNoTokenForOperation(PutOpParam.Op.RENEWDELEGATIONTOKEN); checkNoTokenForOperation(PutOpParam.Op.RENEWDELEGATIONTOKEN);
} }
@Test(timeout=1000) @Test(timeout = 1000)
public void testNoTokenForCancelToken() throws IOException { public void testNoTokenForCancelToken() throws IOException {
checkNoTokenForOperation(PutOpParam.Op.CANCELDELEGATIONTOKEN); checkNoTokenForOperation(PutOpParam.Op.CANCELDELEGATIONTOKEN);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void checkNoTokenForOperation(HttpOpParam.Op op) throws IOException { private void checkNoTokenForOperation(HttpOpParam.Op op) throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem()); WebHdfsFileSystem fs = spyWebhdfsInSecureSetup();
doReturn(null).when(fs).selectDelegationToken(ugi);
doReturn(null).when(fs).getDelegationToken(null); doReturn(null).when(fs).getDelegationToken(null);
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf); fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// do not get a token! // do not get a token!
@ -168,10 +111,9 @@ public class TestWebHdfsTokens {
verify(fs, never()).getDelegationToken(); verify(fs, never()).getDelegationToken();
verify(fs, never()).getDelegationToken(null); verify(fs, never()).getDelegationToken(null);
verify(fs, never()).setDelegationToken(any(Token.class)); verify(fs, never()).setDelegationToken(any(Token.class));
verify(fs, never()).addRenewAction(fs);
} }
@Test(timeout=1000) @Test(timeout = 1000)
public void testGetOpRequireAuth() { public void testGetOpRequireAuth() {
for (HttpOpParam.Op op : GetOpParam.Op.values()) { for (HttpOpParam.Op op : GetOpParam.Op.values()) {
boolean expect = (op == GetOpParam.Op.GETDELEGATIONTOKEN); boolean expect = (op == GetOpParam.Op.GETDELEGATIONTOKEN);
@ -179,72 +121,25 @@ public class TestWebHdfsTokens {
} }
} }
@Test(timeout=1000) @Test(timeout = 1000)
public void testPutOpRequireAuth() { public void testPutOpRequireAuth() {
for (HttpOpParam.Op op : PutOpParam.Op.values()) { for (HttpOpParam.Op op : PutOpParam.Op.values()) {
boolean expect = (op == PutOpParam.Op.RENEWDELEGATIONTOKEN || boolean expect = (op == PutOpParam.Op.RENEWDELEGATIONTOKEN || op == PutOpParam.Op.CANCELDELEGATIONTOKEN);
op == PutOpParam.Op.CANCELDELEGATIONTOKEN);
assertEquals(expect, op.getRequireAuth()); assertEquals(expect, op.getRequireAuth());
} }
} }
@Test(timeout=1000) @Test(timeout = 1000)
public void testPostOpRequireAuth() { public void testPostOpRequireAuth() {
for (HttpOpParam.Op op : PostOpParam.Op.values()) { for (HttpOpParam.Op op : PostOpParam.Op.values()) {
assertFalse(op.getRequireAuth()); assertFalse(op.getRequireAuth());
} }
} }
@Test(timeout=1000) @Test(timeout = 1000)
public void testDeleteOpRequireAuth() { public void testDeleteOpRequireAuth() {
for (HttpOpParam.Op op : DeleteOpParam.Op.values()) { for (HttpOpParam.Op op : DeleteOpParam.Op.values()) {
assertFalse(op.getRequireAuth()); assertFalse(op.getRequireAuth());
} }
} }
@Test
public void testGetTokenAfterFailure() throws Exception {
Configuration conf = mock(Configuration.class);
Token<?> token1 = mock(Token.class);
Token<?> token2 = mock(Token.class);
long renewCycle = 1000;
DelegationTokenRenewer.renewCycle = renewCycle;
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
doReturn(conf).when(fs).getConf();
doReturn(token1).doReturn(token2).when(fs).getDelegationToken(null);
// cause token renewer to abandon the token
doThrow(new IOException("renew failed")).when(token1).renew(conf);
doThrow(new IOException("get failed")).when(fs).addDelegationTokens(null, null);
// trigger token acquisition
Token<?> token = fs.getDelegationToken();
RenewAction<?> action = fs.action;
assertSame(token1, token);
assertTrue(action.isValid());
// fetch again and make sure it's the same as before
token = fs.getDelegationToken();
assertSame(token1, token);
assertSame(action, fs.action);
assertTrue(fs.action.isValid());
// upon renewal, token will go bad based on above stubbing
Thread.sleep(renewCycle);
assertSame(action, fs.action);
assertFalse(fs.action.isValid());
// now that token is invalid, should get a new one
token = fs.getDelegationToken();
assertSame(token2, token);
assertNotSame(action, fs.action);
assertTrue(fs.action.isValid());
action = fs.action;
// should get same one again
token = fs.getDelegationToken();
assertSame(token2, token);
assertSame(action, fs.action);
assertTrue(fs.action.isValid());
}
} }

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.web;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
@ -36,15 +34,20 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.web.resources.*; import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.junit.Assert;
import org.junit.*; import org.junit.Before;
import org.junit.Test;
public class TestWebHdfsUrl { public class TestWebHdfsUrl {
// NOTE: port is never used // NOTE: port is never used
@ -306,95 +309,4 @@ public class TestWebHdfsUrl {
} }
return (WebHdfsFileSystem) FileSystem.get(uri, conf); return (WebHdfsFileSystem) FileSystem.get(uri, conf);
} }
@Test(timeout=60000)
public void testSelectHdfsDelegationToken() throws Exception {
SecurityUtilTestHelper.setTokenServiceUseIp(true);
Configuration conf = new Configuration();
conf.setClass("fs.webhdfs.impl", MyWebHdfsFileSystem.class, FileSystem.class);
// test with implicit default port
URI fsUri = URI.create("webhdfs://localhost");
MyWebHdfsFileSystem fs = (MyWebHdfsFileSystem) FileSystem.get(fsUri, conf);
checkTokenSelection(fs, conf);
// test with explicit default port
fsUri = URI.create("webhdfs://localhost:"+fs.getDefaultPort());
fs = (MyWebHdfsFileSystem) FileSystem.get(fsUri, conf);
checkTokenSelection(fs, conf);
// test with non-default port
fsUri = URI.create("webhdfs://localhost:"+(fs.getDefaultPort()-1));
fs = (MyWebHdfsFileSystem) FileSystem.get(fsUri, conf);
checkTokenSelection(fs, conf);
}
private void checkTokenSelection(MyWebHdfsFileSystem fs,
Configuration conf) throws IOException {
int port = fs.getCanonicalUri().getPort();
// can't clear tokens from ugi, so create a new user everytime
UserGroupInformation ugi =
UserGroupInformation.createUserForTesting(fs.getUri().getAuthority(), new String[]{});
// use ip-based tokens
SecurityUtilTestHelper.setTokenServiceUseIp(true);
// test fallback to hdfs token
Token<?> hdfsToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
new Text("127.0.0.1:8020"));
ugi.addToken(hdfsToken);
// test fallback to hdfs token
Token<?> token = fs.selectDelegationToken(ugi);
assertNotNull(token);
assertEquals(hdfsToken, token);
// test webhdfs is favored over hdfs
Token<?> webHdfsToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
WebHdfsFileSystem.TOKEN_KIND, new Text("127.0.0.1:"+port));
ugi.addToken(webHdfsToken);
token = fs.selectDelegationToken(ugi);
assertNotNull(token);
assertEquals(webHdfsToken, token);
// switch to using host-based tokens, no token should match
SecurityUtilTestHelper.setTokenServiceUseIp(false);
token = fs.selectDelegationToken(ugi);
assertNull(token);
// test fallback to hdfs token
hdfsToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
new Text("localhost:8020"));
ugi.addToken(hdfsToken);
token = fs.selectDelegationToken(ugi);
assertNotNull(token);
assertEquals(hdfsToken, token);
// test webhdfs is favored over hdfs
webHdfsToken = new Token<TokenIdentifier>(
new byte[0], new byte[0],
WebHdfsFileSystem.TOKEN_KIND, new Text("localhost:"+port));
ugi.addToken(webHdfsToken);
token = fs.selectDelegationToken(ugi);
assertNotNull(token);
assertEquals(webHdfsToken, token);
}
static class MyWebHdfsFileSystem extends WebHdfsFileSystem {
@Override
public URI getCanonicalUri() {
return super.getCanonicalUri();
}
@Override
public int getDefaultPort() {
return super.getDefaultPort();
}
}
} }

View File

@ -1,26 +0,0 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<!-- Turn off SSL server authentication for tests by default -->
<property>
<name>ssl.client.do.not.authenticate.server</name>
<value>true</value>
</property>
</configuration>

View File

@ -220,6 +220,9 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5616. MR Client-AppMaster RPC max retries on socket timeout is too MAPREDUCE-5616. MR Client-AppMaster RPC max retries on socket timeout is too
high. (cnauroth) high. (cnauroth)
MAPREDUCE-5625. TestFixedLengthInputFormat fails in jdk7 environment
(Mariappan Asokan via jeagles)
Release 2.2.1 - UNRELEASED Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -1546,6 +1549,9 @@ Release 0.23.10 - UNRELEASED
MAPREDUCE-5587. TestTextOutputFormat fails on JDK7 (jeagles) MAPREDUCE-5587. TestTextOutputFormat fails on JDK7 (jeagles)
MAPREDUCE-5373. TestFetchFailure.testFetchFailureMultipleReduces could fail
intermittently (jeagles)
Release 0.23.9 - 2013-07-08 Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -323,8 +323,7 @@ public class TestFetchFailure {
app.waitForState(reduceAttempt3, TaskAttemptState.RUNNING); app.waitForState(reduceAttempt3, TaskAttemptState.RUNNING);
updateStatus(app, reduceAttempt3, Phase.SHUFFLE); updateStatus(app, reduceAttempt3, Phase.SHUFFLE);
//send 3 fetch failures from reduce to trigger map re execution //send 2 fetch failures from reduce to prepare for map re execution
sendFetchFailure(app, reduceAttempt, mapAttempt1);
sendFetchFailure(app, reduceAttempt, mapAttempt1); sendFetchFailure(app, reduceAttempt, mapAttempt1);
sendFetchFailure(app, reduceAttempt, mapAttempt1); sendFetchFailure(app, reduceAttempt, mapAttempt1);
@ -333,6 +332,7 @@ public class TestFetchFailure {
updateStatus(app, reduceAttempt2, Phase.REDUCE); updateStatus(app, reduceAttempt2, Phase.REDUCE);
updateStatus(app, reduceAttempt3, Phase.REDUCE); updateStatus(app, reduceAttempt3, Phase.REDUCE);
//send 3rd fetch failures from reduce to trigger map re execution
sendFetchFailure(app, reduceAttempt, mapAttempt1); sendFetchFailure(app, reduceAttempt, mapAttempt1);
//wait for map Task state move back to RUNNING //wait for map Task state move back to RUNNING

View File

@ -99,8 +99,7 @@ public class TestFixedLengthInputFormat {
Path file = new Path(workDir, new String("testFormat.txt")); Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10); createFile(file, null, 10, 10);
// Set the fixed length record length config property // Set the fixed length record length config property
Configuration testConf = new Configuration(defaultConf); JobConf job = new JobConf(defaultConf);
JobConf job = new JobConf(testConf);
FileInputFormat.setInputPaths(job, workDir); FileInputFormat.setInputPaths(job, workDir);
FixedLengthInputFormat format = new FixedLengthInputFormat(); FixedLengthInputFormat format = new FixedLengthInputFormat();
format.configure(job); format.configure(job);
@ -127,8 +126,7 @@ public class TestFixedLengthInputFormat {
Path file = new Path(workDir, new String("testFormat.txt")); Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10); createFile(file, null, 10, 10);
// Set the fixed length record length config property // Set the fixed length record length config property
Configuration testConf = new Configuration(defaultConf); JobConf job = new JobConf(defaultConf);
JobConf job = new JobConf(testConf);
FileInputFormat.setInputPaths(job, workDir); FileInputFormat.setInputPaths(job, workDir);
FixedLengthInputFormat format = new FixedLengthInputFormat(); FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job, 0); format.setRecordLength(job, 0);
@ -156,8 +154,7 @@ public class TestFixedLengthInputFormat {
Path file = new Path(workDir, new String("testFormat.txt")); Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10); createFile(file, null, 10, 10);
// Set the fixed length record length config property // Set the fixed length record length config property
Configuration testConf = new Configuration(defaultConf); JobConf job = new JobConf(defaultConf);
JobConf job = new JobConf(testConf);
FileInputFormat.setInputPaths(job, workDir); FileInputFormat.setInputPaths(job, workDir);
FixedLengthInputFormat format = new FixedLengthInputFormat(); FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job, -10); format.setRecordLength(job, -10);
@ -206,8 +203,8 @@ public class TestFixedLengthInputFormat {
writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
"ten nine eightsevensix five four threetwo one "); "ten nine eightsevensix five four threetwo one ");
FixedLengthInputFormat format = new FixedLengthInputFormat(); FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(defaultConf, 5);
JobConf job = new JobConf(defaultConf); JobConf job = new JobConf(defaultConf);
format.setRecordLength(job, 5);
FileInputFormat.setInputPaths(job, workDir); FileInputFormat.setInputPaths(job, workDir);
ReflectionUtils.setConf(gzip, job); ReflectionUtils.setConf(gzip, job);
format.configure(job); format.configure(job);
@ -290,9 +287,9 @@ public class TestFixedLengthInputFormat {
ArrayList<String> recordList ArrayList<String> recordList
= createFile(file, codec, recordLength, totalRecords); = createFile(file, codec, recordLength, totalRecords);
assertTrue(localFs.exists(file)); assertTrue(localFs.exists(file));
// Set the fixed length record length config property // Create the job and set the fixed length record length config property
Configuration testConf = new Configuration(defaultConf); JobConf job = new JobConf(defaultConf);
FixedLengthInputFormat.setRecordLength(testConf, recordLength); FixedLengthInputFormat.setRecordLength(job, recordLength);
int numSplits = 1; int numSplits = 1;
// Arbitrarily set number of splits. // Arbitrarily set number of splits.
@ -313,8 +310,7 @@ public class TestFixedLengthInputFormat {
LOG.info("Number of splits set to: " + numSplits); LOG.info("Number of splits set to: " + numSplits);
} }
// Create the job, and setup the input path // Setup the input path
JobConf job = new JobConf(testConf);
FileInputFormat.setInputPaths(job, workDir); FileInputFormat.setInputPaths(job, workDir);
// Try splitting the file in a variety of sizes // Try splitting the file in a variety of sizes
FixedLengthInputFormat format = new FixedLengthInputFormat(); FixedLengthInputFormat format = new FixedLengthInputFormat();
@ -390,8 +386,8 @@ public class TestFixedLengthInputFormat {
writeFile(localFs, new Path(workDir, fileName.toString()), codec, writeFile(localFs, new Path(workDir, fileName.toString()), codec,
"one two threefour five six seveneightnine ten"); "one two threefour five six seveneightnine ten");
FixedLengthInputFormat format = new FixedLengthInputFormat(); FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(defaultConf, 5);
JobConf job = new JobConf(defaultConf); JobConf job = new JobConf(defaultConf);
format.setRecordLength(job, 5);
FileInputFormat.setInputPaths(job, workDir); FileInputFormat.setInputPaths(job, workDir);
if (codec != null) { if (codec != null) {
ReflectionUtils.setConf(codec, job); ReflectionUtils.setConf(codec, job);

View File

@ -104,9 +104,8 @@ public class TestFixedLengthInputFormat {
localFs.delete(workDir, true); localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt")); Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10); createFile(file, null, 10, 10);
// Set the fixed length record length config property // Create the job and do not set fixed record length
Configuration testConf = new Configuration(defaultConf); Job job = Job.getInstance(defaultConf);
Job job = Job.getInstance(testConf);
FileInputFormat.setInputPaths(job, workDir); FileInputFormat.setInputPaths(job, workDir);
FixedLengthInputFormat format = new FixedLengthInputFormat(); FixedLengthInputFormat format = new FixedLengthInputFormat();
List<InputSplit> splits = format.getSplits(job); List<InputSplit> splits = format.getSplits(job);
@ -139,11 +138,10 @@ public class TestFixedLengthInputFormat {
localFs.delete(workDir, true); localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt")); Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10); createFile(file, null, 10, 10);
Job job = Job.getInstance(defaultConf);
// Set the fixed length record length config property // Set the fixed length record length config property
Configuration testConf = new Configuration(defaultConf);
FixedLengthInputFormat format = new FixedLengthInputFormat(); FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(testConf, 0); format.setRecordLength(job.getConfiguration(), 0);
Job job = Job.getInstance(testConf);
FileInputFormat.setInputPaths(job, workDir); FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job); List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false; boolean exceptionThrown = false;
@ -177,10 +175,9 @@ public class TestFixedLengthInputFormat {
Path file = new Path(workDir, new String("testFormat.txt")); Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10); createFile(file, null, 10, 10);
// Set the fixed length record length config property // Set the fixed length record length config property
Configuration testConf = new Configuration(defaultConf); Job job = Job.getInstance(defaultConf);
FixedLengthInputFormat format = new FixedLengthInputFormat(); FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(testConf, -10); format.setRecordLength(job.getConfiguration(), -10);
Job job = Job.getInstance(testConf);
FileInputFormat.setInputPaths(job, workDir); FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job); List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false; boolean exceptionThrown = false;
@ -233,10 +230,10 @@ public class TestFixedLengthInputFormat {
"one two threefour five six seveneightnine ten "); "one two threefour five six seveneightnine ten ");
writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
"ten nine eightsevensix five four threetwo one "); "ten nine eightsevensix five four threetwo one ");
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(defaultConf, 5);
ReflectionUtils.setConf(gzip, defaultConf);
Job job = Job.getInstance(defaultConf); Job job = Job.getInstance(defaultConf);
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job.getConfiguration(), 5);
ReflectionUtils.setConf(gzip, job.getConfiguration());
FileInputFormat.setInputPaths(job, workDir); FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job); List<InputSplit> splits = format.getSplits(job);
assertEquals("compressed splits == 2", 2, splits.size()); assertEquals("compressed splits == 2", 2, splits.size());
@ -317,9 +314,10 @@ public class TestFixedLengthInputFormat {
ArrayList<String> recordList = ArrayList<String> recordList =
createFile(file, codec, recordLength, totalRecords); createFile(file, codec, recordLength, totalRecords);
assertTrue(localFs.exists(file)); assertTrue(localFs.exists(file));
// Set the fixed length record length config property // Create the job and set the fixed length record length config property
Configuration testConf = new Configuration(defaultConf); Job job = Job.getInstance(defaultConf);
FixedLengthInputFormat.setRecordLength(testConf, recordLength); FixedLengthInputFormat.setRecordLength(job.getConfiguration(),
recordLength);
int numSplits = 1; int numSplits = 1;
// Arbitrarily set number of splits. // Arbitrarily set number of splits.
@ -339,11 +337,11 @@ public class TestFixedLengthInputFormat {
} }
LOG.info("Number of splits set to: " + numSplits); LOG.info("Number of splits set to: " + numSplits);
} }
testConf.setLong("mapreduce.input.fileinputformat.split.maxsize", job.getConfiguration().setLong(
"mapreduce.input.fileinputformat.split.maxsize",
(long)(fileSize/numSplits)); (long)(fileSize/numSplits));
// Create the job, and setup the input path // setup the input path
Job job = Job.getInstance(testConf);
FileInputFormat.setInputPaths(job, workDir); FileInputFormat.setInputPaths(job, workDir);
// Try splitting the file in a variety of sizes // Try splitting the file in a variety of sizes
FixedLengthInputFormat format = new FixedLengthInputFormat(); FixedLengthInputFormat format = new FixedLengthInputFormat();
@ -429,18 +427,18 @@ public class TestFixedLengthInputFormat {
private void runPartialRecordTest(CompressionCodec codec) throws Exception { private void runPartialRecordTest(CompressionCodec codec) throws Exception {
localFs.delete(workDir, true); localFs.delete(workDir, true);
Job job = Job.getInstance(defaultConf);
// Create a file with fixed length records with 5 byte long // Create a file with fixed length records with 5 byte long
// records with a partial record at the end. // records with a partial record at the end.
StringBuilder fileName = new StringBuilder("testFormat.txt"); StringBuilder fileName = new StringBuilder("testFormat.txt");
if (codec != null) { if (codec != null) {
fileName.append(".gz"); fileName.append(".gz");
ReflectionUtils.setConf(codec, defaultConf); ReflectionUtils.setConf(codec, job.getConfiguration());
} }
writeFile(localFs, new Path(workDir, fileName.toString()), codec, writeFile(localFs, new Path(workDir, fileName.toString()), codec,
"one two threefour five six seveneightnine ten"); "one two threefour five six seveneightnine ten");
FixedLengthInputFormat format = new FixedLengthInputFormat(); FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(defaultConf, 5); format.setRecordLength(job.getConfiguration(), 5);
Job job = Job.getInstance(defaultConf);
FileInputFormat.setInputPaths(job, workDir); FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job); List<InputSplit> splits = format.getSplits(job);
if (codec != null) { if (codec != null) {

View File

@ -136,6 +136,9 @@ Release 2.3.0 - UNRELEASED
YARN-1401. With zero sleep-delay-before-sigkill.ms, no signal is ever sent YARN-1401. With zero sleep-delay-before-sigkill.ms, no signal is ever sent
(Gera Shegalov via Sandy Ryza) (Gera Shegalov via Sandy Ryza)
YARN-1411. HA config shouldn't affect NodeManager RPC addresses (Karthik
Kambatla via bikas)
Release 2.2.1 - UNRELEASED Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -23,29 +23,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List;
@InterfaceAudience.Private @InterfaceAudience.Private
public class HAUtil { public class HAUtil {
private static Log LOG = LogFactory.getLog(HAUtil.class); private static Log LOG = LogFactory.getLog(HAUtil.class);
public static final List<String> RPC_ADDRESS_CONF_KEYS =
Collections.unmodifiableList(Arrays.asList(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS,
// TODO Remove after YARN-1318
YarnConfiguration.RM_HA_ADMIN_ADDRESS));
public static final String BAD_CONFIG_MESSAGE_PREFIX = public static final String BAD_CONFIG_MESSAGE_PREFIX =
"Invalid configuration! "; "Invalid configuration! ";
@ -139,7 +124,7 @@ public class HAUtil {
} }
public static void verifyAndSetAllRpcAddresses(Configuration conf) { public static void verifyAndSetAllRpcAddresses(Configuration conf) {
for (String confKey : RPC_ADDRESS_CONF_KEYS) { for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
verifyAndSetConfValue(confKey, conf); verifyAndSetConfValue(confKey, conf);
} }
} }
@ -188,9 +173,12 @@ public class HAUtil {
ids.toString() + ")"; ids.toString() + ")";
} }
private static String getConfKeyForRMInstance(String prefix, @InterfaceAudience.Private
Configuration conf) { @VisibleForTesting
return addSuffix(prefix, getRMHAId(conf)); static String getConfKeyForRMInstance(String prefix, Configuration conf) {
return YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS.contains(prefix)
? addSuffix(prefix, getRMHAId(conf))
: prefix;
} }
public static String getConfValueForRMInstance(String prefix, public static String getConfValueForRMInstance(String prefix,

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.conf;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
@ -295,6 +297,17 @@ public class YarnConfiguration extends Configuration {
public static final int DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT = 1; public static final int DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT = 1;
// end @Private // end @Private
public static final List<String> RM_RPC_ADDRESS_CONF_KEYS =
Collections.unmodifiableList(Arrays.asList(
RM_ADDRESS,
RM_SCHEDULER_ADDRESS,
RM_ADMIN_ADDRESS,
RM_RESOURCE_TRACKER_ADDRESS,
RM_WEBAPP_ADDRESS,
RM_WEBAPP_HTTPS_ADDRESS,
// TODO Remove after YARN-1318
RM_HA_ADMIN_ADDRESS));
//////////////////////////////// ////////////////////////////////
// RM state store configs // RM state store configs
//////////////////////////////// ////////////////////////////////
@ -924,7 +937,7 @@ public class YarnConfiguration extends Configuration {
public InetSocketAddress getSocketAddr( public InetSocketAddress getSocketAddr(
String name, String defaultAddress, int defaultPort) { String name, String defaultAddress, int defaultPort) {
String address; String address;
if (HAUtil.isHAEnabled(this)) { if (HAUtil.isHAEnabled(this) && RM_RPC_ADDRESS_CONF_KEYS.contains(name)) {
address = HAUtil.getConfValueForRMInstance(name, defaultAddress, this); address = HAUtil.getConfValueForRMInstance(name, defaultAddress, this);
} else { } else {
address = get(name, defaultAddress); address = get(name, defaultAddress);

View File

@ -28,7 +28,9 @@ import org.junit.Test;
import java.util.Collection; import java.util.Collection;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
public class TestHAUtil { public class TestHAUtil {
@ -51,7 +53,7 @@ public class TestHAUtil {
conf.set(YarnConfiguration.RM_HA_IDS, RM_NODE_IDS_UNTRIMMED); conf.set(YarnConfiguration.RM_HA_IDS, RM_NODE_IDS_UNTRIMMED);
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED); conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) { for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
// configuration key itself cannot contains space/tab/return chars. // configuration key itself cannot contains space/tab/return chars.
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED); conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS); conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
@ -92,7 +94,7 @@ public class TestHAUtil {
StringUtils.getStringCollection(RM_NODE_IDS), HAUtil.getRMHAIds(conf)); StringUtils.getStringCollection(RM_NODE_IDS), HAUtil.getRMHAIds(conf));
assertEquals("Should be saved as Trimmed string", assertEquals("Should be saved as Trimmed string",
RM1_NODE_ID, HAUtil.getRMHAId(conf)); RM1_NODE_ID, HAUtil.getRMHAId(conf));
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) { for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
assertEquals("RPC address not set for " + confKey, assertEquals("RPC address not set for " + confKey,
RM1_ADDRESS, conf.get(confKey)); RM1_ADDRESS, conf.get(confKey));
} }
@ -111,7 +113,7 @@ public class TestHAUtil {
conf.clear(); conf.clear();
conf.set(YarnConfiguration.RM_HA_ID, RM_INVALID_NODE_ID); conf.set(YarnConfiguration.RM_HA_ID, RM_INVALID_NODE_ID);
conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID); conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID);
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) { for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
// simulate xml with invalid node id // simulate xml with invalid node id
conf.set(confKey + RM_INVALID_NODE_ID, RM_INVALID_NODE_ID); conf.set(confKey + RM_INVALID_NODE_ID, RM_INVALID_NODE_ID);
} }
@ -126,7 +128,7 @@ public class TestHAUtil {
} }
conf.clear(); conf.clear();
// simulate the case HAUtil.RPC_ADDRESS_CONF_KEYS are not set // simulate the case HAUtil.RM_RPC_ADDRESS_CONF_KEYS are not set
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID); conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID); conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID);
try { try {
@ -145,7 +147,7 @@ public class TestHAUtil {
conf.clear(); conf.clear();
conf.set(YarnConfiguration.RM_HA_IDS, RM2_NODE_ID + "," + RM3_NODE_ID); conf.set(YarnConfiguration.RM_HA_IDS, RM2_NODE_ID + "," + RM3_NODE_ID);
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED); conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) { for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED); conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS); conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
} }
@ -158,4 +160,14 @@ public class TestHAUtil {
e.getMessage()); e.getMessage());
} }
} }
@Test
public void testGetConfKeyForRMInstance() {
assertTrue("RM instance id is not suffixed",
HAUtil.getConfKeyForRMInstance(YarnConfiguration.RM_ADDRESS, conf)
.contains(HAUtil.getRMHAId(conf)));
assertFalse("RM instance id is suffixed",
HAUtil.getConfKeyForRMInstance(YarnConfiguration.NM_ADDRESS, conf)
.contains(HAUtil.getRMHAId(conf)));
}
} }

View File

@ -23,6 +23,12 @@ import junit.framework.Assert;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.junit.Test; import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestYarnConfiguration { public class TestYarnConfiguration {
@Test @Test
@ -52,4 +58,21 @@ public class TestYarnConfiguration {
"http://rmtesting:24543", rmWebUrl); "http://rmtesting:24543", rmWebUrl);
} }
@Test
public void testGetSocketAddressForNMWithHA() {
YarnConfiguration conf = new YarnConfiguration();
// Set NM address
conf.set(YarnConfiguration.NM_ADDRESS, "0.0.0.0:1234");
// Set HA
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_ID, "rm1");
assertTrue(HAUtil.isHAEnabled(conf));
InetSocketAddress addr = conf.getSocketAddr(YarnConfiguration.NM_ADDRESS,
YarnConfiguration.DEFAULT_NM_ADDRESS,
YarnConfiguration.DEFAULT_NM_PORT);
assertEquals(1234, addr.getPort());
}
} }

View File

@ -51,7 +51,7 @@ public class TestRMHA {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID); conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID);
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) { for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS); conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
} }
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID); conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);

View File

@ -104,7 +104,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.RM_HA_ID, rmId); conf.set(YarnConfiguration.RM_HA_ID, rmId);
for (String rpcAddress : HAUtil.RPC_ADDRESS_CONF_KEYS) { for (String rpcAddress : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(rpcAddress, rmId), "localhost:0"); conf.set(HAUtil.addSuffix(rpcAddress, rmId), "localhost:0");
} }
conf.set(YarnConfiguration.RM_HA_ADMIN_ADDRESS, "localhost:" + adminPort); conf.set(YarnConfiguration.RM_HA_ADMIN_ADDRESS, "localhost:" + adminPort);

11
pom.xml
View File

@ -274,6 +274,11 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<artifactId>maven-clover2-plugin</artifactId> <artifactId>maven-clover2-plugin</artifactId>
<version>3.0.5</version> <version>3.0.5</version>
</plugin> </plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>2.4.0</version>
</plugin>
</plugins> </plugins>
</pluginManagement> </pluginManagement>
@ -333,6 +338,12 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<inherited>true</inherited>
<extensions>true</extensions>
</plugin>
</plugins> </plugins>
</build> </build>