Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1230696 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-01-12 18:40:50 +00:00
commit 8610a9231a
32 changed files with 1351 additions and 1301 deletions

View File

@ -266,6 +266,9 @@ Release 0.23.1 - Unreleased
HADOOP-7963. Fix ViewFS to catch a null canonical service-name and pass
tests TestViewFileSystem* (Siddharth Seth via vinodkv)
HADOOP-7964. Deadlock in NetUtils and SecurityUtil class initialization.
(Daryn Sharp via suresh)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -51,12 +51,6 @@
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.annotations.VisibleForTesting;
//this will need to be replaced someday when there is a suitable replacement
import sun.net.dns.ResolverConfiguration;
import sun.net.util.IPAddressUtil;
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Unstable
public class NetUtils {
@ -72,26 +66,6 @@ public class NetUtils {
/** Base URL of the Hadoop Wiki: {@value} */
public static final String HADOOP_WIKI = "http://wiki.apache.org/hadoop/";
private static HostResolver hostResolver;
static {
// SecurityUtils requires a more secure host resolver if tokens are
// using hostnames
setUseQualifiedHostResolver(!SecurityUtil.getTokenServiceUseIp());
}
/**
* This method is intended for use only by SecurityUtils!
* @param flag where the qualified or standard host resolver is used
* to create socket addresses
*/
@InterfaceAudience.Private
public static void setUseQualifiedHostResolver(boolean flag) {
hostResolver = flag
? new QualifiedHostResolver()
: new StandardHostResolver();
}
/**
* Get the socket factory for the given class according to its
* configuration parameter
@ -249,7 +223,7 @@ public static InetSocketAddress createSocketAddrForHost(String host, int port) {
InetSocketAddress addr;
try {
InetAddress iaddr = hostResolver.getByName(resolveHost);
InetAddress iaddr = SecurityUtil.getByName(resolveHost);
// if there is a static entry for the host, make the returned
// address look like the original given host
if (staticHost != null) {
@ -262,151 +236,6 @@ public static InetSocketAddress createSocketAddrForHost(String host, int port) {
return addr;
}
interface HostResolver {
InetAddress getByName(String host) throws UnknownHostException;
}
/**
* Uses standard java host resolution
*/
static class StandardHostResolver implements HostResolver {
public InetAddress getByName(String host) throws UnknownHostException {
return InetAddress.getByName(host);
}
}
/**
* This an alternate resolver with important properties that the standard
* java resolver lacks:
* 1) The hostname is fully qualified. This avoids security issues if not
* all hosts in the cluster do not share the same search domains. It
* also prevents other hosts from performing unnecessary dns searches.
* In contrast, InetAddress simply returns the host as given.
* 2) The InetAddress is instantiated with an exact host and IP to prevent
* further unnecessary lookups. InetAddress may perform an unnecessary
* reverse lookup for an IP.
* 3) A call to getHostName() will always return the qualified hostname, or
* more importantly, the IP if instantiated with an IP. This avoids
* unnecessary dns timeouts if the host is not resolvable.
* 4) Point 3 also ensures that if the host is re-resolved, ex. during a
* connection re-attempt, that a reverse lookup to host and forward
* lookup to IP is not performed since the reverse/forward mappings may
* not always return the same IP. If the client initiated a connection
* with an IP, then that IP is all that should ever be contacted.
*
* NOTE: this resolver is only used if:
* hadoop.security.token.service.use_ip=false
*/
protected static class QualifiedHostResolver implements HostResolver {
@SuppressWarnings("unchecked")
private List<String> searchDomains =
ResolverConfiguration.open().searchlist();
/**
* Create an InetAddress with a fully qualified hostname of the given
* hostname. InetAddress does not qualify an incomplete hostname that
* is resolved via the domain search list.
* {@link InetAddress#getCanonicalHostName()} will fully qualify the
* hostname, but it always return the A record whereas the given hostname
* may be a CNAME.
*
* @param host a hostname or ip address
* @return InetAddress with the fully qualified hostname or ip
* @throws UnknownHostException if host does not exist
*/
public InetAddress getByName(String host) throws UnknownHostException {
InetAddress addr = null;
if (IPAddressUtil.isIPv4LiteralAddress(host)) {
// use ipv4 address as-is
byte[] ip = IPAddressUtil.textToNumericFormatV4(host);
addr = InetAddress.getByAddress(host, ip);
} else if (IPAddressUtil.isIPv6LiteralAddress(host)) {
// use ipv6 address as-is
byte[] ip = IPAddressUtil.textToNumericFormatV6(host);
addr = InetAddress.getByAddress(host, ip);
} else if (host.endsWith(".")) {
// a rooted host ends with a dot, ex. "host."
// rooted hosts never use the search path, so only try an exact lookup
addr = getByExactName(host);
} else if (host.contains(".")) {
// the host contains a dot (domain), ex. "host.domain"
// try an exact host lookup, then fallback to search list
addr = getByExactName(host);
if (addr == null) {
addr = getByNameWithSearch(host);
}
} else {
// it's a simple host with no dots, ex. "host"
// try the search list, then fallback to exact host
InetAddress loopback = InetAddress.getByName(null);
if (host.equalsIgnoreCase(loopback.getHostName())) {
addr = InetAddress.getByAddress(host, loopback.getAddress());
} else {
addr = getByNameWithSearch(host);
if (addr == null) {
addr = getByExactName(host);
}
}
}
// unresolvable!
if (addr == null) {
throw new UnknownHostException(host);
}
return addr;
}
InetAddress getByExactName(String host) {
InetAddress addr = null;
// InetAddress will use the search list unless the host is rooted
// with a trailing dot. The trailing dot will disable any use of the
// search path in a lower level resolver. See RFC 1535.
String fqHost = host;
if (!fqHost.endsWith(".")) fqHost += ".";
try {
addr = getInetAddressByName(fqHost);
// can't leave the hostname as rooted or other parts of the system
// malfunction, ex. kerberos principals are lacking proper host
// equivalence for rooted/non-rooted hostnames
addr = InetAddress.getByAddress(host, addr.getAddress());
} catch (UnknownHostException e) {
// ignore, caller will throw if necessary
}
return addr;
}
InetAddress getByNameWithSearch(String host) {
InetAddress addr = null;
if (host.endsWith(".")) { // already qualified?
addr = getByExactName(host);
} else {
for (String domain : searchDomains) {
String dot = !domain.startsWith(".") ? "." : "";
addr = getByExactName(host + dot + domain);
if (addr != null) break;
}
}
return addr;
}
// implemented as a separate method to facilitate unit testing
InetAddress getInetAddressByName(String host) throws UnknownHostException {
return InetAddress.getByName(host);
}
void setSearchDomains(String ... domains) {
searchDomains = Arrays.asList(domains);
}
}
/**
* This is for testing only!
*/
@VisibleForTesting
static void setHostResolver(HostResolver newResolver) {
hostResolver = newResolver;
}
/**
* Resolve the uri's hostname and add the default port if not in the uri
* @param uri to resolve
@ -447,7 +276,7 @@ private static String canonicalizeHost(String host) {
String fqHost = canonicalizedHostCache.get(host);
if (fqHost == null) {
try {
fqHost = hostResolver.getByName(host).getHostName();
fqHost = SecurityUtil.getByName(host).getHostName();
// slight race condition, but won't hurt
canonicalizedHostCache.put(host, fqHost);
} catch (UnknownHostException e) {

View File

@ -23,6 +23,8 @@
import java.net.URL;
import java.net.UnknownHostException;
import java.security.AccessController;
import java.util.Arrays;
import java.util.List;
import java.util.ServiceLoader;
import java.util.Set;
@ -41,6 +43,11 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
import com.google.common.annotations.VisibleForTesting;
//this will need to be replaced someday when there is a suitable replacement
import sun.net.dns.ResolverConfiguration;
import sun.net.util.IPAddressUtil;
import sun.security.jgss.krb5.Krb5Util;
import sun.security.krb5.Credentials;
import sun.security.krb5.PrincipalName;
@ -53,7 +60,10 @@ public class SecurityUtil {
// controls whether buildTokenService will use an ip or host/ip as given
// by the user
private static boolean useIpForTokenService;
@VisibleForTesting
static boolean useIpForTokenService;
@VisibleForTesting
static HostResolver hostResolver;
static {
boolean useIp = new Configuration().getBoolean(
@ -68,16 +78,9 @@ public class SecurityUtil {
@InterfaceAudience.Private
static void setTokenServiceUseIp(boolean flag) {
useIpForTokenService = flag;
NetUtils.setUseQualifiedHostResolver(!flag);
}
/**
* Intended only for temporary use by NetUtils. Do not use.
* @return whether tokens use an IP address
*/
@InterfaceAudience.Private
public static boolean getTokenServiceUseIp() {
return useIpForTokenService;
hostResolver = !useIpForTokenService
? new QualifiedHostResolver()
: new StandardHostResolver();
}
/**
@ -142,7 +145,7 @@ protected static boolean isOriginalTGT(KerberosTicket ticket) {
* it will be removed when the Java behavior is changed.
*
* @param remoteHost Target URL the krb-https client will access
* @throws IOException
* @throws IOException if the service ticket cannot be retrieved
*/
public static void fetchServiceTicket(URL remoteHost) throws IOException {
if(!UserGroupInformation.isSecurityEnabled())
@ -179,7 +182,7 @@ public static void fetchServiceTicket(URL remoteHost) throws IOException {
* @param hostname
* the fully-qualified domain name used for substitution
* @return converted Kerberos principal name
* @throws IOException
* @throws IOException if the client address cannot be determined
*/
public static String getServerPrincipal(String principalConfig,
String hostname) throws IOException {
@ -204,7 +207,7 @@ public static String getServerPrincipal(String principalConfig,
* @param addr
* InetAddress of the host used for substitution
* @return converted Kerberos principal name
* @throws IOException
* @throws IOException if the client address cannot be determined
*/
public static String getServerPrincipal(String principalConfig,
InetAddress addr) throws IOException {
@ -251,7 +254,7 @@ static String getLocalHostName() throws UnknownHostException {
* the key to look for keytab file in conf
* @param userNameKey
* the key to look for user's Kerberos principal name in conf
* @throws IOException
* @throws IOException if login fails
*/
public static void login(final Configuration conf,
final String keytabFileKey, final String userNameKey) throws IOException {
@ -271,7 +274,7 @@ public static void login(final Configuration conf,
* the key to look for user's Kerberos principal name in conf
* @param hostname
* hostname to use for substitution
* @throws IOException
* @throws IOException if the config doesn't specify a keytab
*/
public static void login(final Configuration conf,
final String keytabFileKey, final String userNameKey, String hostname)
@ -363,7 +366,7 @@ public static void setSecurityInfoProviders(SecurityInfo... providers) {
* Look up the TokenInfo for a given protocol. It searches all known
* SecurityInfo providers.
* @param protocol The protocol class to get the information for.
* @conf conf Configuration object
* @param conf Configuration object
* @return the TokenInfo or null if it has no KerberosInfo defined
*/
public static TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
@ -442,4 +445,155 @@ public static Text buildTokenService(InetSocketAddress addr) {
public static Text buildTokenService(URI uri) {
return buildTokenService(NetUtils.createSocketAddr(uri.getAuthority()));
}
/**
* Resolves a host subject to the security requirements determined by
* hadoop.security.token.service.use_ip.
*
* @param hostname host or ip to resolve
* @return a resolved host
* @throws UnknownHostException if the host doesn't exist
*/
@InterfaceAudience.Private
public static
InetAddress getByName(String hostname) throws UnknownHostException {
return hostResolver.getByName(hostname);
}
interface HostResolver {
InetAddress getByName(String host) throws UnknownHostException;
}
/**
* Uses standard java host resolution
*/
static class StandardHostResolver implements HostResolver {
public InetAddress getByName(String host) throws UnknownHostException {
return InetAddress.getByName(host);
}
}
/**
* This an alternate resolver with important properties that the standard
* java resolver lacks:
* 1) The hostname is fully qualified. This avoids security issues if not
* all hosts in the cluster do not share the same search domains. It
* also prevents other hosts from performing unnecessary dns searches.
* In contrast, InetAddress simply returns the host as given.
* 2) The InetAddress is instantiated with an exact host and IP to prevent
* further unnecessary lookups. InetAddress may perform an unnecessary
* reverse lookup for an IP.
* 3) A call to getHostName() will always return the qualified hostname, or
* more importantly, the IP if instantiated with an IP. This avoids
* unnecessary dns timeouts if the host is not resolvable.
* 4) Point 3 also ensures that if the host is re-resolved, ex. during a
* connection re-attempt, that a reverse lookup to host and forward
* lookup to IP is not performed since the reverse/forward mappings may
* not always return the same IP. If the client initiated a connection
* with an IP, then that IP is all that should ever be contacted.
*
* NOTE: this resolver is only used if:
* hadoop.security.token.service.use_ip=false
*/
protected static class QualifiedHostResolver implements HostResolver {
@SuppressWarnings("unchecked")
private List<String> searchDomains =
ResolverConfiguration.open().searchlist();
/**
* Create an InetAddress with a fully qualified hostname of the given
* hostname. InetAddress does not qualify an incomplete hostname that
* is resolved via the domain search list.
* {@link InetAddress#getCanonicalHostName()} will fully qualify the
* hostname, but it always return the A record whereas the given hostname
* may be a CNAME.
*
* @param host a hostname or ip address
* @return InetAddress with the fully qualified hostname or ip
* @throws UnknownHostException if host does not exist
*/
public InetAddress getByName(String host) throws UnknownHostException {
InetAddress addr = null;
if (IPAddressUtil.isIPv4LiteralAddress(host)) {
// use ipv4 address as-is
byte[] ip = IPAddressUtil.textToNumericFormatV4(host);
addr = InetAddress.getByAddress(host, ip);
} else if (IPAddressUtil.isIPv6LiteralAddress(host)) {
// use ipv6 address as-is
byte[] ip = IPAddressUtil.textToNumericFormatV6(host);
addr = InetAddress.getByAddress(host, ip);
} else if (host.endsWith(".")) {
// a rooted host ends with a dot, ex. "host."
// rooted hosts never use the search path, so only try an exact lookup
addr = getByExactName(host);
} else if (host.contains(".")) {
// the host contains a dot (domain), ex. "host.domain"
// try an exact host lookup, then fallback to search list
addr = getByExactName(host);
if (addr == null) {
addr = getByNameWithSearch(host);
}
} else {
// it's a simple host with no dots, ex. "host"
// try the search list, then fallback to exact host
InetAddress loopback = InetAddress.getByName(null);
if (host.equalsIgnoreCase(loopback.getHostName())) {
addr = InetAddress.getByAddress(host, loopback.getAddress());
} else {
addr = getByNameWithSearch(host);
if (addr == null) {
addr = getByExactName(host);
}
}
}
// unresolvable!
if (addr == null) {
throw new UnknownHostException(host);
}
return addr;
}
InetAddress getByExactName(String host) {
InetAddress addr = null;
// InetAddress will use the search list unless the host is rooted
// with a trailing dot. The trailing dot will disable any use of the
// search path in a lower level resolver. See RFC 1535.
String fqHost = host;
if (!fqHost.endsWith(".")) fqHost += ".";
try {
addr = getInetAddressByName(fqHost);
// can't leave the hostname as rooted or other parts of the system
// malfunction, ex. kerberos principals are lacking proper host
// equivalence for rooted/non-rooted hostnames
addr = InetAddress.getByAddress(host, addr.getAddress());
} catch (UnknownHostException e) {
// ignore, caller will throw if necessary
}
return addr;
}
InetAddress getByNameWithSearch(String host) {
InetAddress addr = null;
if (host.endsWith(".")) { // already qualified?
addr = getByExactName(host);
} else {
for (String domain : searchDomains) {
String dot = !domain.startsWith(".") ? "." : "";
addr = getByExactName(host + dot + domain);
if (addr != null) break;
}
}
return addr;
}
// implemented as a separate method to facilitate unit testing
InetAddress getInetAddressByName(String host) throws UnknownHostException {
return InetAddress.getByName(host);
}
void setSearchDomains(String ... domains) {
searchDomains = Arrays.asList(domains);
}
}
}

View File

@ -25,7 +25,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.net.NetUtilsTestResolver;
import org.apache.hadoop.security.NetUtilsTestResolver;
import org.apache.hadoop.util.Progressable;
import org.junit.Test;

View File

@ -37,6 +37,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.NetUtilsTestResolver;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.net;
package org.apache.hadoop.security;
import java.net.InetAddress;
import java.net.UnknownHostException;
@ -25,7 +25,7 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.net.NetUtils.QualifiedHostResolver;
import org.apache.hadoop.security.SecurityUtil.QualifiedHostResolver;
/**
* provides a dummy dns search resolver with a configurable search path
@ -41,7 +41,7 @@ public static NetUtilsTestResolver install() {
resolver.addResolvedHost("host.a.b.", "1.1.1.1");
resolver.addResolvedHost("b-host.b.", "2.2.2.2");
resolver.addResolvedHost("simple.", "3.3.3.3");
NetUtils.setHostResolver(resolver);
SecurityUtil.hostResolver = resolver;
return resolver;
}
@ -56,7 +56,8 @@ public void addResolvedHost(String host, String ip) {
resolvedHosts.put(host, addr);
}
InetAddress getInetAddressByName(String host) throws UnknownHostException {
@Override
public InetAddress getInetAddressByName(String host) throws UnknownHostException {
hostSearches.add(host);
if (!resolvedHosts.containsKey(host)) {
throw new UnknownHostException(host);
@ -64,11 +65,21 @@ InetAddress getInetAddressByName(String host) throws UnknownHostException {
return resolvedHosts.get(host);
}
String[] getHostSearches() {
@Override
public InetAddress getByExactName(String host) {
return super.getByExactName(host);
}
@Override
public InetAddress getByNameWithSearch(String host) {
return super.getByNameWithSearch(host);
}
public String[] getHostSearches() {
return hostSearches.toArray(new String[0]);
}
void reset() {
public void reset() {
hostSearches.clear();
}
}

View File

@ -225,7 +225,7 @@ void runBadPortPermutes(String arg, boolean validIfPosPort) {
assertTrue(!addr.isUnresolved());
// don't know what the standard resolver will return for hostname.
// should be host for host; host or ip for ip is ambiguous
if (!SecurityUtil.getTokenServiceUseIp()) {
if (!SecurityUtil.useIpForTokenService) {
assertEquals(host, addr.getHostName());
assertEquals(host, addr.getAddress().getHostName());
}

View File

@ -170,6 +170,9 @@ Trunk (unreleased changes)
HDFS-2739. SecondaryNameNode doesn't start up. (jitendra)
HDFS-2776. Missing interface annotation on JournalSet.
(Brandon Li via jitendra)
Release 0.23.1 - UNRELEASED
INCOMPATIBLE CHANGES
@ -259,6 +262,8 @@ Release 0.23.1 - UNRELEASED
HDFS-1314. Make dfs.blocksize accept size-indicating prefixes (Sho Shimauchi via harsh)
HDFS-69. Improve the 'dfsadmin' commandline help. (harsh)
OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd)

View File

@ -35,11 +35,14 @@
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Manages a collection of Journals. None of the methods are synchronized, it is
* assumed that FSEditLog methods, that use this class, use proper
* synchronization.
*/
@InterfaceAudience.Private
public class JournalSet implements JournalManager {
static final Log LOG = LogFactory.getLog(FSEditLog.class);

View File

@ -131,8 +131,9 @@ private static class SetQuotaCommand extends DFSAdminCommand {
"\t\ton the number of names in the directory tree\n" +
"\t\tFor each directory, attempt to set the quota. An error will be reported if\n" +
"\t\t1. N is not a positive integer, or\n" +
"\t\t2. user is not an administrator, or\n" +
"\t\t3. the directory does not exist or is a file, or\n";
"\t\t2. User is not an administrator, or\n" +
"\t\t3. The directory does not exist or is a file.\n" +
"\t\tNote: A quota of 1 would force the directory to remain empty.\n";
private final long quota; // the quota to be set
@ -917,6 +918,7 @@ private static void printUsage(String cmd) {
+ " [-setBalancerBandwidth <bandwidth in bytes per second>]");
} else {
System.err.println("Usage: java DFSAdmin");
System.err.println("Note: Administrative commands can only be run as the HDFS superuser.");
System.err.println(" [-report]");
System.err.println(" [-safemode enter | leave | get | wait]");
System.err.println(" [-saveNamespace]");

View File

@ -178,6 +178,9 @@ Release 0.23.1 - Unreleased
Improved the earlier patch to not to JobHistoryServer repeatedly.
(Anupam Seth via vinodkv)
MAPREDUCE-3553. Add support for data returned when exceptions thrown from web
service apis to be in either xml or in JSON. (Thomas Graves via mahadev)
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
@ -198,6 +201,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3618. Fixed TaskHeartbeatHandler to not hold a global lock for all
task-updates. (Siddarth Seth via vinodkv)
MAPREDUCE-3512. Batching JobHistory flushing to DFS so that we don't flush
for every event slowing down AM. (Siddarth Seth via vinodkv)
BUG FIXES
MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob
@ -462,6 +468,17 @@ Release 0.23.1 - Unreleased
authenticated. (mahadev)
MAPREDUCE-3648. TestJobConf failing. (Thomas Graves via mahadev)
MAPREDUCE-3651. TestQueueManagerRefresh fails. (Thomas Graves via mahadev)
MAPREDUCE-3645. TestJobHistory fails. (Thomas Graves via mahadev)
MAPREDUCE-3652. org.apache.hadoop.mapred.TestWebUIAuthorization.testWebUIAuthorization
fails. (Thomas Graves via mahadev)
MAPREDUCE-3625. CapacityScheduler web-ui display of queue's used capacity is broken.
(Jason Lowe via mahadev)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -20,9 +20,12 @@
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -70,13 +73,20 @@ public class JobHistoryEventHandler extends AbstractService
private FileSystem stagingDirFS; // log Dir FileSystem
private FileSystem doneDirFS; // done Dir FileSystem
private Configuration conf;
private Path stagingDirPath = null;
private Path doneDirPrefixPath = null; // folder for completed jobs
private int maxUnflushedCompletionEvents;
private int postJobCompletionMultiplier;
private long flushTimeout;
private int minQueueSizeForBatchingFlushes; // TODO: Rename
private BlockingQueue<JobHistoryEvent> eventQueue =
private int numUnflushedCompletionEvents = 0;
private boolean isTimerActive;
protected BlockingQueue<JobHistoryEvent> eventQueue =
new LinkedBlockingQueue<JobHistoryEvent>();
protected Thread eventHandlingThread;
private volatile boolean stopped;
@ -103,8 +113,6 @@ public JobHistoryEventHandler(AppContext context, int startCount) {
@Override
public void init(Configuration conf) {
this.conf = conf;
String stagingDirStr = null;
String doneDirStr = null;
String userDoneDirStr = null;
@ -184,6 +192,27 @@ public void init(Configuration conf) {
throw new YarnException(e);
}
// Maximum number of unflushed completion-events that can stay in the queue
// before flush kicks in.
maxUnflushedCompletionEvents =
conf.getInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS,
MRJobConfig.DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS);
// We want to cut down flushes after job completes so as to write quicker,
// so we increase maxUnflushedEvents post Job completion by using the
// following multiplier.
postJobCompletionMultiplier =
conf.getInt(
MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER,
MRJobConfig.DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER);
// Max time until which flush doesn't take place.
flushTimeout =
conf.getLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
MRJobConfig.DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS);
minQueueSizeForBatchingFlushes =
conf.getInt(
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
super.init(conf);
}
@ -256,14 +285,28 @@ public void stop() {
stopped = true;
//do not interrupt while event handling is in progress
synchronized(lock) {
if (eventHandlingThread != null)
eventHandlingThread.interrupt();
}
try {
if (eventHandlingThread != null)
eventHandlingThread.join();
} catch (InterruptedException ie) {
LOG.info("Interruped Exception while stopping", ie);
}
// Cancel all timers - so that they aren't invoked during or after
// the metaInfo object is wrapped up.
for (MetaInfo mi : fileMap.values()) {
try {
mi.shutDownTimer();
} catch (IOException e) {
LOG.info("Exception while cancelling delayed flush timer. "
+ "Likely caused by a failed flush " + e.getMessage());
}
}
//write all the events remaining in queue
Iterator<JobHistoryEvent> it = eventQueue.iterator();
while(it.hasNext()) {
@ -284,6 +327,12 @@ public void stop() {
super.stop();
}
protected EventWriter createEventWriter(Path historyFilePath)
throws IOException {
FSDataOutputStream out = stagingDirFS.create(historyFilePath, true);
return new EventWriter(out);
}
/**
* Create an event writer for the Job represented by the jobID.
* Writes out the job configuration to the log directory.
@ -319,8 +368,7 @@ protected void setupEventWriter(JobId jobId)
JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId, startCount);
if (writer == null) {
try {
FSDataOutputStream out = stagingDirFS.create(historyFile, true);
writer = new EventWriter(out);
writer = createEventWriter(historyFile);
LOG.info("Event Writer setup for JobId: " + jobId + ", File: "
+ historyFile);
} catch (IOException ioe) {
@ -371,12 +419,26 @@ public void closeWriter(JobId id) throws IOException {
@Override
public void handle(JobHistoryEvent event) {
try {
if (isJobCompletionEvent(event.getHistoryEvent())) {
// When the job is complete, flush slower but write faster.
maxUnflushedCompletionEvents =
maxUnflushedCompletionEvents * postJobCompletionMultiplier;
}
eventQueue.put(event);
} catch (InterruptedException e) {
throw new YarnException(e);
}
}
private boolean isJobCompletionEvent(HistoryEvent historyEvent) {
if (EnumSet.of(EventType.JOB_FINISHED, EventType.JOB_FAILED,
EventType.JOB_KILLED).contains(historyEvent.getEventType())) {
return true;
}
return false;
}
protected void handleEvent(JobHistoryEvent event) {
synchronized (lock) {
@ -615,32 +677,81 @@ protected void closeEventWriter(JobId jobId) throws IOException {
}
}
private class FlushTimerTask extends TimerTask {
private MetaInfo metaInfo;
private IOException ioe = null;
private volatile boolean shouldRun = true;
FlushTimerTask(MetaInfo metaInfo) {
this.metaInfo = metaInfo;
}
@Override
public void run() {
synchronized (lock) {
try {
if (!metaInfo.isTimerShutDown() && shouldRun)
metaInfo.flush();
} catch (IOException e) {
ioe = e;
}
}
}
public IOException getException() {
return ioe;
}
public void stop() {
shouldRun = false;
this.cancel();
}
}
private class MetaInfo {
private Path historyFile;
private Path confFile;
private EventWriter writer;
JobIndexInfo jobIndexInfo;
JobSummary jobSummary;
Timer flushTimer;
FlushTimerTask flushTimerTask;
private boolean isTimerShutDown = false;
MetaInfo(Path historyFile, Path conf, EventWriter writer,
String user, String jobName, JobId jobId) {
MetaInfo(Path historyFile, Path conf, EventWriter writer, String user,
String jobName, JobId jobId) {
this.historyFile = historyFile;
this.confFile = conf;
this.writer = writer;
this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1,
null);
this.jobIndexInfo =
new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null);
this.jobSummary = new JobSummary();
this.flushTimer = new Timer("FlushTimer", true);
}
Path getHistoryFile() { return historyFile; }
Path getHistoryFile() {
return historyFile;
}
Path getConfFile() {return confFile; }
Path getConfFile() {
return confFile;
}
JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
JobIndexInfo getJobIndexInfo() {
return jobIndexInfo;
}
JobSummary getJobSummary() { return jobSummary; }
JobSummary getJobSummary() {
return jobSummary;
}
boolean isWriterActive() {return writer != null ; }
boolean isWriterActive() {
return writer != null;
}
boolean isTimerShutDown() {
return isTimerShutDown;
}
void closeWriter() throws IOException {
synchronized (lock) {
@ -655,7 +766,67 @@ void writeEvent(HistoryEvent event) throws IOException {
synchronized (lock) {
if (writer != null) {
writer.write(event);
processEventForFlush(event);
maybeFlush(event);
}
}
}
void processEventForFlush(HistoryEvent historyEvent) throws IOException {
if (EnumSet.of(EventType.MAP_ATTEMPT_FINISHED,
EventType.MAP_ATTEMPT_FAILED, EventType.MAP_ATTEMPT_KILLED,
EventType.REDUCE_ATTEMPT_FINISHED, EventType.REDUCE_ATTEMPT_FAILED,
EventType.REDUCE_ATTEMPT_KILLED, EventType.TASK_FINISHED,
EventType.TASK_FAILED, EventType.JOB_FINISHED, EventType.JOB_FAILED,
EventType.JOB_KILLED).contains(historyEvent.getEventType())) {
numUnflushedCompletionEvents++;
if (!isTimerActive) {
resetFlushTimer();
if (!isTimerShutDown) {
flushTimerTask = new FlushTimerTask(this);
flushTimer.schedule(flushTimerTask, flushTimeout);
}
}
}
}
void resetFlushTimer() throws IOException {
if (flushTimerTask != null) {
IOException exception = flushTimerTask.getException();
flushTimerTask.stop();
if (exception != null) {
throw exception;
}
flushTimerTask = null;
}
isTimerActive = false;
}
void maybeFlush(HistoryEvent historyEvent) throws IOException {
if ((eventQueue.size() < minQueueSizeForBatchingFlushes
&& numUnflushedCompletionEvents > 0)
|| numUnflushedCompletionEvents >= maxUnflushedCompletionEvents
|| isJobCompletionEvent(historyEvent)) {
this.flush();
}
}
void flush() throws IOException {
synchronized (lock) {
if (numUnflushedCompletionEvents != 0) { // skipped timer cancel.
writer.flush();
numUnflushedCompletionEvents = 0;
resetFlushTimer();
}
}
}
void shutDownTimer() throws IOException {
synchronized (lock) {
isTimerShutDown = true;
flushTimer.cancel();
if (flushTimerTask != null && flushTimerTask.getException() != null) {
throw flushTimerTask.getException();
}
}
}
@ -682,7 +853,7 @@ private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
doneDirFS.delete(toPath, true);
}
boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath,
false, conf);
false, getConfig());
if (copied)
LOG.info("Copied to done location: " + toPath);

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TasksInfo;
import org.apache.hadoop.yarn.webapp.RemoteExceptionData;
@Singleton
@Provider
@ -64,7 +65,7 @@ public class JAXBContextResolver implements ContextResolver<JAXBContext> {
JobCounterInfo.class, TaskCounterInfo.class, CounterGroupInfo.class,
JobInfo.class, JobsInfo.class, ReduceTaskAttemptInfo.class,
TaskAttemptInfo.class, TaskInfo.class, TasksInfo.class,
TaskAttemptsInfo.class, ConfEntryInfo.class};
TaskAttemptsInfo.class, ConfEntryInfo.class, RemoteExceptionData.class};
public JAXBContextResolver() throws Exception {
this.types = new HashSet<Class>(Arrays.asList(cTypes));

View File

@ -0,0 +1,310 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestJobHistoryEventHandler {
private static final Log LOG = LogFactory
.getLog(TestJobHistoryEventHandler.class);
@Test
public void testFirstFlushOnCompletionEvent() throws Exception {
TestParams t = new TestParams();
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
60 * 1000l);
conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
conf.setInt(
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 200);
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
for (int i = 0; i < 100; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskStartedEvent(
t.taskID, 0, TaskType.MAP, "")));
}
handleNextNEvents(jheh, 100);
verify(mockWriter, times(0)).flush();
// First completion event, but min-queue-size for batching flushes is 10
handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, 0, TaskType.MAP, "", null)));
verify(mockWriter).flush();
} finally {
jheh.stop();
verify(mockWriter).close();
}
}
@Test
public void testMaxUnflushedCompletionEvents() throws Exception {
TestParams t = new TestParams();
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
60 * 1000l);
conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
conf.setInt(
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 5);
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, 0, TaskType.MAP, "", null)));
}
handleNextNEvents(jheh, 9);
verify(mockWriter, times(0)).flush();
handleNextNEvents(jheh, 1);
verify(mockWriter).flush();
handleNextNEvents(jheh, 50);
verify(mockWriter, times(6)).flush();
} finally {
jheh.stop();
verify(mockWriter).close();
}
}
@Test
public void testUnflushedTimer() throws Exception {
TestParams t = new TestParams();
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
2 * 1000l); //2 seconds.
conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 100);
conf.setInt(
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 5);
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, 0, TaskType.MAP, "", null)));
}
handleNextNEvents(jheh, 9);
verify(mockWriter, times(0)).flush();
Thread.sleep(2 * 4 * 1000l); // 4 seconds should be enough. Just be safe.
verify(mockWriter).flush();
} finally {
jheh.stop();
verify(mockWriter).close();
}
}
@Test
public void testBatchedFlushJobEndMultiplier() throws Exception {
TestParams t = new TestParams();
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
60 * 1000l); //2 seconds.
conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 3);
conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
conf.setInt(
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 0);
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
mockWriter = jheh.getEventWriter();
verify(mockWriter).write(any(HistoryEvent.class));
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
t.taskID, 0, TaskType.MAP, "", null)));
}
queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
handleNextNEvents(jheh, 29);
verify(mockWriter, times(0)).flush();
handleNextNEvents(jheh, 72);
verify(mockWriter, times(4)).flush(); //3 * 30 + 1 for JobFinished
} finally {
jheh.stop();
verify(mockWriter).close();
}
}
private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
jheh.handle(event);
}
private void handleEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event)
throws InterruptedException {
jheh.handle(event);
jheh.handleEvent(jheh.eventQueue.take());
}
private void handleNextNEvents(JHEvenHandlerForTest jheh, int numEvents)
throws InterruptedException {
for (int i = 0; i < numEvents; i++) {
jheh.handleEvent(jheh.eventQueue.take());
}
}
private String setupTestWorkDir() {
File testWorkDir = new File("target", this.getClass().getCanonicalName());
try {
FileContext.getLocalFSFileContext().delete(
new Path(testWorkDir.getAbsolutePath()), true);
return testWorkDir.getAbsolutePath();
} catch (Exception e) {
LOG.warn("Could not cleanup", e);
throw new YarnException("could not cleanup test dir", e);
}
}
private AppContext mockAppContext(JobId jobId) {
AppContext mockContext = mock(AppContext.class);
Job mockJob = mock(Job.class);
when(mockJob.getTotalMaps()).thenReturn(10);
when(mockJob.getTotalReduces()).thenReturn(10);
when(mockJob.getName()).thenReturn("mockjob");
when(mockContext.getJob(jobId)).thenReturn(mockJob);
return mockContext;
}
private class TestParams {
String workDir = setupTestWorkDir();
ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
AppContext mockAppContext = mockAppContext(jobId);
}
}
class JHEvenHandlerForTest extends JobHistoryEventHandler {
private EventWriter eventWriter;
volatile int handleEventCompleteCalls = 0;
volatile int handleEventStartedCalls = 0;
public JHEvenHandlerForTest(AppContext context, int startCount) {
super(context, startCount);
}
@Override
public void start() {
}
@Override
protected EventWriter createEventWriter(Path historyFilePath)
throws IOException {
this.eventWriter = mock(EventWriter.class);
return this.eventWriter;
}
@Override
protected void closeEventWriter(JobId jobId) {
}
public EventWriter getEventWriter() {
return this.eventWriter;
}
}

View File

@ -345,6 +345,29 @@ public void testJobIdNonExist() throws JSONException, Exception {
public void testJobIdInvalid() throws JSONException, Exception {
WebResource r = resource();
try {
r.path("ws").path("v1").path("mapreduce").path("jobs").path("job_foo")
.accept(MediaType.APPLICATION_JSON).get(JSONObject.class);
fail("should have thrown exception on invalid uri");
} catch (UniformInterfaceException ue) {
ClientResponse response = ue.getResponse();
assertEquals(Status.BAD_REQUEST, response.getClientResponseStatus());
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject msg = response.getEntity(JSONObject.class);
JSONObject exception = msg.getJSONObject("RemoteException");
assertEquals("incorrect number of elements", 3, exception.length());
String message = exception.getString("message");
String type = exception.getString("exception");
String classname = exception.getString("javaClassName");
verifyJobIdInvalid(message, type, classname);
}
}
// verify the exception output default is JSON
@Test
public void testJobIdInvalidDefault() throws JSONException, Exception {
WebResource r = resource();
try {
r.path("ws").path("v1").path("mapreduce").path("jobs").path("job_foo")
.get(JSONObject.class);
@ -359,6 +382,41 @@ public void testJobIdInvalid() throws JSONException, Exception {
String message = exception.getString("message");
String type = exception.getString("exception");
String classname = exception.getString("javaClassName");
verifyJobIdInvalid(message, type, classname);
}
}
// test that the exception output works in XML
@Test
public void testJobIdInvalidXML() throws JSONException, Exception {
WebResource r = resource();
try {
r.path("ws").path("v1").path("mapreduce").path("jobs").path("job_foo")
.accept(MediaType.APPLICATION_XML).get(JSONObject.class);
fail("should have thrown exception on invalid uri");
} catch (UniformInterfaceException ue) {
ClientResponse response = ue.getResponse();
assertEquals(Status.BAD_REQUEST, response.getClientResponseStatus());
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String msg = response.getEntity(String.class);
System.out.println(msg);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(msg));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("RemoteException");
Element element = (Element) nodes.item(0);
String message = WebServicesTestUtils.getXmlString(element, "message");
String type = WebServicesTestUtils.getXmlString(element, "exception");
String classname = WebServicesTestUtils.getXmlString(element,
"javaClassName");
verifyJobIdInvalid(message, type, classname);
}
}
private void verifyJobIdInvalid(String message, String type, String classname) {
WebServicesTestUtils.checkStringMatch("exception message",
"For input string: \"foo\"", message);
WebServicesTestUtils.checkStringMatch("exception type",
@ -366,7 +424,6 @@ public void testJobIdInvalid() throws JSONException, Exception {
WebServicesTestUtils.checkStringMatch("exception classname",
"java.lang.NumberFormatException", classname);
}
}
@Test
public void testJobIdInvalidBogus() throws JSONException, Exception {

View File

@ -1033,11 +1033,25 @@ public Path run() throws IOException, InterruptedException {
}
}
private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
JobQueueInfo ret = new JobQueueInfo(queue);
// make sure to convert any children
if (queue.getQueueChildren().size() > 0) {
List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
.getQueueChildren().size());
for (QueueInfo child : queue.getQueueChildren()) {
childQueues.add(getJobQueueInfo(child));
}
ret.setChildren(childQueues);
}
return ret;
}
private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
throws IOException {
JobQueueInfo[] ret = new JobQueueInfo[queues.length];
for (int i = 0; i < queues.length; i++) {
ret[i] = new JobQueueInfo(queues[i]);
ret[i] = getJobQueueInfo(queues[i]);
}
return ret;
}

View File

@ -105,7 +105,7 @@ protected void setChildren(List<JobQueueInfo> children) {
public List<JobQueueInfo> getChildren() {
List<JobQueueInfo> list = new ArrayList<JobQueueInfo>();
for (QueueInfo q : super.getQueueChildren()) {
list.add(new JobQueueInfo(q));
list.add((JobQueueInfo)q);
}
return list;
}

View File

@ -436,6 +436,26 @@ public interface MRJobConfig {
public static final String MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR =
MR_AM_PREFIX + "create-intermediate-jh-base-dir";
public static final String MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS =
MR_AM_PREFIX + "history.max-unflushed-events";
public static final int DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS =
200;
public static final String MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER =
MR_AM_PREFIX + "history.job-complete-unflushed-multiplier";
public static final int DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER =
30;
public static final String MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS =
MR_AM_PREFIX + "history.complete-event-flush-timeout";
public static final long DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS =
30 * 1000l;
public static final String MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold";
public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
50;
public static final String MAPRED_MAP_ADMIN_JAVA_OPTS =
"mapreduce.admin.map.child.java.opts";

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.HistoryInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.yarn.webapp.RemoteExceptionData;
@Singleton
@Provider
@ -64,7 +65,8 @@ public class JAXBContextResolver implements ContextResolver<JAXBContext> {
JobTaskAttemptCounterInfo.class, TaskCounterInfo.class,
JobCounterInfo.class, ReduceTaskAttemptInfo.class, TaskAttemptInfo.class,
TaskAttemptsInfo.class, CounterGroupInfo.class,
TaskCounterGroupInfo.class, AMAttemptInfo.class, AMAttemptsInfo.class };
TaskCounterGroupInfo.class, AMAttemptInfo.class, AMAttemptsInfo.class,
RemoteExceptionData.class };
public JAXBContextResolver() throws Exception {
this.types = new HashSet<Class>(Arrays.asList(cTypes));

View File

@ -392,6 +392,31 @@ public void testJobIdNonExist() throws JSONException, Exception {
public void testJobIdInvalid() throws JSONException, Exception {
WebResource r = resource();
try {
r.path("ws").path("v1").path("history").path("mapreduce").path("jobs")
.path("job_foo").accept(MediaType.APPLICATION_JSON)
.get(JSONObject.class);
fail("should have thrown exception on invalid uri");
} catch (UniformInterfaceException ue) {
ClientResponse response = ue.getResponse();
assertEquals(Status.BAD_REQUEST, response.getClientResponseStatus());
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject msg = response.getEntity(JSONObject.class);
JSONObject exception = msg.getJSONObject("RemoteException");
assertEquals("incorrect number of elements", 3, exception.length());
String message = exception.getString("message");
String type = exception.getString("exception");
String classname = exception.getString("javaClassName");
verifyJobIdInvalid(message, type, classname);
}
}
// verify the exception output default is JSON
@Test
public void testJobIdInvalidDefault() throws JSONException, Exception {
WebResource r = resource();
try {
r.path("ws").path("v1").path("history").path("mapreduce").path("jobs")
.path("job_foo").get(JSONObject.class);
@ -406,6 +431,42 @@ public void testJobIdInvalid() throws JSONException, Exception {
String message = exception.getString("message");
String type = exception.getString("exception");
String classname = exception.getString("javaClassName");
verifyJobIdInvalid(message, type, classname);
}
}
// test that the exception output works in XML
@Test
public void testJobIdInvalidXML() throws JSONException, Exception {
WebResource r = resource();
try {
r.path("ws").path("v1").path("history").path("mapreduce").path("jobs")
.path("job_foo").accept(MediaType.APPLICATION_XML)
.get(JSONObject.class);
fail("should have thrown exception on invalid uri");
} catch (UniformInterfaceException ue) {
ClientResponse response = ue.getResponse();
assertEquals(Status.BAD_REQUEST, response.getClientResponseStatus());
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String msg = response.getEntity(String.class);
System.out.println(msg);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(msg));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("RemoteException");
Element element = (Element) nodes.item(0);
String message = WebServicesTestUtils.getXmlString(element, "message");
String type = WebServicesTestUtils.getXmlString(element, "exception");
String classname = WebServicesTestUtils.getXmlString(element,
"javaClassName");
verifyJobIdInvalid(message, type, classname);
}
}
private void verifyJobIdInvalid(String message, String type, String classname) {
WebServicesTestUtils.checkStringMatch("exception message",
"For input string: \"foo\"", message);
WebServicesTestUtils.checkStringMatch("exception type",
@ -413,7 +474,6 @@ public void testJobIdInvalid() throws JSONException, Exception {
WebServicesTestUtils.checkStringMatch("exception classname",
"java.lang.NumberFormatException", classname);
}
}
@Test
public void testJobIdInvalidBogus() throws JSONException, Exception {

View File

@ -19,12 +19,9 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
@ -33,19 +30,12 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.mortbay.util.ajax.JSON;
import com.google.inject.Singleton;
/**
* Handle webservices jersey exceptions and create json response in the format:
* { "RemoteException" :
* {
* "exception" : <exception type>,
* "javaClassName" : <classname of exception>,
* "message" : <error message from exception>
* }
* }
* Handle webservices jersey exceptions and create json or xml response
* with the ExceptionData.
*/
@Singleton
@Provider
@ -100,16 +90,11 @@ public Response toResponse(Exception e) {
s = Response.Status.INTERNAL_SERVER_ERROR;
}
// convert to json
final Map<String, Object> m = new TreeMap<String, Object>();
m.put("exception", e.getClass().getSimpleName());
m.put("message", e.getMessage());
m.put("javaClassName", e.getClass().getName());
final Map<String, Object> m2 = new TreeMap<String, Object>();
m2.put(RemoteException.class.getSimpleName(), m);
final String js = JSON.toString(m2);
// let jaxb handle marshalling data out in the same format requested
RemoteExceptionData exception = new RemoteExceptionData(e.getClass().getSimpleName(),
e.getMessage(), e.getClass().getName());
return Response.status(s).type(MediaType.APPLICATION_JSON).entity(js)
return Response.status(s).entity(exception)
.build();
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.webapp;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Contains the exception information from an exception thrown
* by the web service REST API's.
* Fields include:
* exception - exception type
* javaClassName - java class name of the exception
* message - a detailed message explaining the exception
*
*/
@XmlRootElement(name = "RemoteException")
@XmlAccessorType(XmlAccessType.FIELD)
public class RemoteExceptionData {
private String exception;
private String message;
private String javaClassName;
public RemoteExceptionData() {
}
public RemoteExceptionData(String excep, String message, String className) {
this.exception = excep;
this.message = message;
this.javaClassName = className;
}
public String getException() {
return exception;
}
public String getMessage() {
return message;
}
public String getJavaClassName() {
return javaClassName;
}
}

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.webapp.RemoteExceptionData;
@Singleton
@Provider
@ -45,7 +46,8 @@ public class JAXBContextResolver implements ContextResolver<JAXBContext> {
// you have to specify all the dao classes here
private final Class[] cTypes = {AppInfo.class, AppsInfo.class,
ContainerInfo.class, ContainersInfo.class, NodeInfo.class};
ContainerInfo.class, ContainersInfo.class, NodeInfo.class,
RemoteExceptionData.class};
public JAXBContextResolver() throws Exception {
this.types = new HashSet<Class>(Arrays.asList(cTypes));

View File

@ -382,6 +382,80 @@ public void testNodeAppsStateInvalid() throws JSONException, Exception {
String message = exception.getString("message");
String type = exception.getString("exception");
String classname = exception.getString("javaClassName");
verifyStatInvalidException(message, type, classname);
}
}
// verify the exception object default format is JSON
@Test
public void testNodeAppsStateInvalidDefault() throws JSONException, Exception {
WebResource r = resource();
Application app = new MockApp(1);
nmContext.getApplications().put(app.getAppId(), app);
addAppContainers(app);
Application app2 = new MockApp("foo", 1234, 2);
nmContext.getApplications().put(app2.getAppId(), app2);
addAppContainers(app2);
try {
r.path("ws").path("v1").path("node").path("apps")
.queryParam("state", "FOO_STATE").get(JSONObject.class);
fail("should have thrown exception on invalid user query");
} catch (UniformInterfaceException ue) {
ClientResponse response = ue.getResponse();
assertEquals(Status.BAD_REQUEST, response.getClientResponseStatus());
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject msg = response.getEntity(JSONObject.class);
JSONObject exception = msg.getJSONObject("RemoteException");
assertEquals("incorrect number of elements", 3, exception.length());
String message = exception.getString("message");
String type = exception.getString("exception");
String classname = exception.getString("javaClassName");
verifyStatInvalidException(message, type, classname);
}
}
// test that the exception output also returns XML
@Test
public void testNodeAppsStateInvalidXML() throws JSONException, Exception {
WebResource r = resource();
Application app = new MockApp(1);
nmContext.getApplications().put(app.getAppId(), app);
addAppContainers(app);
Application app2 = new MockApp("foo", 1234, 2);
nmContext.getApplications().put(app2.getAppId(), app2);
addAppContainers(app2);
try {
r.path("ws").path("v1").path("node").path("apps")
.queryParam("state", "FOO_STATE").accept(MediaType.APPLICATION_XML)
.get(JSONObject.class);
fail("should have thrown exception on invalid user query");
} catch (UniformInterfaceException ue) {
ClientResponse response = ue.getResponse();
assertEquals(Status.BAD_REQUEST, response.getClientResponseStatus());
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String msg = response.getEntity(String.class);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(msg));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("RemoteException");
Element element = (Element) nodes.item(0);
String message = WebServicesTestUtils.getXmlString(element, "message");
String type = WebServicesTestUtils.getXmlString(element, "exception");
String classname = WebServicesTestUtils.getXmlString(element,
"javaClassName");
verifyStatInvalidException(message, type, classname);
}
}
private void verifyStatInvalidException(String message, String type,
String classname) {
WebServicesTestUtils
.checkStringMatch(
"exception message",
@ -392,7 +466,6 @@ public void testNodeAppsStateInvalid() throws JSONException, Exception {
WebServicesTestUtils.checkStringMatch("exception classname",
"java.lang.IllegalArgumentException", classname);
}
}
@Test
public void testNodeSingleApps() throws JSONException, Exception {

View File

@ -42,11 +42,12 @@
class CapacitySchedulerPage extends RmView {
static final String _Q = ".ui-state-default.ui-corner-all";
static final float WIDTH_F = 0.8f;
static final float Q_MAX_WIDTH = 0.8f;
static final float Q_STATS_POS = Q_MAX_WIDTH + 0.05f;
static final String Q_END = "left:101%";
static final String OVER = "font-size:1px;background:rgba(255, 140, 0, 0.8)";
static final String UNDER = "font-size:1px;background:rgba(50, 205, 50, 0.8)";
static final float EPSILON = 1e-8f;
static final String Q_GIVEN = "left:0%;background:none;border:1px dashed rgba(0,0,0,0.25)";
static final String Q_OVER = "background:rgba(255, 140, 0, 0.8)";
static final String Q_UNDER = "background:rgba(50, 205, 50, 0.8)";
@RequestScoped
static class CSQInfo {
@ -106,18 +107,20 @@ public void render(Block html) {
for (CapacitySchedulerQueueInfo info : subQueues) {
float used = info.getUsedCapacity() / 100;
float set = info.getCapacity() / 100;
float delta = Math.abs(set - used) + 0.001f;
float max = info.getMaxCapacity() / 100;
LI<UL<Hamlet>> li = ul.
li().
a(_Q).$style(width(max * WIDTH_F)).
$title(join("used:", percent(used), " set:", percent(set),
" max:", percent(max))).
//span().$style(Q_END)._(absMaxPct)._().
span().$style(join(width(delta/max), ';',
used > set ? OVER : UNDER, ';',
used > set ? left(set/max) : left(used/max)))._('.')._().
span(".q", info.getQueuePath().substring(5))._();
a(_Q).$style(width(max * Q_MAX_WIDTH)).
$title(join("capacity:", percent(set), " used:", percent(used),
" max capacity:", percent(max))).
span().$style(join(Q_GIVEN, ";font-size:1px;", width(set/max))).
_('.')._().
span().$style(join(width(used*set/max),
";font-size:1px;left:0%;", used > 1 ? Q_OVER : Q_UNDER)).
_('.')._().
span(".q", info.getQueuePath().substring(5))._().
span().$class("qstats").$style(left(Q_STATS_POS)).
_(join(percent(used), " used"))._();
csqinfo.qinfo = info;
if (info.getSubQueues() == null) {
@ -153,7 +156,7 @@ public void render(Block html) {
if (cs == null) {
ul.
li().
a(_Q).$style(width(WIDTH_F)).
a(_Q).$style(width(Q_MAX_WIDTH)).
span().$style(Q_END)._("100% ")._().
span(".q", "default")._()._();
} else {
@ -163,16 +166,26 @@ public void render(Block html) {
csqinfo.qinfo = null;
float used = sinfo.getUsedCapacity() / 100;
float set = sinfo.getCapacity() / 100;
float delta = Math.abs(set - used) + 0.001f;
ul.
li().$style("margin-bottom: 1em").
span().$style("font-weight: bold")._("Legend:")._().
span().$class("qlegend ui-corner-all").$style(Q_GIVEN).
_("Capacity")._().
span().$class("qlegend ui-corner-all").$style(Q_UNDER).
_("Used")._().
span().$class("qlegend ui-corner-all").$style(Q_OVER).
_("Used (over capacity)")._().
span().$class("qlegend ui-corner-all ui-state-default").
_("Max Capacity")._().
_().
li().
a(_Q).$style(width(WIDTH_F)).
a(_Q).$style(width(Q_MAX_WIDTH)).
$title(join("used:", percent(used))).
span().$style(Q_END)._("100%")._().
span().$style(join(width(delta), ';', used > set ? OVER : UNDER,
';', used > set ? left(set) : left(used)))._(".")._().
span().$style(join(width(used), ";left:0%;",
used > 1 ? Q_OVER : Q_UNDER))._(".")._().
span(".q", "root")._().
span().$class("qstats").$style(left(Q_STATS_POS)).
_(join(percent(used), " used"))._().
_(QueueBlock.class)._();
}
ul._()._().
@ -190,6 +203,8 @@ public void render(Block html) {
"#cs a { font-weight: normal; margin: 2px; position: relative }",
"#cs a span { font-weight: normal; font-size: 80% }",
"#cs-wrapper .ui-widget-header { padding: 0.2em 0.5em }",
".qstats { font-weight: normal; font-size: 80%; position: absolute }",
".qlegend { font-weight: normal; padding: 0 1em; margin: 1em }",
"table.info tr th {width: 50%}")._(). // to center info table
script("/static/jt/jquery.jstree.js").
script().$type("text/javascript").

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.UserMetricsInfo;
import org.apache.hadoop.yarn.webapp.RemoteExceptionData;
@Singleton
@Provider
@ -55,7 +56,8 @@ public class JAXBContextResolver implements ContextResolver<JAXBContext> {
CapacitySchedulerQueueInfo.class, FifoSchedulerInfo.class,
SchedulerTypeInfo.class, NodeInfo.class, UserMetricsInfo.class,
CapacitySchedulerInfo.class, ClusterMetricsInfo.class,
SchedulerInfo.class, AppsInfo.class, NodesInfo.class };
SchedulerInfo.class, AppsInfo.class, NodesInfo.class,
RemoteExceptionData.class};
public JAXBContextResolver() throws Exception {
this.types = new HashSet<Class>(Arrays.asList(cTypes));

View File

@ -25,6 +25,7 @@
import javax.xml.bind.annotation.XmlSeeAlso;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@XmlRootElement
@ -47,7 +48,7 @@ public class CapacitySchedulerQueueInfo {
protected int numApplications;
protected String usedResources;
protected String queueName;
protected String state;
protected QueueState state;
protected ArrayList<CapacitySchedulerQueueInfo> subQueues;
CapacitySchedulerQueueInfo() {
@ -69,7 +70,7 @@ public class CapacitySchedulerQueueInfo {
numApplications = q.getNumApplications();
usedResources = q.getUsedResources().toString();
queueName = q.getQueueName();
state = q.getState().toString();
state = q.getState();
}
public float getCapacity() {
@ -109,7 +110,7 @@ public String getQueueName() {
}
public String getQueueState() {
return this.state;
return this.state.toString();
}
public String getQueuePath() {

View File

@ -62,6 +62,31 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
private static MockRM rm;
private CapacitySchedulerConfiguration csConf;
private class QueueInfo {
float capacity;
float usedCapacity;
float maxCapacity;
float absoluteCapacity;
float absoluteMaxCapacity;
float utilization;
int numApplications;
String usedResources;
String queueName;
String state;
}
private class LeafQueueInfo extends QueueInfo {
int numActiveApplications;
int numPendingApplications;
int numContainers;
int maxApplications;
int maxApplicationsPerUser;
int maxActiveApplications;
int maxActiveApplicationsPerUser;
int userLimit;
float userLimitFactor;
}
private Injector injector = Guice.createInjector(new ServletModule() {
@Override
protected void configureServlets() {
@ -217,29 +242,51 @@ public void verifyClusterSchedulerXML(NodeList nodes) throws Exception {
public void verifySubQueueXML(Element qElem, String q, float parentAbsCapacity)
throws Exception {
float absCapacity = WebServicesTestUtils.getXmlFloat(qElem, "absoluteCapacity");
verifySubQueueGeneric(q,
WebServicesTestUtils.getXmlFloat(qElem, "usedCapacity"),
WebServicesTestUtils.getXmlFloat(qElem, "capacity"),
WebServicesTestUtils.getXmlFloat(qElem, "maxCapacity"),
absCapacity,
WebServicesTestUtils.getXmlFloat(qElem, "absoluteMaxCapacity"),
parentAbsCapacity,
WebServicesTestUtils.getXmlString(qElem, "queueName"),
WebServicesTestUtils.getXmlString(qElem, "state"));
NodeList queues = qElem.getElementsByTagName("subQueues");
QueueInfo qi = (queues != null) ? new QueueInfo() : new LeafQueueInfo();
qi.capacity = WebServicesTestUtils.getXmlFloat(qElem, "capacity");
qi.usedCapacity =
WebServicesTestUtils.getXmlFloat(qElem, "usedCapacity");
qi.maxCapacity = WebServicesTestUtils.getXmlFloat(qElem, "maxCapacity");
qi.absoluteCapacity = WebServicesTestUtils.getXmlFloat(qElem, "absoluteCapacity");
qi.absoluteMaxCapacity =
WebServicesTestUtils.getXmlFloat(qElem, "absoluteMaxCapacity");
qi.utilization = WebServicesTestUtils.getXmlFloat(qElem, "utilization");
qi.numApplications =
WebServicesTestUtils.getXmlInt(qElem, "numApplications");
qi.usedResources =
WebServicesTestUtils.getXmlString(qElem, "usedResources");
qi.queueName = WebServicesTestUtils.getXmlString(qElem, "queueName");
qi.state = WebServicesTestUtils.getXmlString(qElem, "state");
verifySubQueueGeneric(q, qi, parentAbsCapacity);
if (queues != null) {
for (int j = 0; j < queues.getLength(); j++) {
Element subqElem = (Element) queues.item(j);
String qName = WebServicesTestUtils.getXmlString(subqElem, "queueName");
String q2 = q + "." + qName;
verifySubQueueXML(subqElem, q2, absCapacity);
verifySubQueueXML(subqElem, q2, qi.absoluteCapacity);
}
} else {
verifyLeafQueueGeneric(q,
WebServicesTestUtils.getXmlInt(qElem, "userLimit"),
WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor"));
LeafQueueInfo lqi = (LeafQueueInfo) qi;
lqi.numActiveApplications =
WebServicesTestUtils.getXmlInt(qElem, "numActiveApplications");
lqi.numPendingApplications =
WebServicesTestUtils.getXmlInt(qElem, "numPendingApplications");
lqi.numContainers =
WebServicesTestUtils.getXmlInt(qElem, "numContainers");
lqi.maxApplications =
WebServicesTestUtils.getXmlInt(qElem, "maxApplications");
lqi.maxApplicationsPerUser =
WebServicesTestUtils.getXmlInt(qElem, "maxApplicationsPerUser");
lqi.maxActiveApplications =
WebServicesTestUtils.getXmlInt(qElem, "maxActiveApplications");
lqi.maxActiveApplicationsPerUser =
WebServicesTestUtils.getXmlInt(qElem, "maxActiveApplicationsPerUser");
lqi.userLimit = WebServicesTestUtils.getXmlInt(qElem, "userLimit");
lqi.userLimitFactor =
WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor");
verifyLeafQueueGeneric(q, lqi);
}
}
@ -286,16 +333,19 @@ private void verifySubQueue(JSONObject info, String q, float parentAbsCapacity)
}
assertEquals("incorrect number of elements", numExpectedElements, info.length());
float absCapacity = (float) info.getDouble("absoluteCapacity");
QueueInfo qi = isParentQueue ? new QueueInfo() : new LeafQueueInfo();
qi.capacity = (float) info.getDouble("capacity");
qi.usedCapacity = (float) info.getDouble("usedCapacity");
qi.maxCapacity = (float) info.getDouble("maxCapacity");
qi.absoluteCapacity = (float) info.getDouble("absoluteCapacity");
qi.absoluteMaxCapacity = (float) info.getDouble("absoluteMaxCapacity");
qi.utilization = (float) info.getDouble("utilization");
qi.numApplications = info.getInt("numApplications");
qi.usedResources = info.getString("usedResources");
qi.queueName = info.getString("queueName");
qi.state = info.getString("state");
verifySubQueueGeneric(q, (float) info.getDouble("usedCapacity"),
(float) info.getDouble("capacity"),
(float) info.getDouble("maxCapacity"),
absCapacity,
(float) info.getDouble("absoluteMaxCapacity"),
parentAbsCapacity,
info.getString("queueName"),
info.getString("state"));
verifySubQueueGeneric(q, qi, parentAbsCapacity);
if (isParentQueue) {
JSONArray arr = info.getJSONArray("subQueues");
@ -303,50 +353,84 @@ private void verifySubQueue(JSONObject info, String q, float parentAbsCapacity)
for (int i = 0; i < arr.length(); i++) {
JSONObject obj = arr.getJSONObject(i);
String q2 = q + "." + obj.getString("queueName");
verifySubQueue(obj, q2, absCapacity);
verifySubQueue(obj, q2, qi.absoluteCapacity);
}
} else {
verifyLeafQueueGeneric(q, info.getInt("userLimit"),
(float) info.getDouble("userLimitFactor"));
LeafQueueInfo lqi = (LeafQueueInfo) qi;
lqi.numActiveApplications = info.getInt("numActiveApplications");
lqi.numPendingApplications = info.getInt("numPendingApplications");
lqi.numContainers = info.getInt("numContainers");
lqi.maxApplications = info.getInt("maxApplications");
lqi.maxApplicationsPerUser = info.getInt("maxApplicationsPerUser");
lqi.maxActiveApplications = info.getInt("maxActiveApplications");
lqi.maxActiveApplicationsPerUser = info.getInt("maxActiveApplicationsPerUser");
lqi.userLimit = info.getInt("userLimit");
lqi.userLimitFactor = (float) info.getDouble("userLimitFactor");
verifyLeafQueueGeneric(q, lqi);
}
}
private void verifySubQueueGeneric(String q, float usedCapacity,
float capacity, float maxCapacity,
float absCapacity, float absMaxCapacity,
float parentAbsCapacity,
String qname, String state)
throws Exception {
private void verifySubQueueGeneric(String q, QueueInfo info,
float parentAbsCapacity) throws Exception {
String[] qArr = q.split("\\.");
assertTrue("q name invalid: " + q, qArr.length > 1);
String qshortName = qArr[qArr.length - 1];
assertEquals("usedCapacity doesn't match", 0, usedCapacity, 1e-3f);
assertEquals("capacity doesn't match", csConf.getCapacity(q), capacity,
1e-3f);
assertEquals("usedCapacity doesn't match", 0, info.usedCapacity, 1e-3f);
assertEquals("capacity doesn't match", csConf.getCapacity(q),
info.capacity, 1e-3f);
float expectCapacity = csConf.getMaximumCapacity(q);
float expectAbsMaxCapacity = parentAbsCapacity * (maxCapacity/100);
float expectAbsMaxCapacity = parentAbsCapacity * (info.maxCapacity/100);
if (CapacitySchedulerConfiguration.UNDEFINED == expectCapacity) {
expectCapacity = 100;
expectAbsMaxCapacity = 100;
}
assertEquals("maxCapacity doesn't match", expectCapacity, maxCapacity,
1e-3f);
assertEquals("maxCapacity doesn't match", expectCapacity,
info.maxCapacity, 1e-3f);
assertEquals("absoluteCapacity doesn't match",
parentAbsCapacity * (capacity/100), absCapacity, 1e-3f);
parentAbsCapacity * (info.capacity/100), info.absoluteCapacity, 1e-3f);
assertEquals("absoluteMaxCapacity doesn't match",
expectAbsMaxCapacity, absMaxCapacity, 1e-3f);
assertTrue("queueName doesn't match, got: " + qname + " expected: " + q,
qshortName.matches(qname));
expectAbsMaxCapacity, info.absoluteMaxCapacity, 1e-3f);
assertEquals("utilization doesn't match", 0, info.utilization, 1e-3f);
assertEquals("numApplications doesn't match", 0, info.numApplications);
assertTrue("usedResources doesn't match",
info.usedResources.matches("memory: 0"));
assertTrue("queueName doesn't match, got: " + info.queueName
+ " expected: " + q, qshortName.matches(info.queueName));
assertTrue("state doesn't match",
(csConf.getState(q).toString()).matches(state));
(csConf.getState(q).toString()).matches(info.state));
}
private void verifyLeafQueueGeneric(String q, int userLimit,
float userLimitFactor) throws Exception {
assertEquals("userLimit doesn't match", csConf.getUserLimit(q), userLimit);
private void verifyLeafQueueGeneric(String q, LeafQueueInfo info)
throws Exception {
assertEquals("numActiveApplications doesn't match",
0, info.numActiveApplications);
assertEquals("numPendingApplications doesn't match",
0, info.numPendingApplications);
assertEquals("numContainers doesn't match",
0, info.numContainers);
int maxSystemApps = csConf.getMaximumSystemApplications();
int expectedMaxApps = (int)(maxSystemApps * (info.absoluteCapacity/100));
int expectedMaxAppsPerUser =
(int)(expectedMaxApps * (info.userLimit/100.0f) * info.userLimitFactor);
// TODO: would like to use integer comparisons here but can't due to
// roundoff errors in absolute capacity calculations
assertEquals("maxApplications doesn't match",
(float)expectedMaxApps, (float)info.maxApplications, 1.0f);
assertEquals("maxApplicationsPerUser doesn't match",
(float)expectedMaxAppsPerUser,
(float)info.maxApplicationsPerUser, info.userLimitFactor);
assertTrue("maxActiveApplications doesn't match",
info.maxActiveApplications > 0);
assertTrue("maxActiveApplicationsPerUser doesn't match",
info.maxActiveApplicationsPerUser > 0);
assertEquals("userLimit doesn't match", csConf.getUserLimit(q),
info.userLimit);
assertEquals("userLimitFactor doesn't match",
csConf.getUserLimitFactor(q), userLimitFactor, 1e-3f);
csConf.getUserLimitFactor(q), info.userLimitFactor, 1e-3f);
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.StringReader;
@ -404,20 +405,84 @@ public void testNonexistNode() throws JSONException, Exception {
String message = exception.getString("message");
String type = exception.getString("exception");
String classname = exception.getString("javaClassName");
WebServicesTestUtils
.checkStringMatch("exception message",
"java.lang.Exception: nodeId, node_invalid:99, is not found",
message);
WebServicesTestUtils.checkStringMatch("exception type",
"NotFoundException", type);
WebServicesTestUtils.checkStringMatch("exception classname",
"org.apache.hadoop.yarn.webapp.NotFoundException", classname);
verifyNonexistNodeException(message, type, classname);
} finally {
rm.stop();
}
}
// test that the exception output defaults to JSON
@Test
public void testNonexistNodeDefault() throws JSONException, Exception {
rm.registerNode("h1:1234", 5120);
rm.registerNode("h2:1235", 5121);
WebResource r = resource();
try {
r.path("ws").path("v1").path("cluster").path("nodes")
.path("node_invalid:99").get(JSONObject.class);
fail("should have thrown exception on non-existent nodeid");
} catch (UniformInterfaceException ue) {
ClientResponse response = ue.getResponse();
assertEquals(Status.NOT_FOUND, response.getClientResponseStatus());
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject msg = response.getEntity(JSONObject.class);
JSONObject exception = msg.getJSONObject("RemoteException");
assertEquals("incorrect number of elements", 3, exception.length());
String message = exception.getString("message");
String type = exception.getString("exception");
String classname = exception.getString("javaClassName");
verifyNonexistNodeException(message, type, classname);
} finally {
rm.stop();
}
}
// test that the exception output works in XML
@Test
public void testNonexistNodeXML() throws JSONException, Exception {
rm.registerNode("h1:1234", 5120);
rm.registerNode("h2:1235", 5121);
WebResource r = resource();
try {
r.path("ws").path("v1").path("cluster").path("nodes")
.path("node_invalid:99").accept(MediaType.APPLICATION_XML)
.get(JSONObject.class);
fail("should have thrown exception on non-existent nodeid");
} catch (UniformInterfaceException ue) {
ClientResponse response = ue.getResponse();
assertEquals(Status.NOT_FOUND, response.getClientResponseStatus());
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
String msg = response.getEntity(String.class);
System.out.println(msg);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(msg));
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("RemoteException");
Element element = (Element) nodes.item(0);
String message = WebServicesTestUtils.getXmlString(element, "message");
String type = WebServicesTestUtils.getXmlString(element, "exception");
String classname = WebServicesTestUtils.getXmlString(element,
"javaClassName");
verifyNonexistNodeException(message, type, classname);
} finally {
rm.stop();
}
}
private void verifyNonexistNodeException(String message, String type, String classname) {
assertTrue("exception message incorrect",
"java.lang.Exception: nodeId, node_invalid:99, is not found"
.matches(message));
assertTrue("exception type incorrect", "NotFoundException".matches(type));
assertTrue("exception className incorrect",
"org.apache.hadoop.yarn.webapp.NotFoundException".matches(classname));
}
@Test
public void testInvalidNode() throws JSONException, Exception {
rm.registerNode("h1:1234", 5120);

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@ -520,12 +521,13 @@ private static void validateTaskAttemptLevelKeyValues(MiniMRCluster mr,
attempt.getHttpPort());
if (attempt.getTaskStatus().equals("SUCCEEDED")) {
String ttHostname = jt.getNode(ttStatus.getHost()).toString();
Node node = jt.getNode(ttStatus.getHost());
String ttHostname = node.getName();
// check if hostname is valid
assertTrue("Host name of task attempt " + attemptId +
assertTrue("Host name : " + attempt.getHostname() + " of task attempt " + attemptId +
" obtained from" +
" history file did not match the expected value",
" history file did not match the expected value " + ttHostname,
ttHostname.equals(attempt.getHostname()));
}
}

View File

@ -17,12 +17,17 @@
*/
package org.apache.hadoop.mapred;
import java.io.OutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URL;
import java.net.HttpURLConnection;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -37,6 +42,10 @@
import org.apache.hadoop.mapreduce.TaskType;
public class TestTaskFail extends TestCase {
private static final Log LOG = LogFactory.getLog(
TestTaskFail.class);
private static String taskLog = "Task attempt log";
static String cleanupLog = "cleanup attempt log";
@ -92,6 +101,29 @@ public void abortTask(TaskAttemptContext context) throws IOException {
}
}
/** access a url, ignoring some IOException such as the page does not exist */
static int getHttpStatusCode(String urlstring, String userName,
String method) throws IOException {
LOG.info("Accessing " + urlstring + " as user " + userName);
URL url = new URL(urlstring + "&user.name=" + userName);
HttpURLConnection connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod(method);
if (method.equals("POST")) {
String encodedData = "action=kill&user.name=" + userName;
connection.setRequestProperty("Content-Type",
"application/x-www-form-urlencoded");
connection.setRequestProperty("Content-Length",
Integer.toString(encodedData.length()));
connection.setDoOutput(true);
OutputStream os = connection.getOutputStream();
os.write(encodedData.getBytes());
}
connection.connect();
return connection.getResponseCode();
}
public RunningJob launchJob(JobConf conf,
Path inDir,
Path outDir,
@ -142,8 +174,8 @@ private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId,
String tasklogUrl = TaskLogServlet.getTaskLogUrl("localhost",
String.valueOf(ttStatus.getHttpPort()), attemptId.toString()) +
"&filter=STDERR";
assertEquals(HttpURLConnection.HTTP_OK, TestWebUIAuthorization
.getHttpStatusCode(tasklogUrl, tip.getUser(), "GET"));
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(tasklogUrl, tip.getUser(), "GET"));
if (containsCleanupLog) {
// validate task logs: tasklog should contain both task logs
// and cleanup logs
@ -160,8 +192,8 @@ private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId,
String cleanupTasklogUrl = TaskLogServlet.getTaskLogUrl("localhost",
String.valueOf(ttStatus.getHttpPort()), attemptId.toString())
+ "&filter=STDERR&cleanup=true";
assertEquals(HttpURLConnection.HTTP_OK, TestWebUIAuthorization
.getHttpStatusCode(cleanupTasklogUrl, tip.getUser(), "GET"));
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(cleanupTasklogUrl, tip.getUser(), "GET"));
// Task-cleanup task should not be scheduled on the node that the task just failed
if (jt.taskTrackers().size() >= 2) {

View File

@ -1,934 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.net.HttpURLConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.http.TestHttpServer.DummyFilterInitializer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
private static final Log LOG = LogFactory.getLog(
TestWebUIAuthorization.class);
// users who submit the jobs
private static final String jobSubmitter = "user1";
private static final String jobSubmitter1 = "user11";
private static final String jobSubmitter2 = "user12";
private static final String jobSubmitter3 = "user13";
// mrOwner starts the cluster
private static String mrOwner = null;
// member of supergroup
private static final String superGroupMember = "user2";
// mrAdmin
private static final String mrAdminUser = "user4";
// Group for mrAdmins
private static final String mrAdminGroup = "admingroup";
// member of mrAdminGroup
private static final String mrAdminGroupMember = "user5";
// admin of "default" queue
private static final String qAdmin = "user3";
// "colleague1" is there in job-view-acls config
private static final String viewColleague = "colleague1";
// "colleague2" is there in job-modify-acls config
private static final String modifyColleague = "colleague2";
// "colleague3" is there in both job-view-acls and job-modify-acls
private static final String viewAndModifyColleague = "colleague3";
// "evilJohn" is not having view/modify access on the jobs
private static final String unauthorizedUser = "evilJohn";
@Override
protected void setUp() throws Exception {
// do not do anything
};
@Override
protected void tearDown() throws Exception {
deleteQueuesConfigFile();
super.tearDown();
}
/** access a url, ignoring some IOException such as the page does not exist */
static int getHttpStatusCode(String urlstring, String userName,
String method) throws IOException {
LOG.info("Accessing " + urlstring + " as user " + userName);
URL url = new URL(urlstring + "&user.name=" + userName);
HttpURLConnection connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod(method);
if (method.equals("POST")) {
String encodedData = "action=kill&user.name=" + userName;
connection.setRequestProperty("Content-Type",
"application/x-www-form-urlencoded");
connection.setRequestProperty("Content-Length",
Integer.toString(encodedData.length()));
connection.setDoOutput(true);
OutputStream os = connection.getOutputStream();
os.write(encodedData.getBytes());
}
connection.connect();
return connection.getResponseCode();
}
public static class MyGroupsProvider extends ShellBasedUnixGroupsMapping {
static Map<String, List<String>> mapping = new HashMap<String, List<String>>();
@Override
public List<String> getGroups(String user) throws IOException {
return mapping.get(user);
}
}
/**
* Validates the given jsp/servlet against different user names who
* can(or cannot) view the job.
* (1) jobSubmitter can view the job
* (2) mrAdmin and deprecated superGroupMember can view any job
* (3) mrOwner can view any job
* (4) qAdmins of the queue to which job is submitted to can view any job in
* that queue.
* (5) user mentioned in job-view-acl should be able to view the
* job irrespective of job-modify-acl.
* (6) user mentioned in job-modify-acl but not in job-view-acl
* cannot view the job
* (7) other unauthorized users cannot view the job
*/
private void validateViewJob(String url, String method)
throws IOException {
assertEquals("Incorrect return code for job submitter " + jobSubmitter,
HttpURLConnection.HTTP_OK, getHttpStatusCode(url, jobSubmitter,
method));
assertEquals("Incorrect return code for supergroup-member " +
superGroupMember, HttpURLConnection.HTTP_OK,
getHttpStatusCode(url, superGroupMember, method));
assertEquals("Incorrect return code for admin user " +
mrAdminUser, HttpURLConnection.HTTP_OK,
getHttpStatusCode(url, mrAdminUser, method));
assertEquals("Incorrect return code for admingroup-member " +
mrAdminGroupMember, HttpURLConnection.HTTP_OK,
getHttpStatusCode(url, mrAdminGroupMember, method));
assertEquals("Incorrect return code for MR-owner " + mrOwner,
HttpURLConnection.HTTP_OK, getHttpStatusCode(url, mrOwner, method));
assertEquals("Incorrect return code for queue admin " + qAdmin,
HttpURLConnection.HTTP_OK, getHttpStatusCode(url, qAdmin, method));
assertEquals("Incorrect return code for user in job-view-acl " +
viewColleague, HttpURLConnection.HTTP_OK,
getHttpStatusCode(url, viewColleague, method));
assertEquals("Incorrect return code for user in job-view-acl and " +
"job-modify-acl " + viewAndModifyColleague, HttpURLConnection.HTTP_OK,
getHttpStatusCode(url, viewAndModifyColleague, method));
assertEquals("Incorrect return code for user in job-modify-acl " +
modifyColleague, HttpURLConnection.HTTP_UNAUTHORIZED,
getHttpStatusCode(url, modifyColleague, method));
assertEquals("Incorrect return code for unauthorizedUser " +
unauthorizedUser, HttpURLConnection.HTTP_UNAUTHORIZED,
getHttpStatusCode(url, unauthorizedUser, method));
}
/**
* Validates the given jsp/servlet against different user names who
* can(or cannot) modify the job.
* (1) jobSubmitter, mrOwner, qAdmin, mrAdmin and deprecated superGroupMember
* can modify the job.
* But we are not validating this in this method. Let the caller
* explicitly validate this, if needed.
* (2) user mentioned in job-view-acl but not in job-modify-acl cannot
* modify the job
* (3) user mentioned in job-modify-acl (irrespective of job-view-acl)
* can modify the job
* (4) other unauthorized users cannot modify the job
*/
private void validateModifyJob(String url, String method) throws IOException {
assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
getHttpStatusCode(url, viewColleague, method));
assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
getHttpStatusCode(url, unauthorizedUser, method));
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(url, modifyColleague, method));
}
// starts a sleep job with 1 map task that runs for a long time
private Job startSleepJobAsUser(String user, JobConf conf) throws Exception {
final SleepJob sleepJob = new SleepJob();
sleepJob.setConf(conf);
UserGroupInformation jobSubmitterUGI =
UserGroupInformation.createRemoteUser(user);
Job job = jobSubmitterUGI.doAs(new PrivilegedExceptionAction<Job>() {
public Job run() throws Exception {
// Very large sleep job.
Job job = sleepJob.createJob(1, 0, 900000, 1, 0, 0);
job.submit();
return job;
}
});
return job;
}
// Waits till the map task gets started and gets its tipId from map reports
// and returns the tipId
private TaskID getTIPId(MiniMRCluster cluster,
org.apache.hadoop.mapreduce.JobID jobid) throws Exception {
JobClient client = new JobClient(cluster.createJobConf());
JobID jobId = JobID.downgrade(jobid);
TaskReport[] mapReports = null;
TaskID tipId = null;
do { // make sure that the map task is running
Thread.sleep(200);
mapReports = client.getMapTaskReports(jobId);
} while (mapReports.length == 0);
for (TaskReport r : mapReports) {
tipId = r.getTaskID();
break;// because we have only one map
}
return tipId;
}
/**
* Make sure that the given user can do killJob using jobdetails.jsp url
* @param cluster
* @param conf
* @param jtURL
* @param jobTrackerJSP
* @param user
* @throws Exception
*/
private void confirmJobDetailsJSPKillJobAsUser(MiniMRCluster cluster,
JobConf conf, String jtURL, String jobTrackerJSP, String user)
throws Exception {
Job job = startSleepJobAsUser(jobSubmitter, conf);
org.apache.hadoop.mapreduce.JobID jobid = job.getJobID();
getTIPId(cluster, jobid);// wait till the map task is started
// jobDetailsJSP killJob url
String url = jtURL + "/jobdetails.jsp?" +
"action=kill&jobid="+ jobid.toString();
try {
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(url, user, "POST"));
} finally {
if (!job.isComplete()) {
LOG.info("Killing job " + jobid + " from finally block");
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP + "&killJobs=true&jobCheckBox=" +
jobid.toString(), jobSubmitter, "GET"));
}
}
}
static void setupGroupsProvider() throws IOException {
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
MyGroupsProvider.class.getName());
Groups.getUserToGroupsMappingService(conf);
MyGroupsProvider.mapping.put(jobSubmitter, Arrays.asList("group1"));
MyGroupsProvider.mapping.put(viewColleague, Arrays.asList("group2"));
MyGroupsProvider.mapping.put(modifyColleague, Arrays.asList("group1"));
MyGroupsProvider.mapping.put(unauthorizedUser,
Arrays.asList("evilSociety"));
MyGroupsProvider.mapping.put(superGroupMember, Arrays.asList("superGroup"));
MyGroupsProvider.mapping.put(mrAdminGroupMember,
Arrays.asList(mrAdminGroup));
MyGroupsProvider.mapping.put(viewAndModifyColleague,
Arrays.asList("group3"));
MyGroupsProvider.mapping.put(qAdmin, Arrays.asList("group4"));
mrOwner = UserGroupInformation.getCurrentUser().getShortUserName();
MyGroupsProvider.mapping.put(mrOwner, Arrays.asList(
new String[] { "group5", "group6" }));
MyGroupsProvider.mapping.put(jobSubmitter1, Arrays.asList("group7"));
MyGroupsProvider.mapping.put(jobSubmitter2, Arrays.asList("group7"));
MyGroupsProvider.mapping.put(jobSubmitter3, Arrays.asList("group7"));
MyGroupsProvider.mapping.put(mrAdminUser, Arrays.asList("group8"));
}
public void testAuthorizationForJobHistoryPages() throws Exception {
setupGroupsProvider();
Properties props = new Properties();
props.setProperty("hadoop.http.filter.initializers",
DummyFilterInitializer.class.getName());
props.setProperty(MRConfig.MR_ACLS_ENABLED,
String.valueOf(true));
props.setProperty("dfs.permissions.enabled", "false");
props.setProperty("mapred.job.tracker.history.completed.location",
"historyDoneFolderOnHDFS");
props.setProperty(MRJobConfig.SETUP_CLEANUP_NEEDED, "false");
props.setProperty(MRConfig.MR_SUPERGROUP, "superGroup");
props.setProperty(MRConfig.MR_ADMINS, mrAdminUser + " " + mrAdminGroup);
props.setProperty(JTConfig.JT_RETIREJOBS, "true");
String[] queueNames = {"default"};
String[] submitAclStrings = new String[] { jobSubmitter };
String[] adminsAclStrings = new String[] { qAdmin };
startCluster(props, queueNames, submitAclStrings, adminsAclStrings);
MiniMRCluster cluster = getMRCluster();
int infoPort = cluster.getJobTrackerRunner().getJobTrackerInfoPort();
JobConf conf = new JobConf(cluster.createJobConf());
conf.set(MRJobConfig.JOB_ACL_VIEW_JOB, viewColleague + " group3");
// Let us add group1 and group3 to modify-job-acl. So modifyColleague and
// viewAndModifyColleague will be able to modify the job
conf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, " group1,group3");
final SleepJob sleepJob = new SleepJob();
sleepJob.setConf(conf);
UserGroupInformation jobSubmitterUGI =
UserGroupInformation.createRemoteUser(jobSubmitter);
Job job = jobSubmitterUGI.doAs(new PrivilegedExceptionAction<Job>() {
public Job run() throws Exception {
// Very large sleep job.
Job job = sleepJob.createJob(1, 0, 1000, 1, 0, 0);
job.waitForCompletion(true);
return job;
}
});
org.apache.hadoop.mapreduce.JobID jobid = job.getJobID();
// Wait till job retires.
for (int i = 0; i < 10 && !job.isRetired(); i++) {
UtilsForTests.waitFor(1000);
LOG.info("waiting for the job " + jobid + " to retire");
}
assertTrue("Job did not retire", job.isRetired());
String historyFileName = job.getStatus().getHistoryFile();
String jtURL = "http://localhost:" + infoPort;
// validate access of jobdetails_history.jsp
String jobDetailsJSP =
jtURL + "/jobdetailshistory.jsp?logFile=" + historyFileName;
validateViewJob(jobDetailsJSP, "GET");
// validate accesses of jobtaskshistory.jsp
String jobTasksJSP =
jtURL + "/jobtaskshistory.jsp?logFile=" + historyFileName;
String[] taskTypes =
new String[] { "JOb_SETUP", "MAP", "REDUCE", "JOB_CLEANUP" };
String[] states =
new String[] { "all", "SUCCEEDED", "FAILED", "KILLED" };
for (String taskType : taskTypes) {
for (String state : states) {
validateViewJob(jobTasksJSP + "&taskType=" + taskType + "&status="
+ state, "GET");
}
}
JobHistoryParser parser =
new JobHistoryParser(new Path(historyFileName).getFileSystem(conf),
historyFileName);
JobInfo jobInfo = parser.parse();
Map<TaskID, TaskInfo> tipsMap = jobInfo.getAllTasks();
for (TaskID tip : tipsMap.keySet()) {
// validate access of taskdetailshistory.jsp
validateViewJob(jtURL + "/taskdetailshistory.jsp?logFile="
+ historyFileName + "&tipid=" + tip.toString(), "GET");
Map<TaskAttemptID, TaskAttemptInfo> attemptsMap =
tipsMap.get(tip).getAllTaskAttempts();
for (TaskAttemptID attempt : attemptsMap.keySet()) {
// validate access to taskstatshistory.jsp
validateViewJob(jtURL + "/taskstatshistory.jsp?attemptid="
+ attempt.toString() + "&logFile=" + historyFileName, "GET");
// validate access to tasklogs - STDOUT and STDERR. SYSLOGs are not
// generated for the 1 map sleep job in the test case.
String stdoutURL = TaskLogServlet.getTaskLogUrl("localhost",
Integer.toString(attemptsMap.get(attempt).getHttpPort()),
attempt.toString()) + "&filter=" + TaskLog.LogName.STDOUT;
validateViewJob(stdoutURL, "GET");
String stderrURL = TaskLogServlet.getTaskLogUrl("localhost",
Integer.toString(attemptsMap.get(attempt).getHttpPort()),
attempt.toString()) + "&filter=" + TaskLog.LogName.STDERR;
validateViewJob(stderrURL, "GET");
}
}
// For each tip, let us test the effect of deletion of job-acls.xml file and
// deletion of task log dir for each of the attempts of the tip.
// delete job-acls.xml file from the job userlog dir and verify
// if unauthorized users can view task logs of each attempt.
Path jobACLsFilePath = new Path(TaskLog.getJobDir(jobid).toString(),
TaskTracker.jobACLsFile);
assertTrue("Could not delete job-acls.xml file.",
new File(jobACLsFilePath.toUri().getPath()).delete());
for (TaskID tip : tipsMap.keySet()) {
Map<TaskAttemptID, TaskAttemptInfo> attemptsMap =
tipsMap.get(tip).getAllTaskAttempts();
for (TaskAttemptID attempt : attemptsMap.keySet()) {
String stdoutURL = TaskLogServlet.getTaskLogUrl("localhost",
Integer.toString(attemptsMap.get(attempt).getHttpPort()),
attempt.toString()) + "&filter=" + TaskLog.LogName.STDOUT;;
String stderrURL = TaskLogServlet.getTaskLogUrl("localhost",
Integer.toString(attemptsMap.get(attempt).getHttpPort()),
attempt.toString()) + "&filter=" + TaskLog.LogName.STDERR;
// unauthorized users can view task logs of each attempt because
// job-acls.xml file is deleted.
assertEquals("Incorrect return code for " + unauthorizedUser,
HttpURLConnection.HTTP_OK, getHttpStatusCode(stdoutURL,
unauthorizedUser, "GET"));
assertEquals("Incorrect return code for " + unauthorizedUser,
HttpURLConnection.HTTP_OK, getHttpStatusCode(stderrURL,
unauthorizedUser, "GET"));
// delete the whole task log dir of attempt and verify that we get
// correct response code (i.e. HTTP_GONE) when task logs are accessed.
File attemptLogDir = TaskLog.getAttemptDir(
org.apache.hadoop.mapred.TaskAttemptID.downgrade(attempt), false);
FileUtil.fullyDelete(attemptLogDir);
// Try accessing tasklogs - STDOUT and STDERR now(after the whole
// attempt log dir is deleted).
assertEquals("Incorrect return code for " + jobSubmitter,
HttpURLConnection.HTTP_GONE, getHttpStatusCode(stdoutURL,
jobSubmitter, "GET"));
assertEquals("Incorrect return code for " + jobSubmitter,
HttpURLConnection.HTTP_GONE, getHttpStatusCode(stderrURL,
jobSubmitter, "GET"));
}
}
// validate access to analysejobhistory.jsp
String analyseJobHistoryJSP =
jtURL + "/analysejobhistory.jsp?logFile=" + historyFileName;
validateViewJob(analyseJobHistoryJSP, "GET");
// validate access of jobconf_history.jsp
String jobConfJSP =
jtURL + "/jobconf_history.jsp?logFile=" + historyFileName;
validateViewJob(jobConfJSP, "GET");
}
/**
* Creates queues configuration file with the given queues and acls and starts
* cluster with that queues configuration file.
* @param props configuration properties to inject to the mini cluster
* @param queueNames the job queues on the cluster
* @param submitAclStrings acl-submit-job acls for all queues
* @param adminsAclStrings acl-administer-jobs acls for all queues
* @throws Exception
*/
private void startCluster(Properties props, String[] queueNames,
String[] submitAclStrings, String[] adminsAclStrings) throws Exception {
createQueuesConfigFile(queueNames, submitAclStrings, adminsAclStrings);
startCluster(true, props);
}
/**
* Starts a sleep job and tries to kill the job using jobdetails.jsp as
* (1) viewColleague (2) unauthorizedUser (3) modifyColleague
* (4) viewAndModifyColleague (5) mrOwner (6) deprecated superGroupMember
* (7) mrAdmin and (8) jobSubmitter
*
* Validates the given jsp/servlet against different user names who
* can(or cannot) do both view and modify on the job.
* (1) jobSubmitter, mrOwner, mrAdmin and deprecated superGroupMember can do
* both view and modify
* on the job. But we are not validating this in this method. Let the
* caller explicitly validate this, if needed.
* (2) user mentioned in job-view-acls and job-modify-acls can do this
* (3) user mentioned in job-view-acls but not in job-modify-acls cannot
* do this
* (4) user mentioned in job-modify-acls but not in job-view-acls cannot
* do this
* (5) qAdmin cannot do this because he doesn't have view access to the job
* (6) other unauthorized users cannot do this
*
* @throws Exception
*/
private void validateJobDetailsJSPKillJob(MiniMRCluster cluster,
JobConf clusterConf, String jtURL) throws Exception {
JobConf conf = new JobConf(cluster.createJobConf());
conf.set(MRJobConfig.JOB_ACL_VIEW_JOB, viewColleague + " group3");
// Let us add group1 and group3 to modify-job-acl. So modifyColleague and
// viewAndModifyColleague will be able to modify the job
conf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, " group1,group3");
String jobTrackerJSP = jtURL + "/jobtracker.jsp?a=b";
Job job = startSleepJobAsUser(jobSubmitter, conf);
org.apache.hadoop.mapreduce.JobID jobid = job.getJobID();
getTIPId(cluster, jobid);// wait till the map task is started
// jobDetailsJSPKillJobAction url
String url = jtURL + "/jobdetails.jsp?" +
"action=kill&jobid="+ jobid.toString();
try {
assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
getHttpStatusCode(url, viewColleague, "POST"));
assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
getHttpStatusCode(url, unauthorizedUser, "POST"));
assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
getHttpStatusCode(url, modifyColleague, "POST"));
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(url, viewAndModifyColleague, "POST"));
assertTrue("killJob using jobdetails.jsp failed for a job for which "
+ "user has job-view and job-modify permissions", job.isComplete());
} finally {
if (!job.isComplete()) {
LOG.info("Killing job " + jobid + " from finally block");
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP + "&killJobs=true&jobCheckBox=" +
jobid.toString(), jobSubmitter, "GET"));
}
}
// Check if jobSubmitter, mrOwner, superGroupMember and queueAdmins
// can do killJob using jobdetails.jsp url
confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
jobSubmitter);
confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
mrOwner);
confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
superGroupMember);
confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
mrAdminGroupMember);
confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
mrAdminUser);
confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
qAdmin);
}
/**
* Make sure that the given user can do killJob using jobtracker.jsp url
* @param cluster
* @param conf
* @param jtURL
* @param user
* @throws Exception
*/
private void confirmJobTrackerJSPKillJobAsUser(MiniMRCluster cluster,
JobConf conf, String jtURL, String user)
throws Exception {
String jobTrackerJSP = jtURL + "/jobtracker.jsp?a=b";
Job job = startSleepJobAsUser(jobSubmitter, conf);
org.apache.hadoop.mapreduce.JobID jobid = job.getJobID();
getTIPId(cluster, jobid);// wait till the map task is started
// jobTrackerJSP killJob url
String url = jobTrackerJSP +
"&killJobs=true&jobCheckBox=" + jobid.toString();
try {
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(url, user, "POST"));
} finally {
if (!job.isComplete()) {
LOG.info("Killing job " + jobid + " from finally block");
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP + "&killJobs=true&jobCheckBox=" +
jobid.toString(), jobSubmitter, "GET"));
}
}
}
/**
* Make sure that multiple jobs get killed using jobtracker.jsp url when
* user has modify access on only some of those jobs.
* @param cluster
* @param conf
* @param jtURL
* @param user
* @throws Exception
*/
private void validateKillMultipleJobs(MiniMRCluster cluster,
JobConf conf, String jtURL) throws Exception {
String jobTrackerJSP = jtURL + "/jobtracker.jsp?a=b";
// jobTrackerJSP killJob url
String url = jobTrackerJSP + "&killJobs=true";
// view-job-acl doesn't matter for killJob from jobtracker jsp page
conf.set(MRJobConfig.JOB_ACL_VIEW_JOB, " ");
// Let us start 4 jobs as 4 different users(none of these 4 users is
// mrOwner and none of these users is a member of mrAdmin/superGroup). Only
// based on the config MRJobConfig.JOB_ACL_MODIFY_JOB being set here
// and the jobSubmitter, killJob on each of the jobs will be succeeded.
// start 1st job.
// Out of these 4 users, only jobSubmitter can do killJob on 1st job
conf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, " ");
Job job1 = startSleepJobAsUser(jobSubmitter, conf);
org.apache.hadoop.mapreduce.JobID jobid = job1.getJobID();
getTIPId(cluster, jobid);// wait till the map task is started
url = url.concat("&jobCheckBox=" + jobid.toString());
// start 2nd job.
// Out of these 4 users, only jobSubmitter1 can do killJob on 2nd job
Job job2 = startSleepJobAsUser(jobSubmitter1, conf);
jobid = job2.getJobID();
getTIPId(cluster, jobid);// wait till the map task is started
url = url.concat("&jobCheckBox=" + jobid.toString());
// start 3rd job.
// Out of these 4 users, only jobSubmitter2 can do killJob on 3rd job
Job job3 = startSleepJobAsUser(jobSubmitter2, conf);
jobid = job3.getJobID();
getTIPId(cluster, jobid);// wait till the map task is started
url = url.concat("&jobCheckBox=" + jobid.toString());
// start 4rd job.
// Out of these 4 users, jobSubmitter1 and jobSubmitter3
// can do killJob on 4th job
conf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, jobSubmitter1);
Job job4 = startSleepJobAsUser(jobSubmitter3, conf);
jobid = job4.getJobID();
getTIPId(cluster, jobid);// wait till the map task is started
url = url.concat("&jobCheckBox=" + jobid.toString());
try {
// Try killing all the 4 jobs as user jobSubmitter1 who can kill only
// 2nd and 4th jobs. Check if 1st and 3rd jobs are not killed and
// 2nd and 4th jobs got killed
assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
getHttpStatusCode(url, jobSubmitter1, "POST"));
assertFalse("killJob succeeded for a job for which user doesnot "
+ " have job-modify permission", job1.isComplete());
assertFalse("killJob succeeded for a job for which user doesnot "
+ " have job-modify permission", job3.isComplete());
assertTrue("killJob failed for a job for which user has "
+ "job-modify permission", job2.isComplete());
assertTrue("killJob failed for a job for which user has "
+ "job-modify permission", job4.isComplete());
} finally {
// Kill all 4 jobs as user mrOwner(even though some of them
// were already killed)
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(url, mrOwner, "GET"));
}
}
/**
* Run a job and validate if JSPs/Servlets are going through authentication
* and authorization.
* @throws Exception
*/
@Test
public void testWebUIAuthorization() throws Exception {
setupGroupsProvider();
Properties props = new Properties();
props.setProperty("hadoop.http.filter.initializers",
DummyFilterInitializer.class.getName());
props.setProperty(MRConfig.MR_ACLS_ENABLED, String.valueOf(true));
props.setProperty("dfs.permissions.enabled", "false");
props.setProperty(JTConfig.PRIVATE_ACTIONS_KEY, "true");
props.setProperty(MRJobConfig.SETUP_CLEANUP_NEEDED, "false");
props.setProperty(MRConfig.MR_SUPERGROUP, "superGroup");
props.setProperty(MRConfig.MR_ADMINS, mrAdminUser + " " + mrAdminGroup);
String[] queueNames = {"default"};
String[] submitAclStrings = {jobSubmitter + "," + jobSubmitter1 + ","
+ jobSubmitter2 + "," + jobSubmitter3};
String[] adminsAclStrings = new String[]{qAdmin};
startCluster(props, queueNames, submitAclStrings, adminsAclStrings);
MiniMRCluster cluster = getMRCluster();
int infoPort = cluster.getJobTrackerRunner().getJobTrackerInfoPort();
JobConf clusterConf = cluster.createJobConf();
JobConf conf = new JobConf(clusterConf);
conf.set(MRJobConfig.JOB_ACL_VIEW_JOB, viewColleague + " group3");
// Let us add group1 and group3 to modify-job-acl. So modifyColleague and
// viewAndModifyColleague will be able to modify the job
conf.set(MRJobConfig.JOB_ACL_MODIFY_JOB, " group1,group3");
Job job = startSleepJobAsUser(jobSubmitter, conf);
org.apache.hadoop.mapreduce.JobID jobid = job.getJobID();
String jtURL = "http://localhost:" + infoPort;
String jobTrackerJSP = jtURL + "/jobtracker.jsp?a=b";
try {
// Currently, authorization is not done for jobtracker page. So allow
// everyone to view it.
validateJobTrackerJSPAccess(jtURL);
validateJobDetailsJSPAccess(jobid, jtURL);
validateTaskGraphServletAccess(jobid, jtURL);
validateJobTasksJSPAccess(jobid, jtURL);
validateJobConfJSPAccess(jobid, jtURL);
validateJobFailuresJSPAccess(jobid, jtURL);
valiateJobBlacklistedTrackerJSPAccess(jobid, jtURL);
validateJobTrackerJSPSetPriorityAction(jobid, jtURL);
// Wait for the tip to start so as to test task related JSP
TaskID tipId = getTIPId(cluster, jobid);
validateTaskStatsJSPAccess(jobid, jtURL, tipId);
validateTaskDetailsJSPAccess(jobid, jtURL, tipId);
validateJobTrackerJSPKillJobAction(jobid, jtURL);
} finally {
if (!job.isComplete()) { // kill the job(as jobSubmitter) if needed
LOG.info("Killing job " + jobid + " from finally block");
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP + "&killJobs=true&jobCheckBox=" +
jobid.toString(), jobSubmitter, "GET"));
}
}
// validate killJob of jobdetails.jsp
validateJobDetailsJSPKillJob(cluster, clusterConf, jtURL);
// validate killJob of jobtracker.jsp as users viewAndModifyColleague,
// jobSubmitter, mrOwner, mrAdmin and superGroupMember
confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL,
viewAndModifyColleague);
confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, jobSubmitter);
confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, mrOwner);
confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, superGroupMember);
confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, mrAdminUser);
confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, mrAdminGroupMember);
confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, qAdmin);
// validate killing of multiple jobs using jobtracker jsp and check
// if all the jobs which can be killed by user are actually the ones that
// got killed
validateKillMultipleJobs(cluster, conf, jtURL);
}
public void testWebUIAuthorizationForCommonServlets() throws Exception {
setupGroupsProvider();
Properties props = new Properties();
props.setProperty("hadoop.http.filter.initializers",
DummyFilterInitializer.class.getName());
props.setProperty(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
"true");
props.setProperty(MRConfig.MR_SUPERGROUP, "superGroup");
props.setProperty(MRConfig.MR_ADMINS, mrAdminUser + " " + mrAdminGroup);
startCluster(true, props);
validateCommonServlets(getMRCluster());
stopCluster();
}
private void validateCommonServlets(MiniMRCluster cluster)
throws IOException {
int infoPort = cluster.getJobTrackerRunner().getJobTrackerInfoPort();
String jtURL = "http://localhost:" + infoPort;
for (String servlet : new String[] { "logs", "stacks", "logLevel" }) {
String url = jtURL + "/" + servlet;
checkAccessToCommonServlet(url);
}
// validate access to common servlets for TaskTracker.
String ttURL = "http://localhost:"
+ cluster.getTaskTrackerRunner(0).getTaskTracker().getHttpPort();
for (String servlet : new String[] { "logs", "stacks", "logLevel" }) {
String url = ttURL + "/" + servlet;
checkAccessToCommonServlet(url);
}
}
private void checkAccessToCommonServlet(String url) throws IOException {
url = url + "?a=b";
assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(url, mrAdminUser,
"GET"));
assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(url,
mrAdminGroupMember, "GET"));
assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(url,
mrOwner, "GET"));
assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(url,
superGroupMember, "GET"));
// no access for any other user
assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, getHttpStatusCode(url,
jobSubmitter, "GET"));
}
// validate killJob of jobtracker.jsp
private void validateJobTrackerJSPKillJobAction(
org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
throws IOException {
String jobTrackerJSP = jtURL + "/jobtracker.jsp?a=b";
String jobTrackerJSPKillJobAction = jobTrackerJSP +
"&killJobs=true&jobCheckBox="+ jobid.toString();
validateModifyJob(jobTrackerJSPKillJobAction, "GET");
}
// validate viewing of job of taskdetails.jsp
private void validateTaskDetailsJSPAccess(
org.apache.hadoop.mapreduce.JobID jobid, String jtURL, TaskID tipId)
throws IOException {
String taskDetailsJSP = jtURL + "/taskdetails.jsp?jobid=" +
jobid.toString() + "&tipid=" + tipId;
validateViewJob(taskDetailsJSP, "GET");
}
// validate taskstats.jsp
private void validateTaskStatsJSPAccess(
org.apache.hadoop.mapreduce.JobID jobid, String jtURL, TaskID tipId)
throws IOException {
String taskStatsJSP = jtURL + "/taskstats.jsp?jobid=" +
jobid.toString() + "&tipid=" + tipId;
validateViewJob(taskStatsJSP, "GET");
}
// validate setJobPriority
private void validateJobTrackerJSPSetPriorityAction(
org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
throws IOException {
String jobTrackerJSP = jtURL + "/jobtracker.jsp?a=b";
String jobTrackerJSPSetJobPriorityAction = jobTrackerJSP +
"&changeJobPriority=true&setJobPriority="+"HIGH"+"&jobCheckBox=" +
jobid.toString();
validateModifyJob(jobTrackerJSPSetJobPriorityAction, "GET");
// jobSubmitter, mrOwner, qAdmin, mrAdmin and superGroupMember are not
// validated for
// job-modify permission in validateModifyJob(). So let us do it
// explicitly here
assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
jobTrackerJSPSetJobPriorityAction, jobSubmitter, "GET"));
assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
jobTrackerJSPSetJobPriorityAction, superGroupMember, "GET"));
assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
jobTrackerJSPSetJobPriorityAction, mrAdminUser, "GET"));
assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
jobTrackerJSPSetJobPriorityAction, mrAdminGroupMember, "GET"));
assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
jobTrackerJSPSetJobPriorityAction, qAdmin, "GET"));
assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
jobTrackerJSPSetJobPriorityAction, mrOwner, "GET"));
}
// validate access of jobblacklistedtrackers.jsp
private void valiateJobBlacklistedTrackerJSPAccess(
org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
throws IOException {
String jobBlacklistedTrackersJSP = jtURL +
"/jobblacklistedtrackers.jsp?jobid="+jobid.toString();
validateViewJob(jobBlacklistedTrackersJSP, "GET");
}
// validate access of jobfailures.jsp
private void validateJobFailuresJSPAccess(
org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
throws IOException {
String jobFailuresJSP = jtURL + "/jobfailures.jsp?jobid="+jobid.toString();
validateViewJob(jobFailuresJSP, "GET");
}
// validate access of jobconf.jsp
private void validateJobConfJSPAccess(
org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
throws IOException {
String jobConfJSP = jtURL + "/jobconf.jsp?jobid="+jobid.toString();
validateViewJob(jobConfJSP, "GET");
}
// validate access of jobtasks.jsp
private void validateJobTasksJSPAccess(
org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
throws IOException {
String jobTasksJSP = jtURL + "/jobtasks.jsp?jobid="
+ jobid.toString() + "&type=map&pagenum=1&state=running";
validateViewJob(jobTasksJSP, "GET");
}
// validate access of TaskGraphServlet
private void validateTaskGraphServletAccess(
org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
throws IOException {
String taskGraphServlet = jtURL + "/taskgraph?type=map&jobid="
+ jobid.toString();
validateViewJob(taskGraphServlet, "GET");
taskGraphServlet = jtURL + "/taskgraph?type=reduce&jobid="
+ jobid.toString();
validateViewJob(taskGraphServlet, "GET");
}
// validate access of jobdetails.jsp
private void validateJobDetailsJSPAccess(
org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
throws IOException {
String jobDetailsJSP = jtURL + "/jobdetails.jsp?jobid="
+ jobid.toString();
validateViewJob(jobDetailsJSP, "GET");
}
// validate access of jobtracker.jsp
private void validateJobTrackerJSPAccess(String jtURL)
throws IOException {
String jobTrackerJSP = jtURL + "/jobtracker.jsp?a=b";
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP, jobSubmitter, "GET"));
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP, viewColleague, "GET"));
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP, unauthorizedUser, "GET"));
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP, modifyColleague, "GET"));
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP, viewAndModifyColleague, "GET"));
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP, mrOwner, "GET"));
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP, qAdmin, "GET"));
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP, superGroupMember, "GET"));
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP, mrAdminUser, "GET"));
assertEquals(HttpURLConnection.HTTP_OK,
getHttpStatusCode(jobTrackerJSP, mrAdminGroupMember, "GET"));
}
}