Merging r1525759 through r1526365 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1526366 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-09-26 03:41:09 +00:00
commit 77856140f4
63 changed files with 1062 additions and 255 deletions

View File

@ -342,9 +342,6 @@ Release 2.3.0 - UNRELEASED
HADOOP-9582. Non-existent file to "hadoop fs -conf" doesn't throw error
(Ashwin Shankar via jlowe)
HADOOP-9761. ViewFileSystem#rename fails when using DistributedFileSystem.
(Andrew Wang via Colin Patrick McCabe)
HADOOP-9817. FileSystem#globStatus and FileContext#globStatus need to work
with symlinks. (Colin Patrick McCabe via Andrew Wang)
@ -372,6 +369,9 @@ Release 2.3.0 - UNRELEASED
HADOOP-9791. Add a test case covering long paths for new FileUtil access
check methods (ivanmi)
HADOOP-9981. globStatus should minimize its listStatus and getFileStatus
calls. (Contributed by Colin Patrick McCabe)
Release 2.2.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -384,6 +384,24 @@ Release 2.2.0 - UNRELEASED
BUG FIXES
Release 2.1.2 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
HADOOP-9776. HarFileSystem.listStatus() returns invalid authority if port
number is empty. (Shanyu Zhao via ivanmi)
HADOOP-9761. ViewFileSystem#rename fails when using DistributedFileSystem.
(Andrew Wang via Colin Patrick McCabe)
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES

View File

@ -83,6 +83,15 @@ class Globber {
}
}
/**
* Convert a path component that contains backslash ecape sequences to a
* literal string. This is necessary when you want to explicitly refer to a
* path that contains globber metacharacters.
*/
private static String unescapePathComponent(String name) {
return name.replaceAll("\\\\(.)", "$1");
}
/**
* Translate an absolute path into a list of path components.
* We merge double slashes into a single slash here.
@ -166,37 +175,72 @@ class Globber {
new Path(scheme, authority, Path.SEPARATOR)));
}
for (String component : components) {
for (int componentIdx = 0; componentIdx < components.size();
componentIdx++) {
ArrayList<FileStatus> newCandidates =
new ArrayList<FileStatus>(candidates.size());
GlobFilter globFilter = new GlobFilter(component);
GlobFilter globFilter = new GlobFilter(components.get(componentIdx));
String component = unescapePathComponent(components.get(componentIdx));
if (globFilter.hasPattern()) {
sawWildcard = true;
}
if (candidates.isEmpty() && sawWildcard) {
// Optimization: if there are no more candidates left, stop examining
// the path components. We can only do this if we've already seen
// a wildcard component-- otherwise, we still need to visit all path
// components in case one of them is a wildcard.
break;
}
if ((componentIdx < components.size() - 1) &&
(!globFilter.hasPattern())) {
// Optimization: if this is not the terminal path component, and we
// are not matching against a glob, assume that it exists. If it
// doesn't exist, we'll find out later when resolving a later glob
// or the terminal path component.
for (FileStatus candidate : candidates) {
candidate.setPath(new Path(candidate.getPath(), component));
}
continue;
}
for (FileStatus candidate : candidates) {
FileStatus resolvedCandidate = candidate;
if (candidate.isSymlink()) {
// We have to resolve symlinks, because otherwise we don't know
// whether they are directories.
resolvedCandidate = getFileStatus(candidate.getPath());
}
if (resolvedCandidate == null ||
resolvedCandidate.isDirectory() == false) {
continue;
}
FileStatus[] children = listStatus(candidate.getPath());
for (FileStatus child : children) {
// Set the child path based on the parent path.
// This keeps the symlinks in our path.
child.setPath(new Path(candidate.getPath(),
child.getPath().getName()));
if (globFilter.accept(child.getPath())) {
newCandidates.add(child);
if (globFilter.hasPattern()) {
FileStatus[] children = listStatus(candidate.getPath());
if (children.length == 1) {
// If we get back only one result, this could be either a listing
// of a directory with one entry, or it could reflect the fact
// that what we listed resolved to a file.
//
// Unfortunately, we can't just compare the returned paths to
// figure this out. Consider the case where you have /a/b, where
// b is a symlink to "..". In that case, listing /a/b will give
// back "/a/b" again. If we just went by returned pathname, we'd
// incorrectly conclude that /a/b was a file and should not match
// /a/*/*. So we use getFileStatus of the path we just listed to
// disambiguate.
if (!getFileStatus(candidate.getPath()).isDirectory()) {
continue;
}
}
}
for (FileStatus child : children) {
// Set the child path based on the parent path.
child.setPath(new Path(candidate.getPath(),
child.getPath().getName()));
if (globFilter.accept(child.getPath())) {
newCandidates.add(child);
}
}
} else {
// When dealing with non-glob components, use getFileStatus
// instead of listStatus. This is an optimization, but it also
// is necessary for correctness in HDFS, since there are some
// special HDFS directories like .reserved and .snapshot that are
// not visible to listStatus, but which do exist. (See HADOOP-9877)
FileStatus childStatus = getFileStatus(
new Path(candidate.getPath(), component));
if (childStatus != null) {
newCandidates.add(childStatus);
}
}
}
candidates = newCandidates;
}

View File

