Merging r1518852 through r1519883 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1519885 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-09-03 23:20:45 +00:00
commit f353769d0c
75 changed files with 1548 additions and 565 deletions

View File

@ -357,6 +357,9 @@ Release 2.3.0 - UNRELEASED
HADOOP-9877. Fix listing of snapshot directories in globStatus.
(Binglin Chang via Andrew Wang)
HADOOP-9909. org.apache.hadoop.fs.Stat should permit other LANG.
(Shinichi Yamashita via Andrew Wang)
Release 2.1.1-beta - UNRELEASED
INCOMPATIBLE CHANGES
@ -396,6 +399,9 @@ Release 2.1.1-beta - UNRELEASED
HADOOP-9906. Move HAZKUtil to o.a.h.util.ZKUtil and make inner-classes
public (Karthik Kambatla via Sandy Ryza)
HADOOP-9918. Add addIfService to CompositeService (Karthik Kambatla via
Sandy Ryza)
OPTIMIZATIONS
BUG FIXES
@ -435,6 +441,9 @@ Release 2.1.1-beta - UNRELEASED
HADOOP-9894. Race condition in Shell leads to logged error stream handling
exceptions (Arpit Agarwal)
HADOOP-9774. RawLocalFileSystem.listStatus() return absolute paths when
input path is relative on Windows. (Shanyu Zhao via ivanmi)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -182,6 +182,18 @@ public class Path implements Comparable {
/** Construct a Path from components. */
public Path(String scheme, String authority, String path) {
checkPathArg( path );
// add a slash in front of paths with Windows drive letters
if (hasWindowsDrive(path) && path.charAt(0) != '/') {
path = "/" + path;
}
// add "./" in front of Linux relative paths so that a path containing
// a colon e.q. "a:b" will not be interpreted as scheme "a".
if (!WINDOWS && path.charAt(0) != '/') {
path = "./" + path;
}
initialize(scheme, authority, path, null);
}

View File

@ -393,7 +393,7 @@ public class RawLocalFileSystem extends FileSystem {
new DeprecatedRawLocalFileStatus(localf, getDefaultBlockSize(f), this)};
}
File[] names = localf.listFiles();
String[] names = localf.list();
if (names == null) {
return null;
}
@ -401,7 +401,9 @@ public class RawLocalFileSystem extends FileSystem {
int j = 0;
for (int i = 0; i < names.length; i++) {
try {
results[j] = getFileStatus(new Path(names[i].getAbsolutePath()));
// Assemble the path using the Path 3 arg constructor to make sure
// paths with colon are properly resolved on Linux
results[j] = getFileStatus(new Path(f, new Path(null, null, names[i])));
j++;
} catch (FileNotFoundException e) {
// ignore the files not found since the dir list may have have changed

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.fs;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.StringTokenizer;
@ -62,6 +64,10 @@ public class Stat extends Shell {
this.path = new Path(qualified.toUri().getPath());
this.blockSize = blockSize;
this.dereference = deref;
// LANG = C setting
Map<String, String> env = new HashMap<String, String>();
env.put("LANG", "C");
setEnvironment(env);
}
public FileStatus getFileStatus() throws IOException {

View File

@ -64,6 +64,11 @@ public class CompositeService extends AbstractService {
}
}
/**
* Add the passed {@link Service} to the list of services managed by this
* {@link CompositeService}
* @param service the {@link Service} to be added
*/
protected void addService(Service service) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding service " + service.getName());
@ -73,6 +78,21 @@ public class CompositeService extends AbstractService {
}
}
/**
* If the passed object is an instance of {@link Service},
* add it to the list of services managed by this {@link CompositeService}
* @param object
* @return true if a service is added, false otherwise.
*/
protected boolean addIfService(Object object) {
if (object instanceof Service) {
addService((Service) object);
return true;
} else {
return false;
}
}
protected synchronized boolean removeService(Service service) {
synchronized (serviceList) {
return serviceList.add(service);

View File

@ -540,6 +540,13 @@ abstract public class Shell {
protected abstract void parseExecResult(BufferedReader lines)
throws IOException;
/**
* Get the environment variable
*/
public String getEnvironment(String env) {
return environment.get(env);
}
/** get the current sub-process executing the given command
* @return process executing the command
*/

View File

@ -280,6 +280,21 @@ public class TestLocalFileSystem {
stats[0].getPath().toUri().getPath());
}
@Test
public void testListStatusReturnConsistentPathOnWindows() throws IOException {
assumeTrue(Shell.WINDOWS);
String dirNoDriveSpec = TEST_ROOT_DIR;
if (dirNoDriveSpec.charAt(1) == ':')
dirNoDriveSpec = dirNoDriveSpec.substring(2);
File file = new File(dirNoDriveSpec, "foo");
file.mkdirs();
FileStatus[] stats = fileSys.listStatus(new Path(dirNoDriveSpec));
assertEquals("Unexpected number of stats", 1, stats.length);
assertEquals("Bad path from stat", new Path(file.getPath()).toUri().getPath(),
stats[0].getPath().toUri().getPath());
}
@Test(timeout = 10000)
public void testReportChecksumFailure() throws IOException {
base.mkdirs();

View File

@ -159,6 +159,42 @@ public class TestPath extends TestCase {
}
}
@Test (timeout = 30000)
public void testPathThreeArgContructor() {
assertEquals(new Path("foo"), new Path(null, null, "foo"));
assertEquals(new Path("scheme:///foo"), new Path("scheme", null, "/foo"));
assertEquals(
new Path("scheme://authority/foo"),
new Path("scheme", "authority", "/foo"));
if (Path.WINDOWS) {
assertEquals(new Path("c:/foo/bar"), new Path(null, null, "c:/foo/bar"));
assertEquals(new Path("c:/foo/bar"), new Path(null, null, "/c:/foo/bar"));
} else {
assertEquals(new Path("./a:b"), new Path(null, null, "a:b"));
}
// Resolution tests
if (Path.WINDOWS) {
assertEquals(
new Path("c:/foo/bar"),
new Path("/fou", new Path(null, null, "c:/foo/bar")));
assertEquals(
new Path("c:/foo/bar"),
new Path("/fou", new Path(null, null, "/c:/foo/bar")));
assertEquals(
new Path("/foo/bar"),
new Path("/foo", new Path(null, null, "bar")));
} else {
assertEquals(
new Path("/foo/bar/a:b"),
new Path("/foo/bar", new Path(null, null, "a:b")));
assertEquals(
new Path("/a:b"),
new Path("/foo/bar", new Path(null, null, "/a:b")));
}
}
@Test (timeout = 30000)
public void testEquals() {
assertFalse(new Path("/").equals(new Path("/foo")));

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -119,4 +120,9 @@ public class TestStat {
// expected
}
}
@Test(timeout=10000)
public void testStatEnvironment() throws Exception {
assertEquals(stat.getEnvironment("LANG"), "C");
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.mount;
import java.util.List;
import org.apache.hadoop.nfs.security.NfsExports;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
@ -61,13 +62,26 @@ public class MountResponse {
}
/** Response for RPC call {@link MountInterface.MNTPROC#EXPORT} */
public static XDR writeExportList(XDR xdr, int xid, List<String> exports) {
public static XDR writeExportList(XDR xdr, int xid, List<String> exports,
List<NfsExports> hostMatcher) {
assert (exports.size() == hostMatcher.size());
RpcAcceptedReply.voidReply(xdr, xid);
for (String export : exports) {
for (int i = 0; i < exports.size(); i++) {
xdr.writeBoolean(true); // Value follows - yes
xdr.writeString(export);
xdr.writeInt(0);
xdr.writeString(exports.get(i));
// List host groups
String[] hostGroups = hostMatcher.get(i).getHostGroupList();
if (hostGroups.length > 0) {
for (int j = 0; j < hostGroups.length; j++) {
xdr.writeBoolean(true); // Value follows - yes
xdr.writeVariableOpaque(hostGroups[j].getBytes());
}
}
xdr.writeBoolean(false); // Value follows - no more group
}
xdr.writeBoolean(false); // Value follows - no
return xdr;
}

View File

@ -192,13 +192,13 @@ public class Nfs3Constant {
public static final String EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";";
/** Allowed hosts for nfs exports */
public static final String EXPORTS_ALLOWED_HOSTS_KEY = "hdfs.nfs.exports.allowed.hosts";
public static final String EXPORTS_ALLOWED_HOSTS_KEY = "dfs.nfs.exports.allowed.hosts";
public static final String EXPORTS_ALLOWED_HOSTS_KEY_DEFAULT = "* rw";
/** Size for nfs exports cache */
public static final String EXPORTS_CACHE_SIZE_KEY = "hdfs.nfs.exports.cache.size";
public static final String EXPORTS_CACHE_SIZE_KEY = "dfs.nfs.exports.cache.size";
public static final int EXPORTS_CACHE_SIZE_DEFAULT = 512;
/** Expiration time for nfs exports cache entry */
public static final String EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY = "hdfs.nfs.exports.cache.expirytime.millis";
public static final String EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY = "dfs.nfs.exports.cache.expirytime.millis";
public static final long EXPORTS_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 15 * 60 * 1000; // 15 min
public static final String FILE_DUMP_DIR_KEY = "dfs.nfs3.dump.dir";

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.security;
package org.apache.hadoop.nfs.security;
public enum AccessPrivilege {
READ_ONLY,

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.security;
package org.apache.hadoop.nfs.security;
import java.net.InetAddress;
import java.util.ArrayList;
@ -153,6 +153,19 @@ public class NfsExports {
}
}
/**
* Return the configured group list
*/
public String[] getHostGroupList() {
int listSize = mMatches.size();
String[] hostGroups = new String[listSize];
for (int i = 0; i < mMatches.size(); i++) {
hostGroups[i] = mMatches.get(i).getHostGroup();
}
return hostGroups;
}
public AccessPrivilege getAccessPrivilege(InetAddress addr) {
return getAccessPrivilege(addr.getHostAddress(),
addr.getCanonicalHostName());
@ -191,6 +204,7 @@ public class NfsExports {
}
public abstract boolean isIncluded(String address, String hostname);
public abstract String getHostGroup();
}
/**
@ -202,9 +216,14 @@ public class NfsExports {
}
@Override
public boolean isIncluded(String ip, String hostname) {
public boolean isIncluded(String address, String hostname) {
return true;
}
@Override
public String getHostGroup() {
return "*";
}
}
/**
@ -235,6 +254,11 @@ public class NfsExports {
}
return false;
}
@Override
public String getHostGroup() {
return subnetInfo.getAddress() + "/" + subnetInfo.getNetmask();
}
}
/**
@ -264,6 +288,11 @@ public class NfsExports {
}
return false;
}
@Override
public String getHostGroup() {
return ipOrHost;
}
}
/**
@ -293,6 +322,11 @@ public class NfsExports {
}
return false;
}
@Override
public String getHostGroup() {
return pattern.toString();
}
}
/**

View File

@ -15,12 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs.security;
package org.apache.hadoop.nfs.security;
import junit.framework.Assert;
import org.apache.hadoop.hdfs.nfs.security.AccessPrivilege;
import org.apache.hadoop.hdfs.nfs.security.NfsExports;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.junit.Test;

View File

@ -27,8 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.nfs.security.AccessPrivilege;
import org.apache.hadoop.hdfs.nfs.security.NfsExports;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.mount.MountEntry;
@ -36,6 +34,8 @@ import org.apache.hadoop.mount.MountInterface;
import org.apache.hadoop.mount.MountResponse;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.security.AccessPrivilege;
import org.apache.hadoop.nfs.security.NfsExports;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcProgram;
@ -184,7 +184,10 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
} else if (mntproc == MNTPROC.UMNTALL) {
umntall(out, xid, client);
} else if (mntproc == MNTPROC.EXPORT) {
out = MountResponse.writeExportList(out, xid, exports);
// Currently only support one NFS export "/"
List<NfsExports> hostsMatchers = new ArrayList<NfsExports>();
hostsMatchers.add(hostsMatcher);
out = MountResponse.writeExportList(out, xid, exports, hostsMatchers);
} else {
// Invalid procedure
RpcAcceptedReply.voidReply(out, xid,

View File

@ -26,10 +26,10 @@ import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Options;
@ -38,8 +38,6 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.nfs.security.AccessPrivilege;
import org.apache.hadoop.hdfs.nfs.security.NfsExports;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -98,6 +96,8 @@ import org.apache.hadoop.nfs.nfs3.response.VoidResponse;
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
import org.apache.hadoop.nfs.nfs3.response.WccAttr;
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.nfs.security.AccessPrivilege;
import org.apache.hadoop.nfs.security.NfsExports;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
import org.apache.hadoop.oncrpc.RpcAuthSys;

View File

@ -264,6 +264,9 @@ Release 2.3.0 - UNRELEASED
HDFS-4994. Audit log getContentSummary() calls. (Robert Parker via kihwal)
HDFS-5144. Document time unit to NameNodeMetrics. (Akira Ajisaka via
suresh)
OPTIMIZATIONS
BUG FIXES
@ -310,6 +313,9 @@ Release 2.1.1-beta - UNRELEASED
HDFS-5078 Support file append in NFSv3 gateway to enable data streaming
to HDFS (brandonli)
HDFS-5136 MNT EXPORT should give the full group list which can mount the
exports (brandonli)
IMPROVEMENTS
HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may
@ -342,6 +348,8 @@ Release 2.1.1-beta - UNRELEASED
HDFS-5128. Allow multiple net interfaces to be used with HA namenode RPC
server. (kihwal)
HDFS-5150. Allow per NN SPN for internal SPNEGO. (kihwal)
OPTIMIZATIONS
BUG FIXES
@ -404,6 +412,9 @@ Release 2.1.1-beta - UNRELEASED
HDFS-5077. NPE in FSNamesystem.commitBlockSynchronization().
(Plamen Jeliazkov via shv)
HDFS-5140. Too many safemode monitor threads being created in the standby
namenode causing it to fail with out of memory error. (jing9)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -373,7 +373,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final LeaseManager leaseManager = new LeaseManager(this);
Daemon smmthread = null; // SafeModeMonitor thread
volatile Daemon smmthread = null; // SafeModeMonitor thread
Daemon nnrmthread = null; // NamenodeResourceMonitor thread
@ -4575,7 +4575,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// Have to have write-lock since leaving safemode initializes
// repl queues, which requires write lock
assert hasWriteLock();
if (needEnter()) {
// if smmthread is already running, the block threshold must have been
// reached before, there is no need to enter the safe mode again
if (smmthread == null && needEnter()) {
enter();
// check if we are ready to initialize replication queues
if (canInitializeReplQueues() && !isPopulatingReplQueues()) {
@ -4584,7 +4586,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
reportStatus("STATE* Safe mode ON.", false);
return;
}
// the threshold is reached
// the threshold is reached or was reached before
if (!isOn() || // safe mode is off
extension <= 0 || threshold <= 0) { // don't need to wait
this.leave(); // leave safe mode
@ -4596,9 +4598,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
// start monitor
reached = now();
if (smmthread == null) {
smmthread = new Daemon(new SafeModeMonitor());
smmthread.start();
reportStatus("STATE* Safe mode extension entered.", true);
}
// check if we are ready to initialize replication queues
if (canInitializeReplQueues() && !isPopulatingReplQueues()) {
@ -4834,6 +4838,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (safeMode.canLeave()) {
// Leave safe mode.
safeMode.leave();
smmthread = null;
break;
}
} finally {
@ -4849,7 +4854,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (!fsRunning) {
LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread");
}
smmthread = null;
}
}

View File

@ -182,6 +182,7 @@ public class NameNode implements NameNodeStatusMXBean {
DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY,
DFS_NAMENODE_USER_NAME_KEY,
DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY,
DFS_HA_FENCE_METHODS_KEY,
DFS_HA_ZKFC_PORT_KEY,
DFS_HA_FENCE_METHODS_KEY

View File

@ -80,8 +80,10 @@ public class NameNodeMetrics {
@Metric("Block report") MutableRate blockReport;
MutableQuantiles[] blockReportQuantiles;
@Metric("Duration in SafeMode at startup") MutableGaugeInt safeModeTime;
@Metric("Time loading FS Image at startup") MutableGaugeInt fsImageLoadTime;
@Metric("Duration in SafeMode at startup in msec")
MutableGaugeInt safeModeTime;
@Metric("Time loading FS Image at startup in msec")
MutableGaugeInt fsImageLoadTime;
NameNodeMetrics(String processName, String sessionId, int[] intervals) {
registry.tag(ProcessName, processName).tag(SessionId, sessionId);

View File

@ -162,6 +162,8 @@ Release 2.3.0 - UNRELEASED
OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
BUG FIXES
MAPREDUCE-5316. job -list-attempt-ids command does not handle illegal

View File

@ -75,9 +75,9 @@ class YarnChild {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
LOG.debug("Child starting");
final JobConf defaultConf = new JobConf();
defaultConf.addResource(MRJobConfig.JOB_CONF_FILE);
UserGroupInformation.setConfiguration(defaultConf);
final JobConf job = new JobConf();
job.addResource(MRJobConfig.JOB_CONF_FILE);
UserGroupInformation.setConfiguration(job);
String host = args[0];
int port = Integer.parseInt(args[1]);
@ -111,7 +111,7 @@ class YarnChild {
@Override
public TaskUmbilicalProtocol run() throws Exception {
return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
TaskUmbilicalProtocol.versionID, address, defaultConf);
TaskUmbilicalProtocol.versionID, address, job);
}
});
@ -140,7 +140,7 @@ class YarnChild {
YarnChild.taskid = task.getTaskID();
// Create the job-conf and set credentials
final JobConf job = configureTask(task, credentials, jt);
configureTask(job, task, credentials, jt);
// log the system properties
String systemPropsToLog = MRApps.getSystemPropertiesToLog(job);
@ -260,9 +260,8 @@ class YarnChild {
job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString());
}
private static JobConf configureTask(Task task, Credentials credentials,
Token<JobTokenIdentifier> jt) throws IOException {
final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
private static void configureTask(JobConf job, Task task,
Credentials credentials, Token<JobTokenIdentifier> jt) throws IOException {
job.setCredentials(credentials);
ApplicationAttemptId appAttemptId =
@ -306,7 +305,6 @@ class YarnChild {
writeLocalJobFile(localTaskFile, job);
task.setJobFile(localTaskFile.toString());
task.setConf(job);
return job;
}
/**

View File

@ -626,12 +626,6 @@ public class MRAppMaster extends CompositeService {
}
}
protected void addIfService(Object object) {
if (object instanceof Service) {
addService((Service) object);
}
}
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
this.jobHistoryEventHandler = new JobHistoryEventHandler(context,

View File

@ -21,6 +21,9 @@ Release 2.3.0 - UNRELEASED
NEW FEATURES
YARN-649. Added a new NM web-service to serve container logs in plain text
over HTTP. (Sandy Ryza via vinodkv)
IMPROVEMENTS
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
@ -39,6 +42,9 @@ Release 2.1.1-beta - UNRELEASED
INCOMPATIBLE CHANGES
YARN-707. Added user information also in the YARN ClientToken so that AMs
can implement authorization based on incoming users. (Jason Lowe via vinodkv)
NEW FEATURES
IMPROVEMENTS
@ -66,6 +72,20 @@ Release 2.1.1-beta - UNRELEASED
YARN-1080. Improved help message for "yarn logs" command. (Xuan Gong via
vinodkv)
YARN-771. AMRMClient support for resource blacklisting (Junping Du via
bikas)
YARN-1117. Improved help messages for "yarn application" and "yarn node"
commands. (Xuan Gong via vinodkv)
YARN-1120. Made ApplicationConstants.Environment.USER definition OS neutral
as the corresponding value is now set correctly end-to-end. (Chuan Liu via
vinodkv)
YARN-1124. Modified YARN CLI application list to display new and submitted
applications together with running apps by default, following up YARN-1074.
(Xuan Gong via vinodkv)
OPTIMIZATIONS
BUG FIXES
@ -126,6 +146,9 @@ Release 2.1.1-beta - UNRELEASED
YARN-1101. Active nodes can be decremented below 0 (Robert Parker
via tgraves)
YARN-1077. Fixed TestContainerLaunch test failure on Windows. (Chuan Liu via
vinodkv)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -80,7 +80,7 @@ public interface ApplicationConstants {
* $USER
* Final, non-modifiable.
*/
USER(Shell.WINDOWS ? "USERNAME": "USER"),
USER("USER"),
/**
* $LOGNAME

View File

@ -286,4 +286,15 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
Priority priority,
String resourceName,
Resource capability);
/**
* Update application's blacklist with addition or removal resources.
*
* @param blacklistAdditions list of resources which should be added to the
* application blacklist
* @param blacklistRemovals list of resources which should be removed from the
* application blacklist
*/
public abstract void updateBlacklist(List<String> blacklistAdditions,
List<String> blacklistRemovals);
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@ -80,6 +81,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected Resource clusterAvailableResources;
protected int clusterNodeCount;
protected final Set<String> blacklistAdditions = new HashSet<String>();
protected final Set<String> blacklistRemovals = new HashSet<String>();
class ResourceRequestInfo {
ResourceRequest remoteRequest;
LinkedHashSet<T> containerRequests;
@ -199,9 +203,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
Preconditions.checkArgument(progressIndicator >= 0,
"Progress indicator should not be negative");
AllocateResponse allocateResponse = null;
ArrayList<ResourceRequest> askList = null;
ArrayList<ContainerId> releaseList = null;
List<ResourceRequest> askList = null;
List<ContainerId> releaseList = null;
AllocateRequest allocateRequest = null;
List<String> blacklistToAdd = new ArrayList<String>();
List<String> blacklistToRemove = new ArrayList<String>();
try {
synchronized (this) {
@ -217,9 +223,22 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
// optimistically clear this collection assuming no RPC failure
ask.clear();
release.clear();
blacklistToAdd.addAll(blacklistAdditions);
blacklistToRemove.addAll(blacklistRemovals);
ResourceBlacklistRequest blacklistRequest =
(blacklistToAdd != null) || (blacklistToRemove != null) ?
ResourceBlacklistRequest.newInstance(blacklistToAdd,
blacklistToRemove) : null;
allocateRequest =
AllocateRequest.newInstance(lastResponseId, progressIndicator,
askList, releaseList, null);
askList, releaseList, blacklistRequest);
// clear blacklistAdditions and blacklistRemovals before
// unsynchronized part
blacklistAdditions.clear();
blacklistRemovals.clear();
}
allocateResponse = rmClient.allocate(allocateRequest);
@ -253,6 +272,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
ask.add(oldAsk);
}
}
blacklistAdditions.addAll(blacklistToAdd);
blacklistRemovals.addAll(blacklistToRemove);
}
}
}
@ -604,4 +626,31 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
+ " #asks=" + ask.size());
}
}
@Override
public synchronized void updateBlacklist(List<String> blacklistAdditions,
List<String> blacklistRemovals) {
if (blacklistAdditions != null) {
this.blacklistAdditions.addAll(blacklistAdditions);
// if some resources are also in blacklistRemovals updated before, we
// should remove them here.
this.blacklistRemovals.removeAll(blacklistAdditions);
}
if (blacklistRemovals != null) {
this.blacklistRemovals.addAll(blacklistRemovals);
// if some resources are in blacklistAdditions before, we should remove
// them here.
this.blacklistAdditions.removeAll(blacklistRemovals);
}
if (blacklistAdditions != null && blacklistRemovals != null
&& blacklistAdditions.removeAll(blacklistRemovals)) {
// we allow resources to appear in addition list and removal list in the
// same invocation of updateBlacklist(), but should get a warn here.
LOG.warn("The same resources appear in both blacklistAdditions and " +
"blacklistRemovals in updateBlacklist.");
}
}
}

