Merge trunk to HDFS-4685.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4685@1569833 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-02-19 17:24:02 +00:00
commit 909eb7e4f1
27 changed files with 301 additions and 127 deletions

View File

@ -301,6 +301,18 @@ Trunk (Unreleased)
HADOOP-8589. ViewFs tests fail when tests and home dirs are nested (sanjay Radia) HADOOP-8589. ViewFs tests fail when tests and home dirs are nested (sanjay Radia)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
Release 2.4.0 - UNRELEASED Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -345,6 +357,8 @@ Release 2.4.0 - UNRELEASED
HADOOP-10249. LdapGroupsMapping should trim ldap password read from file. HADOOP-10249. LdapGroupsMapping should trim ldap password read from file.
(Dilli Armugam via suresh) (Dilli Armugam via suresh)
HADOOP-10346. Deadlock while logging tokens (jlowe)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -105,15 +105,18 @@ public byte[] getIdentifier() {
return identifier; return identifier;
} }
private static synchronized Class<? extends TokenIdentifier> private static Class<? extends TokenIdentifier>
getClassForIdentifier(Text kind) { getClassForIdentifier(Text kind) {
Class<? extends TokenIdentifier> cls = null;
synchronized (Token.class) {
if (tokenKindMap == null) { if (tokenKindMap == null) {
tokenKindMap = Maps.newHashMap(); tokenKindMap = Maps.newHashMap();
for (TokenIdentifier id : ServiceLoader.load(TokenIdentifier.class)) { for (TokenIdentifier id : ServiceLoader.load(TokenIdentifier.class)) {
tokenKindMap.put(id.getKind(), id.getClass()); tokenKindMap.put(id.getKind(), id.getClass());
} }
} }
Class<? extends TokenIdentifier> cls = tokenKindMap.get(kind); cls = tokenKindMap.get(kind);
}
if (cls == null) { if (cls == null) {
LOG.warn("Cannot find class for token kind " + kind); LOG.warn("Cannot find class for token kind " + kind);
return null; return null;

View File

@ -259,6 +259,18 @@ Trunk (Unreleased)
HDFS-5794. Fix the inconsistency of layout version number of HDFS-5794. Fix the inconsistency of layout version number of
ADD_DATANODE_AND_STORAGE_UUIDS between trunk and branch-2. (jing9) ADD_DATANODE_AND_STORAGE_UUIDS between trunk and branch-2. (jing9)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
Release 2.4.0 - UNRELEASED Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -411,13 +423,25 @@ Release 2.4.0 - UNRELEASED
HDFS-5716. Allow WebHDFS to use pluggable authentication filter HDFS-5716. Allow WebHDFS to use pluggable authentication filter
(Haohui Mai via brandonli) (Haohui Mai via brandonli)
HDFS-5953. TestBlockReaderFactory fails in trunk. (Akira Ajisaka via wang) HDFS-5953. TestBlockReaderFactory fails if libhadoop.so has not been built.
(Akira Ajisaka via wang)
HDFS-5759. Web UI does not show up during the period of loading FSImage. HDFS-5759. Web UI does not show up during the period of loading FSImage.
(Haohui Mai via Arpit Agarwal) (Haohui Mai via Arpit Agarwal)
HDFS-5942. Fix javadoc in OfflineImageViewer. (Akira Ajisaka via cnauroth) HDFS-5942. Fix javadoc in OfflineImageViewer. (Akira Ajisaka via cnauroth)
HDFS-5780. TestRBWBlockInvalidation times out intemittently. (Mit Desai
via kihwal)
HDFS-5803. TestBalancer.testBalancer0 fails. (Chen He via kihwal)
HDFS-5893. HftpFileSystem.RangeHeaderUrlOpener uses the default
URLConnectionFactory which does not import SSL certificates. (Haohui Mai via
jing9)
HDFS-5961. OIV cannot load fsimages containing a symbolic link. (kihwal)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
@ -472,6 +496,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5847. Consolidate INodeReference into a separate section. (jing9) HDFS-5847. Consolidate INodeReference into a separate section. (jing9)
HDFS-5959. Fix typo at section name in FSImageFormatProtobuf.java.
(Akira Ajisaka via suresh)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -237,7 +237,7 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
inodeLoader.loadINodeSection(in); inodeLoader.loadINodeSection(in);
} }
break; break;
case INODE_REFRENCE: case INODE_REFERENCE:
snapshotLoader.loadINodeReferenceSection(in); snapshotLoader.loadINodeReferenceSection(in);
break; break;
case INODE_DIR: case INODE_DIR:
@ -551,7 +551,7 @@ public enum SectionName {
STRING_TABLE("STRING_TABLE"), STRING_TABLE("STRING_TABLE"),
EXTENDED_ACL("EXTENDED_ACL"), EXTENDED_ACL("EXTENDED_ACL"),
INODE("INODE"), INODE("INODE"),
INODE_REFRENCE("INODE_REFRENCE"), INODE_REFERENCE("INODE_REFERENCE"),
SNAPSHOT("SNAPSHOT"), SNAPSHOT("SNAPSHOT"),
INODE_DIR("INODE_DIR"), INODE_DIR("INODE_DIR"),
FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"), FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"),

View File

@ -27,7 +27,6 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -61,14 +60,9 @@ private URL createRedirectURL(String path, String encodedPath, HdfsFileStatus st
} else { } else {
hostname = host.getIpAddr(); hostname = host.getIpAddr();
} }
int port = host.getInfoPort();
if ("https".equals(scheme)) { int port = "https".equals(scheme) ? host.getInfoSecurePort() : host
final Integer portObject = (Integer) getServletContext().getAttribute( .getInfoPort();
DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY);
if (portObject != null) {
port = portObject;
}
}
String dtParam = ""; String dtParam = "";
if (dt != null) { if (dt != null) {

View File

@ -402,7 +402,7 @@ public void serializeINodeReferenceSection(OutputStream out)
INodeReferenceSection.INodeReference.Builder rb = buildINodeReference(ref); INodeReferenceSection.INodeReference.Builder rb = buildINodeReference(ref);
rb.build().writeDelimitedTo(out); rb.build().writeDelimitedTo(out);
} }
parent.commitSection(headers, SectionName.INODE_REFRENCE); parent.commitSection(headers, SectionName.INODE_REFERENCE);
} }
private INodeReferenceSection.INodeReference.Builder buildINodeReference( private INodeReferenceSection.INodeReference.Builder buildINodeReference(

View File

@ -737,6 +737,7 @@ private void processINode(DataInputStream in, ImageVisitor v,
processPermission(in, v); processPermission(in, v);
} else if (numBlocks == -2) { } else if (numBlocks == -2) {
v.visit(ImageElement.SYMLINK, Text.readString(in)); v.visit(ImageElement.SYMLINK, Text.readString(in));
processPermission(in, v);
} else if (numBlocks == -3) { // reference node } else if (numBlocks == -3) { // reference node
final boolean isWithName = in.readBoolean(); final boolean isWithName = in.readBoolean();
int snapshotId = in.readInt(); int snapshotId = in.readInt();

View File

@ -115,7 +115,7 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
case INODE: case INODE:
loadINodeSection(is); loadINodeSection(is);
break; break;
case INODE_REFRENCE: case INODE_REFERENCE:
loadINodeReferenceSection(is); loadINodeReferenceSection(is);
break; break;
case INODE_DIR: case INODE_DIR:

View File

@ -113,7 +113,7 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
case INODE: case INODE:
dumpINodeSection(is); dumpINodeSection(is);
break; break;
case INODE_REFRENCE: case INODE_REFERENCE:
dumpINodeReferenceSection(is); dumpINodeReferenceSection(is);
break; break;
case INODE_DIR: case INODE_DIR:

View File

@ -344,14 +344,15 @@ protected String addDelegationTokenParam(String query) throws IOException {
} }
static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener { static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener {
URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY; private final URLConnectionFactory connFactory;
RangeHeaderUrlOpener(final URL url) { RangeHeaderUrlOpener(URLConnectionFactory connFactory, final URL url) {
super(url); super(url);
this.connFactory = connFactory;
} }
protected HttpURLConnection openConnection() throws IOException { protected HttpURLConnection openConnection() throws IOException {
return (HttpURLConnection)connectionFactory.openConnection(url); return (HttpURLConnection)connFactory.openConnection(url);
} }
/** Use HTTP Range header for specifying offset. */ /** Use HTTP Range header for specifying offset. */
@ -381,8 +382,9 @@ static class RangeHeaderInputStream extends ByteRangeInputStream {
super(o, r); super(o, r);
} }
RangeHeaderInputStream(final URL url) { RangeHeaderInputStream(URLConnectionFactory connFactory, final URL url) {
this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null)); this(new RangeHeaderUrlOpener(connFactory, url),
new RangeHeaderUrlOpener(connFactory, null));
} }
@Override @Override
@ -397,7 +399,7 @@ public FSDataInputStream open(Path f, int buffersize) throws IOException {
String path = "/data" + ServletUtil.encodePath(f.toUri().getPath()); String path = "/data" + ServletUtil.encodePath(f.toUri().getPath());
String query = addDelegationTokenParam("ugi=" + getEncodedUgiParameter()); String query = addDelegationTokenParam("ugi=" + getEncodedUgiParameter());
URL u = getNamenodeURL(path, query); URL u = getNamenodeURL(path, query);
return new FSDataInputStream(new RangeHeaderInputStream(u)); return new FSDataInputStream(new RangeHeaderInputStream(connectionFactory, u));
} }
@Override @Override

