Merge r1407704 through r1408926 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1408938 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-11-13 20:21:39 +00:00
commit 2a9f450511
48 changed files with 950 additions and 238 deletions

View File

@ -361,6 +361,9 @@ Release 2.0.3-alpha - Unreleased
HADOOP-8860. Split MapReduce and YARN sections in documentation navigation.
(tomwhite via tucu)
HADOOP-9021. Enforce configured SASL method on the server (daryn via
bobby)
OPTIMIZATIONS
HADOOP-8866. SampleQuantiles#query is O(N^2) instead of O(N). (Andrew Wang
@ -421,6 +424,8 @@ Release 2.0.3-alpha - Unreleased
HADOOP-7115. Add a cache for getpwuid_r and getpwgid_r calls (tucu)
HADOOP-8999. SASL negotiation is flawed (daryn)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
@ -1140,6 +1145,12 @@ Release 0.23.5 - UNRELEASED
HADOOP-8986. Server$Call object is never released after it is sent (bobby)
HADOOP-9022. Hadoop distcp tool fails to copy file if -m 0 specified
(Jonathan Eagles vai bobby)
HADOOP-9025. org.apache.hadoop.tools.TestCopyListing failing (Jonathan
Eagles via jlowe)
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -184,7 +184,18 @@ public String toString() {
return str;
}
/** Apply a umask to this permission and return a new one */
/**
* Apply a umask to this permission and return a new one.
*
* The umask is used by create, mkdir, and other Hadoop filesystem operations.
* The mode argument for these operations is modified by removing the bits
* which are set in the umask. Thus, the umask limits the permissions which
* newly created files and directories get.
*
* @param umask The umask to use
*
* @return The effective permission
*/
public FsPermission applyUMask(FsPermission umask) {
return new FsPermission(useraction.and(umask.useraction.not()),
groupaction.and(umask.groupaction.not()),

View File

@ -45,6 +45,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -87,7 +88,9 @@
import org.apache.hadoop.security.SaslRpcServer.SaslDigestCallbackHandler;
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
@ -113,7 +116,7 @@
@InterfaceStability.Evolving
public abstract class Server {
private final boolean authorize;
private boolean isSecurityEnabled;
private EnumSet<AuthMethod> enabledAuthMethods;
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
public void addTerseExceptions(Class<?>... exceptionClass) {
@ -1217,6 +1220,10 @@ private void saslReadAndProcess(byte[] saslToken) throws IOException,
AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
throw e;
}
if (replyToken == null && authMethod == AuthMethod.PLAIN) {
// client needs at least response to know if it should use SIMPLE
replyToken = new byte[0];
}
if (replyToken != null) {
if (LOG.isDebugEnabled())
LOG.debug("Will send token of size " + replyToken.length
@ -1334,34 +1341,9 @@ public int readAndProcess() throws IOException, InterruptedException {
if (authMethod == null) {
throw new IOException("Unable to read authentication method");
}
boolean useSaslServer = isSecurityEnabled;
final boolean clientUsingSasl;
switch (authMethod) {
case SIMPLE: { // no sasl for simple
clientUsingSasl = false;
break;
}
case DIGEST: { // always allow tokens if there's a secret manager
useSaslServer |= (secretManager != null);
clientUsingSasl = true;
break;
}
default: {
clientUsingSasl = true;
break;
}
}
if (useSaslServer) {
saslServer = createSaslServer(authMethod);
} else if (clientUsingSasl) { // security is off
doSaslReply(SaslStatus.SUCCESS, new IntWritable(
SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and we
// should ignore it. Both client and server should fall back
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
// this may create a SASL server, or switch us into SIMPLE
authMethod = initializeAuthContext(authMethod);
connectionHeaderBuf = null;
connectionHeaderRead = true;
@ -1409,10 +1391,24 @@ public int readAndProcess() throws IOException, InterruptedException {
}
}
private SaslServer createSaslServer(AuthMethod authMethod)
private AuthMethod initializeAuthContext(AuthMethod authMethod)
throws IOException {
try {
return createSaslServerInternal(authMethod);
if (enabledAuthMethods.contains(authMethod)) {
saslServer = createSaslServer(authMethod);
} else if (enabledAuthMethods.contains(AuthMethod.SIMPLE)) {
doSaslReply(SaslStatus.SUCCESS, new IntWritable(
SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and we
// should ignore it. Both client and server should fall back
// to simple auth from now on.
skipInitialSaslHandshake = true;
} else {
throw new AccessControlException(
authMethod + " authentication is not enabled."
+ " Available:" + enabledAuthMethods);
}
} catch (IOException ioe) {
final String ioeClass = ioe.getClass().getName();
final String ioeMessage = ioe.getLocalizedMessage();
@ -1425,9 +1421,10 @@ private SaslServer createSaslServer(AuthMethod authMethod)
}
throw ioe;
}
return authMethod;
}
private SaslServer createSaslServerInternal(AuthMethod authMethod)
private SaslServer createSaslServer(AuthMethod authMethod)
throws IOException {
SaslServer saslServer = null;
String hostname = null;
@ -1436,18 +1433,9 @@ private SaslServer createSaslServerInternal(AuthMethod authMethod)
switch (authMethod) {
case SIMPLE: {
throw new AccessControlException("Authorization ("
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION
+ ") is enabled but authentication ("
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
+ ") is configured as simple. Please configure another method "
+ "like kerberos or digest.");
return null; // no sasl for simple
}
case DIGEST: {
if (secretManager == null) {
throw new AccessControlException(
"Server is not configured to do DIGEST authentication.");
}
secretManager.checkAvailableForRead();
hostname = SaslRpcServer.SASL_DEFAULT_REALM;
saslCallback = new SaslDigestCallbackHandler(secretManager, this);
@ -1469,6 +1457,7 @@ private SaslServer createSaslServerInternal(AuthMethod authMethod)
break;
}
default:
// we should never be able to get here
throw new AccessControlException(
"Server does not support SASL " + authMethod);
}
@ -1908,7 +1897,9 @@ protected Server(String bindAddress, int port,
this.authorize =
conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
false);
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
// configure supported authentications
this.enabledAuthMethods = getAuthMethods(secretManager, conf);
// Start the listener here and let it bind to the port
listener = new Listener();
@ -1929,6 +1920,31 @@ protected Server(String bindAddress, int port,
this.exceptionsHandler.addTerseExceptions(StandbyException.class);
}
// get the security type from the conf. implicitly include token support
// if a secret manager is provided, or fail if token is the conf value but
// there is no secret manager
private EnumSet<AuthMethod> getAuthMethods(SecretManager<?> secretManager,
Configuration conf) {
AuthenticationMethod confAuthenticationMethod =
SecurityUtil.getAuthenticationMethod(conf);
EnumSet<AuthMethod> authMethods =
EnumSet.of(confAuthenticationMethod.getAuthMethod());
if (confAuthenticationMethod == AuthenticationMethod.TOKEN) {
if (secretManager == null) {
throw new IllegalArgumentException(AuthenticationMethod.TOKEN +
" authentication requires a secret manager");
}
} else if (secretManager != null) {
LOG.debug(AuthenticationMethod.TOKEN +
" authentication enabled for secret manager");
authMethods.add(AuthenticationMethod.TOKEN.getAuthMethod());
}
LOG.debug("Server accepts auth methods:" + authMethods);
return authMethods;
}
private void closeConnection(Connection connection) {
synchronized (connectionList) {
if (connectionList.remove(connection))
@ -2045,16 +2061,6 @@ Configuration getConf() {
return conf;
}
/** for unit testing only, should be called before server is started */
void disableSecurity() {
this.isSecurityEnabled = false;
}
/** for unit testing only, should be called before server is started */
void enableSecurity() {
this.isSecurityEnabled = true;
}
/** Sets the socket buffer size used for responding to RPCs */
public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }

View File

@ -145,15 +145,13 @@ public boolean saslConnect(InputStream inS, OutputStream outS)
byte[] saslToken = new byte[0];
if (saslClient.hasInitialResponse())
saslToken = saslClient.evaluateChallenge(saslToken);
if (saslToken != null) {
while (saslToken != null) {
outStream.writeInt(saslToken.length);
outStream.write(saslToken, 0, saslToken.length);
outStream.flush();
if (LOG.isDebugEnabled())
LOG.debug("Have sent token of size " + saslToken.length
+ " from initSASLContext.");
}
if (!saslClient.isComplete()) {
readStatus(inStream);
int len = inStream.readInt();
if (len == SaslRpcServer.SWITCH_TO_SIMPLE_AUTH) {
@ -161,32 +159,18 @@ public boolean saslConnect(InputStream inS, OutputStream outS)
LOG.debug("Server asks us to fall back to simple auth.");
saslClient.dispose();
return false;
} else if ((len == 0) && saslClient.isComplete()) {
break;
}
saslToken = new byte[len];
if (LOG.isDebugEnabled())
LOG.debug("Will read input token of size " + saslToken.length
+ " for processing by initSASLContext");
inStream.readFully(saslToken);
}
while (!saslClient.isComplete()) {
saslToken = saslClient.evaluateChallenge(saslToken);
if (saslToken != null) {
if (LOG.isDebugEnabled())
LOG.debug("Will send token of size " + saslToken.length
+ " from initSASLContext.");
outStream.writeInt(saslToken.length);
outStream.write(saslToken, 0, saslToken.length);
outStream.flush();
}
if (!saslClient.isComplete()) {
readStatus(inStream);
saslToken = new byte[inStream.readInt()];
if (LOG.isDebugEnabled())
LOG.debug("Will read input token of size " + saslToken.length
+ " for processing by initSASLContext");
inStream.readFully(saslToken);
}
}
if (!saslClient.isComplete()) { // shouldn't happen
throw new SaslException("Internal negotiation error");
}
if (LOG.isDebugEnabled()) {
LOG.debug("SASL client context established. Negotiated QoP: "

View File

@ -240,6 +240,7 @@ private static synchronized void initUGI(Configuration conf) {
AuthenticationMethod auth = SecurityUtil.getAuthenticationMethod(conf);
switch (auth) {
case SIMPLE:
case TOKEN:
useKerberos = false;
break;
case KERBEROS:

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#banner {
height: 93px;
background: none;
}
#bannerLeft img {
margin-left: 30px;
margin-top: 10px;
}
#bannerRight img {
margin: 17px;
}

View File

@ -0,0 +1,28 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project name="Apache Hadoop ${project.version}">
<skin>
<groupId>org.apache.maven.skins</groupId>
<artifactId>maven-stylus-skin</artifactId>
<version>1.2</version>
</skin>
<body>
<links>
<item name="Apache Hadoop" href="http://hadoop.apache.org/"/>
</links>
</body>
</project>

View File

@ -569,10 +569,13 @@ public void handle(Callback[] callbacks)
private static Pattern KrbFailed =
Pattern.compile(".*Failed on local exception:.* " +
"Failed to specify server's Kerberos principal name.*");
private static Pattern Denied =
Pattern.compile(".*Authorization .* is enabled .*");
private static Pattern NoDigest =
Pattern.compile(".*Server is not configured to do DIGEST auth.*");
private static Pattern Denied(AuthenticationMethod method) {
return Pattern.compile(".*RemoteException.*AccessControlException.*: "
+method.getAuthMethod() + " authentication is not enabled.*");
}
private static Pattern NoTokenAuth =
Pattern.compile(".*IllegalArgumentException: " +
"TOKEN authentication requires a secret manager");
/*
* simple server
@ -604,13 +607,40 @@ public void testSimpleServerWithInvalidTokens() throws Exception {
assertAuthEquals(SIMPLE, getAuthMethod(KERBEROS, SIMPLE, false));
}
/*
* token server
*/
@Test
public void testTokenOnlyServer() throws Exception {
assertAuthEquals(Denied(SIMPLE), getAuthMethod(SIMPLE, TOKEN));
assertAuthEquals(KrbFailed, getAuthMethod(KERBEROS, TOKEN));
}
@Test
public void testTokenOnlyServerWithTokens() throws Exception {
assertAuthEquals(TOKEN, getAuthMethod(SIMPLE, TOKEN, true));
assertAuthEquals(TOKEN, getAuthMethod(KERBEROS, TOKEN, true));
forceSecretManager = false;
assertAuthEquals(NoTokenAuth, getAuthMethod(SIMPLE, TOKEN, true));
assertAuthEquals(NoTokenAuth, getAuthMethod(KERBEROS, TOKEN, true));
}
@Test
public void testTokenOnlyServerWithInvalidTokens() throws Exception {
assertAuthEquals(BadToken, getAuthMethod(SIMPLE, TOKEN, false));
assertAuthEquals(BadToken, getAuthMethod(KERBEROS, TOKEN, false));
forceSecretManager = false;
assertAuthEquals(NoTokenAuth, getAuthMethod(SIMPLE, TOKEN, false));
assertAuthEquals(NoTokenAuth, getAuthMethod(KERBEROS, TOKEN, false));
}
/*
* kerberos server
*/
@Test
public void testKerberosServer() throws Exception {
assertAuthEquals(Denied, getAuthMethod(SIMPLE, KERBEROS));
assertAuthEquals(KrbFailed, getAuthMethod(KERBEROS, KERBEROS));
assertAuthEquals(Denied(SIMPLE), getAuthMethod(SIMPLE, KERBEROS));
assertAuthEquals(KrbFailed, getAuthMethod(KERBEROS, KERBEROS));
}
@Test
@ -620,8 +650,8 @@ public void testKerberosServerWithTokens() throws Exception {
assertAuthEquals(TOKEN, getAuthMethod(KERBEROS, KERBEROS, true));
// can't fallback to simple when using kerberos w/o tokens
forceSecretManager = false;
assertAuthEquals(NoDigest, getAuthMethod(SIMPLE, KERBEROS, true));
assertAuthEquals(NoDigest, getAuthMethod(KERBEROS, KERBEROS, true));
assertAuthEquals(Denied(TOKEN), getAuthMethod(SIMPLE, KERBEROS, true));
assertAuthEquals(Denied(TOKEN), getAuthMethod(KERBEROS, KERBEROS, true));
}
@Test
@ -629,8 +659,8 @@ public void testKerberosServerWithInvalidTokens() throws Exception {
assertAuthEquals(BadToken, getAuthMethod(SIMPLE, KERBEROS, false));
assertAuthEquals(BadToken, getAuthMethod(KERBEROS, KERBEROS, false));
forceSecretManager = false;
assertAuthEquals(NoDigest, getAuthMethod(SIMPLE, KERBEROS, true));
assertAuthEquals(NoDigest, getAuthMethod(KERBEROS, KERBEROS, true));
assertAuthEquals(Denied(TOKEN), getAuthMethod(SIMPLE, KERBEROS, false));
assertAuthEquals(Denied(TOKEN), getAuthMethod(KERBEROS, KERBEROS, false));
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#banner {
height: 93px;
background: none;
}
#bannerLeft img {
margin-left: 30px;
margin-top: 10px;
}
#bannerRight img {
margin: 17px;
}

View File

@ -464,6 +464,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4048. Use ERROR instead of INFO for volume failure logs.
(Stephen Chu via eli)
HDFS-1322. Document umask in DistributedFileSystem#mkdirs javadocs.
(Colin Patrick McCabe via eli)
OPTIMIZATIONS
BUG FIXES
@ -577,6 +580,12 @@ Release 2.0.3-alpha - Unreleased
HDFS-4162. Some malformed and unquoted HTML strings are returned from
datanode web ui. (Darek Dagit via suresh)
HDFS-4164. fuse_dfs: add -lrt to the compiler command line on Linux.
(Colin Patrick McCabe via eli)
HDFS-3921. NN will prematurely consider blocks missing when entering active
state while still in safe mode. (atm)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
@ -1971,6 +1980,9 @@ Release 0.23.5 - UNRELEASED
HDFS-3990. NN's health report has severe performance problems (daryn)
HDFS-4181. LeaseManager tries to double remove and prints extra messages
(Kihwal Lee via daryn)
BUG FIXES
HDFS-3829. TestHftpURLTimeouts fails intermittently with JDK7 (Trevor
@ -1985,6 +1997,9 @@ Release 0.23.5 - UNRELEASED
HDFS-4090. getFileChecksum() result incompatible when called against
zero-byte files. (Kihwal Lee via daryn)
HDFS-4172. namenode does not URI-encode parameters when building URI for
datanode request (Derek Dagit via bobby)
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -509,14 +509,32 @@ public LocatedFileStatus next() throws IOException {
}
/**
* Create a directory with given name and permission, only when
* parent directory exists.
* Create a directory, only when the parent directories exist.
*
* See {@link FsPermission#applyUMask(FsPermission)} for details of how
* the permission is applied.
*
* @param f The path to create
* @param permission The permission. See FsPermission#applyUMask for
* details about how this is used to calculate the
* effective permission.
*/
public boolean mkdir(Path f, FsPermission permission) throws IOException {
statistics.incrementWriteOps(1);
return dfs.mkdirs(getPathName(f), permission, false);
}
/**
* Create a directory and its parent directories.
*
* See {@link FsPermission#applyUMask(FsPermission)} for details of how
* the permission is applied.
*
* @param f The path to create
* @param permission The permission. See FsPermission#applyUMask for
* details about how this is used to calculate the
* effective permission.
*/
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
statistics.incrementWriteOps(1);

View File

@ -666,13 +666,17 @@ void startActiveServices() throws IOException {
LOG.info("Catching up to latest edits from old active before " +
"taking over writer role in edits logs");
editLogTailer.catchupDuringFailover();
blockManager.setPostponeBlocksFromFuture(false);
LOG.info("Reprocessing replication and invalidation queues");
blockManager.setPostponeBlocksFromFuture(false);
blockManager.getDatanodeManager().markAllDatanodesStale();
blockManager.clearQueues();
blockManager.processAllPendingDNMessages();
blockManager.processMisReplicatedBlocks();
if (!isInSafeMode() ||
(isInSafeMode() && safeMode.isPopulatingReplQueues())) {
LOG.info("Reprocessing replication and invalidation queues");
blockManager.processMisReplicatedBlocks();
}
if (LOG.isDebugEnabled()) {
LOG.debug("NameNode metadata after re-processing " +

View File

@ -135,7 +135,9 @@ synchronized Lease addLease(String holder, String src) {
synchronized void removeLease(Lease lease, String src) {
sortedLeasesByPath.remove(src);
if (!lease.removePath(src)) {
LOG.error(src + " not found in lease.paths (=" + lease.paths + ")");
if (LOG.isDebugEnabled()) {
LOG.debug(src + " not found in lease.paths (=" + lease.paths + ")");
}
}
if (!lease.hasPath()) {
@ -440,11 +442,14 @@ private synchronized void checkLeases() {
oldest.getPaths().toArray(leasePaths);
for(String p : leasePaths) {
try {
if(fsnamesystem.internalReleaseLease(oldest, p, HdfsServerConstants.NAMENODE_LEASE_HOLDER)) {
LOG.info("Lease recovery for " + p + " is complete. File closed.");
removing.add(p);
} else {
LOG.info("Started block recovery " + p + " lease " + oldest);
boolean completed = fsnamesystem.internalReleaseLease(oldest, p,
HdfsServerConstants.NAMENODE_LEASE_HOLDER);
if (LOG.isDebugEnabled()) {
if (completed) {
LOG.debug("Lease recovery for " + p + " is complete. File closed.");
} else {
LOG.debug("Started block recovery " + p + " lease " + oldest);
}
}
} catch (IOException e) {
LOG.error("Cannot release the path " + p + " in the lease "

View File

@ -22,6 +22,12 @@ abstract class BooleanParam extends Param<Boolean, BooleanParam.Domain> {
static final String TRUE = "true";
static final String FALSE = "false";
/** @return the parameter value as a string */
@Override
public String getValueString() {
return value.toString();
}
BooleanParam(final Domain domain, final Boolean value) {
super(domain, value);
}

View File

@ -53,6 +53,11 @@ public String toString() {
return getName() + "=" + toString(value);
}
/** @return the parameter value as a string */
@Override
public String getValueString() {
return toString(value);
}
/** The domain of the parameter. */
static final class Domain<E extends Enum<E>> extends Param.Domain<EnumSet<E>> {

View File

@ -114,6 +114,12 @@ public String toQueryString() {
}
}
/** @return the parameter value as a string */
@Override
public String getValueString() {
return value.toString();
}
HttpOpParam(final Domain<E> domain, final E value) {
super(domain, value);
}

View File

@ -31,6 +31,12 @@ public String toString() {
return getName() + "=" + Domain.toString(getValue());
}
/** @return the parameter value as a string */
@Override
public String getValueString() {
return Domain.toString(getValue());
}
/** The domain of the parameter. */
static final class Domain extends Param.Domain<InetSocketAddress> {
Domain(final String paramName) {

View File

@ -44,6 +44,12 @@ public String toString() {
return getName() + "=" + domain.toString(getValue());
}
/** @return the parameter value as a string */
@Override
public String getValueString() {
return domain.toString(getValue());
}
/** The domain of the parameter. */
static final class Domain extends Param.Domain<Integer> {
/** The radix of the number. */

View File

@ -43,6 +43,12 @@ public String toString() {
return getName() + "=" + domain.toString(getValue());
}
/** @return the parameter value as a string */
@Override
public String getValueString() {
return domain.toString(getValue());
}
/** The domain of the parameter. */
static final class Domain extends Param.Domain<Long> {
/** The radix of the number. */

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.web.resources;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Arrays;
import java.util.Comparator;
@ -32,16 +34,29 @@ public int compare(Param<?, ?> left, Param<?, ?> right) {
}
};
/** Convert the parameters to a sorted String. */
/** Convert the parameters to a sorted String.
*
* @param separator URI parameter separator character
* @param parameters parameters to encode into a string
* @return the encoded URI string
*/
public static String toSortedString(final String separator,
final Param<?, ?>... parameters) {
Arrays.sort(parameters, NAME_CMP);
final StringBuilder b = new StringBuilder();
for(Param<?, ?> p : parameters) {
if (p.getValue() != null) {
b.append(separator).append(p);
try {
for(Param<?, ?> p : parameters) {
if (p.getValue() != null) {
b.append(separator).append(
URLEncoder.encode(p.getName(), "UTF-8")
+ "="
+ URLEncoder.encode(p.getValueString(), "UTF-8"));
}
}
}
} catch (UnsupportedEncodingException e) {
// Sane systems know about UTF-8, so this should never happen.
throw new RuntimeException(e);
}
return b.toString();
}
@ -60,6 +75,9 @@ public final T getValue() {
return value;
}
/** @return the parameter value as a string */
public abstract String getValueString();
/** @return the parameter name. */
public abstract String getName();
@ -101,4 +119,4 @@ public final T parse(final String varName, final String str) {
}
}
}
}
}

View File

@ -44,6 +44,12 @@ public String toString() {
return getName() + "=" + domain.toString(getValue());
}
/** @return the parameter value as a string */
@Override
public final String getValueString() {
return domain.toString(getValue());
}
/** The domain of the parameter. */
static final class Domain extends Param.Domain<Short> {
/** The radix of the number. */

View File

@ -25,6 +25,12 @@ abstract class StringParam extends Param<String, StringParam.Domain> {
super(domain, domain.parse(str));
}
/** @return the parameter value as a string */
@Override
public String getValueString() {
return value;
}
/** The domain of the parameter. */
static final class Domain extends Param.Domain<String> {
/** The pattern defining the domain; null . */

View File

@ -70,6 +70,7 @@ IF(FUSE_FOUND)
hdfs
m
pthread
rt
)
add_executable(test_fuse_dfs
test/test_fuse_dfs.c

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#banner {
height: 93px;
background: none;
}
#bannerLeft img {
margin-left: 30px;
margin-top: 10px;
}
#bannerRight img {
margin: 17px;
}

View File

@ -0,0 +1,28 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project name="Apache Hadoop ${project.version}">
<skin>
<groupId>org.apache.maven.skins</groupId>
<artifactId>maven-stylus-skin</artifactId>
<version>1.2</version>
</skin>
<body>
<links>
<item name="Apache Hadoop" href="http://hadoop.apache.org/"/>
</links>
</body>
</project>

View File

@ -630,6 +630,32 @@ public Boolean get() {
assertEquals(0L, nn1.getNamesystem().getPendingReplicationBlocks());
}
/**
* Make sure that when we transition to active in safe mode that we don't
* prematurely consider blocks missing just because not all DNs have reported
* yet.
*
* This is a regression test for HDFS-3921.
*/
@Test
public void testNoPopulatingReplQueuesWhenStartingActiveInSafeMode()
throws IOException {
DFSTestUtil.createFile(fs, new Path("/test"), 15*BLOCK_SIZE, (short)3, 1L);
// Stop the DN so that when the NN restarts not all blocks wil be reported
// and the NN won't leave safe mode.
cluster.stopDataNode(1);
// Restart the namenode but don't wait for it to hear from all DNs (since
// one DN is deliberately shut down.)
cluster.restartNameNode(0, false);
cluster.transitionToActive(0);
assertTrue(cluster.getNameNode(0).isInSafeMode());
// We shouldn't yet consider any blocks "missing" since we're in startup
// safemode, i.e. not all DNs may have reported.
assertEquals(0, cluster.getNamesystem(0).getMissingBlocksCount());
}
/**
* Print a big banner in the test log to make debug easier.
*/

View File

@ -224,4 +224,14 @@ public void testReplicationParam() {
LOG.info("EXPECTED: " + e);
}
}
@Test
public void testToSortedStringEscapesURICharacters() {
final String sep = "&";
Param<?, ?> ampParam = new TokenArgumentParam("token&ampersand");
Param<?, ?> equalParam = new RenewerParam("renewer=equal");
final String expected = "&renewer=renewer%3Dequal&token=token%26ampersand";
final String actual = Param.toSortedString(sep, equalParam, ampParam);
Assert.assertEquals(expected, actual);
}
}

View File

@ -650,6 +650,16 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED
state (jlowe via bobby)
MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)
MAPREDUCE-4787. TestJobMonitorAndPrint is broken (Rob Parker via bobby)
MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe
via bobby)
MAPREDUCE-4786. Job End Notification retry interval is 5 milliseconds by
default (Ravi Prakash via bobby)
Release 0.23.4 - UNRELEASED

View File

@ -53,7 +53,7 @@ public class JobEndNotifier implements Configurable {
protected String userUrl;
protected String proxyConf;
protected int numTries; //Number of tries to attempt notification
protected int waitInterval; //Time to wait between retrying notification
protected int waitInterval; //Time (ms) to wait between retrying notification
protected URL urlToNotify; //URL to notify read from the config
protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
@ -71,10 +71,10 @@ public void setConf(Configuration conf) {
, conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, 1)
);
waitInterval = Math.min(
conf.getInt(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, 5)
, conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, 5)
conf.getInt(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, 5000)
, conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, 5000)
);
waitInterval = (waitInterval < 0) ? 5 : waitInterval;
waitInterval = (waitInterval < 0) ? 5000 : waitInterval;
userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);

View File

@ -712,7 +712,10 @@ protected void scheduleTasks(Set<TaskId> taskIDs) {
* The only entry point to change the Job.
*/
public void handle(JobEvent event) {
LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + event.getJobId() + " of type "
+ event.getType());
}
try {
writeLock.lock();
JobStateInternal oldState = getInternalState();

View File

@ -22,9 +22,11 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -118,9 +120,18 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
protected Credentials credentials;
protected Token<JobTokenIdentifier> jobToken;
//should be set to one which comes first
//saying COMMIT_PENDING
private TaskAttemptId commitAttempt;
private TaskAttemptId successfulAttempt;
private final Set<TaskAttemptId> failedAttempts;
// Track the finished attempts - successful, failed and killed
private final Set<TaskAttemptId> finishedAttempts;
// counts the number of attempts that are either running or in a state where
// they will come to be running when they get a Container
private int numberUncompletedAttempts = 0;
private final Set<TaskAttemptId> inProgressAttempts;
private boolean historyTaskStartGenerated = false;
@ -182,6 +193,14 @@ TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_KILLED,
new KillWaitAttemptKilledTransition())
.addTransition(TaskStateInternal.KILL_WAIT,
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_SUCCEEDED,
new KillWaitAttemptSucceededTransition())
.addTransition(TaskStateInternal.KILL_WAIT,
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_FAILED,
new KillWaitAttemptFailedTransition())
// Ignore-able transitions.
.addTransition(
TaskStateInternal.KILL_WAIT,
@ -189,8 +208,6 @@ TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
TaskEventType.T_ATTEMPT_FAILED,
TaskEventType.T_ATTEMPT_SUCCEEDED,
TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from SUCCEEDED state
@ -200,13 +217,15 @@ TaskEventType.T_ATTEMPT_FAILED, new RetroactiveFailureTransition())
.addTransition(TaskStateInternal.SUCCEEDED,
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition())
.addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
TaskEventType.T_ATTEMPT_SUCCEEDED,
new AttemptSucceededAtSucceededTransition())
// Ignore-able transitions.
.addTransition(
TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_SUCCEEDED,
TaskEventType.T_KILL))
// Transitions from FAILED state
@ -242,15 +261,6 @@ public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
new RecoverdAttemptsComparator();
//should be set to one which comes first
//saying COMMIT_PENDING
private TaskAttemptId commitAttempt;
private TaskAttemptId successfulAttempt;
private int failedAttempts;
private int finishedAttempts;//finish are total of success, failed and killed
@Override
public TaskState getState() {
readLock.lock();
@ -275,6 +285,9 @@ public TaskImpl(JobId jobId, TaskType taskType, int partition,
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
this.attempts = Collections.emptyMap();
this.finishedAttempts = new HashSet<TaskAttemptId>(2);
this.failedAttempts = new HashSet<TaskAttemptId>(2);
this.inProgressAttempts = new HashSet<TaskAttemptId>(2);
// This overridable method call is okay in a constructor because we
// have a convention that none of the overrides depends on any
// fields that need initialization.
@ -611,9 +624,9 @@ private void addAndScheduleAttempt() {
taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
}
++numberUncompletedAttempts;
inProgressAttempts.add(attempt.getID());
//schedule the nextAttemptNumber
if (failedAttempts > 0) {
if (failedAttempts.size() > 0) {
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_RESCHEDULE));
} else {
@ -788,12 +801,14 @@ private static class AttemptSucceededTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
TaskTAttemptEvent taskTAttemptEvent = (TaskTAttemptEvent) event;
TaskAttemptId taskAttemptId = taskTAttemptEvent.getTaskAttemptID();
task.handleTaskAttemptCompletion(
((TaskTAttemptEvent) event).getTaskAttemptID(),
taskAttemptId,
TaskAttemptCompletionEventStatus.SUCCEEDED);
task.finishedAttempts++;
--task.numberUncompletedAttempts;
task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID();
task.finishedAttempts.add(taskAttemptId);
task.inProgressAttempts.remove(taskAttemptId);
task.successfulAttempt = taskAttemptId;
task.eventHandler.handle(new JobTaskEvent(
task.taskId, TaskState.SUCCEEDED));
LOG.info("Task succeeded with attempt " + task.successfulAttempt);
@ -824,11 +839,13 @@ private static class AttemptKilledTransition implements
SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
TaskAttemptId taskAttemptId =
((TaskTAttemptEvent) event).getTaskAttemptID();
task.handleTaskAttemptCompletion(
((TaskTAttemptEvent) event).getTaskAttemptID(),
taskAttemptId,
TaskAttemptCompletionEventStatus.KILLED);
task.finishedAttempts++;
--task.numberUncompletedAttempts;
task.finishedAttempts.add(taskAttemptId);
task.inProgressAttempts.remove(taskAttemptId);
if (task.successfulAttempt == null) {
task.addAndScheduleAttempt();
}
@ -840,15 +857,25 @@ private static class KillWaitAttemptKilledTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
protected TaskStateInternal finalState = TaskStateInternal.KILLED;
protected final TaskAttemptCompletionEventStatus taCompletionEventStatus;
public KillWaitAttemptKilledTransition() {
this(TaskAttemptCompletionEventStatus.KILLED);
}
public KillWaitAttemptKilledTransition(
TaskAttemptCompletionEventStatus taCompletionEventStatus) {
this.taCompletionEventStatus = taCompletionEventStatus;
}
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.handleTaskAttemptCompletion(
((TaskTAttemptEvent) event).getTaskAttemptID(),
TaskAttemptCompletionEventStatus.KILLED);
task.finishedAttempts++;
TaskAttemptId taskAttemptId =
((TaskTAttemptEvent) event).getTaskAttemptID();
task.handleTaskAttemptCompletion(taskAttemptId, taCompletionEventStatus);
task.finishedAttempts.add(taskAttemptId);
// check whether all attempts are finished
if (task.finishedAttempts == task.attempts.size()) {
if (task.finishedAttempts.size() == task.attempts.size()) {
if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
finalState, null); // TODO JH verify failedAttempt null
@ -867,43 +894,57 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
}
}
private static class KillWaitAttemptSucceededTransition extends
KillWaitAttemptKilledTransition {
public KillWaitAttemptSucceededTransition() {
super(TaskAttemptCompletionEventStatus.SUCCEEDED);
}
}
private static class KillWaitAttemptFailedTransition extends
KillWaitAttemptKilledTransition {
public KillWaitAttemptFailedTransition() {
super(TaskAttemptCompletionEventStatus.FAILED);
}
}
private static class AttemptFailedTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.failedAttempts++;
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID();
task.failedAttempts.add(taskAttemptId);
if (taskAttemptId.equals(task.commitAttempt)) {
task.commitAttempt = null;
}
TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
TaskAttempt attempt = task.attempts.get(taskAttemptId);
if (attempt.getAssignedContainerMgrAddress() != null) {
//container was assigned
task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(),
attempt.getAssignedContainerMgrAddress()));
}
task.finishedAttempts++;
if (task.failedAttempts < task.maxAttempts) {
task.finishedAttempts.add(taskAttemptId);
if (task.failedAttempts.size() < task.maxAttempts) {
task.handleTaskAttemptCompletion(
((TaskTAttemptEvent) event).getTaskAttemptID(),
taskAttemptId,
TaskAttemptCompletionEventStatus.FAILED);
// we don't need a new event if we already have a spare
if (--task.numberUncompletedAttempts == 0
task.inProgressAttempts.remove(taskAttemptId);
if (task.inProgressAttempts.size() == 0
&& task.successfulAttempt == null) {
task.addAndScheduleAttempt();
}
} else {
task.handleTaskAttemptCompletion(
((TaskTAttemptEvent) event).getTaskAttemptID(),
taskAttemptId,
TaskAttemptCompletionEventStatus.TIPFAILED);
TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
TaskAttemptId taId = ev.getTaskAttemptID();
if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
TaskStateInternal.FAILED, taId);
TaskStateInternal.FAILED, taskAttemptId);
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
taskFailedEvent));
} else {
@ -927,14 +968,14 @@ private static class RetroactiveFailureTransition
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
if (event instanceof TaskTAttemptEvent) {
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
!castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
// succeeded state
return TaskStateInternal.SUCCEEDED;
}
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
!castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
// succeeded state
task.finishedAttempts.add(castEvent.getTaskAttemptID());
task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
return TaskStateInternal.SUCCEEDED;
}
// a successful REDUCE task should not be overridden
@ -953,7 +994,7 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
// believe that there's no redundancy.
unSucceed(task);
// fake increase in Uncomplete attempts for super.transition
++task.numberUncompletedAttempts;
task.inProgressAttempts.add(castEvent.getTaskAttemptID());
return super.transition(task, event);
}
@ -976,6 +1017,8 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
!attemptId.equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
// succeeded state
task.finishedAttempts.add(castEvent.getTaskAttemptID());
task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
return TaskStateInternal.SUCCEEDED;
}
}
@ -1006,6 +1049,16 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
}
}
private static class AttemptSucceededAtSucceededTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
task.finishedAttempts.add(castEvent.getTaskAttemptID());
task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
}
}
private static class KillNewTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
@ -1045,7 +1098,7 @@ public void transition(TaskImpl task, TaskEvent event) {
(attempt, "Task KILL is received. Killing attempt!");
}
task.numberUncompletedAttempts = 0;
task.inProgressAttempts.clear();
}
}

View File

@ -54,6 +54,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
@ -63,6 +64,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
@ -243,6 +245,39 @@ public Job submit(Configuration conf) throws Exception {
return job;
}
public void waitForInternalState(JobImpl job,
JobStateInternal finalState) throws Exception {
int timeoutSecs = 0;
JobStateInternal iState = job.getInternalState();
while (!finalState.equals(iState) && timeoutSecs++ < 20) {
System.out.println("Job Internal State is : " + iState
+ " Waiting for Internal state : " + finalState);
Thread.sleep(500);
iState = job.getInternalState();
}
System.out.println("Task Internal State is : " + iState);
Assert.assertEquals("Task Internal state is not correct (timedout)",
finalState, iState);
}
public void waitForInternalState(TaskImpl task,
TaskStateInternal finalState) throws Exception {
int timeoutSecs = 0;
TaskReport report = task.getReport();
TaskStateInternal iState = task.getInternalState();
while (!finalState.equals(iState) && timeoutSecs++ < 20) {
System.out.println("Task Internal State is : " + iState
+ " Waiting for Internal state : " + finalState + " progress : "
+ report.getProgress());
Thread.sleep(500);
report = task.getReport();
iState = task.getInternalState();
}
System.out.println("Task Internal State is : " + iState);
Assert.assertEquals("Task Internal state is not correct (timedout)",
finalState, iState);
}
public void waitForInternalState(TaskAttemptImpl attempt,
TaskAttemptStateInternal finalState) throws Exception {
int timeoutSecs = 0;

View File

@ -55,22 +55,22 @@ private void testNumRetries(Configuration conf) {
//Test maximum retry interval is capped by
//MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL
private void testWaitInterval(Configuration conf) {
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "5");
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "1");
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "5000");
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "1000");
setConf(conf);
Assert.assertTrue("Expected waitInterval to be 1, but was " + waitInterval,
waitInterval == 1);
Assert.assertTrue("Expected waitInterval to be 1000, but was "
+ waitInterval, waitInterval == 1000);
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "10");
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "10000");
setConf(conf);
Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval,
waitInterval == 5);
Assert.assertTrue("Expected waitInterval to be 5000, but was "
+ waitInterval, waitInterval == 5000);
//Test negative numbers are set to default
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "-10");
setConf(conf);
Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval,
waitInterval == 5);
Assert.assertTrue("Expected waitInterval to be 5000, but was "
+ waitInterval, waitInterval == 5000);
}
private void testProxyConfiguration(Configuration conf) {
@ -125,17 +125,28 @@ protected boolean notifyURLOnce() {
public void testNotifyRetries() throws InterruptedException {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent");
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3");
conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3");
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "3000");
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "3000");
JobReport jobReport = Mockito.mock(JobReport.class);
long startTime = System.currentTimeMillis();
this.notificationCount = 0;
this.setConf(conf);
this.notify(jobReport);
long endTime = System.currentTimeMillis();
Assert.assertEquals("Only 1 try was expected but was : "
+ this.notificationCount, this.notificationCount, 1);
Assert.assertTrue("Should have taken more than 5 seconds it took "
+ (endTime - startTime), endTime - startTime > 5000);
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3");
conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3");
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "3000");
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "3000");
startTime = System.currentTimeMillis();
this.notificationCount = 0;
this.setConf(conf);
this.notify(jobReport);
endTime = System.currentTimeMillis();
Assert.assertEquals("Only 3 retries were expected but was : "
+ this.notificationCount, this.notificationCount, 3);
Assert.assertTrue("Should have taken more than 9 seconds it took "

View File

@ -25,12 +25,15 @@
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@ -39,12 +42,18 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.junit.Test;
/**
* Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios.
*
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestKill {
@Test
@ -131,6 +140,80 @@ public void testKillTask() throws Exception {
iter.next().getReport().getTaskAttemptState());
}
@Test
public void testKillTaskWait() throws Exception {
final Dispatcher dispatcher = new AsyncDispatcher() {
private TaskAttemptEvent cachedKillEvent;
@Override
protected void dispatch(Event event) {
if (event instanceof TaskAttemptEvent) {
TaskAttemptEvent killEvent = (TaskAttemptEvent) event;
if (killEvent.getType() == TaskAttemptEventType.TA_KILL) {
TaskAttemptId taID = killEvent.getTaskAttemptID();
if (taID.getTaskId().getTaskType() == TaskType.REDUCE
&& taID.getTaskId().getId() == 0 && taID.getId() == 0) {
// Task is asking the reduce TA to kill itself. 'Create' a race
// condition. Make the task succeed and then inform the task that
// TA has succeeded. Once Task gets the TA succeeded event at
// KILL_WAIT, then relay the actual kill signal to TA
super.dispatch(new TaskAttemptEvent(taID,
TaskAttemptEventType.TA_DONE));
super.dispatch(new TaskAttemptEvent(taID,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
super.dispatch(new TaskTAttemptEvent(taID,
TaskEventType.T_ATTEMPT_SUCCEEDED));
this.cachedKillEvent = killEvent;
return;
}
}
} else if (event instanceof TaskEvent) {
TaskEvent taskEvent = (TaskEvent) event;
if (taskEvent.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED
&& this.cachedKillEvent != null) {
// When the TA comes and reports that it is done, send the
// cachedKillEvent
super.dispatch(this.cachedKillEvent);
return;
}
}
super.dispatch(event);
}
};
MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
@Override
public Dispatcher createDispatcher() {
return dispatcher;
}
};
Job job = app.submit(new Configuration());
JobId jobId = app.getJobId();
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask = it.next();
Task reduceTask = it.next();
app.waitForState(mapTask, TaskState.RUNNING);
app.waitForState(reduceTask, TaskState.RUNNING);
TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
// Finish map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapAttempt.getID(),
TaskAttemptEventType.TA_DONE));
app.waitForState(mapTask, TaskState.SUCCEEDED);
// Now kill the job
app.getContext().getEventHandler()
.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
}
@Test
public void testKillTaskAttempt() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);

View File

@ -141,7 +141,6 @@ private class MockTaskAttemptImpl extends TaskAttemptImpl {
private float progress = 0;
private TaskAttemptState state = TaskAttemptState.NEW;
private TaskAttemptId attemptId;
private TaskType taskType;
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
@ -152,14 +151,11 @@ public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
AppContext appContext, TaskType taskType) {
super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
dataLocations, committer, jobToken, credentials, clock, appContext);
attemptId = Records.newRecord(TaskAttemptId.class);
attemptId.setId(id);
attemptId.setTaskId(taskId);
this.taskType = taskType;
}
public TaskAttemptId getAttemptId() {
return attemptId;
return getID();
}
@Override
@ -561,4 +557,49 @@ public void testCommitAfterSucceeds() {
mockTask = createMockTask(TaskType.REDUCE);
runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING);
}
@Test
public void testSpeculativeMapFetchFailure() {
// Setup a scenario where speculative task wins, first attempt killed
mockTask = createMockTask(TaskType.MAP);
runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
assertEquals(2, taskAttempts.size());
// speculative attempt retroactively fails from fetch failures
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
}
@Test
public void testSpeculativeMapMultipleSucceedFetchFailure() {
// Setup a scenario where speculative task wins, first attempt succeeds
mockTask = createMockTask(TaskType.MAP);
runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED);
assertEquals(2, taskAttempts.size());
// speculative attempt retroactively fails from fetch failures
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
}
@Test
public void testSpeculativeMapFailedFetchFailure() {
// Setup a scenario where speculative task wins, first attempt succeeds
mockTask = createMockTask(TaskType.MAP);
runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
assertEquals(2, taskAttempts.size());
// speculative attempt retroactively fails from fetch failures
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
TaskEventType.T_ATTEMPT_FAILED));
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
}
}

View File

@ -51,7 +51,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<log4j.configuration>file:///${project.parent.basedir}/../src/test/log4j.properties</log4j.configuration>
<log4j.configuration>file:///${project.basedir}/src/test/resources/log4j.properties</log4j.configuration>
</systemPropertyVariables>
</configuration>
</plugin>

View File

@ -610,36 +610,6 @@
</description>
</property>
<!-- Job Notification Configuration -->
<!--
<property>
<name>mapreduce.job.end-notification.url</name>
<value>http://localhost:8080/jobstatus.php?jobId=$jobId&amp;jobStatus=$jobStatus</value>
<description>Indicates url which will be called on completion of job to inform
end status of job.
User can give at most 2 variables with URI : $jobId and $jobStatus.
If they are present in URI, then they will be replaced by their
respective values.
</description>
</property>
-->
<property>
<name>mapreduce.job.end-notification.retry.attempts</name>
<value>0</value>
<description>Indicates how many times hadoop should attempt to contact the
notification URL </description>
</property>
<property>
<name>mapreduce.job.end-notification.retry.interval</name>
<value>30000</value>
<description>Indicates time in milliseconds between notification URL retry
calls</description>
</property>
<property>
<name>mapreduce.job.queuename</name>
<value>default</value>
@ -802,6 +772,34 @@
</description>
</property>
<!-- Job Notification Configuration -->
<property>
<name>mapreduce.job.end-notification.url</name>
<!--<value>http://localhost:8080/jobstatus.php?jobId=$jobId&amp;jobStatus=$jobStatus</value>-->
<description>Indicates url which will be called on completion of job to inform
end status of job.
User can give at most 2 variables with URI : $jobId and $jobStatus.
If they are present in URI, then they will be replaced by their
respective values.
</description>
</property>
<property>
<name>mapreduce.job.end-notification.retry.attempts</name>
<value>0</value>
<description>The number of times the submitter of the job wants to retry job
end notification if it fails. This is capped by
mapreduce.job.end-notification.max.attempts</description>
</property>
<property>
<name>mapreduce.job.end-notification.retry.interval</name>
<value>1000</value>
<description>The number of milliseconds the submitter of the job wants to
wait before job end notification is retried if it fails. This is capped by
mapreduce.job.end-notification.max.retry.interval</description>
</property>
<property>
<name>mapreduce.job.end-notification.max.attempts</name>
<value>5</value>
@ -815,36 +813,12 @@
<property>
<name>mapreduce.job.end-notification.max.retry.interval</name>
<value>5</value>
<value>5000</value>
<final>true</final>
<description>The maximum amount of time (in seconds) to wait before retrying
job end notification. Cluster administrators can set this to limit how long
the Application Master waits before exiting. Must be marked as final to
prevent users from overriding this.</description>
</property>
<property>
<name>mapreduce.job.end-notification.url</name>
<value></value>
<description>The URL to send job end notification. It may contain sentinels
$jobId and $jobStatus which will be replaced with jobId and jobStatus.
</description>
</property>
<property>
<name>mapreduce.job.end-notification.retry.attempts</name>
<value>5</value>
<description>The number of times the submitter of the job wants to retry job
end notification if it fails. This is capped by
mapreduce.job.end-notification.max.attempts</description>
</property>
<property>
<name>mapreduce.job.end-notification.retry.interval</name>
<value>1</value>
<description>The number of seconds the submitter of the job wants to wait
before job end notification is retried if it fails. This is capped by
mapreduce.job.end-notification.max.retry.interval</description>
<description>The maximum amount of time (in milliseconds) to wait before
retrying job end notification. Cluster administrators can set this to
limit how long the Application Master waits before exiting. Must be marked
as final to prevent users from overriding this.</description>
</property>
<property>

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#banner {
height: 93px;
background: none;
}
#bannerLeft img {
margin-left: 30px;
margin-top: 10px;
}
#bannerRight img {
margin: 17px;
}

View File

@ -0,0 +1,28 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project name="Apache Hadoop ${project.version}">
<skin>
<groupId>org.apache.maven.skins</groupId>
<artifactId>maven-stylus-skin</artifactId>
<version>1.2</version>
</skin>
<body>
<links>
<item name="Apache Hadoop" href="http://hadoop.apache.org/"/>
</links>
</body>
</project>

View File

@ -274,7 +274,7 @@ public int getMaxMaps() {
* @param maxMaps - Number of maps
*/
public void setMaxMaps(int maxMaps) {
this.maxMaps = maxMaps;
this.maxMaps = Math.max(maxMaps, 1);
}
/** Get the map bandwidth in MB

View File

@ -131,8 +131,8 @@ public void testDuplicates() {
fs = FileSystem.get(getConf());
List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(new Path("/tmp/in/*/*"));
TestDistCpUtils.createFile(fs, "/tmp/in/1.txt");
TestDistCpUtils.createFile(fs, "/tmp/in/src/1.txt");
TestDistCpUtils.createFile(fs, "/tmp/in/src1/1.txt");
TestDistCpUtils.createFile(fs, "/tmp/in/src2/1.txt");
Path target = new Path("/tmp/out");
Path listingFile = new Path("/tmp/list");
DistCpOptions options = new DistCpOptions(srcPaths, target);

View File

@ -275,6 +275,13 @@ public void testParseMaps() {
"hdfs://localhost:8020/target/"});
Assert.assertEquals(options.getMaxMaps(), 1);
options = OptionsParser.parse(new String[] {
"-m",
"0",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertEquals(options.getMaxMaps(), 1);
try {
OptionsParser.parse(new String[] {
"-m",

View File

@ -216,6 +216,9 @@ Release 0.23.5 - UNRELEASED
YARN-206. TestApplicationCleanup.testContainerCleanup occasionally fails.
(jlowe via jeagles)
YARN-212. NM state machine ignores an APPLICATION_CONTAINER_FINISHED event
when it shouldn't (Nathan Roberts via jlowe)
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -143,6 +143,9 @@ ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
ApplicationEventType.FINISH_APPLICATION,
new AppFinishTriggeredTransition())
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
CONTAINER_DONE_TRANSITION)
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
new AppLogInitDoneTransition())

View File

@ -277,6 +277,8 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
// From DONE
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.KILL_CONTAINER)
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.INIT_CONTAINER)
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)