View File

@ -29,6 +29,7 @@ import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -70,32 +71,38 @@ public class ApplicationCLI extends YarnCLI {
Options opts = new Options();
opts.addOption(STATUS_CMD, true, "Prints the status of the application.");
opts.addOption(LIST_CMD, false, "List applications from the RM. " +
"Supports optional use of --appTypes to filter applications " +
"Supports optional use of -appTypes to filter applications " +
"based on application type, " +
"and --appStates to filter applications based on application state");
"and -appStates to filter applications based on application state");
opts.addOption(KILL_CMD, true, "Kills the application.");
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
Option appTypeOpt = new Option(APP_TYPE_CMD, true,
"Works with --list to filter applications based on their type.");
Option appTypeOpt = new Option(APP_TYPE_CMD, true, "Works with -list to " +
"filter applications based on " +
"input comma-separated list of application types.");
appTypeOpt.setValueSeparator(',');
appTypeOpt.setArgs(Option.UNLIMITED_VALUES);
appTypeOpt.setArgName("Comma-separated list of application types");
appTypeOpt.setArgName("Types");
opts.addOption(appTypeOpt);
Option appStateOpt =
new Option(
APP_STATE_CMD,
true,
"Works with --list to filter applications based on their state. "
+ getAllValidApplicationStates());
Option appStateOpt = new Option(APP_STATE_CMD, true, "Works with -list " +
"to filter applications based on input comma-separated list of " +
"application states. " + getAllValidApplicationStates());
appStateOpt.setValueSeparator(',');
appStateOpt.setArgs(Option.UNLIMITED_VALUES);
appStateOpt.setArgName("Comma-separated list of application states");
appStateOpt.setArgName("States");
opts.addOption(appStateOpt);
opts.getOption(KILL_CMD).setArgName("Application ID");
opts.getOption(STATUS_CMD).setArgName("Application ID");
CommandLine cliParser = new GnuParser().parse(opts, args);
int exitCode = -1;
CommandLine cliParser = null;
try {
cliParser = new GnuParser().parse(opts, args);
} catch (MissingArgumentException ex) {
sysout.println("Missing argument for options");
printUsage(opts);
return exitCode;
}
if (cliParser.hasOption(STATUS_CMD)) {
if (args.length != 2) {
printUsage(opts);
@ -187,6 +194,8 @@ public class ApplicationCLI extends YarnCLI {
} else {
if (appStates.isEmpty()) {
appStates.add(YarnApplicationState.RUNNING);
appStates.add(YarnApplicationState.ACCEPTED);
appStates.add(YarnApplicationState.SUBMITTED);
}
}

View File

@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang.time.DateFormatUtils;
@ -64,20 +65,29 @@ public class NodeCLI extends YarnCLI {
Options opts = new Options();
opts.addOption(STATUS_CMD, true, "Prints the status report of the node.");
opts.addOption(LIST_CMD, false, "List all running nodes. " +
"Supports optional use of --states to filter nodes " +
"based on node state, all --all to list all nodes.");
"Supports optional use of -states to filter nodes " +
"based on node state, all -all to list all nodes.");
Option nodeStateOpt = new Option(NODE_STATE_CMD, true,
"Works with -list to filter nodes based on their states.");
"Works with -list to filter nodes based on input comma-separated list of node states.");
nodeStateOpt.setValueSeparator(',');
nodeStateOpt.setArgs(Option.UNLIMITED_VALUES);
nodeStateOpt.setArgName("Comma-separated list of node states");
nodeStateOpt.setArgName("States");
opts.addOption(nodeStateOpt);
Option allOpt = new Option(NODE_ALL, false,
"Works with -list to list all nodes.");
opts.addOption(allOpt);
CommandLine cliParser = new GnuParser().parse(opts, args);
opts.getOption(STATUS_CMD).setArgName("NodeId");
int exitCode = -1;
CommandLine cliParser = null;
try {
cliParser = new GnuParser().parse(opts, args);
} catch (MissingArgumentException ex) {
sysout.println("Missing argument for options");
printUsage(opts);
return exitCode;
}
if (cliParser.hasOption("status")) {
if (args.length != 2) {
printUsage(opts);

View File

@ -18,13 +18,16 @@
package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -97,6 +100,7 @@ public class TestAMRMClient {
static String rack;
static String[] nodes;
static String[] racks;
private final static int DEFAULT_ITERATION = 3;
@BeforeClass
public static void setup() throws Exception {
@ -477,6 +481,144 @@ public class TestAMRMClient {
}
}
@Test (timeout=60000)
public void testAllocationWithBlacklist() throws YarnException, IOException {
AMRMClientImpl<ContainerRequest> amClient = null;
try {
// start am rm client
amClient =
(AMRMClientImpl<ContainerRequest>) AMRMClient
.<ContainerRequest> createAMRMClient();
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
ContainerRequest storedContainer1 =
new ContainerRequest(capability, nodes, racks, priority);
amClient.addContainerRequest(storedContainer1);
assertTrue(amClient.ask.size() == 3);
assertTrue(amClient.release.size() == 0);
List<String> localNodeBlacklist = new ArrayList<String>();
localNodeBlacklist.add(node);
// put node in black list, so no container assignment
amClient.updateBlacklist(localNodeBlacklist, null);
int allocatedContainerCount = getAllocatedContainersNumber(amClient,
DEFAULT_ITERATION);
// the only node is in blacklist, so no allocation
assertTrue(allocatedContainerCount == 0);
// Remove node from blacklist, so get assigned with 2
amClient.updateBlacklist(null, localNodeBlacklist);
ContainerRequest storedContainer2 =
new ContainerRequest(capability, nodes, racks, priority);
amClient.addContainerRequest(storedContainer2);
allocatedContainerCount = getAllocatedContainersNumber(amClient,
DEFAULT_ITERATION);
assertEquals(allocatedContainerCount, 2);
// Test in case exception in allocate(), blacklist is kept
assertTrue(amClient.blacklistAdditions.isEmpty());
assertTrue(amClient.blacklistRemovals.isEmpty());
// create a invalid ContainerRequest - memory value is minus
ContainerRequest invalidContainerRequest =
new ContainerRequest(Resource.newInstance(-1024, 1),
nodes, racks, priority);
amClient.addContainerRequest(invalidContainerRequest);
amClient.updateBlacklist(localNodeBlacklist, null);
try {
// allocate() should complain as ContainerRequest is invalid.
amClient.allocate(0.1f);
fail("there should be an exception here.");
} catch (Exception e) {
assertEquals(amClient.blacklistAdditions.size(), 1);
}
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
amClient.stop();
}
}
}
@Test (timeout=60000)
public void testAMRMClientWithBlacklist() throws YarnException, IOException {
AMRMClientImpl<ContainerRequest> amClient = null;
try {
// start am rm client
amClient =
(AMRMClientImpl<ContainerRequest>) AMRMClient
.<ContainerRequest> createAMRMClient();
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
String[] nodes = {"node1", "node2", "node3"};
// Add nodes[0] and nodes[1]
List<String> nodeList01 = new ArrayList<String>();
nodeList01.add(nodes[0]);
nodeList01.add(nodes[1]);
amClient.updateBlacklist(nodeList01, null);
assertEquals(amClient.blacklistAdditions.size(),2);
assertEquals(amClient.blacklistRemovals.size(),0);
// Add nodes[0] again, verify it is not added duplicated.
List<String> nodeList02 = new ArrayList<String>();
nodeList02.add(nodes[0]);
nodeList02.add(nodes[2]);
amClient.updateBlacklist(nodeList02, null);
assertEquals(amClient.blacklistAdditions.size(),3);
assertEquals(amClient.blacklistRemovals.size(),0);
// Add nodes[1] and nodes[2] to removal list,
// Verify addition list remove these two nodes.
List<String> nodeList12 = new ArrayList<String>();
nodeList12.add(nodes[1]);
nodeList12.add(nodes[2]);
amClient.updateBlacklist(null, nodeList12);
assertEquals(amClient.blacklistAdditions.size(),1);
assertEquals(amClient.blacklistRemovals.size(),2);
// Add nodes[1] again to addition list,
// Verify removal list will remove this node.
List<String> nodeList1 = new ArrayList<String>();
nodeList1.add(nodes[1]);
amClient.updateBlacklist(nodeList1, null);
assertEquals(amClient.blacklistAdditions.size(),2);
assertEquals(amClient.blacklistRemovals.size(),1);
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
amClient.stop();
}
}
}
private int getAllocatedContainersNumber(
AMRMClientImpl<ContainerRequest> amClient, int iterationsLeft)
throws YarnException, IOException {
int allocatedContainerCount = 0;
while (iterationsLeft-- > 0) {
Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft);
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
assertTrue(nodeCount == amClient.getClusterNodeCount());
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
if(allocatedContainerCount == 0) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(100);
}
}
return allocatedContainerCount;
}
@Test (timeout=60000)
public void testAMRMClient() throws YarnException, IOException {
AMRMClient<ContainerRequest> amClient = null;

View File

@ -86,6 +86,7 @@ public class TestYarnClient {
client.init(conf);
client.start();
client.stop();
rm.stop();
}
@Test (timeout = 30000)

View File

@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doThrow;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.ArrayList;
@ -72,6 +73,7 @@ public class TestYarnCLI {
sysOut = spy(new PrintStream(sysOutStream));
sysErrStream = new ByteArrayOutputStream();
sysErr = spy(new PrintStream(sysErrStream));
System.setOut(sysOut);
}
@Test
@ -167,15 +169,35 @@ public class TestYarnCLI {
null);
applicationReports.add(newApplicationReport4);
ApplicationId applicationId5 = ApplicationId.newInstance(1234, 9);
ApplicationReport newApplicationReport5 = ApplicationReport.newInstance(
applicationId5, ApplicationAttemptId.newInstance(applicationId5, 5),
"user5", "queue5", "appname5", "host5", 128, null,
YarnApplicationState.ACCEPTED, "diagnostics5", "url5", 5, 5,
FinalApplicationStatus.KILLED, null, "N/A", 0.93789f, "HIVE",
null);
applicationReports.add(newApplicationReport5);
ApplicationId applicationId6 = ApplicationId.newInstance(1234, 10);
ApplicationReport newApplicationReport6 = ApplicationReport.newInstance(
applicationId6, ApplicationAttemptId.newInstance(applicationId6, 6),
"user6", "queue6", "appname6", "host6", 129, null,
YarnApplicationState.SUBMITTED, "diagnostics6", "url6", 6, 6,
FinalApplicationStatus.KILLED, null, "N/A", 0.99789f, "PIG",
null);
applicationReports.add(newApplicationReport6);
// Test command yarn application -list
// if the set appStates is empty, RUNNING state will be automatically added
// to the appStates list
// the output of yarn application -list should be the same as
// equals to yarn application -list --appStates RUNNING
// equals to yarn application -list --appStates RUNNING,ACCEPTED,SUBMITTED
Set<String> appType1 = new HashSet<String>();
EnumSet<YarnApplicationState> appState1 =
EnumSet.noneOf(YarnApplicationState.class);
appState1.add(YarnApplicationState.RUNNING);
appState1.add(YarnApplicationState.ACCEPTED);
appState1.add(YarnApplicationState.SUBMITTED);
when(client.getApplications(appType1, appState1)).thenReturn(
getApplicationReports(applicationReports, appType1, appState1, false));
int result = cli.run(new String[] { "-list" });
@ -185,7 +207,7 @@ public class TestYarnCLI {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
pw.println("Total number of applications (application-types: " + appType1
+ " and states: " + appState1 + ")" + ":" + 2);
+ " and states: " + appState1 + ")" + ":" + 4);
pw.print(" Application-Id\t Application-Name");
pw.print("\t Application-Type");
pw.print("\t User\t Queue\t State\t ");
@ -201,6 +223,16 @@ public class TestYarnCLI {
pw.print("queue3\t RUNNING\t ");
pw.print("SUCCEEDED\t 73.79%");
pw.println("\t N/A");
pw.print(" application_1234_0009\t ");
pw.print("appname5\t HIVE\t user5\t ");
pw.print("queue5\t ACCEPTED\t ");
pw.print("KILLED\t 93.79%");
pw.println("\t N/A");
pw.print(" application_1234_0010\t ");
pw.print("appname6\t PIG\t user6\t ");
pw.print("queue6\t SUBMITTED\t ");
pw.print("KILLED\t 99.79%");
pw.println("\t N/A");
pw.close();
String appsReportStr = baos.toString("UTF-8");
Assert.assertEquals(appsReportStr, sysOutStream.toString());
@ -208,7 +240,8 @@ public class TestYarnCLI {
//Test command yarn application -list --appTypes apptype1,apptype2
//the output should be the same as
//yarn application -list --appTypes apptyp1, apptype2 --appStates RUNNING
// yarn application -list --appTypes apptyp1, apptype2 --appStates
// RUNNING,ACCEPTED,SUBMITTED
sysOutStream.reset();
Set<String> appType2 = new HashSet<String>();
appType2.add("YARN");
@ -217,6 +250,8 @@ public class TestYarnCLI {
EnumSet<YarnApplicationState> appState2 =
EnumSet.noneOf(YarnApplicationState.class);
appState2.add(YarnApplicationState.RUNNING);
appState2.add(YarnApplicationState.ACCEPTED);
appState2.add(YarnApplicationState.SUBMITTED);
when(client.getApplications(appType2, appState2)).thenReturn(
getApplicationReports(applicationReports, appType2, appState2, false));
result =
@ -358,7 +393,7 @@ public class TestYarnCLI {
baos = new ByteArrayOutputStream();
pw = new PrintWriter(baos);
pw.println("Total number of applications (application-types: " + appType5
+ " and states: " + appState5 + ")" + ":" + 4);
+ " and states: " + appState5 + ")" + ":" + 6);
pw.print(" Application-Id\t Application-Name");
pw.print("\t Application-Type");
pw.print("\t User\t Queue\t State\t ");
@ -384,6 +419,16 @@ public class TestYarnCLI {
pw.print("queue4\t FAILED\t ");
pw.print("SUCCEEDED\t 83.79%");
pw.println("\t N/A");
pw.print(" application_1234_0009\t ");
pw.print("appname5\t HIVE\t user5\t ");
pw.print("queue5\t ACCEPTED\t ");
pw.print("KILLED\t 93.79%");
pw.println("\t N/A");
pw.print(" application_1234_0010\t ");
pw.print("appname6\t PIG\t user6\t ");
pw.print("queue6\t SUBMITTED\t ");
pw.print("KILLED\t 99.79%");
pw.println("\t N/A");
pw.close();
appsReportStr = baos.toString("UTF-8");
Assert.assertEquals(appsReportStr, sysOutStream.toString());
@ -456,21 +501,40 @@ public class TestYarnCLI {
}
@Test (timeout = 10000)
public void testHelpCommand() throws Exception {
public void testAppsHelpCommand() throws Exception {
ApplicationCLI cli = createAndGetAppCLI();
ApplicationCLI spyCli = spy(cli);
int result = spyCli.run(new String[] { "-help" });
Assert.assertTrue(result == 0);
verify(spyCli).printUsage(any(Options.class));
Assert.assertEquals(createApplicationCLIHelpMessage(),
sysOutStream.toString());
sysOutStream.reset();
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
result =
cli.run(new String[] { "-kill", applicationId.toString(), "args" });
verify(spyCli).printUsage(any(Options.class));
Assert.assertEquals(createApplicationCLIHelpMessage(),
sysOutStream.toString());
sysOutStream.reset();
NodeId nodeId = NodeId.newInstance("host0", 0);
result = cli.run(new String[] { "-status", nodeId.toString(), "args" });
verify(spyCli).printUsage(any(Options.class));
Assert.assertEquals(createApplicationCLIHelpMessage(),
sysOutStream.toString());
}
@Test (timeout = 5000)
public void testNodesHelpCommand() throws Exception {
NodeCLI nodeCLI = new NodeCLI();
nodeCLI.setClient(client);
nodeCLI.setSysOutPrintStream(sysOut);
nodeCLI.setSysErrPrintStream(sysErr);
nodeCLI.run(new String[] {});
Assert.assertEquals(createNodeCLIHelpMessage(),
sysOutStream.toString());
}
@Test
@ -806,6 +870,25 @@ public class TestYarnCLI {
verifyUsageInfo(new NodeCLI());
}
@Test
public void testMissingArguments() throws Exception {
ApplicationCLI cli = createAndGetAppCLI();
int result = cli.run(new String[] { "-status" });
Assert.assertEquals(result, -1);
Assert.assertEquals("Missing argument for options\n"
+ createApplicationCLIHelpMessage(), sysOutStream.toString());
sysOutStream.reset();
NodeCLI nodeCLI = new NodeCLI();
nodeCLI.setClient(client);
nodeCLI.setSysOutPrintStream(sysOut);
nodeCLI.setSysErrPrintStream(sysErr);
result = nodeCLI.run(new String[] { "-status" });
Assert.assertEquals(result, -1);
Assert.assertEquals("Missing argument for options\n"
+ createNodeCLIHelpMessage(), sysOutStream.toString());
}
private void verifyUsageInfo(YarnCLI cli) throws Exception {
cli.setSysErrPrintStream(sysErr);
cli.run(new String[0]);
@ -832,4 +915,45 @@ public class TestYarnCLI {
return cli;
}
private String createApplicationCLIHelpMessage() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
pw.println("usage: application");
pw.println(" -appStates <States> Works with -list to filter applications based");
pw.println(" on input comma-separated list of application");
pw.println(" states. The valid application state can be one");
pw.println(" of the following:");
pw.println(" ALL,NEW,NEW_SAVING,SUBMITTED,ACCEPTED,RUNNING,");
pw.println(" FINISHED,FAILED,KILLED");
pw.println(" -appTypes <Types> Works with -list to filter applications based");
pw.println(" on input comma-separated list of application");
pw.println(" types.");
pw.println(" -help Displays help for all commands.");
pw.println(" -kill <Application ID> Kills the application.");
pw.println(" -list List applications from the RM. Supports");
pw.println(" optional use of -appTypes to filter");
pw.println(" applications based on application type, and");
pw.println(" -appStates to filter applications based on");
pw.println(" application state");
pw.println(" -status <Application ID> Prints the status of the application.");
pw.close();
String appsHelpStr = baos.toString("UTF-8");
return appsHelpStr;
}
private String createNodeCLIHelpMessage() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
pw.println("usage: node");
pw.println(" -all Works with -list to list all nodes.");
pw.println(" -list List all running nodes. Supports optional use of");
pw.println(" -states to filter nodes based on node state, all -all");
pw.println(" to list all nodes.");
pw.println(" -states <States> Works with -list to filter nodes based on input");
pw.println(" comma-separated list of node states.");
pw.println(" -status <NodeId> Prints the status report of the node.");
pw.close();
String nodesHelpStr = baos.toString("UTF-8");
return nodesHelpStr;
}
}

View File

@ -125,7 +125,7 @@ public class ResourceBlacklistRequestPBImpl extends ResourceBlacklistRequest {
@Override
public void setBlacklistAdditions(List<String> resourceNames) {
if (resourceNames == null) {
if (resourceNames == null || resourceNames.isEmpty()) {
if (this.blacklistAdditions != null) {
this.blacklistAdditions.clear();
}
@ -144,7 +144,7 @@ public class ResourceBlacklistRequestPBImpl extends ResourceBlacklistRequest {
@Override
public void setBlacklistRemovals(List<String> resourceNames) {
if (resourceNames == null) {
if (resourceNames == null || resourceNames.isEmpty()) {
if (this.blacklistRemovals != null) {
this.blacklistRemovals.clear();
}

View File

@ -39,6 +39,7 @@ public class ClientToAMTokenIdentifier extends TokenIdentifier {
public static final Text KIND_NAME = new Text("YARN_CLIENT_TOKEN");
private ApplicationAttemptId applicationAttemptId;
private Text clientName = new Text();
// TODO: Add more information in the tokenID such that it is not
// transferrable, more secure etc.
@ -46,21 +47,27 @@ public class ClientToAMTokenIdentifier extends TokenIdentifier {
public ClientToAMTokenIdentifier() {
}
public ClientToAMTokenIdentifier(ApplicationAttemptId id) {
public ClientToAMTokenIdentifier(ApplicationAttemptId id, String client) {
this();
this.applicationAttemptId = id;
this.clientName = new Text(client);
}
public ApplicationAttemptId getApplicationAttemptID() {
return this.applicationAttemptId;
}
public String getClientName() {
return this.clientName.toString();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.applicationAttemptId.getApplicationId()
.getClusterTimestamp());
out.writeInt(this.applicationAttemptId.getApplicationId().getId());
out.writeInt(this.applicationAttemptId.getAttemptId());
this.clientName.write(out);
}
@Override
@ -68,6 +75,7 @@ public class ClientToAMTokenIdentifier extends TokenIdentifier {
this.applicationAttemptId =
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(in.readLong(), in.readInt()), in.readInt());
this.clientName.readFields(in);
}
@Override
@ -77,10 +85,10 @@ public class ClientToAMTokenIdentifier extends TokenIdentifier {
@Override
public UserGroupInformation getUser() {
if (this.applicationAttemptId == null) {
if (this.clientName == null) {
return null;
}
return UserGroupInformation.createRemoteUser(this.applicationAttemptId.toString());
return UserGroupInformation.createRemoteUser(this.clientName.toString());
}
@InterfaceAudience.Private

View File

@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
public class ClientToAMTokenSecretManager extends
BaseClientToAMTokenSecretManager {
// Only one client-token and one master-key for AM
// Only one master-key for AM
private SecretKey masterKey;
public ClientToAMTokenSecretManager(
@ -53,7 +53,7 @@ public class ClientToAMTokenSecretManager extends
@Override
public SecretKey getMasterKey(ApplicationAttemptId applicationAttemptID) {
// Only one client-token and one master-key for AM, just return that.
// Only one master-key for AM, just return that.
return this.masterKey;
}

View File

@ -198,7 +198,8 @@ public class ConverterUtils {
Iterator<String> it = _split(appIdStr).iterator();
if (!it.next().equals(APPLICATION_PREFIX)) {
throw new IllegalArgumentException("Invalid ApplicationId prefix: "
+ appIdStr);
+ appIdStr + ". The valid ApplicationId should start with prefix "
+ APPLICATION_PREFIX);
}
try {
return toApplicationId(it);

View File

@ -242,9 +242,8 @@ public class WebApps {
for(Map.Entry<String, Object> entry : attributes.entrySet()) {
server.setAttribute(entry.getKey(), entry.getValue());
}
String webAppPath = "/" + name + "/*";
server.defineFilter(server.getWebAppContext(), "guice",
GuiceFilter.class.getName(), null, new String[] { webAppPath, "/" });
GuiceFilter.class.getName(), null, new String[] { "/*" });
webapp.setConf(conf);
webapp.setHttpServer(server);

View File

@ -19,11 +19,14 @@
package org.apache.hadoop.yarn.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.BreakableService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
@ -315,6 +318,26 @@ public class TestCompositeService {
assertInState(STATE.INITED, child);
}
@Test (timeout = 1000)
public void testAddIfService() {
CompositeService testService = new CompositeService("TestService") {
Service service;
@Override
public void serviceInit(Configuration conf) {
Integer notAService = new Integer(0);
assertFalse("Added an integer as a service",
addIfService(notAService));
service = new AbstractService("Service") {};
assertTrue("Unable to add a service", addIfService(service));
}
};
testService.init(new Configuration());
assertEquals("Incorrect number of services",
1, testService.getServices().size());
}
public static class CompositeServiceAddingAChild extends CompositeService{
Service child;

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by joblicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.webapp;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import javax.ws.rs.ext.ContextResolver;
import javax.ws.rs.ext.Provider;
import javax.xml.bind.JAXBContext;
import org.apache.hadoop.yarn.webapp.MyTestWebService.MyInfo;
import com.google.inject.Singleton;
import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.api.json.JSONJAXBContext;
@Singleton
@Provider
public class MyTestJAXBContextResolver implements ContextResolver<JAXBContext> {
private JAXBContext context;
private final Set<Class> types;
// you have to specify all the dao classes here
private final Class[] cTypes = { MyInfo.class };
public MyTestJAXBContextResolver() throws Exception {
this.types = new HashSet<Class>(Arrays.asList(cTypes));
this.context =
new JSONJAXBContext(JSONConfiguration.natural().rootUnwrapping(false)
.build(), cTypes);
}
@Override
public JAXBContext getContext(Class<?> objectType) {
return (types.contains(objectType)) ? context : null;
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by joblicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.webapp;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import com.google.inject.Singleton;
@Singleton
@Path("/ws/v1/test")
public class MyTestWebService {
@GET
@Produces({ MediaType.APPLICATION_XML })
public MyInfo get() {
return new MyInfo();
}
@XmlRootElement(name = "myInfo")
@XmlAccessorType(XmlAccessType.FIELD)
static class MyInfo {
public MyInfo() {
}
}
}

View File

@ -18,30 +18,47 @@
package org.apache.hadoop.yarn.webapp;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.webapp.Controller;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.view.HtmlPage;
import org.apache.hadoop.yarn.webapp.view.JQueryUI;
import org.apache.hadoop.yarn.webapp.view.TextPage;
import com.google.inject.Inject;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_TABLE;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._TH;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import static org.apache.hadoop.yarn.util.StringHelper.*;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.ext.ContextResolver;
import javax.ws.rs.ext.Provider;
import javax.xml.bind.JAXBContext;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.webapp.view.HtmlPage;
import org.apache.hadoop.yarn.webapp.view.JQueryUI;
import org.apache.hadoop.yarn.webapp.view.TextPage;
import org.junit.Test;
import static org.junit.Assert.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.api.json.JSONJAXBContext;
public class TestWebApp {
static final Logger LOG = LoggerFactory.getLogger(TestWebApp.class);
@ -227,8 +244,13 @@ public class TestWebApp {
}
@Test public void testCustomRoutes() throws Exception {
WebApp app = WebApps.$for("test", this).start(new WebApp() {
@Override public void setup() {
WebApp app =
WebApps.$for("test", TestWebApp.class, this, "ws").start(new WebApp() {
@Override
public void setup() {
bind(MyTestJAXBContextResolver.class);
bind(MyTestWebService.class);
route("/:foo", FooController.class);
route("/bar/foo", FooController.class, "bar");
route("/foo/:foo", DefaultController.class);
@ -245,6 +267,31 @@ public class TestWebApp {
assertEquals("default1", getContent(baseUrl +"test/foo/1").trim());
assertEquals("default2", getContent(baseUrl +"test/foo/bar/2").trim());
assertEquals(404, getResponseCode(baseUrl +"test/goo"));
assertEquals(200, getResponseCode(baseUrl +"ws/v1/test"));
assertTrue(getContent(baseUrl +"ws/v1/test").contains("myInfo"));
} finally {
app.stop();
}
}
// This is to test the GuiceFilter should only be applied to webAppContext,
// not to staticContext and logContext;
@Test public void testYARNWebAppContext() throws Exception {
// setting up the log context
System.setProperty("hadoop.log.dir", "/Not/Existing/dir");
WebApp app = WebApps.$for("test", this).start(new WebApp() {
@Override public void setup() {
route("/", FooController.class);
}
});
String baseUrl = baseUrl(app);
try {
// should not redirect to foo
assertFalse("foo".equals(getContent(baseUrl +"static").trim()));
// Not able to access a non-existing dir, should not redirect to foo.
assertEquals(404, getResponseCode(baseUrl +"logs"));
// should be able to redirect to foo.
assertEquals("foo", getContent(baseUrl).trim());
} finally {
app.stop();
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
/**
* Context interface for sharing information across components in the
@ -61,4 +62,8 @@ public interface Context {
NodeHealthStatus getNodeHealthStatus();
ContainerManagementProtocol getContainerManager();
LocalDirsHandlerService getLocalDirsHandler();
ApplicationACLsManager getApplicationACLsManager();
}

View File

@ -123,7 +123,8 @@ public class NodeManager extends CompositeService
protected NMContext createNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager) {
return new NMContext(containerTokenSecretManager, nmTokenSecretManager);
return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
dirsHandler, aclsManager);
}
protected void doSecureLogin() throws IOException {
@ -142,9 +143,6 @@ public class NodeManager extends CompositeService
NMTokenSecretManagerInNM nmTokenSecretManager =
new NMTokenSecretManagerInNM();
this.context =
createNMContext(containerTokenSecretManager, nmTokenSecretManager);
this.aclsManager = new ApplicationACLsManager(conf);
ContainerExecutor exec = ReflectionUtils.newInstance(
@ -165,6 +163,8 @@ public class NodeManager extends CompositeService
addService(nodeHealthChecker);
dirsHandler = nodeHealthChecker.getDiskHandler();
this.context = createNMContext(containerTokenSecretManager,
nmTokenSecretManager);
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
@ -319,14 +319,19 @@ public class NodeManager extends CompositeService
private final NMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInNM nmTokenSecretManager;
private ContainerManagementProtocol containerManager;
private final LocalDirsHandlerService dirsHandler;
private final ApplicationACLsManager aclsManager;
private WebServer webServer;
private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
.getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager) {
NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager) {
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.dirsHandler = dirsHandler;
this.aclsManager = aclsManager;
this.nodeHealthStatus.setIsNodeHealthy(true);
this.nodeHealthStatus.setHealthReport("Healthy");
this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
@ -386,6 +391,16 @@ public class NodeManager extends CompositeService
public void setNodeId(NodeId nodeId) {
this.nodeId = nodeId;
}
@Override
public LocalDirsHandlerService getLocalDirsHandler() {
return dirsHandler;
}
@Override
public ApplicationACLsManager getApplicationACLsManager() {
return aclsManager;
}
}

View File

@ -85,7 +85,6 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@ -193,12 +192,6 @@ public class ContainerManagerImpl extends CompositeService implements
super.serviceInit(conf);
}
private void addIfService(Object object) {
if (object instanceof Service) {
addService((Service) object);
}
}
protected LogHandler createLogHandler(Configuration conf, Context context,
DeletionService deletionService) {
if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
@ -475,8 +468,7 @@ public class ContainerManagerImpl extends CompositeService implements
// Create the application
Application application =
new ApplicationImpl(dispatcher, this.aclsManager, user, applicationID,
credentials, context);
new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app " + applicationID);

View File

@ -73,14 +73,13 @@ public class ApplicationImpl implements Application {
Map<ContainerId, Container> containers =
new HashMap<ContainerId, Container>();
public ApplicationImpl(Dispatcher dispatcher,
ApplicationACLsManager aclsManager, String user, ApplicationId appId,
public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId appId,
Credentials credentials, Context context) {
this.dispatcher = dispatcher;
this.user = user;
this.appId = appId;
this.credentials = credentials;
this.aclsManager = aclsManager;
this.aclsManager = context.getApplicationACLsManager();
this.context = context;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();

View File

@ -525,7 +525,8 @@ public class ContainerLaunch implements Callable<Integer> {
@Override
public void env(String key, String value) {
line("@set ", key, "=", value);
line("@set ", key, "=", value,
"\nif %errorlevel% neq 0 exit /b %errorlevel%");
}
@Override

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -149,6 +150,7 @@ public class ContainersLauncher extends AbstractService
dispatcher.getEventHandler().handle(
new ContainerExitEvent(containerId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
ExitCode.TERMINATED.getExitCode(),
"Container terminated before launch."));
}

View File

@ -28,36 +28,21 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import org.mortbay.log.Log;
import com.google.inject.Inject;
@ -90,19 +75,11 @@ public class ContainerLogsPage extends NMView {
public static class ContainersLogsBlock extends HtmlBlock implements
YarnWebParams {
private final Configuration conf;
private final Context nmContext;
private final ApplicationACLsManager aclsManager;
private final LocalDirsHandlerService dirsHandler;
@Inject
public ContainersLogsBlock(Configuration conf, Context context,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
this.conf = conf;
public ContainersLogsBlock(Context context) {
this.nmContext = context;
this.aclsManager = aclsManager;
this.dirsHandler = dirsHandler;
}
@Override
@ -118,102 +95,29 @@ public class ContainerLogsPage extends NMView {
ContainerId containerId;
try {
containerId = ConverterUtils.toContainerId($(CONTAINER_ID));
} catch (IllegalArgumentException e) {
html.h1("Invalid containerId " + $(CONTAINER_ID));
} catch (IllegalArgumentException ex) {
html.h1("Invalid container ID: " + $(CONTAINER_ID));
return;
}
ApplicationId applicationId = containerId.getApplicationAttemptId()
.getApplicationId();
Application application = this.nmContext.getApplications().get(
applicationId);
Container container = this.nmContext.getContainers().get(containerId);
if (application == null) {
html.h1(
"Unknown container. Container either has not started or "
+ "has already completed or "
+ "doesn't belong to this node at all.");
return;
}
if (container == null) {
// Container may have alerady completed, but logs not aggregated yet.
printLogs(html, containerId, applicationId, application);
return;
}
if (EnumSet.of(ContainerState.NEW, ContainerState.LOCALIZING,
ContainerState.LOCALIZED).contains(container.getContainerState())) {
html.h1("Container is not yet running. Current state is "
+ container.getContainerState());
return;
}
if (container.getContainerState() == ContainerState.LOCALIZATION_FAILED) {
html.h1("Container wasn't started. Localization failed.");
return;
}
if (EnumSet.of(ContainerState.RUNNING,
ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_SUCCESS).contains(
container.getContainerState())) {
printLogs(html, containerId, applicationId, application);
return;
}
if (EnumSet.of(ContainerState.KILLING,
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerState.CONTAINER_RESOURCES_CLEANINGUP).contains(
container.getContainerState())) {
//Container may have generated some logs before being killed.
printLogs(html, containerId, applicationId, application);
return;
}
if (container.getContainerState().equals(ContainerState.DONE)) {
// Prev state unknown. Logs may be available.
printLogs(html, containerId, applicationId, application);
return;
} else {
html.h1("Container is no longer running...");
return;
}
}
private void printLogs(Block html, ContainerId containerId,
ApplicationId applicationId, Application application) {
// Check for the authorization.
String remoteUser = request().getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
if (callerUGI != null
&& !this.aclsManager.checkAccess(callerUGI,
ApplicationAccessType.VIEW_APP, application.getUser(),
applicationId)) {
html.h1(
"User [" + remoteUser
+ "] is not authorized to view the logs for application "
+ applicationId);
return;
}
if (!$(CONTAINER_LOG_TYPE).isEmpty()) {
File logFile = null;
try {
URI logPathURI = new URI(this.dirsHandler.getLogPathToRead(
ContainerLaunch.getRelativeContainerLogDir(
applicationId.toString(), containerId.toString())
+ Path.SEPARATOR + $(CONTAINER_LOG_TYPE)).toString());
logFile = new File(logPathURI.getPath());
} catch (URISyntaxException e) {
html.h1("Cannot find this log on the local disk.");
return;
} catch (Exception e) {
html.h1("Cannot find this log on the local disk.");
return;
if ($(CONTAINER_LOG_TYPE).isEmpty()) {
List<File> logFiles = ContainerLogsUtils.getContainerLogDirs(containerId,
request().getRemoteUser(), nmContext);
printLogFileDirectory(html, logFiles);
} else {
File logFile = ContainerLogsUtils.getContainerLogFile(containerId,
$(CONTAINER_LOG_TYPE), request().getRemoteUser(), nmContext);
printLogFile(html, logFile);
}
} catch (YarnException ex) {
html.h1(ex.getMessage());
} catch (NotFoundException ex) {
html.h1(ex.getMessage());
}
}
private void printLogFile(Block html, File logFile) {
long start =
$("start").isEmpty() ? -4 * 1024 : Long.parseLong($("start"));
start = start < 0 ? logFile.length() + start : start;
@ -230,22 +134,10 @@ public class ContainerLogsPage extends NMView {
FileInputStream logByteStream = null;
try {
logByteStream =
SecureIOUtils.openForRead(logFile, application.getUser(), null);
} catch (IOException e) {
LOG.error(
"Exception reading log file " + logFile.getAbsolutePath(), e);
if (e.getMessage().contains(
"did not match expected owner '" + application.getUser()
+ "'")) {
html.h1("Exception reading log file. Application submitted by '"
+ application.getUser()
+ "' doesn't own requested log file : "
+ logFile.getName());
} else {
html.h1("Exception reading log file. It might be because log "
+ "file was aggregated : " + logFile.getName());
}
logByteStream = ContainerLogsUtils.openLogFileForRead($(CONTAINER_ID),
logFile, nmContext);
} catch (IOException ex) {
html.h1(ex.getMessage());
return;
}
@ -257,7 +149,7 @@ public class ContainerLogsPage extends NMView {
logFile.getName(), "?start=0"), "here").
_(" for full log")._();
}
// TODO Fix findBugs close warning along with IOUtils change
IOUtils.skipFully(logByteStream, start);
InputStreamReader reader = new InputStreamReader(logByteStream);
int bufferSize = 65536;
@ -292,10 +184,10 @@ public class ContainerLogsPage extends NMView {
}
}
}
} else {
}
private void printLogFileDirectory(Block html, List<File> containerLogsDirs) {
// Print out log types in lexical order
List<File> containerLogsDirs = getContainerLogDirs(containerId,
dirsHandler);
Collections.sort(containerLogsDirs);
boolean foundLogFile = false;
for (File containerLogsDir : containerLogsDirs) {
@ -313,30 +205,9 @@ public class ContainerLogsPage extends NMView {
}
}
if (!foundLogFile) {
html.h1("No logs available for container " + containerId.toString());
html.h1("No logs available for container " + $(CONTAINER_ID));
return;
}
}
return;
}
static List<File> getContainerLogDirs(ContainerId containerId,
LocalDirsHandlerService dirsHandler) {
List<String> logDirs = dirsHandler.getLogDirs();
List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
for (String logDir : logDirs) {
try {
logDir = new URI(logDir).getPath();
} catch (URISyntaxException e) {
Log.warn(e.getMessage());
}
String appIdStr = ConverterUtils.toString(containerId
.getApplicationAttemptId().getApplicationId());
File appLogDir = new File(logDir, appIdStr);
String containerIdStr = ConverterUtils.toString(containerId);
containerLogDirs.add(new File(appLogDir, containerIdStr));
}
return containerLogDirs;
}
}
}

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.webapp;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Contains utilities for fetching a user's log file in a secure fashion.
*/
public class ContainerLogsUtils {
public static final Logger LOG = LoggerFactory.getLogger(ContainerLogsUtils.class);
/**
* Finds the local directories that logs for the given container are stored
* on.
*/
public static List<File> getContainerLogDirs(ContainerId containerId,
String remoteUser, Context context) throws YarnException {
Container container = context.getContainers().get(containerId);
if (container == null) {
throw new YarnException("Container does not exist.");
}
Application application = getApplicationForContainer(containerId, context);
checkAccess(remoteUser, application, context);
checkState(container.getContainerState());
return getContainerLogDirs(containerId, context.getLocalDirsHandler());
}
static List<File> getContainerLogDirs(ContainerId containerId,
LocalDirsHandlerService dirsHandler) throws YarnException {
List<String> logDirs = dirsHandler.getLogDirs();
List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
for (String logDir : logDirs) {
try {
logDir = new URI(logDir).getPath();
} catch (URISyntaxException e) {
throw new YarnException("Internal error", e);
}
String appIdStr = ConverterUtils.toString(containerId
.getApplicationAttemptId().getApplicationId());
File appLogDir = new File(logDir, appIdStr);
containerLogDirs.add(new File(appLogDir, containerId.toString()));
}
return containerLogDirs;
}
/**
* Finds the log file with the given filename for the given container.
*/
public static File getContainerLogFile(ContainerId containerId,
String fileName, String remoteUser, Context context) throws YarnException {
Container container = context.getContainers().get(containerId);
if (container == null) {
throw new NotFoundException("Container with id " + containerId
+ " not found.");
}
Application application = getApplicationForContainer(containerId, context);
checkAccess(remoteUser, application, context);
checkState(container.getContainerState());
try {
LocalDirsHandlerService dirsHandler = context.getLocalDirsHandler();
String relativeContainerLogDir = ContainerLaunch.getRelativeContainerLogDir(
application.getAppId().toString(), containerId.toString());
Path logPath = dirsHandler.getLogPathToRead(
relativeContainerLogDir + Path.SEPARATOR + fileName);
URI logPathURI = new URI(logPath.toString());
File logFile = new File(logPathURI.getPath());
return logFile;
} catch (URISyntaxException e) {
throw new YarnException("Internal error", e);
} catch (IOException e) {
LOG.warn("Failed to find log file", e);
throw new NotFoundException("Cannot find this log on the local disk.");
}
}
private static Application getApplicationForContainer(ContainerId containerId,
Context context) {
ApplicationId applicationId = containerId.getApplicationAttemptId()
.getApplicationId();
Application application = context.getApplications().get(
applicationId);
if (application == null) {
throw new NotFoundException(
"Unknown container. Container either has not started or "
+ "has already completed or "
+ "doesn't belong to this node at all.");
}
return application;
}
private static void checkAccess(String remoteUser, Application application,
Context context) throws YarnException {
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
if (callerUGI != null
&& !context.getApplicationACLsManager().checkAccess(callerUGI,
ApplicationAccessType.VIEW_APP, application.getUser(),
application.getAppId())) {
throw new YarnException(
"User [" + remoteUser
+ "] is not authorized to view the logs for application "
+ application.getAppId());
}
}
private static void checkState(ContainerState state) {
if (state == ContainerState.NEW || state == ContainerState.LOCALIZING ||
state == ContainerState.LOCALIZED) {
throw new NotFoundException("Container is not yet running. Current state is "
+ state);
}
if (state == ContainerState.LOCALIZATION_FAILED) {
throw new NotFoundException("Container wasn't started. Localization failed.");
}
}
public static FileInputStream openLogFileForRead(String containerIdStr, File logFile,
Context context) throws IOException {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationId applicationId = containerId.getApplicationAttemptId()
.getApplicationId();
String user = context.getApplications().get(
applicationId).getUser();
try {
return SecureIOUtils.openForRead(logFile, user, null);
} catch (IOException e) {
if (e.getMessage().contains(
"did not match expected owner '" + user
+ "'")) {
LOG.error(
"Exception reading log file " + logFile.getAbsolutePath(), e);
throw new IOException("Exception reading log file. Application submitted by '"
+ user
+ "' doesn't own requested log file : "
+ logFile.getName(), e);
} else {
throw new IOException("Exception reading log file. It might be because log "
+ "file was aggregated : " + logFile.getName(), e);
}
}
}
}

View File

@ -17,19 +17,31 @@
package org.apache.hadoop.yarn.server.nodemanager.webapp;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map.Entry;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -59,6 +71,9 @@ public class NMWebServices {
private static RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private @javax.ws.rs.core.Context
HttpServletRequest request;
private @javax.ws.rs.core.Context
HttpServletResponse response;
@ -180,4 +195,65 @@ public class NMWebServices {
}
/**
* Returns the contents of a container's log file in plain text.
*
* Only works for containers that are still in the NodeManager's memory, so
* logs are no longer available after the corresponding application is no
* longer running.
*
* @param containerIdStr
* The container ID
* @param filename
* The name of the log file
* @return
* The contents of the container's log file
*/
@GET
@Path("/containerlogs/{containerid}/{filename}")
@Produces({ MediaType.TEXT_PLAIN })
@Public
@Unstable
public Response getLogs(@PathParam("containerid") String containerIdStr,
@PathParam("filename") String filename) {
ContainerId containerId;
try {
containerId = ConverterUtils.toContainerId(containerIdStr);
} catch (IllegalArgumentException ex) {
return Response.status(Status.BAD_REQUEST).build();
}
File logFile = null;
try {
logFile = ContainerLogsUtils.getContainerLogFile(
containerId, filename, request.getRemoteUser(), nmContext);
} catch (NotFoundException ex) {
return Response.status(Status.NOT_FOUND).entity(ex.getMessage()).build();
} catch (YarnException ex) {
return Response.serverError().entity(ex.getMessage()).build();
}
try {
final FileInputStream fis = ContainerLogsUtils.openLogFileForRead(
containerIdStr, logFile, nmContext);
StreamingOutput stream = new StreamingOutput() {
@Override
public void write(OutputStream os) throws IOException,
WebApplicationException {
int bufferSize = 65536;
byte[] buf = new byte[bufferSize];
int len;
while ((len = fis.read(buf, 0, bufferSize)) > 0) {
os.write(buf, 0, len);
}
os.flush();
}
};
return Response.ok(stream).build();
} catch (IOException ex) {
return Response.serverError().entity(ex.getMessage()).build();
}
}
}

View File

@ -79,7 +79,7 @@ public class TestEventFlow {
YarnConfiguration conf = new YarnConfiguration();
Context context = new NMContext(new NMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInNM()) {
new NMTokenSecretManagerInNM(), null, null) {
@Override
public int getHttpPort() {
return 1234;

View File

@ -1185,7 +1185,7 @@ public class TestNodeStatusUpdater {
public MyNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager) {
super(containerTokenSecretManager, nmTokenSecretManager);
super(containerTokenSecretManager, nmTokenSecretManager, null, null);
}
@Override

View File

@ -100,7 +100,7 @@ public abstract class BaseContainerManagerTest {
protected static final int HTTP_PORT = 5412;
protected Configuration conf = new YarnConfiguration();
protected Context context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM()) {
conf), new NMTokenSecretManagerInNM(), null, new ApplicationACLsManager(conf)) {
public int getHttpPort() {
return HTTP_PORT;
};

View File

@ -490,6 +490,8 @@ public class TestApplication {
when(context.getContainerTokenSecretManager()).thenReturn(
new NMContainerTokenSecretManager(conf));
when(context.getApplicationACLsManager()).thenReturn(
new ApplicationACLsManager(conf));
// Setting master key
MasterKey masterKey = new MasterKeyPBImpl();
@ -501,8 +503,7 @@ public class TestApplication {
this.user = user;
this.appId = BuilderUtils.newApplicationId(timestamp, id);
app = new ApplicationImpl(dispatcher, new ApplicationACLsManager(
new Configuration()), this.user, appId, null, context);
app = new ApplicationImpl(dispatcher, this.user, appId, null, context);
containers = new ArrayList<Container>();
for (int i = 0; i < numContainers; i++) {
Container container = createMockedContainer(this.appId, i);

View File

@ -240,15 +240,10 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
File shellFile = null;
try {
shellFile = Shell.appendScriptExtension(tmpDir, "hello");
String timeoutCommand = Shell.WINDOWS ? "@echo \"hello\"" :
"echo \"hello\"";
PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
FileUtil.setExecutable(shellFile, true);
writer.println(timeoutCommand);
writer.close();
Map<Path, List<String>> resources =
new HashMap<Path, List<String>>();
FileOutputStream fos = new FileOutputStream(shellFile);
FileUtil.setExecutable(shellFile, true);
Map<String, String> env = new HashMap<String, String>();
// invalid env
@ -270,7 +265,9 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
} catch(ExitCodeException e){
diagnostics = e.getMessage();
}
Assert.assertTrue(diagnostics.contains("command not found"));
Assert.assertTrue(diagnostics.contains(Shell.WINDOWS ?
"is not recognized as an internal or external command" :
"command not found"));
Assert.assertTrue(shexc.getExitCode() != 0);
}
finally {
@ -289,7 +286,8 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
try {
shellFile = Shell.appendScriptExtension(tmpDir, "hello");
// echo "hello" to stdout and "error" to stderr and exit code with 2;
String command = Shell.WINDOWS ? "@echo \"hello\"; @echo \"error\" 1>&2; exit 2;" :
String command = Shell.WINDOWS ?
"@echo \"hello\" & @echo \"error\" 1>&2 & exit /b 2" :
"echo \"hello\"; echo \"error\" 1>&2; exit 2;";
PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
FileUtil.setExecutable(shellFile, true);
@ -297,7 +295,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
writer.close();
Map<Path, List<String>> resources =
new HashMap<Path, List<String>>();
FileOutputStream fos = new FileOutputStream(shellFile);
FileOutputStream fos = new FileOutputStream(shellFile, true);
Map<String, String> env = new HashMap<String, String>();
List<String> commands = new ArrayList<String>();

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsPage.ContainersLogsBlock;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
@ -63,7 +63,7 @@ import com.google.inject.Module;
public class TestContainerLogsPage {
@Test(timeout=30000)
public void testContainerLogDirs() throws IOException {
public void testContainerLogDirs() throws IOException, YarnException {
File absLogDir = new File("target",
TestNMWebServer.class.getSimpleName() + "LogDir").getAbsoluteFile();
String logdirwithFile = absLogDir.toURI().toString();
@ -86,7 +86,7 @@ public class TestContainerLogsPage {
ContainerId container1 = BuilderUtils.newContainerId(recordFactory, appId,
appAttemptId, 0);
List<File> files = null;
files = ContainerLogsPage.ContainersLogsBlock.getContainerLogDirs(
files = ContainerLogsUtils.getContainerLogDirs(
container1, dirsHandler);
Assert.assertTrue(!(files.get(0).toString().contains("file:")));
}
@ -146,8 +146,6 @@ public class TestContainerLogsPage {
out.write("Log file Content".getBytes());
out.close();
ApplicationACLsManager aclsManager = mock(ApplicationACLsManager.class);
Context context = mock(Context.class);
ConcurrentMap<ApplicationId, Application> appMap =
new ConcurrentHashMap<ApplicationId, Application>();
@ -157,7 +155,7 @@ public class TestContainerLogsPage {
new ConcurrentHashMap<ContainerId, Container>());
ContainersLogsBlock cLogsBlock =
new ContainersLogsBlock(conf, context, aclsManager, dirsHandler);
new ContainersLogsBlock(context);
Map<String, String> params = new HashMap<String, String>();
params.put(YarnWebParams.CONTAINER_ID, container1.toString());

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -76,7 +77,7 @@ public class TestNMWebServer {
}
private int startNMWebAppServer(String webAddr) {
Context nmContext = new NodeManager.NMContext(null, null);
Context nmContext = new NodeManager.NMContext(null, null, null, null);
ResourceView resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
@ -133,8 +134,8 @@ public class TestNMWebServer {
}
@Test
public void testNMWebApp() throws IOException {
Context nmContext = new NodeManager.NMContext(null, null);
public void testNMWebApp() throws IOException, YarnException {
Context nmContext = new NodeManager.NMContext(null, null, null, null);
ResourceView resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
@ -219,10 +220,10 @@ public class TestNMWebServer {
private void writeContainerLogs(Context nmContext,
ContainerId containerId, LocalDirsHandlerService dirsHandler)
throws IOException {
throws IOException, YarnException {
// ContainerLogDir should be created
File containerLogDir =
ContainerLogsPage.ContainersLogsBlock.getContainerLogDirs(containerId,
ContainerLogsUtils.getContainerLogDirs(containerId,
dirsHandler).get(0);
containerLogDir.mkdirs();
for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {

View File

@ -23,24 +23,38 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
@ -86,7 +100,14 @@ public class TestNMWebServices extends JerseyTest {
private Injector injector = Guice.createInjector(new ServletModule() {
@Override
protected void configureServlets() {
nmContext = new NodeManager.NMContext(null, null);
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {
@ -110,13 +131,6 @@ public class TestNMWebServices extends JerseyTest {
return true;
}
};
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
bind(JAXBContextResolver.class);
bind(NMWebServices.class);
@ -293,6 +307,53 @@ public class TestNMWebServices extends JerseyTest {
verifyNodesXML(nodes);
}
@Test
public void testContainerLogs() throws IOException {
WebResource r = resource();
final ContainerId containerId = BuilderUtils.newContainerId(0, 0, 0, 0);
final String containerIdStr = BuilderUtils.newContainerId(0, 0, 0, 0)
.toString();
final ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
final ApplicationId appId = appAttemptId.getApplicationId();
final String appIdStr = appId.toString();
final String filename = "logfile1";
final String logMessage = "log message\n";
nmContext.getApplications().put(appId, new ApplicationImpl(null, "user",
appId, null, nmContext));
MockContainer container = new MockContainer(appAttemptId,
new AsyncDispatcher(), new Configuration(), "user", appId, 1);
container.setState(ContainerState.RUNNING);
nmContext.getContainers().put(containerId, container);
// write out log file
Path path = dirsHandler.getLogPathForWrite(
ContainerLaunch.getRelativeContainerLogDir(
appIdStr, containerIdStr) + "/" + filename, false);
File logFile = new File(path.toUri().getPath());
logFile.deleteOnExit();
assertTrue("Failed to create log dir", logFile.getParentFile().mkdirs());
PrintWriter pw = new PrintWriter(logFile);
pw.print(logMessage);
pw.close();
// ask for it
ClientResponse response = r.path("ws").path("v1").path("node")
.path("containerlogs").path(containerIdStr).path(filename)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertEquals(logMessage, responseText);
// ask for file that doesn't exist
response = r.path("ws").path("v1").path("node")
.path("containerlogs").path(containerIdStr).path("uhhh")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
Assert.assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Cannot find this log on the local disk."));
}
public void verifyNodesXML(NodeList nodes) throws JSONException, Exception {
for (int i = 0; i < nodes.getLength(); i++) {
Element element = (Element) nodes.item(i);

View File

@ -93,7 +93,13 @@ public class TestNMWebServicesApps extends JerseyTest {
private Injector injector = Guice.createInjector(new ServletModule() {
@Override
protected void configureServlets() {
nmContext = new NodeManager.NMContext(null, null);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {
@ -119,12 +125,6 @@ public class TestNMWebServicesApps extends JerseyTest {
return true;
}
};
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
bind(JAXBContextResolver.class);
bind(NMWebServices.class);

View File

@ -93,15 +93,6 @@ public class TestNMWebServicesContainers extends JerseyTest {
private Injector injector = Guice.createInjector(new ServletModule() {
@Override
protected void configureServlets() {
nmContext = new NodeManager.NMContext(null, null) {
public NodeId getNodeId() {
return NodeId.newInstance("testhost.foo.com", 8042);
};
public int getHttpPort() {
return 1234;
};
};
resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
@ -131,6 +122,15 @@ public class TestNMWebServicesContainers extends JerseyTest {
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager) {
public NodeId getNodeId() {
return NodeId.newInstance("testhost.foo.com", 8042);
};
public int getHttpPort() {
return 1234;
};
};
nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
bind(JAXBContextResolver.class);
bind(NMWebServices.class);

View File

@ -248,7 +248,8 @@ public class ClientRMService extends AbstractService implements
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, applicationId);
ApplicationReport report =
application.createAndGetApplicationReport(allowAccess);
application.createAndGetApplicationReport(callerUGI.getUserName(),
allowAccess);
GetApplicationReportResponse response = recordFactory
.newRecordInstance(GetApplicationReportResponse.class);
@ -425,7 +426,8 @@ public class ClientRMService extends AbstractService implements
}
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application.getApplicationId());
reports.add(application.createAndGetApplicationReport(allowAccess));
reports.add(application.createAndGetApplicationReport(
callerUGI.getUserName(), allowAccess));
}
GetApplicationsResponse response =
@ -471,7 +473,7 @@ public class ClientRMService extends AbstractService implements
apps.size());
for (RMApp app : apps) {
if (app.getQueue().equals(queueInfo.getQueueName())) {
appReports.add(app.createAndGetApplicationReport(true));
appReports.add(app.createAndGetApplicationReport(null, true));
}
}
}

View File

@ -33,7 +33,6 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
@ -302,12 +301,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
return new AsyncDispatcher();
}
protected void addIfService(Object object) {
if (object instanceof Service) {
addService((Service) object);
}
}
protected AMRMTokenSecretManager createAMRMTokenSecretManager(
Configuration conf) {
return new AMRMTokenSecretManager(conf);

View File

@ -24,6 +24,8 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -44,7 +46,6 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@ -75,14 +76,14 @@ public abstract class RMStateStore extends AbstractService {
public static class ApplicationAttemptState {
final ApplicationAttemptId attemptId;
final Container masterContainer;
final Credentials appAttemptTokens;
final Credentials appAttemptCredentials;
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer,
Credentials appAttemptTokens) {
Credentials appAttemptCredentials) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
this.appAttemptTokens = appAttemptTokens;
this.appAttemptCredentials = appAttemptCredentials;
}
public Container getMasterContainer() {
@ -91,8 +92,8 @@ public abstract class RMStateStore extends AbstractService {
public ApplicationAttemptId getAttemptId() {
return attemptId;
}
public Credentials getAppAttemptTokens() {
return appAttemptTokens;
public Credentials getAppAttemptCredentials() {
return appAttemptCredentials;
}
}
@ -265,7 +266,7 @@ public abstract class RMStateStore extends AbstractService {
* RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
*/
public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) {
Credentials credentials = getTokensFromAppAttempt(appAttempt);
Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
@ -365,7 +366,7 @@ public abstract class RMStateStore extends AbstractService {
app.getSubmitTime(), app.getApplicationSubmissionContext(),
app.getUser());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
Credentials credentials = getTokensFromAppAttempt(appAttempt);
Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
appAttempt.getMasterContainer(), credentials);
@ -396,16 +397,20 @@ public abstract class RMStateStore extends AbstractService {
public static final Text AM_RM_TOKEN_SERVICE = new Text(
"AM_RM_TOKEN_SERVICE");
private Credentials getTokensFromAppAttempt(RMAppAttempt appAttempt) {
public static final Text AM_CLIENT_TOKEN_MASTER_KEY_NAME =
new Text("YARN_CLIENT_TOKEN_MASTER_KEY");
private Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
Credentials credentials = new Credentials();
Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
if(appToken != null){
credentials.addToken(AM_RM_TOKEN_SERVICE, appToken);
}
Token<ClientToAMTokenIdentifier> clientToAMToken =
appAttempt.getClientToAMToken();
if(clientToAMToken != null){
credentials.addToken(clientToAMToken.getService(), clientToAMToken);
SecretKey clientTokenMasterKey =
appAttempt.getClientTokenMasterKey();
if(clientTokenMasterKey != null){
credentials.addSecretKey(AM_CLIENT_TOKEN_MASTER_KEY_NAME,
clientTokenMasterKey.getEncoded());
}
return credentials;
}
@ -445,7 +450,7 @@ public abstract class RMStateStore extends AbstractService {
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
Exception storedException = null;
Credentials credentials = attemptState.getAppAttemptTokens();
Credentials credentials = attemptState.getAppAttemptCredentials();
ByteBuffer appAttemptTokens = null;
try {
if(credentials != null){

View File

@ -128,10 +128,12 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* <li>resource usage report - all values are -1</li>
* </ul>
*
* @param clientUserName the user name of the client requesting the report
* @param allowAccess whether to allow full access to the report
* @return the {@link ApplicationReport} detailing the status of the application.
*/
ApplicationReport createAndGetApplicationReport(boolean allowAccess);
ApplicationReport createAndGetApplicationReport(String clientUserName,
boolean allowAccess);
/**
* To receive the collection of all {@link RMNode}s whose updates have been

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@ -411,7 +410,8 @@ public class RMAppImpl implements RMApp, Recoverable {
}
@Override
public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
public ApplicationReport createAndGetApplicationReport(String clientUserName,
boolean allowAccess) {
this.readLock.lock();
try {
@ -432,11 +432,14 @@ public class RMAppImpl implements RMApp, Recoverable {
currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
trackingUrl = this.currentAttempt.getTrackingUrl();
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
if (UserGroupInformation.isSecurityEnabled()
&& clientUserName != null) {
Token<ClientToAMTokenIdentifier> attemptClientToAMToken =
this.currentAttempt.getClientToAMToken();
if (attemptClientToAMToken != null) {
clientToAMToken =
BuilderUtils.newClientToAMToken(
new Token<ClientToAMTokenIdentifier>(
new ClientToAMTokenIdentifier(
currentApplicationAttemptId, clientUserName),
rmContext.getClientToAMTokenSecretManager());
clientToAMToken = BuilderUtils.newClientToAMToken(
attemptClientToAMToken.getIdentifier(),
attemptClientToAMToken.getKind().toString(),
attemptClientToAMToken.getPassword(),
@ -451,9 +454,8 @@ public class RMAppImpl implements RMApp, Recoverable {
if (currentAttempt != null &&
currentAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
try {
if (getApplicationSubmissionContext().getUnmanagedAM() &&
getUser().equals(UserGroupInformation.getCurrentUser().getUserName())) {
clientUserName != null && getUser().equals(clientUserName)) {
Token<AMRMTokenIdentifier> token = currentAttempt.getAMRMToken();
if (token != null) {
amrmToken = BuilderUtils.newAMRMToken(token.getIdentifier(),
@ -461,10 +463,6 @@ public class RMAppImpl implements RMApp, Recoverable {
token.getService().toString());
}
}
} catch (IOException ex) {
LOG.warn("UserGroupInformation.getCurrentUser() error: " +
ex.toString(), ex);
}
}
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import java.util.List;
import java.util.Set;
import javax.crypto.SecretKey;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@ -32,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
/**
@ -90,12 +91,6 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
*/
String getWebProxyBase();
/**
* The token required by the clients to talk to the application attempt
* @return the token required by the clients to talk to the application attempt
*/
Token<ClientToAMTokenIdentifier> getClientToAMToken();
/**
* Diagnostics information for the application attempt.
* @return diagnostics information for the application attempt.
@ -154,6 +149,12 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
*/
Token<AMRMTokenIdentifier> getAMRMToken();
/**
* The master key for client-to-AM tokens for this app attempt
* @return The master key for client-to-AM tokens for this app attempt
*/
SecretKey getClientTokenMasterKey();
/**
* Get application container and resource usage information.
* @return an ApplicationResourceUsageReport object.

View File

@ -33,12 +33,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import javax.crypto.SecretKey;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -60,8 +61,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSelector;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
@ -126,9 +125,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private final WriteLock writeLock;
private final ApplicationAttemptId applicationAttemptId;
private Token<ClientToAMTokenIdentifier> clientToAMToken;
private final ApplicationSubmissionContext submissionContext;
private Token<AMRMTokenIdentifier> amrmToken = null;
private SecretKey clientTokenMasterKey = null;
//nodes on while this attempt's containers ran
private final Set<NodeId> ranNodes =
@ -499,8 +498,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
@Override
public Token<ClientToAMTokenIdentifier> getClientToAMToken() {
return this.clientToAMToken;
public SecretKey getClientTokenMasterKey() {
return this.clientTokenMasterKey;
}
@Override
@ -659,7 +658,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
assert attemptState != null;
setMasterContainer(attemptState.getMasterContainer());
recoverAppAttemptTokens(attemptState.getAppAttemptTokens());
recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
+ " AttemptId: " + getAppAttemptId()
+ " MasterContainer: " + masterContainer);
@ -668,17 +667,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.RECOVER));
}
private void recoverAppAttemptTokens(Credentials appAttemptTokens) {
private void recoverAppAttemptCredentials(Credentials appAttemptTokens) {
if (appAttemptTokens == null) {
return;
}
if (UserGroupInformation.isSecurityEnabled()) {
ClientToAMTokenSelector clientToAMTokenSelector =
new ClientToAMTokenSelector();
this.clientToAMToken =
clientToAMTokenSelector.selectToken(new Text(),
appAttemptTokens.getAllTokens());
if (UserGroupInformation.isSecurityEnabled()) {
byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey(
RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME);
clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager()
.registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
}
// Only one AMRMToken is stored per-attempt, so this should be fine. Can't
@ -715,15 +713,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
.registerAppAttempt(appAttempt.applicationAttemptId);
if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.rmContext.getClientToAMTokenSecretManager()
appAttempt.clientTokenMasterKey = appAttempt.rmContext
.getClientToAMTokenSecretManager()
.registerApplication(appAttempt.applicationAttemptId);
// create clientToAMToken
appAttempt.clientToAMToken =
new Token<ClientToAMTokenIdentifier>(new ClientToAMTokenIdentifier(
appAttempt.applicationAttemptId),
appAttempt.rmContext.getClientToAMTokenSecretManager());
}
// create AMRMToken
@ -762,7 +754,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
message)
);
appAttempt.removeTokens(appAttempt);
appAttempt.removeCredentials(appAttempt);
}
}
@ -895,7 +887,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttemptId,
finalAttemptState));
appAttempt.removeTokens(appAttempt);
appAttempt.removeCredentials(appAttempt);
}
}
@ -1256,7 +1248,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
store.storeApplicationAttempt(this);
}
private void removeTokens(RMAppAttemptImpl appAttempt) {
private void removeCredentials(RMAppAttemptImpl appAttempt) {
// Unregister from the ClientToAMTokenSecretManager
if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.rmContext.getClientToAMTokenSecretManager()

View File

@ -33,9 +33,18 @@ public class ClientToAMTokenSecretManagerInRM extends
private Map<ApplicationAttemptId, SecretKey> masterKeys =
new HashMap<ApplicationAttemptId, SecretKey>();
public synchronized void registerApplication(
public synchronized SecretKey registerApplication(
ApplicationAttemptId applicationAttemptID) {
this.masterKeys.put(applicationAttemptID, generateSecret());
SecretKey key = generateSecret();
this.masterKeys.put(applicationAttemptID, key);
return key;
}
public synchronized SecretKey registerMasterKey(
ApplicationAttemptId applicationAttemptID, byte[] keyData) {
SecretKey key = createSecretKey(keyData);
this.masterKeys.put(applicationAttemptID, key);
return key;
}
public synchronized void unRegisterApplication(

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Level;
@ -387,6 +388,10 @@ public class MockRM extends ResourceManager {
return this.rmDTSecretManager;
}
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return this.clientToAMSecretManager;
}
@Override
protected void startWepApp() {
// override to disable webapp

View File

@ -541,16 +541,21 @@ public class TestRMRestart {
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
// the appToken and clientToAMToken that are generated when RMAppAttempt
// is created,
// the appToken and clientTokenMasterKey that are generated when
// RMAppAttempt is created,
HashSet<Token<?>> tokenSet = new HashSet<Token<?>>();
tokenSet.add(attempt1.getAMRMToken());
tokenSet.add(attempt1.getClientToAMToken());
byte[] clientTokenMasterKey =
attempt1.getClientTokenMasterKey().getEncoded();
// assert application Token is saved
// assert application credentials are saved
Credentials savedCredentials = attemptState.getAppAttemptCredentials();
HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens());
savedTokens.addAll(savedCredentials.getAllTokens());
Assert.assertEquals(tokenSet, savedTokens);
Assert.assertArrayEquals("client token master key not saved",
clientTokenMasterKey, savedCredentials.getSecretKey(
RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
// start new RM
MockRM rm2 = new TestSecurityMockRM(conf, memStore);
@ -564,13 +569,18 @@ public class TestRMRestart {
Assert.assertNotNull(loadedAttempt1);
savedTokens.clear();
savedTokens.add(loadedAttempt1.getAMRMToken());
savedTokens.add(loadedAttempt1.getClientToAMToken());
Assert.assertEquals(tokenSet, savedTokens);
// assert clientToAMToken is recovered back to api-versioned
// clientToAMToken
Assert.assertEquals(attempt1.getClientToAMToken(),
loadedAttempt1.getClientToAMToken());
// assert client token master key is recovered back to api-versioned
// client token master key
Assert.assertEquals("client token master key not restored",
attempt1.getClientTokenMasterKey(),
loadedAttempt1.getClientTokenMasterKey());
// assert secret manager also knows about the key
Assert.assertArrayEquals(clientTokenMasterKey,
rm2.getClientToAMTokenSecretManager().getMasterKey(attemptId1)
.getEncoded());
// Not testing ApplicationTokenSecretManager has the password populated back,
// that is needed in work-preserving restart

View File

@ -115,7 +115,8 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
public ApplicationReport createAndGetApplicationReport(
String clientUserName,boolean allowAccess) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -25,12 +26,12 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.crypto.SecretKey;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
@ -55,7 +56,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
@ -198,7 +198,7 @@ public class TestRMStateStore {
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
String containerIdStr, Token<AMRMTokenIdentifier> appToken,
Token<ClientToAMTokenIdentifier> clientToAMToken, TestDispatcher dispatcher)
SecretKey clientTokenMasterKey, TestDispatcher dispatcher)
throws Exception {
Container container = new ContainerPBImpl();
@ -207,7 +207,8 @@ public class TestRMStateStore {
when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
when(mockAttempt.getMasterContainer()).thenReturn(container);
when(mockAttempt.getAMRMToken()).thenReturn(appToken);
when(mockAttempt.getClientToAMToken()).thenReturn(clientToAMToken);
when(mockAttempt.getClientTokenMasterKey())
.thenReturn(clientTokenMasterKey);
dispatcher.attemptId = attemptId;
dispatcher.storedException = null;
store.storeApplicationAttempt(mockAttempt);
@ -215,7 +216,6 @@ public class TestRMStateStore {
return container.getId();
}
@SuppressWarnings("unchecked")
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
long submitTime = System.currentTimeMillis();
Configuration conf = new YarnConfiguration();
@ -233,33 +233,33 @@ public class TestRMStateStore {
ApplicationId appId1 = attemptId1.getApplicationId();
storeApp(store, appId1, submitTime);
// create application token1 for attempt1
List<Token<?>> appAttemptToken1 =
generateTokens(attemptId1, appTokenMgr, clientToAMTokenMgr, conf);
// create application token and client token key for attempt1
Token<AMRMTokenIdentifier> appAttemptToken1 =
generateAMRMToken(attemptId1, appTokenMgr);
HashSet<Token<?>> attemptTokenSet1 = new HashSet<Token<?>>();
attemptTokenSet1.addAll(appAttemptToken1);
attemptTokenSet1.add(appAttemptToken1);
SecretKey clientTokenKey1 =
clientToAMTokenMgr.registerApplication(attemptId1);
ContainerId containerId1 = storeAttempt(store, attemptId1,
"container_1352994193343_0001_01_000001",
(Token<AMRMTokenIdentifier>) (appAttemptToken1.get(0)),
(Token<ClientToAMTokenIdentifier>)(appAttemptToken1.get(1)),
dispatcher);
appAttemptToken1, clientTokenKey1, dispatcher);
String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
ApplicationAttemptId attemptId2 =
ConverterUtils.toApplicationAttemptId(appAttemptIdStr2);
// create application token2 for attempt2
List<Token<?>> appAttemptToken2 =
generateTokens(attemptId2, appTokenMgr, clientToAMTokenMgr, conf);
// create application token and client token key for attempt2
Token<AMRMTokenIdentifier> appAttemptToken2 =
generateAMRMToken(attemptId2, appTokenMgr);
HashSet<Token<?>> attemptTokenSet2 = new HashSet<Token<?>>();
attemptTokenSet2.addAll(appAttemptToken2);
attemptTokenSet2.add(appAttemptToken2);
SecretKey clientTokenKey2 =
clientToAMTokenMgr.registerApplication(attemptId2);
ContainerId containerId2 = storeAttempt(store, attemptId2,
"container_1352994193343_0001_02_000001",
(Token<AMRMTokenIdentifier>) (appAttemptToken2.get(0)),
(Token<ClientToAMTokenIdentifier>)(appAttemptToken2.get(1)),
dispatcher);
appAttemptToken2, clientTokenKey2, dispatcher);
ApplicationAttemptId attemptIdRemoved = ConverterUtils
.toApplicationAttemptId("appattempt_1352994193343_0002_000001");
@ -306,8 +306,12 @@ public class TestRMStateStore {
assertEquals(containerId1, attemptState.getMasterContainer().getId());
// attempt1 applicationToken is loaded correctly
HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens());
savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
assertEquals(attemptTokenSet1, savedTokens);
// attempt1 client token master key is loaded correctly
assertArrayEquals(clientTokenKey1.getEncoded(),
attemptState.getAppAttemptCredentials()
.getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
attemptState = appState.getAttempt(attemptId2);
// attempt2 is loaded correctly
@ -317,8 +321,12 @@ public class TestRMStateStore {
assertEquals(containerId2, attemptState.getMasterContainer().getId());
// attempt2 applicationToken is loaded correctly
savedTokens.clear();
savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens());
savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
assertEquals(attemptTokenSet2, savedTokens);
// attempt2 client token master key is loaded correctly
assertArrayEquals(clientTokenKey2.getEncoded(),
attemptState.getAppAttemptCredentials()
.getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
// assert store is in expected state after everything is cleaned
assertTrue(stateStoreHelper.isFinalStateValid());
@ -357,24 +365,14 @@ public class TestRMStateStore {
Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber());
}
private List<Token<?>> generateTokens(ApplicationAttemptId attemptId,
AMRMTokenSecretManager appTokenMgr,
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr, Configuration conf) {
private Token<AMRMTokenIdentifier> generateAMRMToken(
ApplicationAttemptId attemptId,
AMRMTokenSecretManager appTokenMgr) {
AMRMTokenIdentifier appTokenId =
new AMRMTokenIdentifier(attemptId);
Token<AMRMTokenIdentifier> appToken =
new Token<AMRMTokenIdentifier>(appTokenId, appTokenMgr);
appToken.setService(new Text("appToken service"));
ClientToAMTokenIdentifier clientToAMTokenId =
new ClientToAMTokenIdentifier(attemptId);
clientToAMTokenMgr.registerApplication(attemptId);
Token<ClientToAMTokenIdentifier> clientToAMToken =
new Token<ClientToAMTokenIdentifier>(clientToAMTokenId, clientToAMTokenMgr);
clientToAMToken.setService(new Text("clientToAMToken service"));
List<Token<?>> tokenPair = new ArrayList<Token<?>>();
tokenPair.add(0, appToken);
tokenPair.add(1, clientToAMToken);
return tokenPair;
return appToken;
}
}

View File

@ -143,7 +143,8 @@ public class MockRMApp implements RMApp {
}
@Override
public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
public ApplicationReport createAndGetApplicationReport(
String clientUserName, boolean allowAccess) {
throw new UnsupportedOperationException("Not supported yet.");
}

View File

@ -726,7 +726,9 @@ public class TestRMAppTransitions {
public void testGetAppReport() {
RMApp app = createNewTestApp(null);
assertAppState(RMAppState.NEW, app);
ApplicationReport report = app.createAndGetApplicationReport(true);
ApplicationReport report = app.createAndGetApplicationReport(null, true);
Assert.assertNotNull(report.getApplicationResourceUsageReport());
report = app.createAndGetApplicationReport("clientuser", true);
Assert.assertNotNull(report.getApplicationResourceUsageReport());
}
}

View File

@ -115,7 +115,6 @@ public class TestClientToAMTokens {
private final byte[] secretKey;
private InetSocketAddress address;
private boolean pinged = false;
private ClientToAMTokenSecretManager secretManager;
public CustomAM(ApplicationAttemptId appId, byte[] secretKey) {
super("CustomAM");
@ -132,12 +131,14 @@ public class TestClientToAMTokens {
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
secretManager = new ClientToAMTokenSecretManager(this.appAttemptId, secretKey);
Server server;
try {
server =
new RPC.Builder(conf).setProtocol(CustomProtocol.class)
.setNumHandlers(1).setSecretManager(secretManager)
new RPC.Builder(conf)
.setProtocol(CustomProtocol.class)
.setNumHandlers(1)
.setSecretManager(
new ClientToAMTokenSecretManager(this.appAttemptId, secretKey))
.setInstance(this).build();
} catch (Exception e) {
throw new YarnRuntimeException(e);
@ -146,14 +147,10 @@ public class TestClientToAMTokens {
this.address = NetUtils.getConnectAddress(server);
super.serviceStart();
}
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
return this.secretManager;
}
}
@Test
public void testClientToAMs() throws Exception {
public void testClientToAMTokens() throws Exception {
final Configuration conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
@ -204,7 +201,7 @@ public class TestClientToAMTokens {
GetApplicationReportResponse reportResponse =
rm.getClientRMService().getApplicationReport(request);
ApplicationReport appReport = reportResponse.getApplicationReport();
org.apache.hadoop.yarn.api.records.Token clientToAMToken =
org.apache.hadoop.yarn.api.records.Token originalClientToAMToken =
appReport.getClientToAMToken();
ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId();
@ -259,17 +256,47 @@ public class TestClientToAMTokens {
Assert.assertFalse(am.pinged);
}
// Verify denial for a malicious user
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("me");
Token<ClientToAMTokenIdentifier> token =
ConverterUtils.convertFromYarn(clientToAMToken, am.address);
ConverterUtils.convertFromYarn(originalClientToAMToken, am.address);
// Verify denial for a malicious user with tampered ID
verifyTokenWithTamperedID(conf, am, token);
// Verify denial for a malicious user with tampered user-name
verifyTokenWithTamperedUserName(conf, am, token);
// Now for an authenticated user
verifyValidToken(conf, am, token);
}
private void verifyTokenWithTamperedID(final Configuration conf,
final CustomAM am, Token<ClientToAMTokenIdentifier> token)
throws IOException {
// Malicious user, messes with appId
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("me");
ClientToAMTokenIdentifier maliciousID =
new ClientToAMTokenIdentifier(BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(app.getApplicationId()
.getClusterTimestamp(), 42), 43));
BuilderUtils.newApplicationId(am.appAttemptId.getApplicationId()
.getClusterTimestamp(), 42), 43), UserGroupInformation
.getCurrentUser().getShortUserName());
verifyTamperedToken(conf, am, token, ugi, maliciousID);
}
private void verifyTokenWithTamperedUserName(final Configuration conf,
final CustomAM am, Token<ClientToAMTokenIdentifier> token)
throws IOException {
// Malicious user, messes with appId
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("me");
ClientToAMTokenIdentifier maliciousID =
new ClientToAMTokenIdentifier(am.appAttemptId, "evilOrc");
verifyTamperedToken(conf, am, token, ugi, maliciousID);
}
private void verifyTamperedToken(final Configuration conf, final CustomAM am,
Token<ClientToAMTokenIdentifier> token, UserGroupInformation ugi,
ClientToAMTokenIdentifier maliciousID) {
Token<ClientToAMTokenIdentifier> maliciousToken =
new Token<ClientToAMTokenIdentifier>(maliciousID.getBytes(),
token.getPassword(), token.getKind(),
@ -309,8 +336,12 @@ public class TestClientToAMTokens {
+ "Mismatched response."));
Assert.assertFalse(am.pinged);
}
}
// Now for an authenticated user
private void verifyValidToken(final Configuration conf, final CustomAM am,
Token<ClientToAMTokenIdentifier> token) throws IOException,
InterruptedException {
UserGroupInformation ugi;
ugi = UserGroupInformation.createRemoteUser("me");
ugi.addToken(token);
@ -326,5 +357,4 @@ public class TestClientToAMTokens {
}
});
}
}