View File

@ -74,7 +74,7 @@ public class TestBalancer {
ClientProtocol client; ClientProtocol client;
static final long TIMEOUT = 20000L; //msec static final long TIMEOUT = 40000L; //msec
static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5% static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5%
static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta
static final int DEFAULT_BLOCK_SIZE = 10; static final int DEFAULT_BLOCK_SIZE = 10;

View File

@ -66,7 +66,7 @@ private static NumberReplicas countReplicas(final FSNamesystem namesystem,
* datanode, namenode should ask to invalidate that corrupted block and * datanode, namenode should ask to invalidate that corrupted block and
* schedule replication for one more replica for that under replicated block. * schedule replication for one more replica for that under replicated block.
*/ */
@Test(timeout=60000) @Test(timeout=600000)
public void testBlockInvalidationWhenRBWReplicaMissedInDN() public void testBlockInvalidationWhenRBWReplicaMissedInDN()
throws IOException, InterruptedException { throws IOException, InterruptedException {
// This test cannot pass on Windows due to file locking enforcement. It will // This test cannot pass on Windows due to file locking enforcement. It will
@ -75,7 +75,7 @@ public void testBlockInvalidationWhenRBWReplicaMissedInDN()
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2); conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 100); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 300);
conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
@ -105,22 +105,23 @@ public void testBlockInvalidationWhenRBWReplicaMissedInDN()
out.close(); out.close();
// Check datanode has reported the corrupt block. int liveReplicas = 0;
int corruptReplicas = 0;
while (true) { while (true) {
if ((corruptReplicas = countReplicas(namesystem, blk).corruptReplicas()) > 0) { if ((liveReplicas = countReplicas(namesystem, blk).liveReplicas()) < 2) {
// This confirms we have a corrupt replica
LOG.info("Live Replicas after corruption: " + liveReplicas);
break; break;
} }
Thread.sleep(100); Thread.sleep(100);
} }
assertEquals("There should be 1 replica in the corruptReplicasMap", 1, assertEquals("There should be less than 2 replicas in the "
corruptReplicas); + "liveReplicasMap", 1, liveReplicas);
// Check the block has got replicated to another datanode.
blk = DFSTestUtil.getFirstBlock(fs, testPath);
int liveReplicas = 0;
while (true) { while (true) {
if ((liveReplicas = countReplicas(namesystem, blk).liveReplicas()) > 1) { if ((liveReplicas =
countReplicas(namesystem, blk).liveReplicas()) > 1) {
//Wait till the live replica count becomes equal to Replication Factor
LOG.info("Live Replicas after Rereplication: " + liveReplicas);
break; break;
} }
Thread.sleep(100); Thread.sleep(100);
@ -128,9 +129,9 @@ public void testBlockInvalidationWhenRBWReplicaMissedInDN()
assertEquals("There should be two live replicas", 2, assertEquals("There should be two live replicas", 2,
liveReplicas); liveReplicas);
// sleep for 1 second, so that by this time datanode reports the corrupt // sleep for 2 seconds, so that by this time datanode reports the corrupt
// block after a live replica of block got replicated. // block after a live replica of block got replicated.
Thread.sleep(1000); Thread.sleep(2000);
// Check that there is no corrupt block in the corruptReplicasMap. // Check that there is no corrupt block in the corruptReplicasMap.
assertEquals("There should not be any replica in the corruptReplicasMap", assertEquals("There should not be any replica in the corruptReplicasMap",

View File

@ -97,12 +97,13 @@ public String getHeaderField(String field) {
@Test @Test
public void testByteRange() throws IOException { public void testByteRange() throws IOException {
URLConnectionFactory factory = mock(URLConnectionFactory.class);
HftpFileSystem.RangeHeaderUrlOpener ospy = spy( HftpFileSystem.RangeHeaderUrlOpener ospy = spy(
new HftpFileSystem.RangeHeaderUrlOpener(new URL("http://test/"))); new HftpFileSystem.RangeHeaderUrlOpener(factory, new URL("http://test/")));
doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy) doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
.openConnection(); .openConnection();
HftpFileSystem.RangeHeaderUrlOpener rspy = spy( HftpFileSystem.RangeHeaderUrlOpener rspy = spy(
new HftpFileSystem.RangeHeaderUrlOpener((URL) null)); new HftpFileSystem.RangeHeaderUrlOpener(factory, (URL) null));
doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy) doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
.openConnection(); .openConnection();
ByteRangeInputStream is = new HftpFileSystem.RangeHeaderInputStream(ospy, rspy); ByteRangeInputStream is = new HftpFileSystem.RangeHeaderInputStream(ospy, rspy);
@ -171,12 +172,15 @@ public void testByteRange() throws IOException {
assertEquals("Should fail because incorrect response code was sent", assertEquals("Should fail because incorrect response code was sent",
"HTTP_OK expected, received 206", e.getMessage()); "HTTP_OK expected, received 206", e.getMessage());
} }
is.close();
} }
@Test @Test
public void testPropagatedClose() throws IOException { public void testPropagatedClose() throws IOException {
ByteRangeInputStream brs = spy( URLConnectionFactory factory = mock(URLConnectionFactory.class);
new HftpFileSystem.RangeHeaderInputStream(new URL("http://test/")));
ByteRangeInputStream brs = spy(new HftpFileSystem.RangeHeaderInputStream(
factory, new URL("http://test/")));
InputStream mockStream = mock(InputStream.class); InputStream mockStream = mock(InputStream.class);
doReturn(mockStream).when(brs).openInputStream(); doReturn(mockStream).when(brs).openInputStream();

View File

@ -19,6 +19,7 @@
import java.io.File; import java.io.File;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
@ -30,6 +31,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -65,9 +67,11 @@ public static void setUp() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive(); cluster.waitActive();
cluster.getFileSystem().create(new Path("/test")).close(); OutputStream os = cluster.getFileSystem().create(new Path("/test"));
os.write(23);
os.close();
InetSocketAddress addr = cluster.getNameNode().getHttpsAddress(); InetSocketAddress addr = cluster.getNameNode().getHttpsAddress();
nnAddr = addr.getHostName() + ":" + addr.getPort(); nnAddr = NetUtils.getHostPortString(addr);
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, nnAddr); conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, nnAddr);
} }
@ -82,6 +86,9 @@ public static void tearDown() throws Exception {
public void testHsftpFileSystem() throws Exception { public void testHsftpFileSystem() throws Exception {
FileSystem fs = FileSystem.get(new URI("hsftp://" + nnAddr), conf); FileSystem fs = FileSystem.get(new URI("hsftp://" + nnAddr), conf);
Assert.assertTrue(fs.exists(new Path("/test"))); Assert.assertTrue(fs.exists(new Path("/test")));
InputStream is = fs.open(new Path("/test"));
Assert.assertEquals(23, is.read());
is.close();
fs.close(); fs.close();
} }

View File

@ -139,6 +139,18 @@ Trunk (Unreleased)
MAPREDUCE-5717. Task pings are interpreted as task progress (jlowe) MAPREDUCE-5717. Task pings are interpreted as task progress (jlowe)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
Release 2.4.0 - UNRELEASED Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.hs; package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -169,8 +170,21 @@ protected HistoryServerStateStoreService createStateStore(
} }
protected void doSecureLogin(Configuration conf) throws IOException { protected void doSecureLogin(Configuration conf) throws IOException {
InetSocketAddress socAddr = getBindAddress(conf);
SecurityUtil.login(conf, JHAdminConfig.MR_HISTORY_KEYTAB, SecurityUtil.login(conf, JHAdminConfig.MR_HISTORY_KEYTAB,
JHAdminConfig.MR_HISTORY_PRINCIPAL); JHAdminConfig.MR_HISTORY_PRINCIPAL, socAddr.getHostName());
}
/**
* Retrieve JHS bind address from configuration
*
* @param conf
* @return InetSocketAddress
*/
public static InetSocketAddress getBindAddress(Configuration conf) {
return conf.getSocketAddr(JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_PORT);
} }
@Override @Override

View File

@ -15,6 +15,18 @@ Trunk - Unreleased
YARN-524 TestYarnVersionInfo failing if generated properties doesn't YARN-524 TestYarnVersionInfo failing if generated properties doesn't
include an SVN URL. (stevel) include an SVN URL. (stevel)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
Release 2.4.0 - UNRELEASED Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -277,6 +289,20 @@ Release 2.4.0 - UNRELEASED
instead rely on the http policy framework. And also fix some bugs related instead rely on the http policy framework. And also fix some bugs related
to https handling in YARN web-apps. (Haohui Mai via vinodkv) to https handling in YARN web-apps. (Haohui Mai via vinodkv)
YARN-1721. When moving app between queues in Fair Scheduler, grab lock on
FSSchedulerApp (Sandy Ryza)
YARN-1724. Race condition in Fair Scheduler when continuous scheduling is
turned on (Sandy Ryza)
YARN-1590. Fixed ResourceManager, web-app proxy and MR JobHistoryServer to
expand _HOST properly in their kerberos principles. (Mohammad Kamrul Islam
va vinodkv)
YARN-1428. Fixed RM to write the final state of RMApp/RMAppAttempt to the
application history store in the transition to the final state. (Contributed
by Zhijie Shen)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -861,6 +861,9 @@ public class YarnConfiguration extends Configuration {
/** The address for the web proxy.*/ /** The address for the web proxy.*/
public static final String PROXY_ADDRESS = public static final String PROXY_ADDRESS =
PROXY_PREFIX + "address"; PROXY_PREFIX + "address";
public static final int DEFAULT_PROXY_PORT = 9099;
public static final String DEFAULT_PROXY_ADDRESS =
"0.0.0.0:" + DEFAULT_RM_PORT;
/** /**
* YARN Service Level Authorization * YARN Service Level Authorization

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -894,8 +895,9 @@ protected void serviceStart() throws Exception {
} }
protected void doSecureLogin() throws IOException { protected void doSecureLogin() throws IOException {
InetSocketAddress socAddr = getBindAddress(conf);
SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB, SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB,
YarnConfiguration.RM_PRINCIPAL); YarnConfiguration.RM_PRINCIPAL, socAddr.getHostName());
} }
@Override @Override
@ -1042,4 +1044,17 @@ private void resetDispatcher() {
addIfService(rmDispatcher); addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher); rmContext.setDispatcher(rmDispatcher);
} }
/**
* Retrieve RM bind address from configuration
*
* @param conf
* @return InetSocketAddress
*/
public static InetSocketAddress getBindAddress(Configuration conf) {
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
}
} }