@ -283,8 +283,9 @@ public class HarFileSystem extends FilterFileSystem {
private String getHarAuth(URI underLyingUri) {
String auth = underLyingUri.getScheme() + "-";
if (underLyingUri.getHost() != null) {
auth += underLyingUri.getHost() + ":";
auth += underLyingUri.getHost();
if (underLyingUri.getPort() != -1) {
auth += ":";
auth += underLyingUri.getPort();
}
}

View File

@ -1226,4 +1226,19 @@
</description>
</property>
<property>
<name>nfs3.server.port</name>
<value>2049</value>
<description>
Specify the port number used by Hadoop NFS.
</description>
</property>
<property>
<name>nfs3.mountd.port</name>
<value>4242</value>
<description>
Specify the port number used by Hadoop mount daemon.
</description>
</property>
</configuration>

View File

@ -311,7 +311,7 @@ Hadoop MapReduce Next Generation - Cluster Setup
| | | Only applicable if log-aggregation is enabled. |
*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.aux-services>>> | | |
| | mapreduce.shuffle | |
| | mapreduce_shuffle | |
| | | Shuffle service that needs to be set for Map Reduce applications. |
*-------------------------+-------------------------+------------------------+

View File

@ -140,7 +140,7 @@ Add the following configs to your <<<yarn-site.xml>>>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce.shuffle</value>
<value>mapreduce_shuffle</value>
<description>shuffle service that needs to be set for Map Reduce to run </description>
</property>
+---+

View File

@ -221,6 +221,17 @@ public class TestHarFileSystemBasics {
hfs.initialize(uri, new Configuration());
}
@Test
public void testPositiveListFilesNotEndInColon() throws Exception {
// re-initialize the har file system with host name
// make sure the qualified path name does not append ":" at the end of host name
final URI uri = new URI("har://file-localhost" + harPath.toString());
harFileSystem.initialize(uri, conf);
Path p1 = new Path("har://file-localhost" + harPath.toString());
Path p2 = harFileSystem.makeQualified(p1);
assertTrue(p2.toUri().toString().startsWith("har://file-localhost/"));
}
// ========== Negative:
@Test

View File

@ -69,4 +69,13 @@
<value>simple</value>
</property>
<property>
<name>nfs3.server.port</name>
<value>2079</value>
</property>
<property>
<name>nfs3.mountd.port</name>
<value>4272</value>
</property>
</configuration>

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.nfs.nfs3;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mount.MountdBase;
import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.RpcUtil;
@ -38,6 +39,7 @@ public abstract class Nfs3Base {
public static final Log LOG = LogFactory.getLog(Nfs3Base.class);
private final MountdBase mountd;
private final RpcProgram rpcProgram;
private final int nfsPort;
public MountdBase getMountBase() {
return mountd;
@ -47,9 +49,17 @@ public abstract class Nfs3Base {
return rpcProgram;
}
protected Nfs3Base(MountdBase mountd, RpcProgram program, Configuration conf) {
this.mountd = mountd;
this.rpcProgram = program;
this.nfsPort = conf.getInt("nfs3.server.port", Nfs3Constant.PORT);
LOG.info("NFS server port set to: "+nfsPort);
}
protected Nfs3Base(MountdBase mountd, RpcProgram program) {
this.mountd = mountd;
this.rpcProgram = program;
this.nfsPort = Nfs3Constant.PORT;
}
public void start(boolean register) {
@ -61,7 +71,7 @@ public abstract class Nfs3Base {
}
private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(Nfs3Constant.PORT,
SimpleTcpServer tcpServer = new SimpleTcpServer(nfsPort,
rpcProgram, 0) {
@Override
public ChannelPipelineFactory getPipelineFactory() {

View File

@ -76,7 +76,8 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
public RpcProgramMountd(List<String> exports, Configuration config)
throws IOException {
// Note that RPC cache is not enabled
super("mountd", "localhost", PORT, PROGRAM, VERSION_1, VERSION_3, 0);
super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT),
PROGRAM, VERSION_1, VERSION_3, 0);
this.hostsMatcher = NfsExports.getInstance(config);
this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());

View File

@ -42,7 +42,7 @@ public class Nfs3 extends Nfs3Base {
}
public Nfs3(List<String> exports, Configuration config) throws IOException {
super(new Mountd(exports, config), new RpcProgramNfs3(config));
super(new Mountd(exports, config), new RpcProgramNfs3(config), config);
}
public static void main(String[] args) throws IOException {

View File

@ -199,9 +199,9 @@ class OpenFileCtx {
try {
synchronized (this) {
// check if alive again
Preconditions.checkState(dumpFile.createNewFile(),
"The dump file should not exist: %s", dumpFilePath);
dumpOut = new FileOutputStream(dumpFile);
Preconditions.checkState(dumpFile.createNewFile(),
"The dump file should not exist: %s", dumpFilePath);
dumpOut = new FileOutputStream(dumpFile);
}
} catch (IOException e) {
LOG.error("Got failure when creating dump stream " + dumpFilePath, e);
@ -239,6 +239,10 @@ class OpenFileCtx {
&& nonSequentialWriteInMemory.get() > 0) {
OffsetRange key = it.next();
WriteCtx writeCtx = pendingWrites.get(key);
if (writeCtx == null) {
// This write was just deleted
continue;
}
try {
long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
if (dumpedDataSize > 0) {
@ -262,16 +266,30 @@ class OpenFileCtx {
@Override
public void run() {
while (activeState && enabledDump) {
if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
dump();
}
synchronized (OpenFileCtx.this) {
if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
try {
OpenFileCtx.this.wait();
} catch (InterruptedException e) {
try {
if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
dump();
}
synchronized (OpenFileCtx.this) {
if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
try {
OpenFileCtx.this.wait();
if (LOG.isDebugEnabled()) {
LOG.debug("Dumper woke up");
}
} catch (InterruptedException e) {
LOG.info("Dumper is interrupted, dumpFilePath= "
+ OpenFileCtx.this.dumpFilePath);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Dumper checking OpenFileCtx activeState: " + activeState
+ " enabledDump: " + enabledDump);
}
} catch (Throwable t) {
LOG.info("Dumper get Throwable: " + t + ". dumpFilePath: "
+ OpenFileCtx.this.dumpFilePath);
}
}
}

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>nfs3.server.port</name>
<value>2079</value>
</property>
<property>
<name>nfs3.mountd.port</name>
<value>4272</value>
</property>
</configuration>

View File

@ -323,10 +323,28 @@ Release 2.2.0 - UNRELEASED
OPTIMIZATIONS
BUG FIXES
Release 2.1.2 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
HDFS-5246. Make Hadoop nfs server port and mount daemon port
configurable. (Jinghui Wang via brandonli)
BUG FIXES
HDFS-5139. Remove redundant -R option from setrep.
HDFS-5251. Race between the initialization of NameNode and the http
server. (Haohui Mai via suresh)
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES
@ -494,6 +512,8 @@ Release 2.1.1-beta - 2013-09-23
HDFS-5231. Fix broken links in the document of HDFS Federation. (Haohui Mai
via jing9)
HDFS-5249. Fix dumper thread which may die silently. (brandonli)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES
@ -947,6 +967,9 @@ Release 2.1.0-beta - 2013-08-22
HDFS-5016. Deadlock in pipeline recovery causes Datanode to be marked dead.
(suresh)
HDFS-5228. The RemoteIterator returned by DistributedFileSystem.listFiles
may throw NullPointerException. (szetszwo and cnauroth via szetszwo)
BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.

View File

@ -713,6 +713,7 @@ public class DistributedFileSystem extends FileSystem {
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
final PathFilter filter)
throws IOException {
final Path absF = fixRelativePart(p);
return new RemoteIterator<LocatedFileStatus>() {
private DirectoryListing thisListing;
private int i;
@ -722,7 +723,7 @@ public class DistributedFileSystem extends FileSystem {
{ // initializer
// Fully resolve symlinks in path first to avoid additional resolution
// round-trips as we fetch more batches of listings
src = getPathName(resolvePath(p));
src = getPathName(resolvePath(absF));
// fetch the first batch of entries in the directory
thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, true);
statistics.incrementReadOps(1);
@ -736,7 +737,7 @@ public class DistributedFileSystem extends FileSystem {
while (curStat == null && hasNextNoFilter()) {
LocatedFileStatus next =
((HdfsLocatedFileStatus)thisListing.getPartialListing()[i++])
.makeQualifiedLocated(getUri(), p);
.makeQualifiedLocated(getUri(), absF);
if (filter.accept(next.getPath())) {
curStat = next;
}

View File

@ -102,6 +102,10 @@ class NamenodeJspHelper {
}
static String getRollingUpgradeText(FSNamesystem fsn) {
if (fsn == null) {
return "";
}
DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
Map<String, Integer> list = dm.getDatanodesSoftwareVersions();
if(list.size() > 1) {

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.*;
@ -361,17 +362,6 @@ public class TestGlobPaths {
status = fs.globStatus(new Path(USER_DIR+"{/dir*}"));
checkStatus(status, d1, d2, d3, d4);
/*
* true filter
*/
PathFilter trueFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return true;
}
};
status = fs.globStatus(new Path(Path.SEPARATOR), trueFilter);
checkStatus(status, new Path(Path.SEPARATOR));
@ -843,6 +833,8 @@ public class TestGlobPaths {
}
}
private static final PathFilter trueFilter = new AcceptAllPathFilter();
/**
* Accept only paths ending in Z.
*/
@ -893,11 +885,13 @@ public class TestGlobPaths {
}
}
@Ignore
@Test
public void testGlobWithSymlinksOnFS() throws Exception {
testOnFileSystem(new TestGlobWithSymlinks());
}
@Ignore
@Test
public void testGlobWithSymlinksOnFC() throws Exception {
testOnFileContext(new TestGlobWithSymlinks());
@ -970,11 +964,13 @@ public class TestGlobPaths {
}
}
@Ignore
@Test
public void testGlobWithSymlinksToSymlinksOnFS() throws Exception {
testOnFileSystem(new TestGlobWithSymlinksToSymlinks());
}
@Ignore
@Test
public void testGlobWithSymlinksToSymlinksOnFC() throws Exception {
testOnFileContext(new TestGlobWithSymlinksToSymlinks());
@ -1019,11 +1015,13 @@ public class TestGlobPaths {
}
}
@Ignore
@Test
public void testGlobSymlinksWithCustomPathFilterOnFS() throws Exception {
testOnFileSystem(new TestGlobSymlinksWithCustomPathFilter());
}
@Ignore
@Test
public void testGlobSymlinksWithCustomPathFilterOnFC() throws Exception {
testOnFileContext(new TestGlobSymlinksWithCustomPathFilter());
@ -1044,7 +1042,7 @@ public class TestGlobPaths {
new Path(USER_DIR + "/alphaLink"), new AcceptAllPathFilter());
Assert.assertEquals(1, statuses.length);
Path path = statuses[0].getPath();
Assert.assertEquals(USER_DIR + "/alphaLink", path.toUri().getPath());
Assert.assertEquals(USER_DIR + "/alpha", path.toUri().getPath());
Assert.assertEquals("hdfs", path.toUri().getScheme());
if (fc != null) {
// If we're using FileContext, then we can list a file:/// URI.
@ -1150,4 +1148,31 @@ public class TestGlobPaths {
public void testGlobAccessDeniedOnFC() throws Exception {
testOnFileContext(new TestGlobAccessDenied());
}
/**
* Test that trying to list a reserved path on HDFS via the globber works.
**/
private static class TestReservedHdfsPaths implements FSTestWrapperGlobTest {
public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap,
FileSystem fs, FileContext fc) throws Exception {
String reservedRoot = "/.reserved/.inodes/" + INodeId.ROOT_INODE_ID;
Assert.assertEquals(reservedRoot,
TestPath.mergeStatuses(unprivilegedWrap.
globStatus(new Path(reservedRoot), new AcceptAllPathFilter())));
// These inodes don't show up via listStatus.
Assert.assertEquals("",
TestPath.mergeStatuses(unprivilegedWrap.
globStatus(new Path("/.reserved/*"), new AcceptAllPathFilter())));
}
}
@Test
public void testReservedHdfsPathsOnFS() throws Exception {
testOnFileSystem(new TestReservedHdfsPaths());
}
@Test
public void testReservedHdfsPathsOnFC() throws Exception {
testOnFileContext(new TestReservedHdfsPaths());
}
}

View File

@ -31,8 +31,10 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import org.apache.commons.lang.ArrayUtils;
@ -47,9 +49,11 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
@ -226,7 +230,7 @@ public class TestDistributedFileSystem {
final long millis = Time.now();
{
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
final DistributedFileSystem dfs = cluster.getFileSystem();
dfs.dfs.getLeaseRenewer().setGraceSleepPeriod(grace);
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
@ -326,7 +330,7 @@ public class TestDistributedFileSystem {
}
{
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
final DistributedFileSystem dfs = cluster.getFileSystem();
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
//open and check the file
@ -835,4 +839,25 @@ public class TestDistributedFileSystem {
}
}
@Test(timeout=60000)
public void testListFiles() throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
DistributedFileSystem fs = cluster.getFileSystem();
final Path relative = new Path("relative");
fs.create(new Path(relative, "foo")).close();
final List<LocatedFileStatus> retVal = new ArrayList<LocatedFileStatus>();
final RemoteIterator<LocatedFileStatus> iter = fs.listFiles(relative, true);
while (iter.hasNext()) {
retVal.add(iter.next());
}
System.out.println("retVal = " + retVal);
} finally {
cluster.shutdown();
}
}
}

View File

@ -18,8 +18,13 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase.*;
import static org.mockito.Mockito.*;
import static org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase.LOADING_EDITS;
import static org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase.LOADING_FSIMAGE;
import static org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase.SAFEMODE;
import static org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase.SAVING_CHECKPOINT;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.List;
@ -107,6 +112,11 @@ public class TestNameNodeJspHelper {
Assert.assertTrue(containsMatch(contents, SAFEMODE.getDescription()));
}
@Test
public void testGetRollingUpgradeText() {
Assert.assertEquals("", NamenodeJspHelper.getRollingUpgradeText(null));
}
/**
* Checks if the list contains any string that partially matches the regex.
*

View File

@ -191,6 +191,18 @@ Release 2.2.0 - UNRELEASED
OPTIMIZATIONS
BUG FIXES
Release 2.1.2 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta
@ -204,6 +216,11 @@ Release 2.2.0 - UNRELEASED
needs to set up its own certificates etc and not depend on clusters'.
(Omkar Vinit Joshi via vinodkv)
MAPREDUCE-5505. Clients should be notified job finished only after job
successfully unregistered (Zhijie Shen via bikas)
MAPREDUCE-5503. Fixed a test issue in TestMRJobClient. (Jian He via vinodkv)
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES
@ -221,6 +238,10 @@ Release 2.1.1-beta - 2013-09-23
MAPREDUCE-5379. Include token tracking ids in jobconf. (kkambatl via tucu)
MAPREDUCE-5523. Added separate configuration properties for https for JHS
without which even when https is enabled, it starts on http port itself.
(Omkar Vinit Joshi via vinodkv)
OPTIMIZATIONS
MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race

View File

@ -39,11 +39,11 @@ export YARN_CONF_DIR=$HADOOP_CONF_DIR
Step 7) Setup config: for running mapreduce applications, which now are in user land, you need to setup nodemanager with the following configuration in your yarn-site.xml before you start the nodemanager.
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce.shuffle</value>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>

View File

@ -63,4 +63,7 @@ public interface AppContext {
ClientToAMTokenSecretManager getClientToAMTokenSecretManager();
boolean isLastAMRetry();
boolean safeToReportTerminationToUser();
}

View File

@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
@ -209,6 +210,10 @@ public class MRAppMaster extends CompositeService {
private long recoveredJobStartTime = 0;
@VisibleForTesting
protected AtomicBoolean safeToReportTerminationToUser =
new AtomicBoolean(false);
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime, int maxAppAttempts) {
@ -554,8 +559,10 @@ public class MRAppMaster extends CompositeService {
LOG.info("Calling stop for all the services");
MRAppMaster.this.stop();
// TODO: Stop ClientService last, since only ClientService should wait for
// some time so clients can know the final states. Will be removed once RM come on.
// Except ClientService, other services are already stopped, it is safe to
// let clients know the final states. ClientService should wait for some
// time so clients have enough time to know the final states.
safeToReportTerminationToUser.set(true);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
@ -964,6 +971,11 @@ public class MRAppMaster extends CompositeService {
public boolean isLastAMRetry(){
return isLastAMRetry;
}
@Override
public boolean safeToReportTerminationToUser() {
return safeToReportTerminationToUser.get();
}
}
@SuppressWarnings("unchecked")

View File

@ -641,6 +641,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
private ScheduledFuture failWaitTriggerScheduledFuture;
private JobState lastNonFinalState = JobState.NEW;
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
@ -928,7 +930,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
public JobState getState() {
readLock.lock();
try {
return getExternalState(getInternalState());
JobState state = getExternalState(getInternalState());
if (!appContext.safeToReportTerminationToUser()
&& (state == JobState.SUCCEEDED || state == JobState.FAILED
|| state == JobState.KILLED || state == JobState.ERROR)) {
return lastNonFinalState;
} else {
return state;
}
} finally {
readLock.unlock();
}
@ -972,6 +981,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
if (oldState != getInternalState()) {
LOG.info(jobId + "Job Transitioned from " + oldState + " to "
+ getInternalState());
rememberLastNonFinalState(oldState);
}
}
@ -980,6 +990,15 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
}
}
private void rememberLastNonFinalState(JobStateInternal stateInternal) {
JobState state = getExternalState(stateInternal);
// if state is not the final state, set lastNonFinalState
if (state != JobState.SUCCEEDED && state != JobState.FAILED
&& state != JobState.KILLED && state != JobState.ERROR) {
lastNonFinalState = state;
}
}
@Private
public JobStateInternal getInternalState() {
readLock.lock();

View File

@ -37,11 +37,11 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.AppInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.StringHelper;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.Controller;
import org.apache.hadoop.yarn.webapp.View;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.base.Joiner;
import com.google.inject.Inject;
@ -62,7 +62,7 @@ public class AppController extends Controller implements AMParams {
set(APP_ID, app.context.getApplicationID().toString());
set(RM_WEB,
JOINER.join(WebAppUtil.getSchemePrefix(),
YarnConfiguration.getRMWebAppHostAndPort(conf)));
WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf)));
}
@Inject

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.mapreduce.v2.app.webapp;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
public class WebAppUtil {
private static boolean isSSLEnabledInYARN;
@ -36,4 +40,21 @@ public class WebAppUtil {
return "http://";
}
}
}
public static void setJHSWebAppURLWithoutScheme(Configuration conf,
String hostAddress) {
if (HttpConfig.isSecure()) {
conf.set(JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS, hostAddress);
} else {
conf.set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, hostAddress);
}
}
public static String getJHSWebAppURLWithoutScheme(Configuration conf) {
if (HttpConfig.isSecure()) {
return conf.get(JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS);
} else {
return conf.get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS);
}
}
}

View File

@ -135,11 +135,22 @@ public class MRApp extends MRAppMaster {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, Clock clock, boolean shutdown) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock,
shutdown);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, boolean shutdown) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, shutdown);
}
@Override
protected void initJobCredentialsAndUGI(Configuration conf) {
// Fake a shuffle secret that normally is provided by the job client.
@ -169,23 +180,43 @@ public class MRApp extends MRAppMaster {
new SystemClock());
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, boolean shutdown) {
this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
new SystemClock(), shutdown);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) {
this(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), maps, reduces, autoComplete, testName,
cleanOnStart, startCount, clock, shutdown);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock) {
this(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), maps, reduces, autoComplete, testName,
cleanOnStart, startCount, clock);
cleanOnStart, startCount, clock, true);
}
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, boolean shutdown) {
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock(), shutdown);
}
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock());
cleanOnStart, startCount, new SystemClock(), true);
}
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock) {
boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) {
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
this.testWorkDir = new File("target", testName);
@ -204,6 +235,9 @@ public class MRApp extends MRAppMaster {
this.maps = maps;
this.reduces = reduces;
this.autoComplete = autoComplete;
// If safeToReportTerminationToUser is set to true, we can verify whether
// the job can reaches the final state when MRAppMaster shuts down.
this.safeToReportTerminationToUser.set(shutdown);
}
@Override

View File

@ -135,4 +135,11 @@ public class MockAppContext implements AppContext {
public boolean isLastAMRetry() {
return false;
}
@Override
public boolean safeToReportTerminationToUser() {
// bogus - Not Required
return true;
}
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@ -374,6 +375,19 @@ public class TestMRApp {
app.waitForState(job, JobState.ERROR);
}
@SuppressWarnings("resource")
@Test
public void testJobSuccess() throws Exception {
MRApp app = new MRApp(2, 2, true, this.getClass().getName(), true, false);
JobImpl job = (JobImpl) app.submit(new Configuration());
app.waitForInternalState(job, JobStateInternal.SUCCEEDED);
// AM is not unregistered
Assert.assertEquals(JobState.RUNNING, job.getState());
// imitate that AM is unregistered
app.safeToReportTerminationToUser.set(true);
app.waitForState(job, JobState.SUCCEEDED);
}
@Test
public void testJobRebootNotLastRetry() throws Exception {
MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);

View File

@ -867,5 +867,12 @@ public class TestRuntimeEstimators {
public boolean isLastAMRetry() {
return false;
}
@Override
public boolean safeToReportTerminationToUser() {
// bogus - Not Required
return true;
}
}
}

View File

@ -275,6 +275,7 @@ public class TestJobImpl {
AppContext mockContext = mock(AppContext.class);
when(mockContext.isLastAMRetry()).thenReturn(true);
when(mockContext.safeToReportTerminationToUser()).thenReturn(false);
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext);
completeJobTasks(job);
assertJobState(job, JobStateInternal.COMMITTING);
@ -282,7 +283,9 @@ public class TestJobImpl {
syncBarrier.await();
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
assertJobState(job, JobStateInternal.REBOOT);
// return the external state as FAILED since this is last retry.
// return the external state as ERROR since this is last retry.
Assert.assertEquals(JobState.RUNNING, job.getState());
when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
Assert.assertEquals(JobState.ERROR, job.getState());
dispatcher.stop();
@ -590,12 +593,14 @@ public class TestJobImpl {
final JobDiagnosticsUpdateEvent diagUpdateEvent =
new JobDiagnosticsUpdateEvent(jobId, diagMsg);
MRAppMetrics mrAppMetrics = MRAppMetrics.create();
AppContext mockContext = mock(AppContext.class);
when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), new Configuration(),
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
mrAppMetrics, null, true, null, 0, null, null, null, null);
mrAppMetrics, null, true, null, 0, null, mockContext, null, null);
job.handle(diagUpdateEvent);
String diagnostics = job.getReport().getDiagnostics();
Assert.assertNotNull(diagnostics);
@ -606,7 +611,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
mrAppMetrics, null, true, null, 0, null, null, null, null);
mrAppMetrics, null, true, null, 0, null, mockContext, null, null);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
job.handle(diagUpdateEvent);
diagnostics = job.getReport().getDiagnostics();
@ -699,7 +704,9 @@ public class TestJobImpl {
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
AppContext mockContext = mock(AppContext.class);
when(mockContext.safeToReportTerminationToUser()).thenReturn(false);
JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
@ -707,12 +714,15 @@ public class TestJobImpl {
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
Assert.assertEquals(JobState.FAILED, job.getState());
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
Assert.assertEquals(JobState.FAILED, job.getState());
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED));
Assert.assertEquals(JobState.FAILED, job.getState());
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
assertJobState(job, JobStateInternal.FAILED);
Assert.assertEquals(JobState.RUNNING, job.getState());
when(mockContext.safeToReportTerminationToUser()).thenReturn(true);
Assert.assertEquals(JobState.FAILED, job.getState());
dispatcher.stop();
@ -750,6 +760,10 @@ public class TestJobImpl {
Dispatcher dispatcher, int numSplits, AppContext appContext) {
JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID);
if (appContext == null) {
appContext = mock(AppContext.class);
when(appContext.safeToReportTerminationToUser()).thenReturn(true);
}
StubbedJob job = new StubbedJob(jobId,
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0),
conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext);

View File

@ -18,8 +18,15 @@
package org.apache.hadoop.mapreduce.v2.jobhistory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
/**
* Stores Job History configuration keys that can be set by administrators of
@ -130,6 +137,13 @@ public class JHAdminConfig {
public static final String DEFAULT_MR_HISTORY_WEBAPP_ADDRESS =
"0.0.0.0:" + DEFAULT_MR_HISTORY_WEBAPP_PORT;
/**The https address the history server webapp is on.*/
public static final String MR_HISTORY_WEBAPP_HTTPS_ADDRESS =
MR_HISTORY_PREFIX + "webapp.https.address";
public static final int DEFAULT_MR_HISTORY_WEBAPP_HTTPS_PORT = 19890;
public static final String DEFAULT_MR_HISTORY_WEBAPP_HTTPS_ADDRESS =
"0.0.0.0:" + DEFAULT_MR_HISTORY_WEBAPP_HTTPS_PORT;
/**The kerberos principal to be used for spnego filter for history server*/
public static final String MR_WEBAPP_SPNEGO_USER_NAME_KEY =
MR_HISTORY_PREFIX + "webapp.spnego-principal";
@ -160,4 +174,36 @@ public class JHAdminConfig {
*/
public static boolean DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS = false;
public static String getResolvedMRHistoryWebAppURLWithoutScheme(
Configuration conf) {
InetSocketAddress address = null;
if (HttpConfig.isSecure()) {
address =
conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_PORT);
} else {
address =
conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_PORT); }
address = NetUtils.getConnectAddress(address);
StringBuffer sb = new StringBuffer();
InetAddress resolved = address.getAddress();
if (resolved == null || resolved.isAnyLocalAddress() ||
resolved.isLoopbackAddress()) {
String lh = address.getHostName();
try {
lh = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
//Ignore and fallback.
}
sb.append(lh);
} else {
sb.append(address.getHostName());
}
sb.append(":").append(address.getPort());
return sb.toString();
}
}

View File

@ -83,14 +83,14 @@ Hadoop MapReduce Next Generation - Pluggable Shuffle and Pluggable Sort
*--------------------------------------+---------------------+-----------------+
| <<Property>> | <<Default Value>> | <<Explanation>> |
*--------------------------------------+---------------------+-----------------+
| <<<yarn.nodemanager.aux-services>>> | <<<...,mapreduce.shuffle>>> | The auxiliary service name |
| <<<yarn.nodemanager.aux-services>>> | <<<...,mapreduce_shuffle>>> | The auxiliary service name |
*--------------------------------------+---------------------+-----------------+
| <<<yarn.nodemanager.aux-services.mapreduce.shuffle.class>>> | <<<org.apache.hadoop.mapred.ShuffleHandler>>> | The auxiliary service class to use |
| <<<yarn.nodemanager.aux-services.mapreduce_shuffle.class>>> | <<<org.apache.hadoop.mapred.ShuffleHandler>>> | The auxiliary service class to use |
*--------------------------------------+---------------------+-----------------+
<<IMPORTANT:>> If setting an auxiliary service in addition the default
<<<mapreduce.shuffle>>> service, then a new service key should be added to the
<<<mapreduce_shuffle>>> service, then a new service key should be added to the
<<<yarn.nodemanager.aux-services>>> property, for example <<<mapred.shufflex>>>.
Then the property defining the corresponding class must be
<<<yarn.nodemanager.aux-services.mapreduce.shufflex.class>>>.
<<<yarn.nodemanager.aux-services.mapreduce_shufflex.class>>>.

View File

@ -387,4 +387,11 @@ public class JobHistory extends AbstractService implements HistoryContext {
// bogus - Not Required
return false;
}
@Override
public boolean safeToReportTerminationToUser() {
// bogus - Not Required
return true;
}
}

View File

@ -79,7 +79,7 @@ public class TestJobClient extends TestMRJobClient {
Configuration conf = createJobConf();
String jobId = runJob();
testGetCounter(jobId, conf);
testJobList(jobId, conf);
testAllJobList(jobId, conf);
testChangingJobPriority(jobId, conf);
}

View File

@ -29,6 +29,8 @@ import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -60,6 +62,22 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
return job;
}
private Job runJobInBackGround(Configuration conf) throws Exception {
String input = "hello1\nhello2\nhello3\n";
Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
1, 1, input);
job.setJobName("mr");
job.setPriority(JobPriority.NORMAL);
job.submit();
int i = 0;
while (i++ < 200 && job.getJobID() == null) {
LOG.info("waiting for jobId...");
Thread.sleep(100);
}
return job;
}
public static int runTool(Configuration conf, Tool tool, String[] args,
OutputStream out) throws Exception {
PrintStream oldOut = System.out;
@ -108,8 +126,10 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
Job job = runJob(conf);
String jobId = job.getJobID().toString();
// test jobs list
testJobList(jobId, conf);
// test all jobs list
testAllJobList(jobId, conf);
// test only submitted jobs list
testSubmittedJobList(conf);
// test job counter
testGetCounter(jobId, conf);
// status
@ -131,38 +151,37 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
// submit job from file
testSubmit(conf);
// kill a task
testKillTask(job, conf);
testKillTask(conf);
// fail a task
testfailTask(job, conf);
testfailTask(conf);
// kill job
testKillJob(jobId, conf);
testKillJob(conf);
}
/**
* test fail task
*/
private void testfailTask(Job job, Configuration conf) throws Exception {
private void testfailTask(Configuration conf) throws Exception {
Job job = runJobInBackGround(conf);
CLI jc = createJobClient();
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
TaskAttemptID taid = new TaskAttemptID(tid, 1);
ByteArrayOutputStream out = new ByteArrayOutputStream();
// TaskAttemptId is not set
// TaskAttemptId is not set
int exitCode = runTool(conf, jc, new String[] { "-fail-task" }, out);
assertEquals("Exit code", -1, exitCode);
try {
runTool(conf, jc, new String[] { "-fail-task", taid.toString() }, out);
fail(" this task should field");
} catch (IOException e) {
// task completed !
assertTrue(e.getMessage().contains("_0001_m_000000_1"));
}
runTool(conf, jc, new String[] { "-fail-task", taid.toString() }, out);
String answer = new String(out.toByteArray(), "UTF-8");
Assert
.assertTrue(answer.contains("Killed task " + taid + " by failing it"));
}
/**
* test a kill task
*/
private void testKillTask(Job job, Configuration conf) throws Exception {
private void testKillTask(Configuration conf) throws Exception {
Job job = runJobInBackGround(conf);
CLI jc = createJobClient();
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
TaskAttemptID taid = new TaskAttemptID(tid, 1);
@ -171,20 +190,17 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
int exitCode = runTool(conf, jc, new String[] { "-kill-task" }, out);
assertEquals("Exit code", -1, exitCode);
try {
runTool(conf, jc, new String[] { "-kill-task", taid.toString() }, out);
fail(" this task should be killed");
} catch (IOException e) {
System.out.println(e);
// task completed
assertTrue(e.getMessage().contains("_0001_m_000000_1"));
}
runTool(conf, jc, new String[] { "-kill-task", taid.toString() }, out);
String answer = new String(out.toByteArray(), "UTF-8");
Assert.assertTrue(answer.contains("Killed task " + taid));
}
/**
* test a kill job
*/
private void testKillJob(String jobId, Configuration conf) throws Exception {
private void testKillJob(Configuration conf) throws Exception {
Job job = runJobInBackGround(conf);
String jobId = job.getJobID().toString();
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
@ -435,7 +451,8 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
/**
* print a job list
*/
protected void testJobList(String jobId, Configuration conf) throws Exception {
protected void testAllJobList(String jobId, Configuration conf)
throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad options
@ -458,23 +475,31 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
}
assertEquals(1, counter);
out.reset();
// only submitted
exitCode = runTool(conf, createJobClient(), new String[] { "-list" }, out);
}
protected void testSubmittedJobList(Configuration conf) throws Exception {
Job job = runJobInBackGround(conf);
ByteArrayOutputStream out = new ByteArrayOutputStream();
String line;
int counter = 0;
// only submitted
int exitCode =
runTool(conf, createJobClient(), new String[] { "-list" }, out);
assertEquals("Exit code", 0, exitCode);
br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(
out.toByteArray())));
BufferedReader br =
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(
out.toByteArray())));
counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (line.contains(jobId)) {
if (line.contains(job.getJobID().toString())) {
counter++;
}
}
// all jobs submitted! no current
assertEquals(1, counter);
}
protected void verifyJobPriority(String jobId, String priority,
Configuration conf, CLI jc) throws Exception {
PipedInputStream pis = new PipedInputStream();

View File

@ -28,10 +28,12 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil;
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@ -43,6 +45,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.hamcrest.core.IsEqual;
/**
* Configures and starts the MR-specific components in the YARN cluster.
@ -155,8 +159,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
// pick free random ports.
getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
hostname + ":0");
getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
hostname + ":0");
WebAppUtil.setJHSWebAppURLWithoutScheme(getConfig(), hostname + ":0");
getConfig().set(JHAdminConfig.JHS_ADMIN_ADDRESS,
hostname + ":0");
}
@ -182,17 +185,17 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
//need to do this because historyServer.init creates a new Configuration
getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
WebAppUtil.setJHSWebAppURLWithoutScheme(getConfig(),
WebAppUtil.getJHSWebAppURLWithoutScheme(historyServer.getConfig()));
LOG.info("MiniMRYARN ResourceManager address: " +
getConfig().get(YarnConfiguration.RM_ADDRESS));
LOG.info("MiniMRYARN ResourceManager web address: " +
getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
LOG.info("MiniMRYARN HistoryServer address: " +
getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
LOG.info("MiniMRYARN HistoryServer web address: " +
getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
JHAdminConfig.getResolvedMRHistoryWebAppURLWithoutScheme(getConfig()));
}
@Override

View File

@ -146,7 +146,7 @@ public class ShuffleHandler extends AuxiliaryService {
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
public static final String MAPREDUCE_SHUFFLE_SERVICEID =
"mapreduce.shuffle";
"mapreduce_shuffle";
private static final Map<String,String> userRsrc =
new ConcurrentHashMap<String,String>();

View File

@ -56,11 +56,33 @@ Release 2.2.0 - UNRELEASED
OPTIMIZATIONS
BUG FIXES
Release 2.1.2 - UNRELEASED
INCOMPATIBLE CHANGES
YARN-1229. Define constraints on Auxiliary Service names. Change
ShuffleHandler service name from mapreduce.shuffle to
mapreduce_shuffle (Xuan Gong via sseth)
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
YARN-1128. FifoPolicy.computeShares throws NPE on empty list of Schedulables
(Karthik Kambatla via Sandy Ryza)
YARN-1214. Register ClientToken MasterKey in SecretManager after it is
saved (Jian He via bikas)
YARN-49. Improve distributed shell application to work on a secure cluster.
(Vinod Kumar Vavilapalli via hitesh)
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES
@ -135,6 +157,10 @@ Release 2.1.1-beta - 2013-09-23
YARN-1203. Changed YARN web-app proxy to handle http and https URLs from
AM registration and finish correctly. (Omkar Vinit Joshi via vinodkv)
YARN-1204. Added separate configuration properties for https for RM and NM
without which servers enabled with https will also start on http ports.
(Omkar Vinit Joshi via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -26,20 +26,15 @@ import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import com.google.common.base.Joiner;
@Public
@Evolving
public class YarnConfiguration extends Configuration {
private static final Joiner JOINER = Joiner.on("");
private static final String YARN_DEFAULT_XML_FILE = "yarn-default.xml";
private static final String YARN_SITE_XML_FILE = "yarn-site.xml";
@ -163,6 +158,14 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_RM_WEBAPP_ADDRESS = "0.0.0.0:" +
DEFAULT_RM_WEBAPP_PORT;
/** The https address of the RM web application.*/
public static final String RM_WEBAPP_HTTPS_ADDRESS =
RM_PREFIX + "webapp.https.address";
public static final int DEFAULT_RM_WEBAPP_HTTPS_PORT = 8090;
public static final String DEFAULT_RM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:"
+ DEFAULT_RM_WEBAPP_HTTPS_PORT;
public static final String RM_RESOURCE_TRACKER_ADDRESS =
RM_PREFIX + "resource-tracker.address";
public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8031;
@ -548,6 +551,13 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_NM_WEBAPP_ADDRESS = "0.0.0.0:" +
DEFAULT_NM_WEBAPP_PORT;
/** NM Webapp https address.**/
public static final String NM_WEBAPP_HTTPS_ADDRESS = NM_PREFIX
+ "webapp.https.address";
public static final int DEFAULT_NM_WEBAPP_HTTPS_PORT = 8044;
public static final String DEFAULT_NM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:"
+ DEFAULT_NM_WEBAPP_HTTPS_PORT;
/** How often to monitor containers.*/
public final static String NM_CONTAINER_MON_INTERVAL_MS =
NM_PREFIX + "container-monitor.interval-ms";
@ -833,42 +843,4 @@ public class YarnConfiguration extends Configuration {
this.reloadConfiguration();
}
}
public static String getProxyHostAndPort(Configuration conf) {
String addr = conf.get(PROXY_ADDRESS);
if(addr == null || addr.isEmpty()) {
addr = getRMWebAppHostAndPort(conf);
}
return addr;
}
public static String getRMWebAppHostAndPort(Configuration conf) {
InetSocketAddress address = conf.getSocketAddr(
YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
address = NetUtils.getConnectAddress(address);
StringBuffer sb = new StringBuffer();
InetAddress resolved = address.getAddress();
if (resolved == null || resolved.isAnyLocalAddress() ||
resolved.isLoopbackAddress()) {
String lh = address.getHostName();
try {
lh = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
//Ignore and fallback.
}
sb.append(lh);
} else {
sb.append(address.getHostName());
}
sb.append(":").append(address.getPort());
return sb.toString();
}
public static String getRMWebAppURL(Configuration conf) {
return JOINER.join(HttpConfig.getSchemePrefix(),
getRMWebAppHostAndPort(conf));
}
}

View File

@ -26,6 +26,7 @@ import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;
@ -43,10 +44,14 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -73,6 +78,7 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
@ -147,7 +153,7 @@ public class ApplicationMaster {
// Handle to communicate with the Resource Manager
@SuppressWarnings("rawtypes")
private AMRMClientAsync resourceManager;
private AMRMClientAsync amRMClient;
// Handle to communicate with the Node Manager
private NMClientAsync nmClientAsync;
@ -206,7 +212,9 @@ public class ApplicationMaster {
private volatile boolean done;
private volatile boolean success;
private ByteBuffer allTokens;
// Launch threads
private List<Thread> launchThreads = new ArrayList<Thread>();
@ -441,11 +449,24 @@ public class ApplicationMaster {
public boolean run() throws YarnException, IOException {
LOG.info("Starting ApplicationMaster");
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
// Now remove the AM->RM token so that containers cannot access it.
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<?> token = iter.next();
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
iter.remove();
}
}
allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
resourceManager =
AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
resourceManager.init(conf);
resourceManager.start();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
containerListener = new NMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
@ -460,7 +481,7 @@ public class ApplicationMaster {
// Register self with ResourceManager
// This will start heartbeating to the RM
RegisterApplicationMasterResponse response = resourceManager
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
// Dump out information about cluster capability as seen by the
@ -485,7 +506,7 @@ public class ApplicationMaster {
// executed on them ( regardless of success/failure).
for (int i = 0; i < numTotalContainers; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
resourceManager.addContainerRequest(containerAsk);
amRMClient.addContainerRequest(containerAsk);
}
numRequestedContainers.set(numTotalContainers);
@ -535,7 +556,7 @@ public class ApplicationMaster {
success = false;
}
try {
resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
@ -543,7 +564,7 @@ public class ApplicationMaster {
}
done = true;
resourceManager.stop();
amRMClient.stop();
}
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
@ -595,7 +616,7 @@ public class ApplicationMaster {
if (askCount > 0) {
for (int i = 0; i < askCount; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
resourceManager.addContainerRequest(containerAsk);
amRMClient.addContainerRequest(containerAsk);
}
}
@ -651,7 +672,7 @@ public class ApplicationMaster {
@Override
public void onError(Throwable e) {
done = true;
resourceManager.stop();
amRMClient.stop();
}
}
@ -807,6 +828,14 @@ public class ApplicationMaster {
commands.add(command.toString());
ctx.setCommands(commands);
// Set up tokens for the container too. Today, for normal shell commands,
// the container in distribute-shell doesn't need any tokens. We are
// populating them mainly for NodeManagers to be able to download any
// files in the distributed file-system. The tokens are otherwise also
// useful in cases, for e.g., when one is running a "hadoop dfs" command
// inside the distributed shell.
ctx.setTokens(allTokens);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
}

View File

@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -40,9 +40,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -543,8 +547,28 @@ public class Client {
// Not needed in this scenario
// amContainer.setServiceData(serviceData);
// The following are not required for launching an application master
// amContainer.setContainerId(containerId);
// Setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
Credentials credentials = new Credentials();
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
if (tokenRenewer == null || tokenRenewer.length() == 0) {
throw new IOException(
"Can't get Master Kerberos principal for the RM to use as renewer");
}
// For now, only getting tokens for the default file-system.
final Token<?> tokens[] =
fs.addDelegationTokens(tokenRenewer, credentials);
if (tokens != null) {
for (Token<?> token : tokens) {
LOG.info("Got dt for " + fs.getUri() + "; " + token);
}
}
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(fsTokens);
}
appContext.setAMContainerSpec(amContainer);

View File

@ -166,7 +166,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected void serviceStart() throws Exception {
final YarnConfiguration conf = new YarnConfiguration(getConfig());
try {
rmClient = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
rmClient =
ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.webapp.util;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.base.Joiner;
@Private
@Evolving
public class WebAppUtils {
private static final Joiner JOINER = Joiner.on("");
public static void setRMWebAppPort(Configuration conf, int port) {
String hostname = getRMWebAppURLWithoutScheme(conf);
hostname =
(hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":"))
: hostname;
setRMWebAppHostnameAndPort(conf, hostname, port);
}
public static void setRMWebAppHostnameAndPort(Configuration conf,
String hostname, int port) {
String resolvedAddress = hostname + ":" + port;
if (HttpConfig.isSecure()) {
conf.set(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, resolvedAddress);
} else {
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress);
}
}
public static void setNMWebAppHostNameAndPort(Configuration conf,
String hostName, int port) {
if (HttpConfig.isSecure()) {
conf.set(YarnConfiguration.NM_WEBAPP_HTTPS_ADDRESS,
hostName + ":" + port);
} else {
conf.set(YarnConfiguration.NM_WEBAPP_ADDRESS,
hostName + ":" + port);
}
}
public static String getRMWebAppURLWithScheme(Configuration conf) {
return JOINER.join(HttpConfig.getSchemePrefix(),
HttpConfig.isSecure() ? conf.get(
YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS) : conf.get(
YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS));
}
public static String getRMWebAppURLWithoutScheme(Configuration conf) {
if (HttpConfig.isSecure()) {
return conf.get(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS);
}else {
return conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
}
}
public static String getProxyHostAndPort(Configuration conf) {
String addr = conf.get(YarnConfiguration.PROXY_ADDRESS);
if(addr == null || addr.isEmpty()) {
addr = getResolvedRMWebAppURLWithoutScheme(conf);
}
return addr;
}
public static String getResolvedRMWebAppURLWithScheme(Configuration conf) {
return HttpConfig.getSchemePrefix()
+ getResolvedRMWebAppURLWithoutScheme(conf);
}
public static String getResolvedRMWebAppURLWithoutScheme(Configuration conf) {
InetSocketAddress address = null;
if (HttpConfig.isSecure()) {
address =
conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT);
} else {
address =
conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
}
address = NetUtils.getConnectAddress(address);
StringBuffer sb = new StringBuffer();
InetAddress resolved = address.getAddress();
if (resolved == null || resolved.isAnyLocalAddress() ||
resolved.isLoopbackAddress()) {
String lh = address.getHostName();
try {
lh = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
//Ignore and fallback.
}
sb.append(lh);
} else {
sb.append(address.getHostName());
}
sb.append(":").append(address.getPort());
return sb.toString();
}
public static String getNMWebAppURLWithoutScheme(Configuration conf) {
if (HttpConfig.isSecure()) {
return conf.get(YarnConfiguration.NM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_HTTPS_ADDRESS);
} else {
return conf.get(YarnConfiguration.NM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS);
}
}
}

View File

@ -705,9 +705,10 @@
</property>
<property>
<description>the valid service name should only contain a-zA-Z0-9_ and can not start with numbers</description>
<name>yarn.nodemanager.aux-services</name>
<value></value>
<!-- <value>mapreduce.shuffle</value> -->
<!--<value>mapreduce_shuffle</value>-->
</property>
<property>
@ -763,7 +764,7 @@
<!--Map Reduce configuration-->
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.conf;
import junit.framework.Assert;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.junit.Test;
public class TestYarnConfiguration {
@ -28,7 +28,7 @@ public class TestYarnConfiguration {
@Test
public void testDefaultRMWebUrl() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
String rmWebUrl = YarnConfiguration.getRMWebAppURL(conf);
String rmWebUrl = WebAppUtils.getRMWebAppURLWithScheme(conf);
// shouldn't have a "/" on the end of the url as all the other uri routinnes
// specifically add slashes and Jetty doesn't handle double slashes.
Assert.assertNotSame("RM Web Url is not correct", "http://0.0.0.0:8088",
@ -43,7 +43,7 @@ public class TestYarnConfiguration {
// for host and use the port from the RM_WEBAPP_ADDRESS
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "fortesting:24543");
conf.set(YarnConfiguration.RM_ADDRESS, "rmtesting:9999");
String rmWebUrl = YarnConfiguration.getRMWebAppURL(conf);
String rmWebUrl = WebAppUtils.getRMWebAppURLWithScheme(conf);
String[] parts = rmWebUrl.split(":");
Assert.assertEquals("RM Web URL Port is incrrect", 24543,
Integer.valueOf(parts[parts.length - 1]).intValue());

View File

@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -40,6 +41,8 @@ import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import com.google.common.base.Preconditions;
public class AuxServices extends AbstractService
implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {
@ -48,6 +51,8 @@ public class AuxServices extends AbstractService
protected final Map<String,AuxiliaryService> serviceMap;
protected final Map<String,ByteBuffer> serviceMetaData;
private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$");
public AuxServices() {
super(AuxServices.class.getName());
serviceMap =
@ -90,6 +95,13 @@ public class AuxServices extends AbstractService
YarnConfiguration.NM_AUX_SERVICES);
for (final String sName : auxNames) {
try {
Preconditions
.checkArgument(
validateAuxServiceName(sName),
"The ServiceName: " + sName + " set in " +
YarnConfiguration.NM_AUX_SERVICES +" is invalid." +
"The valid service name should only contain a-zA-Z0-9_ " +
"and can not start with numbers");
Class<? extends AuxiliaryService> sClass = conf.getClass(
String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, sName), null,
AuxiliaryService.class);
@ -199,4 +211,10 @@ public class AuxServices extends AbstractService
}
}
private boolean validateAuxServiceName(String name) {
if (name == null || name.trim().isEmpty()) {
return false;
}
return p.matcher(name).matches();
}
}

View File

@ -19,8 +19,9 @@
package org.apache.hadoop.yarn.server.nodemanager.webapp;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.inject.Inject;
@ -37,8 +38,9 @@ public class NavBlock extends HtmlBlock implements YarnWebParams {
@Override
protected void render(Block html) {
String RMWebAppURL = YarnConfiguration.getRMWebAppURL(this.conf);
html
String RMWebAppURL =
WebAppUtils.getResolvedRMWebAppURLWithScheme(this.conf);
html
.div("#nav")
.h3()._("ResourceManager")._()
.ul()

View File

@ -22,7 +22,7 @@ import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
public class WebServer extends AbstractService {
@ -54,8 +55,8 @@ public class WebServer extends AbstractService {
@Override
protected void serviceStart() throws Exception {
String bindAddress = getConfig().get(YarnConfiguration.NM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS);
String bindAddress = WebAppUtils.getNMWebAppURLWithoutScheme(getConfig());
LOG.info("Instantiating NMWebApp at " + bindAddress);
try {
this.webApp =

View File

@ -31,6 +31,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -288,4 +290,33 @@ public class TestAuxServices {
assertTrue(aux.getServices().isEmpty());
}
@Test
public void testValidAuxServiceName() {
final AuxServices aux = new AuxServices();
Configuration conf = new Configuration();
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"Asrv1", "Bsrv_2"});
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv1"),
ServiceA.class, Service.class);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv_2"),
ServiceB.class, Service.class);
try {
aux.init(conf);
} catch (Exception ex) {
Assert.fail("Should not receive the exception.");
}
//Test bad auxService Name
final AuxServices aux1 = new AuxServices();
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"1Asrv1"});
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "1Asrv1"),
ServiceA.class, Service.class);
try {
aux1.init(conf);
Assert.fail("Should receive the exception.");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("The ServiceName: 1Asrv1 set in " +
"yarn.nodemanager.aux-services is invalid.The valid service name " +
"should only contain a-zA-Z0-9_ and can not start with numbers"));
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.SecurityUtil;
@ -88,6 +89,7 @@ import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
@ -441,12 +443,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String hostname = getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname;
int port = webApp.port();
String resolvedAddress = hostname + ":" + port;
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress);
WebAppUtils.setRMWebAppPort(conf, port);
}
super.serviceStart();
@ -704,10 +702,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY)
.withHttpSpnegoKeytabKey(
YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
.at(this.conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS));
String proxyHostAndPort = YarnConfiguration.getProxyHostAndPort(conf);
if(YarnConfiguration.getRMWebAppHostAndPort(conf).
.at(WebAppUtils.getRMWebAppURLWithoutScheme(conf));
String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf);
if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
equals(proxyHostAndPort)) {
AppReportFetcher fetcher = new AppReportFetcher(conf, getClientRMService());
builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
@ -904,7 +901,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
// recover applications
rmAppManager.recover(state);
}
public static void main(String argv[]) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);

View File

@ -652,10 +652,18 @@ public class RMAppImpl implements RMApp, Recoverable {
};
}
private static final class RMAppFinishingTransition extends
RMAppTransition {
private static final class RMAppFinishingTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
if (event.getType().equals(RMAppEventType.APP_REMOVED)) {
RMAppRemovedEvent removeEvent = (RMAppRemovedEvent) event;
if (removeEvent.getRemovedException() != null) {
LOG.error(
"Failed to remove application: " + removeEvent.getApplicationId(),
removeEvent.getRemovedException());
ExitUtil.terminate(1, removeEvent.getRemovedException());
}
}
app.finishTime = System.currentTimeMillis();
}
}

View File

@ -23,6 +23,7 @@ import java.util.Set;
import javax.crypto.SecretKey;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@ -151,9 +152,12 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
Token<AMRMTokenIdentifier> getAMRMToken();
/**
* The master key for client-to-AM tokens for this app attempt
* The master key for client-to-AM tokens for this app attempt. This is only
* used for RMStateStore. Normal operation must invoke the secret manager to
* get the key and not use the local key directly.
* @return The master key for client-to-AM tokens for this app attempt
*/
@LimitedPrivate("RMStateStore")
SecretKey getClientTokenMasterKey();
/**

View File

@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -100,6 +99,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@SuppressWarnings({"unchecked", "rawtypes"})
public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@ -480,7 +480,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
try {
URI trackingUri = StringUtils.isEmpty(trackingUriWithoutScheme) ? null :
ProxyUriUtils.getUriFromAMUrl(trackingUriWithoutScheme);
String proxy = YarnConfiguration.getProxyHostAndPort(conf);
String proxy = WebAppUtils.getProxyHostAndPort(conf);
URI proxyUri = ProxyUriUtils.getUriFromAMUrl(proxy);
URI result = ProxyUriUtils.getProxyUri(trackingUri, proxyUri,
applicationAttemptId.getApplicationId());
@ -496,11 +496,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private void setTrackingUrlToRMAppPage() {
origTrackingUrl = pjoin(
YarnConfiguration.getRMWebAppHostAndPort(conf),
WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf),
"cluster", "app", getAppAttemptId().getApplicationId());
proxiedTrackingUrl = origTrackingUrl;
}
// This is only used for RMStateStore. Normal operation must invoke the secret
// manager to get the key and not use the local key directly.
@Override
public SecretKey getClientTokenMasterKey() {
return this.clientTokenMasterKey;
@ -734,9 +736,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
.registerAppAttempt(appAttempt.applicationAttemptId);
if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.clientTokenMasterKey = appAttempt.rmContext
.getClientToAMTokenSecretManager()
.registerApplication(appAttempt.applicationAttemptId);
appAttempt.clientTokenMasterKey =
appAttempt.rmContext.getClientToAMTokenSecretManager()
.createMasterKey(appAttempt.applicationAttemptId);
}
// create AMRMToken
@ -922,6 +924,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEvent event) {
// Register with AMLivelinessMonitor
appAttempt.attemptLaunched();
// register the ClientTokenMasterKey after it is saved in the store,
// otherwise client may hold an invalid ClientToken after RM restarts.
appAttempt.rmContext.getClientToAMTokenSecretManager()
.registerApplication(appAttempt.getAppAttemptId(),
appAttempt.getClientTokenMasterKey());
}
}

View File

@ -33,17 +33,21 @@ public class ClientToAMTokenSecretManagerInRM extends
private Map<ApplicationAttemptId, SecretKey> masterKeys =
new HashMap<ApplicationAttemptId, SecretKey>();
public synchronized SecretKey registerApplication(
public synchronized SecretKey createMasterKey(
ApplicationAttemptId applicationAttemptID) {
SecretKey key = generateSecret();
this.masterKeys.put(applicationAttemptID, key);
return key;
return generateSecret();
}
public synchronized void registerApplication(
ApplicationAttemptId applicationAttemptID, SecretKey key) {
this.masterKeys.put(applicationAttemptID, key);
}
// Only for RM recovery
public synchronized SecretKey registerMasterKey(
ApplicationAttemptId applicationAttemptID, byte[] keyData) {
SecretKey key = createSecretKey(keyData);
this.masterKeys.put(applicationAttemptID, key);
registerApplication(applicationAttemptID, key);
return key;
}

View File

@ -289,7 +289,7 @@ public class TestRMStateStore extends ClientBaseWithFixes{
HashSet<Token<?>> attemptTokenSet1 = new HashSet<Token<?>>();
attemptTokenSet1.add(appAttemptToken1);
SecretKey clientTokenKey1 =
clientToAMTokenMgr.registerApplication(attemptId1);
clientToAMTokenMgr.createMasterKey(attemptId1);
ContainerId containerId1 = storeAttempt(store, attemptId1,
"container_1352994193343_0001_01_000001",
@ -305,7 +305,7 @@ public class TestRMStateStore extends ClientBaseWithFixes{
HashSet<Token<?>> attemptTokenSet2 = new HashSet<Token<?>>();
attemptTokenSet2.add(appAttemptToken2);
SecretKey clientTokenKey2 =
clientToAMTokenMgr.registerApplication(attemptId2);
clientToAMTokenMgr.createMasterKey(attemptId2);
ContainerId containerId2 = storeAttempt(store, attemptId2,
"container_1352994193343_0001_02_000001",

View File

@ -18,13 +18,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.mockito.Mockito.mock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import junit.framework.Assert;
@ -35,10 +40,13 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
@ -54,7 +62,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@ -62,7 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.junit.After;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -81,6 +92,7 @@ public class TestRMAppTransitions {
private static int appId = 1;
private DrainDispatcher rmDispatcher;
private RMStateStore store;
private YarnScheduler scheduler;
// ignore all the RM application attempt events
private static final class TestApplicationAttemptEventDispatcher implements
@ -206,7 +218,8 @@ public class TestRMAppTransitions {
String queue = MockApps.newQueue();
// ensure max application attempts set to known value
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, maxAppAttempts);
YarnScheduler scheduler = mock(YarnScheduler.class);
scheduler = mock(YarnScheduler.class);
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
@ -392,8 +405,7 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppRemoving(submissionContext);
// REMOVING => FINISHING event RMAppEventType.APP_REMOVED
RMAppEvent finishingEvent =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.APP_REMOVED);
new RMAppRemovedEvent(application.getApplicationId(), null);
application.handle(finishingEvent);
assertAppState(RMAppState.FINISHING, application);
assertTimesAtFinish(application);
@ -816,6 +828,14 @@ public class TestRMAppTransitions {
app = testCreateAppRunning(null);
rmDispatcher.await();
assertAppState(RMAppState.RUNNING, app);
report = app.createAndGetApplicationReport("clientuser", true);
Assert.assertNull(report.getClientToAMToken());
// this method is to make AMLaunchedTransition invoked inside which
// ClientTokenMasterKey is registered in ClientTokenSecretManager
moveCurrentAttemptToLaunchedState(app.getCurrentAppAttempt());
report = app.createAndGetApplicationReport(null, true);
Assert.assertNull(report.getClientToAMToken());
report = app.createAndGetApplicationReport("clientuser", true);
@ -830,4 +850,33 @@ public class TestRMAppTransitions {
report = app.createAndGetApplicationReport("clientuser", true);
Assert.assertNull(report.getClientToAMToken());
}
@SuppressWarnings("unchecked")
private void moveCurrentAttemptToLaunchedState(RMAppAttempt attempt) {
attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(),
RMAppAttemptEventType.APP_ACCEPTED));
// Mock the allocation of AM container
Container container = mock(Container.class);
Resource resource = BuilderUtils.newResource(2048, 1);
when(container.getId()).thenReturn(
BuilderUtils.newContainerId(attempt.getAppAttemptId(), 1));
when(container.getResource()).thenReturn(resource);
Allocation allocation = mock(Allocation.class);
when(allocation.getContainers()).thenReturn(
Collections.singletonList(container));
when(allocation.getContainers()).
thenReturn(Collections.singletonList(container));
when(
scheduler.allocate(any(ApplicationAttemptId.class), any(List.class),
any(List.class), any(List.class), any(List.class))).thenReturn(
allocation);
attempt.handle(new RMAppAttemptContainerAllocatedEvent(attempt
.getAppAttemptId(), container));
attempt
.handle(new RMAppAttemptStoredEvent(attempt.getAppAttemptId(), null));
attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
assertEquals(RMAppAttemptState.LAUNCHED, attempt.getAppAttemptState());
}
}

View File

@ -25,10 +25,10 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import java.util.Arrays;
import java.util.Collection;
@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
@ -83,9 +82,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -100,7 +100,7 @@ public class TestRMAppAttemptTransitions {
private static final String EMPTY_DIAGNOSTICS = "";
private static final String RM_WEBAPP_ADDR =
YarnConfiguration.getRMWebAppHostAndPort(new Configuration());
WebAppUtils.getResolvedRMWebAppURLWithoutScheme(new Configuration());
private boolean isSecurityEnabled;
private RMContext rmContext;
@ -294,9 +294,11 @@ public class TestRMAppAttemptTransitions {
assertEquals(0, applicationAttempt.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus());
if (UserGroupInformation.isSecurityEnabled()) {
verify(clientToAMTokenManager).registerApplication(
verify(clientToAMTokenManager).createMasterKey(
applicationAttempt.getAppAttemptId());
assertNotNull(applicationAttempt.createClientToken("some client"));
// can't create ClientToken as at this time ClientTokenMasterKey has
// not been registered in the SecretManager
assertNull(applicationAttempt.createClientToken("some client"));
}
assertNull(applicationAttempt.createClientToken(null));
assertNotNull(applicationAttempt.getAMRMToken());
@ -428,7 +430,11 @@ public class TestRMAppAttemptTransitions {
assertEquals(RMAppAttemptState.LAUNCHED,
applicationAttempt.getAppAttemptState());
assertEquals(container, applicationAttempt.getMasterContainer());
if (UserGroupInformation.isSecurityEnabled()) {
// ClientTokenMasterKey has been registered in SecretManager, it's able to
// create ClientToken now
assertNotNull(applicationAttempt.createClientToken("some client"));
}
// TODO - need to add more checks relevant to this state
}
@ -561,6 +567,11 @@ public class TestRMAppAttemptTransitions {
}
private void launchApplicationAttempt(Container container) {
if (UserGroupInformation.isSecurityEnabled()) {
// Before LAUNCHED state, can't create ClientToken as at this time
// ClientTokenMasterKey has not been registered in the SecretManager
assertNull(applicationAttempt.createClientToken("some client"));
}
applicationAttempt.handle(
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));

View File

@ -194,16 +194,6 @@ public class TestClientToAMTokens {
nm1.nodeHeartbeat(true);
dispatcher.await();
// Get the app-report.
GetApplicationReportRequest request =
Records.newRecord(GetApplicationReportRequest.class);
request.setApplicationId(app.getApplicationId());
GetApplicationReportResponse reportResponse =
rm.getClientRMService().getApplicationReport(request);
ApplicationReport appReport = reportResponse.getApplicationReport();
org.apache.hadoop.yarn.api.records.Token originalClientToAMToken =
appReport.getClientToAMToken();
ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId();
final MockAM mockAM =
new MockAM(rm.getRMContext(), rm.getApplicationMasterService(),
@ -224,7 +214,17 @@ public class TestClientToAMTokens {
return response;
}
});
// Get the app-report.
GetApplicationReportRequest request =
Records.newRecord(GetApplicationReportRequest.class);
request.setApplicationId(app.getApplicationId());
GetApplicationReportResponse reportResponse =
rm.getClientRMService().getApplicationReport(request);
ApplicationReport appReport = reportResponse.getApplicationReport();
org.apache.hadoop.yarn.api.records.Token originalClientToAMToken =
appReport.getClientToAMToken();
// ClientToAMToken master key should have been received on register
// application master response.
Assert.assertNotNull(response.getClientToAMTokenMasterKey());

View File

@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
@ -39,7 +40,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
/**
* Embedded Yarn minicluster for testcases that need to interact with a cluster.
@ -204,8 +205,7 @@ public class MiniYARNCluster extends CompositeService {
hostname + ":0");
getConfig().set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
hostname + ":0");
getConfig().set(YarnConfiguration.RM_WEBAPP_ADDRESS,
hostname + ":0");
WebAppUtils.setRMWebAppHostnameAndPort(getConfig(), hostname, 0);
}
resourceManager = new ResourceManager() {
@Override
@ -238,7 +238,7 @@ public class MiniYARNCluster extends CompositeService {
LOG.info("MiniYARN ResourceManager address: " +
getConfig().get(YarnConfiguration.RM_ADDRESS));
LOG.info("MiniYARN ResourceManager web address: " +
getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
}
@Override
@ -317,8 +317,9 @@ public class MiniYARNCluster extends CompositeService {
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
WebAppUtils
.setNMWebAppHostNameAndPort(getConfig(),
MiniYARNCluster.getHostname(), 0);
// Disable resource checks by default
if (!getConfig().getBoolean(

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -62,7 +63,7 @@ public class WebAppProxy extends AbstractService {
CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION +
" of " + auth);
}
String proxy = YarnConfiguration.getProxyHostAndPort(conf);
String proxy = WebAppUtils.getProxyHostAndPort(conf);
String[] proxyParts = proxy.split(":");
proxyHost = proxyParts[0];

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.util.StringHelper;
import org.apache.hadoop.yarn.util.TrackingUriPlugin;
import org.apache.hadoop.yarn.webapp.MimeType;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
public class WebAppProxyServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
@ -94,7 +95,7 @@ public class WebAppProxyServlet extends HttpServlet {
conf.getInstances(YarnConfiguration.YARN_TRACKING_URL_GENERATOR,
TrackingUriPlugin.class);
this.rmAppPageUrlBase = StringHelper.pjoin(
YarnConfiguration.getRMWebAppURL(conf), "cluster", "app");
WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "app");
}
/**

View File

@ -26,7 +26,7 @@ import org.apache.hadoop.http.FilterContainer;
import org.apache.hadoop.http.FilterInitializer;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
public class AmFilterInitializer extends FilterInitializer {
private static final String FILTER_NAME = "AM_PROXY_FILTER";
@ -35,7 +35,7 @@ public class AmFilterInitializer extends FilterInitializer {
@Override
public void initFilter(FilterContainer container, Configuration conf) {
Map<String, String> params = new HashMap<String, String>();
String proxy = YarnConfiguration.getProxyHostAndPort(conf);
String proxy = WebAppUtils.getProxyHostAndPort(conf);
String[] parts = proxy.split(":");
params.put(AmIpFilter.PROXY_HOST, parts[0]);
params.put(AmIpFilter.PROXY_URI_BASE,