View File

@ -155,6 +155,60 @@ public void testAppRunningAfterContainersComplete() {
}
}
/**
* Finished containers properly tracked when only container finishes in APP_INITING
*/
@Test
public void testContainersCompleteDuringAppInit1() {
WrappedApplication wa = null;
try {
wa = new WrappedApplication(3, 314159265358979L, "yak", 1);
wa.initApplication();
wa.initContainer(-1);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.containerFinished(0);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
assertEquals(0, wa.app.getContainers().size());
} finally {
if (wa != null)
wa.finished();
}
}
/**
* Finished containers properly tracked when 1 of several containers finishes in APP_INITING
*/
@Test
public void testContainersCompleteDuringAppInit2() {
WrappedApplication wa = null;
try {
wa = new WrappedApplication(3, 314159265358979L, "yak", 3);
wa.initApplication();
wa.initContainer(-1);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.containerFinished(0);
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
wa.applicationInited();
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
assertEquals(2, wa.app.getContainers().size());
wa.containerFinished(1);
wa.containerFinished(2);
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
assertEquals(0, wa.app.getContainers().size());
} finally {
if (wa != null)
wa.finished();
}
}
@Test
@SuppressWarnings("unchecked")
public void testAppFinishedOnRunningContainers() {

View File

@ -56,6 +56,8 @@
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
@ -65,6 +67,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@ -208,6 +212,32 @@ public void testCleanupOnSuccess() throws Exception {
}
}
}
@Test
@SuppressWarnings("unchecked") // mocked generic
public void testInitWhileDone() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(6, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
wc.launchContainer();
reset(wc.localizerBus);
wc.containerSuccessful();
wc.containerResourcesCleanup();
assertEquals(ContainerState.DONE, wc.c.getContainerState());
// Now in DONE, issue INIT
wc.initContainer();
// Verify still in DONE
assertEquals(ContainerState.DONE, wc.c.getContainerState());
verifyCleanupCall(wc);
}
finally {
if (wc != null) {
wc.finished();
}
}
}
@Test
@SuppressWarnings("unchecked") // mocked generic
@ -506,6 +536,8 @@ private class WrappedContainer {
final EventHandler<ContainersLauncherEvent> launcherBus;
final EventHandler<ContainersMonitorEvent> monitorBus;
final EventHandler<AuxServicesEvent> auxBus;
final EventHandler<ApplicationEvent> appBus;
final EventHandler<LogHandlerEvent> LogBus;
final ContainerLaunchContext ctxt;
final ContainerId cId;
@ -527,10 +559,14 @@ private class WrappedContainer {
launcherBus = mock(EventHandler.class);
monitorBus = mock(EventHandler.class);
auxBus = mock(EventHandler.class);
appBus = mock(EventHandler.class);
LogBus = mock(EventHandler.class);
dispatcher.register(LocalizationEventType.class, localizerBus);
dispatcher.register(ContainersLauncherEventType.class, launcherBus);
dispatcher.register(ContainersMonitorEventType.class, monitorBus);
dispatcher.register(AuxServicesEventType.class, auxBus);
dispatcher.register(ApplicationEventType.class, appBus);
dispatcher.register(LogHandlerEventType.class, LogBus);
this.user = user;
ctxt = mock(ContainerLaunchContext.class);
@ -654,6 +690,11 @@ public void containerSuccessful() {
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
drainDispatcherEvents();
}
public void containerResourcesCleanup() {
c.handle(new ContainerEvent(cId,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
drainDispatcherEvents();
}
public void containerFailed(int exitCode) {
c.handle(new ContainerExitEvent(cId,

View File

@ -319,6 +319,7 @@ public void testMultipleAppsLogAggregation() throws Exception {
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
dispatcher.await();
ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{
new ApplicationEvent(
application1,