View File

@ -43,9 +43,12 @@
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/** /**
@ -219,12 +222,13 @@ public void applicationStarted(RMApp app) {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void applicationFinished(RMApp app) { public void applicationFinished(RMApp app, RMAppState finalState) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new WritingApplicationFinishEvent(app.getApplicationId(), new WritingApplicationFinishEvent(app.getApplicationId(),
ApplicationFinishData.newInstance(app.getApplicationId(), ApplicationFinishData.newInstance(app.getApplicationId(),
app.getFinishTime(), app.getDiagnostics().toString(), app.getFinishTime(), app.getDiagnostics().toString(),
app.getFinalApplicationStatus(), app.createApplicationState()))); app.getFinalApplicationStatus(),
RMServerUtils.createApplicationState(finalState))));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -239,15 +243,16 @@ public void applicationAttemptStarted(RMAppAttempt appAttempt) {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void applicationAttemptFinished(RMAppAttempt appAttempt) { public void applicationAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState finalState) {
if (historyServiceEnabled) { if (historyServiceEnabled) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new WritingApplicationAttemptFinishEvent(appAttempt.getAppAttemptId(), new WritingApplicationAttemptFinishEvent(appAttempt.getAppAttemptId(),
ApplicationAttemptFinishData.newInstance( ApplicationAttemptFinishData.newInstance(
appAttempt.getAppAttemptId(), appAttempt.getDiagnostics() appAttempt.getAppAttemptId(), appAttempt.getDiagnostics()
.toString(), appAttempt.getTrackingUrl(), appAttempt .toString(), appAttempt.getTrackingUrl(), appAttempt
.getFinalApplicationStatus(), appAttempt .getFinalApplicationStatus(),
.createApplicationAttemptState()))); RMServerUtils.createApplicationAttemptState(finalState))));
} }
} }

View File

@ -112,9 +112,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private long storedFinishTime = 0; private long storedFinishTime = 0;
private RMAppAttempt currentAttempt; private RMAppAttempt currentAttempt;
private String queue; private String queue;
@SuppressWarnings("rawtypes")
private EventHandler handler; private EventHandler handler;
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
private static final AppFinishedTransition FINISHED_TRANSITION = private static final AppFinishedTransition FINISHED_TRANSITION =
new AppFinishedTransition(); new AppFinishedTransition();
@ -705,7 +703,6 @@ public void transition(RMAppImpl app, RMAppEvent event) {
* either as an exception for failure or null for success, or the client will * either as an exception for failure or null for success, or the client will
* be left waiting forever. * be left waiting forever.
*/ */
@SuppressWarnings("unchecked")
private static final class RMAppMoveTransition extends RMAppTransition { private static final class RMAppMoveTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
RMAppMoveEvent moveEvent = (RMAppMoveEvent) event; RMAppMoveEvent moveEvent = (RMAppMoveEvent) event;
@ -723,7 +720,6 @@ public void transition(RMAppImpl app, RMAppEvent event) {
} }
} }
@SuppressWarnings("unchecked")
private static final class RMAppRecoveredTransition implements private static final class RMAppRecoveredTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> { MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
@ -742,7 +738,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
// The app has completed. // The app has completed.
if (app.recoveredFinalState != null) { if (app.recoveredFinalState != null) {
FINAL_TRANSITION.transition(app, event); new FinalTransition(app.recoveredFinalState).transition(app, event);
return app.recoveredFinalState; return app.recoveredFinalState;
} }
@ -824,7 +820,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
LOG.info(msg); LOG.info(msg);
app.diagnostics.append(msg); app.diagnostics.append(msg);
// Inform the node for app-finish // Inform the node for app-finish
FINAL_TRANSITION.transition(app, event); new FinalTransition(RMAppState.FAILED).transition(app, event);
} }
} }
@ -937,6 +933,10 @@ public void transition(RMAppImpl app, RMAppEvent event) {
} }
private static class AppFinishedTransition extends FinalTransition { private static class AppFinishedTransition extends FinalTransition {
public AppFinishedTransition() {
super(RMAppState.FINISHED);
}
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
RMAppFinishedAttemptEvent finishedEvent = RMAppFinishedAttemptEvent finishedEvent =
(RMAppFinishedAttemptEvent)event; (RMAppFinishedAttemptEvent)event;
@ -980,6 +980,10 @@ public void transition(RMAppImpl app, RMAppEvent event) {
private static class AppKilledTransition extends FinalTransition { private static class AppKilledTransition extends FinalTransition {
public AppKilledTransition() {
super(RMAppState.KILLED);
}
@Override @Override
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
app.diagnostics.append(getAppKilledDiagnostics()); app.diagnostics.append(getAppKilledDiagnostics());
@ -1002,6 +1006,10 @@ public void transition(RMAppImpl app, RMAppEvent event) {
private static final class AppRejectedTransition extends private static final class AppRejectedTransition extends
FinalTransition{ FinalTransition{
public AppRejectedTransition() {
super(RMAppState.FAILED);
}
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent)event; RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent)event;
app.diagnostics.append(rejectedEvent.getMessage()); app.diagnostics.append(rejectedEvent.getMessage());
@ -1011,6 +1019,12 @@ public void transition(RMAppImpl app, RMAppEvent event) {
private static class FinalTransition extends RMAppTransition { private static class FinalTransition extends RMAppTransition {
private final RMAppState finalState;
public FinalTransition(RMAppState finalState) {
this.finalState = finalState;
}
private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) { private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) {
Set<NodeId> nodes = new HashSet<NodeId>(); Set<NodeId> nodes = new HashSet<NodeId>();
for (RMAppAttempt attempt : app.attempts.values()) { for (RMAppAttempt attempt : app.attempts.values()) {
@ -1035,10 +1049,8 @@ public void transition(RMAppImpl app, RMAppEvent event) {
new RMAppManagerEvent(app.applicationId, new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED)); RMAppManagerEventType.APP_COMPLETED));
// TODO: We need to fix for the problem that RMApp enters the final state
// after RMAppAttempt in the killing case
app.rmContext.getRMApplicationHistoryWriter() app.rmContext.getRMApplicationHistoryWriter()
.applicationFinished(app); .applicationFinished(app, finalState);
}; };
} }

View File

@ -1053,7 +1053,7 @@ public void transition(RMAppAttemptImpl appAttempt,
appAttempt.removeCredentials(appAttempt); appAttempt.removeCredentials(appAttempt);
appAttempt.rmContext.getRMApplicationHistoryWriter() appAttempt.rmContext.getRMApplicationHistoryWriter()
.applicationAttemptFinished(appAttempt); .applicationAttemptFinished(appAttempt, finalAttemptState);
} }
} }

View File

@ -989,7 +989,13 @@ private synchronized void nodeUpdate(RMNode nm) {
private void continuousScheduling() { private void continuousScheduling() {
while (true) { while (true) {
List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet()); List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
// Sort the nodes by space available on them, so that we offer
// containers on emptier nodes first, facilitating an even spread. This
// requires holding the scheduler lock, so that the space available on a
// node doesn't change during the sort.
synchronized (this) {
Collections.sort(nodeIdList, nodeAvailableResourceComparator); Collections.sort(nodeIdList, nodeAvailableResourceComparator);
}
// iterate all nodes // iterate all nodes
for (NodeId nodeId : nodeIdList) { for (NodeId nodeId : nodeIdList) {
@ -1366,7 +1372,8 @@ public synchronized String moveApplication(ApplicationId appId,
throw new YarnException("App to be moved " + appId + " not found."); throw new YarnException("App to be moved " + appId + " not found.");
} }
FSSchedulerApp attempt = (FSSchedulerApp) app.getCurrentAppAttempt(); FSSchedulerApp attempt = (FSSchedulerApp) app.getCurrentAppAttempt();
// To serialize with FairScheduler#allocate, synchronize on app attempt
synchronized (attempt) {
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
FSLeafQueue targetQueue = queueMgr.getLeafQueue(queueName, false); FSLeafQueue targetQueue = queueMgr.getLeafQueue(queueName, false);
if (targetQueue == null) { if (targetQueue == null) {
@ -1385,6 +1392,7 @@ public synchronized String moveApplication(ApplicationId appId,
executeMove(app, attempt, oldQueue, targetQueue); executeMove(app, attempt, oldQueue, targetQueue);
return targetQueue.getQueueName(); return targetQueue.getQueueName();
} }
}
private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app, private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app,
FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException { FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException {
@ -1420,8 +1428,8 @@ private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app,
} }
/** /**
* Helper for moveApplication, which is synchronized, so all operations will * Helper for moveApplication, which has appropriate synchronization, so all
* be atomic. * operations will be atomic.
*/ */
private void executeMove(SchedulerApplication app, FSSchedulerApp attempt, private void executeMove(SchedulerApplication app, FSSchedulerApp attempt,
FSLeafQueue oldQueue, FSLeafQueue newQueue) { FSLeafQueue oldQueue, FSLeafQueue newQueue) {

View File

@ -137,8 +137,6 @@ private static RMApp createRMApp(ApplicationId appId) {
new StringBuilder("test diagnostics info")); new StringBuilder("test diagnostics info"));
when(app.getFinalApplicationStatus()).thenReturn( when(app.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED); FinalApplicationStatus.UNDEFINED);
when(app.createApplicationState())
.thenReturn(YarnApplicationState.FINISHED);
return app; return app;
} }
@ -156,8 +154,6 @@ private static RMAppAttempt createRMAppAttempt(
when(appAttempt.getTrackingUrl()).thenReturn("test url"); when(appAttempt.getTrackingUrl()).thenReturn("test url");
when(appAttempt.getFinalApplicationStatus()).thenReturn( when(appAttempt.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED); FinalApplicationStatus.UNDEFINED);
when(appAttempt.createApplicationAttemptState()).thenReturn(
YarnApplicationAttemptState.FINISHED);
return appAttempt; return appAttempt;
} }
@ -200,7 +196,7 @@ public void testWriteApplication() throws Exception {
Assert.assertEquals(0L, appHD.getSubmitTime()); Assert.assertEquals(0L, appHD.getSubmitTime());
Assert.assertEquals(1L, appHD.getStartTime()); Assert.assertEquals(1L, appHD.getStartTime());
writer.applicationFinished(app); writer.applicationFinished(app, RMAppState.FINISHED);
for (int i = 0; i < MAX_RETRIES; ++i) { for (int i = 0; i < MAX_RETRIES; ++i) {
appHD = store.getApplication(ApplicationId.newInstance(0, 1)); appHD = store.getApplication(ApplicationId.newInstance(0, 1));
if (appHD.getYarnApplicationState() != null) { if (appHD.getYarnApplicationState() != null) {
@ -241,7 +237,7 @@ public void testWriteApplicationAttempt() throws Exception {
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1), 1), ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1), 1),
appAttemptHD.getMasterContainerId()); appAttemptHD.getMasterContainerId());
writer.applicationAttemptFinished(appAttempt); writer.applicationAttemptFinished(appAttempt, RMAppAttemptState.FINISHED);
for (int i = 0; i < MAX_RETRIES; ++i) { for (int i = 0; i < MAX_RETRIES; ++i) {
appAttemptHD = appAttemptHD =
store.getApplicationAttempt(ApplicationAttemptId.newInstance( store.getApplicationAttempt(ApplicationAttemptId.newInstance(
@ -326,9 +322,10 @@ public void testParallelWrite() throws Exception {
writer.containerStarted(container); writer.containerStarted(container);
writer.containerFinished(container); writer.containerFinished(container);
} }
writer.applicationAttemptFinished(appAttempt); writer.applicationAttemptFinished(
appAttempt, RMAppAttemptState.FINISHED);
} }
writer.applicationFinished(app); writer.applicationFinished(app, RMAppState.FINISHED);
} }
for (int i = 0; i < MAX_RETRIES; ++i) { for (int i = 0; i < MAX_RETRIES; ++i) {
if (allEventsHandled(20 * 10 * 10 + 20 * 10 + 20)) { if (allEventsHandled(20 * 10 * 10 + 20 * 10 + 20)) {
@ -369,7 +366,7 @@ public void applicationStarted(RMApp app) {
} }
@Override @Override
public void applicationFinished(RMApp app) { public void applicationFinished(RMApp app, RMAppState finalState) {
} }
@Override @Override
@ -377,7 +374,8 @@ public void applicationAttemptStarted(RMAppAttempt appAttempt) {
} }
@Override @Override
public void applicationAttemptFinished(RMAppAttempt appAttempt) { public void applicationAttemptFinished(
RMAppAttempt appAttempt, RMAppAttemptState finalState) {
} }
@Override @Override

View File

@ -57,7 +57,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@ -71,6 +70,7 @@
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
@ -308,16 +308,6 @@ private void assertKilled(RMApp application) {
"Application killed by user.", diag.toString()); "Application killed by user.", diag.toString());
} }
private void assertAppAndAttemptKilled(RMApp application)
throws InterruptedException {
sendAttemptUpdateSavedEvent(application);
sendAppUpdateSavedEvent(application);
assertKilled(application);
Assert.assertEquals(RMAppAttemptState.KILLED, application
.getCurrentAppAttempt().getAppAttemptState());
assertAppFinalStateSaved(application);
}
private void assertFailed(RMApp application, String regex) { private void assertFailed(RMApp application, String regex) {
assertTimesAtFinish(application); assertTimesAtFinish(application);
assertAppState(RMAppState.FAILED, application); assertAppState(RMAppState.FAILED, application);
@ -511,7 +501,7 @@ public void testAppNewKill() throws IOException {
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
assertKilled(application); assertKilled(application);
assertAppFinalStateNotSaved(application); assertAppFinalStateNotSaved(application);
verify(writer).applicationFinished(any(RMApp.class)); verifyApplicationFinished(RMAppState.KILLED);
} }
@Test @Test
@ -528,7 +518,7 @@ public void testAppNewReject() throws IOException {
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
assertFailed(application, rejectedText); assertFailed(application, rejectedText);
assertAppFinalStateNotSaved(application); assertAppFinalStateNotSaved(application);
verify(writer).applicationFinished(any(RMApp.class)); verifyApplicationFinished(RMAppState.FAILED);
} }
@Test (timeout = 30000) @Test (timeout = 30000)
@ -543,7 +533,7 @@ public void testAppNewSavingKill() throws IOException {
rmDispatcher.await(); rmDispatcher.await();
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
assertKilled(application); assertKilled(application);
verify(writer).applicationFinished(any(RMApp.class)); verifyApplicationFinished(RMAppState.KILLED);
} }
@Test (timeout = 30000) @Test (timeout = 30000)
@ -560,7 +550,7 @@ public void testAppNewSavingReject() throws IOException {
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
assertFailed(application, rejectedText); assertFailed(application, rejectedText);
assertAppFinalStateSaved(application); assertAppFinalStateSaved(application);
verify(writer).applicationFinished(any(RMApp.class)); verifyApplicationFinished(RMAppState.FAILED);
} }
@Test (timeout = 30000) @Test (timeout = 30000)
@ -577,7 +567,7 @@ public void testAppSubmittedRejected() throws IOException {
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
assertFailed(application, rejectedText); assertFailed(application, rejectedText);
assertAppFinalStateSaved(application); assertAppFinalStateSaved(application);
verify(writer).applicationFinished(any(RMApp.class)); verifyApplicationFinished(RMAppState.FAILED);
} }
@Test @Test
@ -592,7 +582,7 @@ public void testAppSubmittedKill() throws IOException, InterruptedException {
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
assertKilled(application); assertKilled(application);
assertAppFinalStateSaved(application); assertAppFinalStateSaved(application);
verify(writer).applicationFinished(any(RMApp.class)); verifyApplicationFinished(RMAppState.KILLED);
} }
@Test @Test
@ -627,7 +617,7 @@ public void testAppAcceptedFailed() throws IOException {
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
assertFailed(application, ".*" + message + ".*Failing the application.*"); assertFailed(application, ".*" + message + ".*Failing the application.*");
assertAppFinalStateSaved(application); assertAppFinalStateSaved(application);
verify(writer).applicationFinished(any(RMApp.class)); verifyApplicationFinished(RMAppState.FAILED);
} }
@Test @Test
@ -649,7 +639,7 @@ public void testAppAcceptedKill() throws IOException, InterruptedException {
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
assertKilled(application); assertKilled(application);
assertAppFinalStateSaved(application); assertAppFinalStateSaved(application);
verify(writer).applicationFinished(any(RMApp.class)); verifyApplicationFinished(RMAppState.KILLED);
} }
@Test @Test
@ -672,7 +662,7 @@ public void testAppRunningKill() throws IOException {
sendAttemptUpdateSavedEvent(application); sendAttemptUpdateSavedEvent(application);
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
assertKilled(application); assertKilled(application);
verify(writer).applicationFinished(any(RMApp.class)); verifyApplicationFinished(RMAppState.KILLED);
} }
@Test @Test
@ -727,7 +717,7 @@ public void testAppRunningFailed() throws IOException {
rmDispatcher.await(); rmDispatcher.await();
assertFailed(application, ".*Failing the application.*"); assertFailed(application, ".*Failing the application.*");
assertAppFinalStateSaved(application); assertAppFinalStateSaved(application);
verify(writer).applicationFinished(any(RMApp.class)); verifyApplicationFinished(RMAppState.FAILED);
} }
@Test @Test
@ -785,7 +775,7 @@ public void testAppFinishedFinished() throws IOException {
StringBuilder diag = application.getDiagnostics(); StringBuilder diag = application.getDiagnostics();
Assert.assertEquals("application diagnostics is not correct", Assert.assertEquals("application diagnostics is not correct",
"", diag.toString()); "", diag.toString());
verify(writer).applicationFinished(any(RMApp.class)); verifyApplicationFinished(RMAppState.FINISHED);
} }
@Test (timeout = 30000) @Test (timeout = 30000)
@ -810,10 +800,10 @@ public void testAppFailedFailed() throws IOException {
rmDispatcher.await(); rmDispatcher.await();
assertTimesAtFinish(application); assertTimesAtFinish(application);
assertAppState(RMAppState.FAILED, application); assertAppState(RMAppState.FAILED, application);
verifyApplicationFinished(RMAppState.FAILED);
assertTimesAtFinish(application); assertTimesAtFinish(application);
assertAppState(RMAppState.FAILED, application); assertAppState(RMAppState.FAILED, application);
verify(writer).applicationFinished(any(RMApp.class));
} }
@Test (timeout = 30000) @Test (timeout = 30000)
@ -856,10 +846,10 @@ public void testAppKilledKilled() throws IOException {
rmDispatcher.await(); rmDispatcher.await();
assertTimesAtFinish(application); assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application); assertAppState(RMAppState.KILLED, application);
verifyApplicationFinished(RMAppState.KILLED);
assertTimesAtFinish(application); assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application); assertAppState(RMAppState.KILLED, application);
verify(writer).applicationFinished(any(RMApp.class));
} }
@Test @Test
@ -871,4 +861,11 @@ public void testGetAppReport() {
report = app.createAndGetApplicationReport("clientuser", true); report = app.createAndGetApplicationReport("clientuser", true);
Assert.assertNotNull(report.getApplicationResourceUsageReport()); Assert.assertNotNull(report.getApplicationResourceUsageReport());
} }
private void verifyApplicationFinished(RMAppState state) {
ArgumentCaptor<RMAppState> finalState =
ArgumentCaptor.forClass(RMAppState.class);
verify(writer).applicationFinished(any(RMApp.class), finalState.capture());
Assert.assertEquals(state, finalState.getValue());
}
} }

View File

@ -103,6 +103,7 @@
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
public class TestRMAppAttemptTransitions { public class TestRMAppAttemptTransitions {
@ -367,6 +368,7 @@ private void testAppAttemptSubmittedToFailedState(String diagnostics) {
// verify(application).handle(anyObject()); // verify(application).handle(anyObject());
verify(application).handle(any(RMAppRejectedEvent.class)); verify(application).handle(any(RMAppRejectedEvent.class));
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
} }
/** /**
@ -384,9 +386,9 @@ private void testAppAttemptKilledState(Container amContainer,
assertEquals(0, applicationAttempt.getRanNodes().size()); assertEquals(0, applicationAttempt.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus()); assertNull(applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verify(writer).applicationAttemptFinished(any(RMAppAttempt.class));
verifyAttemptFinalStateSaved(); verifyAttemptFinalStateSaved();
assertFalse(transferStateFromPreviousAttempt); assertFalse(transferStateFromPreviousAttempt);
verifyApplicationAttemptFinished(RMAppAttemptState.KILLED);
} }
/** /**
@ -460,8 +462,8 @@ private void testAppAttemptFailedState(Container container,
// Check events // Check events
verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class)); verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class));
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verify(writer).applicationAttemptFinished(any(RMAppAttempt.class));
verifyAttemptFinalStateSaved(); verifyAttemptFinalStateSaved();
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
} }
/** /**
@ -496,7 +498,6 @@ private void testAppAttemptRunningState(Container container,
assertEquals(getProxyUrl(applicationAttempt), assertEquals(getProxyUrl(applicationAttempt),
applicationAttempt.getTrackingUrl()); applicationAttempt.getTrackingUrl());
} }
verify(writer).applicationAttemptStarted(any(RMAppAttempt.class));
// TODO - need to add more checks relevant to this state // TODO - need to add more checks relevant to this state
} }
@ -544,6 +545,7 @@ private void testAppAttemptFinishedState(Container container,
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
assertFalse(transferStateFromPreviousAttempt); assertFalse(transferStateFromPreviousAttempt);
verifyApplicationAttemptFinished(RMAppAttemptState.FINISHED);
} }
@ -806,7 +808,7 @@ public void testAMCrashAtAllocated() {
assertEquals(RMAppAttemptState.FAILED, assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verify(writer).applicationAttemptFinished(any(RMAppAttempt.class)); verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
} }
@Test @Test
@ -846,6 +848,7 @@ public void testRunningToFailed() {
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
verifyAMHostAndPortInvalidated(); verifyAMHostAndPortInvalidated();
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
} }
@Test @Test
@ -883,6 +886,7 @@ public void testRunningToKilled() {
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyAMHostAndPortInvalidated(); verifyAMHostAndPortInvalidated();
verifyApplicationAttemptFinished(RMAppAttemptState.KILLED);
} }
@Test(timeout=10000) @Test(timeout=10000)
@ -903,6 +907,7 @@ public void testLaunchedExpire() {
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
} }
@Test(timeout=20000) @Test(timeout=20000)
@ -925,6 +930,7 @@ public void testRunningExpire() {
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyAMHostAndPortInvalidated(); verifyAMHostAndPortInvalidated();
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
} }
@Test @Test
@ -1177,6 +1183,7 @@ public void testFailedToFailed() {
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
// should not kill containers when attempt fails. // should not kill containers when attempt fails.
assertTrue(transferStateFromPreviousAttempt); assertTrue(transferStateFromPreviousAttempt);
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
// failed attempt captured the container finished event. // failed attempt captured the container finished event.
assertEquals(0, applicationAttempt.getJustFinishedContainers().size()); assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
@ -1216,6 +1223,7 @@ scheduler, masterService, submissionContext, new Configuration(),
assertEquals(RMAppAttemptState.FAILED, assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
assertFalse(transferStateFromPreviousAttempt); assertFalse(transferStateFromPreviousAttempt);
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
} }
private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) { private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
@ -1245,4 +1253,13 @@ private void verifyAMHostAndPortInvalidated() {
assertEquals("N/A", applicationAttempt.getHost()); assertEquals("N/A", applicationAttempt.getHost());
assertEquals(-1, applicationAttempt.getRpcPort()); assertEquals(-1, applicationAttempt.getRpcPort());
} }
private void verifyApplicationAttemptFinished(RMAppAttemptState state) {
ArgumentCaptor<RMAppAttemptState> finalState =
ArgumentCaptor.forClass(RMAppAttemptState.class);
verify(writer).applicationAttemptFinished(
any(RMAppAttempt.class), finalState.capture());
Assert.assertEquals(state, finalState.getValue());
}
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.webproxy; package org.apache.hadoop.yarn.server.webproxy;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -69,8 +70,21 @@ protected void serviceInit(Configuration conf) throws Exception {
* @throws IOException on any error. * @throws IOException on any error.
*/ */
protected void doSecureLogin(Configuration conf) throws IOException { protected void doSecureLogin(Configuration conf) throws IOException {
InetSocketAddress socAddr = getBindAddress(conf);
SecurityUtil.login(conf, YarnConfiguration.PROXY_KEYTAB, SecurityUtil.login(conf, YarnConfiguration.PROXY_KEYTAB,
YarnConfiguration.PROXY_PRINCIPAL); YarnConfiguration.PROXY_PRINCIPAL, socAddr.getHostName());
}
/**
* Retrieve PROXY bind address from configuration
*
* @param conf
* @return InetSocketAddress
*/
public static InetSocketAddress getBindAddress(Configuration conf) {
return conf.getSocketAddr(YarnConfiguration.PROXY_ADDRESS,
YarnConfiguration.DEFAULT_PROXY_ADDRESS,
YarnConfiguration.DEFAULT_PROXY_PORT);
} }
public static void main(String[] args) { public static void main(String[] args) {