diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 4416a152b80..2d7f17d4cbd 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -361,6 +361,9 @@ Release 2.0.3-alpha - Unreleased HADOOP-8860. Split MapReduce and YARN sections in documentation navigation. (tomwhite via tucu) + HADOOP-9021. Enforce configured SASL method on the server (daryn via + bobby) + OPTIMIZATIONS HADOOP-8866. SampleQuantiles#query is O(N^2) instead of O(N). (Andrew Wang @@ -421,6 +424,8 @@ Release 2.0.3-alpha - Unreleased HADOOP-7115. Add a cache for getpwuid_r and getpwgid_r calls (tucu) + HADOOP-8999. SASL negotiation is flawed (daryn) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES @@ -1140,6 +1145,12 @@ Release 0.23.5 - UNRELEASED HADOOP-8986. Server$Call object is never released after it is sent (bobby) + HADOOP-9022. Hadoop distcp tool fails to copy file if -m 0 specified + (Jonathan Eagles vai bobby) + + HADOOP-9025. org.apache.hadoop.tools.TestCopyListing failing (Jonathan + Eagles via jlowe) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java index 972a410b53d..a909edfd927 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java @@ -184,7 +184,18 @@ public String toString() { return str; } - /** Apply a umask to this permission and return a new one */ + /** + * Apply a umask to this permission and return a new one. + * + * The umask is used by create, mkdir, and other Hadoop filesystem operations. + * The mode argument for these operations is modified by removing the bits + * which are set in the umask. Thus, the umask limits the permissions which + * newly created files and directories get. + * + * @param umask The umask to use + * + * @return The effective permission + */ public FsPermission applyUMask(FsPermission umask) { return new FsPermission(useraction.and(umask.useraction.not()), groupaction.and(umask.groupaction.not()), diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 60155e71ce8..eb735ff9a79 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -45,6 +45,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -87,7 +88,9 @@ import org.apache.hadoop.security.SaslRpcServer.SaslDigestCallbackHandler; import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler; import org.apache.hadoop.security.SaslRpcServer.SaslStatus; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.PolicyProvider; @@ -113,7 +116,7 @@ @InterfaceStability.Evolving public abstract class Server { private final boolean authorize; - private boolean isSecurityEnabled; + private EnumSet enabledAuthMethods; private ExceptionsHandler exceptionsHandler = new ExceptionsHandler(); public void addTerseExceptions(Class... exceptionClass) { @@ -1217,6 +1220,10 @@ private void saslReadAndProcess(byte[] saslToken) throws IOException, AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser); throw e; } + if (replyToken == null && authMethod == AuthMethod.PLAIN) { + // client needs at least response to know if it should use SIMPLE + replyToken = new byte[0]; + } if (replyToken != null) { if (LOG.isDebugEnabled()) LOG.debug("Will send token of size " + replyToken.length @@ -1334,34 +1341,9 @@ public int readAndProcess() throws IOException, InterruptedException { if (authMethod == null) { throw new IOException("Unable to read authentication method"); } - boolean useSaslServer = isSecurityEnabled; - final boolean clientUsingSasl; - switch (authMethod) { - case SIMPLE: { // no sasl for simple - clientUsingSasl = false; - break; - } - case DIGEST: { // always allow tokens if there's a secret manager - useSaslServer |= (secretManager != null); - clientUsingSasl = true; - break; - } - default: { - clientUsingSasl = true; - break; - } - } - if (useSaslServer) { - saslServer = createSaslServer(authMethod); - } else if (clientUsingSasl) { // security is off - doSaslReply(SaslStatus.SUCCESS, new IntWritable( - SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null); - authMethod = AuthMethod.SIMPLE; - // client has already sent the initial Sasl message and we - // should ignore it. Both client and server should fall back - // to simple auth from now on. - skipInitialSaslHandshake = true; - } + + // this may create a SASL server, or switch us into SIMPLE + authMethod = initializeAuthContext(authMethod); connectionHeaderBuf = null; connectionHeaderRead = true; @@ -1409,10 +1391,24 @@ public int readAndProcess() throws IOException, InterruptedException { } } - private SaslServer createSaslServer(AuthMethod authMethod) + private AuthMethod initializeAuthContext(AuthMethod authMethod) throws IOException { try { - return createSaslServerInternal(authMethod); + if (enabledAuthMethods.contains(authMethod)) { + saslServer = createSaslServer(authMethod); + } else if (enabledAuthMethods.contains(AuthMethod.SIMPLE)) { + doSaslReply(SaslStatus.SUCCESS, new IntWritable( + SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null); + authMethod = AuthMethod.SIMPLE; + // client has already sent the initial Sasl message and we + // should ignore it. Both client and server should fall back + // to simple auth from now on. + skipInitialSaslHandshake = true; + } else { + throw new AccessControlException( + authMethod + " authentication is not enabled." + + " Available:" + enabledAuthMethods); + } } catch (IOException ioe) { final String ioeClass = ioe.getClass().getName(); final String ioeMessage = ioe.getLocalizedMessage(); @@ -1425,9 +1421,10 @@ private SaslServer createSaslServer(AuthMethod authMethod) } throw ioe; } + return authMethod; } - private SaslServer createSaslServerInternal(AuthMethod authMethod) + private SaslServer createSaslServer(AuthMethod authMethod) throws IOException { SaslServer saslServer = null; String hostname = null; @@ -1436,18 +1433,9 @@ private SaslServer createSaslServerInternal(AuthMethod authMethod) switch (authMethod) { case SIMPLE: { - throw new AccessControlException("Authorization (" - + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION - + ") is enabled but authentication (" - + CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION - + ") is configured as simple. Please configure another method " - + "like kerberos or digest."); + return null; // no sasl for simple } case DIGEST: { - if (secretManager == null) { - throw new AccessControlException( - "Server is not configured to do DIGEST authentication."); - } secretManager.checkAvailableForRead(); hostname = SaslRpcServer.SASL_DEFAULT_REALM; saslCallback = new SaslDigestCallbackHandler(secretManager, this); @@ -1469,6 +1457,7 @@ private SaslServer createSaslServerInternal(AuthMethod authMethod) break; } default: + // we should never be able to get here throw new AccessControlException( "Server does not support SASL " + authMethod); } @@ -1908,7 +1897,9 @@ protected Server(String bindAddress, int port, this.authorize = conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); - this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled(); + + // configure supported authentications + this.enabledAuthMethods = getAuthMethods(secretManager, conf); // Start the listener here and let it bind to the port listener = new Listener(); @@ -1929,6 +1920,31 @@ protected Server(String bindAddress, int port, this.exceptionsHandler.addTerseExceptions(StandbyException.class); } + // get the security type from the conf. implicitly include token support + // if a secret manager is provided, or fail if token is the conf value but + // there is no secret manager + private EnumSet getAuthMethods(SecretManager secretManager, + Configuration conf) { + AuthenticationMethod confAuthenticationMethod = + SecurityUtil.getAuthenticationMethod(conf); + EnumSet authMethods = + EnumSet.of(confAuthenticationMethod.getAuthMethod()); + + if (confAuthenticationMethod == AuthenticationMethod.TOKEN) { + if (secretManager == null) { + throw new IllegalArgumentException(AuthenticationMethod.TOKEN + + " authentication requires a secret manager"); + } + } else if (secretManager != null) { + LOG.debug(AuthenticationMethod.TOKEN + + " authentication enabled for secret manager"); + authMethods.add(AuthenticationMethod.TOKEN.getAuthMethod()); + } + + LOG.debug("Server accepts auth methods:" + authMethods); + return authMethods; + } + private void closeConnection(Connection connection) { synchronized (connectionList) { if (connectionList.remove(connection)) @@ -2045,16 +2061,6 @@ Configuration getConf() { return conf; } - /** for unit testing only, should be called before server is started */ - void disableSecurity() { - this.isSecurityEnabled = false; - } - - /** for unit testing only, should be called before server is started */ - void enableSecurity() { - this.isSecurityEnabled = true; - } - /** Sets the socket buffer size used for responding to RPCs */ public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java index 8365fa7ccd0..c0e1a0b52d9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java @@ -145,15 +145,13 @@ public boolean saslConnect(InputStream inS, OutputStream outS) byte[] saslToken = new byte[0]; if (saslClient.hasInitialResponse()) saslToken = saslClient.evaluateChallenge(saslToken); - if (saslToken != null) { + while (saslToken != null) { outStream.writeInt(saslToken.length); outStream.write(saslToken, 0, saslToken.length); outStream.flush(); if (LOG.isDebugEnabled()) LOG.debug("Have sent token of size " + saslToken.length + " from initSASLContext."); - } - if (!saslClient.isComplete()) { readStatus(inStream); int len = inStream.readInt(); if (len == SaslRpcServer.SWITCH_TO_SIMPLE_AUTH) { @@ -161,32 +159,18 @@ public boolean saslConnect(InputStream inS, OutputStream outS) LOG.debug("Server asks us to fall back to simple auth."); saslClient.dispose(); return false; + } else if ((len == 0) && saslClient.isComplete()) { + break; } saslToken = new byte[len]; if (LOG.isDebugEnabled()) LOG.debug("Will read input token of size " + saslToken.length + " for processing by initSASLContext"); inStream.readFully(saslToken); - } - - while (!saslClient.isComplete()) { saslToken = saslClient.evaluateChallenge(saslToken); - if (saslToken != null) { - if (LOG.isDebugEnabled()) - LOG.debug("Will send token of size " + saslToken.length - + " from initSASLContext."); - outStream.writeInt(saslToken.length); - outStream.write(saslToken, 0, saslToken.length); - outStream.flush(); - } - if (!saslClient.isComplete()) { - readStatus(inStream); - saslToken = new byte[inStream.readInt()]; - if (LOG.isDebugEnabled()) - LOG.debug("Will read input token of size " + saslToken.length - + " for processing by initSASLContext"); - inStream.readFully(saslToken); - } + } + if (!saslClient.isComplete()) { // shouldn't happen + throw new SaslException("Internal negotiation error"); } if (LOG.isDebugEnabled()) { LOG.debug("SASL client context established. Negotiated QoP: " diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java index f630d695a90..bc82decd622 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java @@ -240,6 +240,7 @@ private static synchronized void initUGI(Configuration conf) { AuthenticationMethod auth = SecurityUtil.getAuthenticationMethod(conf); switch (auth) { case SIMPLE: + case TOKEN: useKerberos = false; break; case KERBEROS: diff --git a/hadoop-common-project/hadoop-common/src/site/resources/css/site.css b/hadoop-common-project/hadoop-common/src/site/resources/css/site.css new file mode 100644 index 00000000000..f830baafa8c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/site/resources/css/site.css @@ -0,0 +1,30 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#banner { + height: 93px; + background: none; +} + +#bannerLeft img { + margin-left: 30px; + margin-top: 10px; +} + +#bannerRight img { + margin: 17px; +} + diff --git a/hadoop-common-project/hadoop-common/src/site/site.xml b/hadoop-common-project/hadoop-common/src/site/site.xml new file mode 100644 index 00000000000..1296cea12cf --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/site/site.xml @@ -0,0 +1,28 @@ + + + + + org.apache.maven.skins + maven-stylus-skin + 1.2 + + + + + + + + + diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java index be3064d20b3..158e48a9376 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java @@ -569,10 +569,13 @@ public void handle(Callback[] callbacks) private static Pattern KrbFailed = Pattern.compile(".*Failed on local exception:.* " + "Failed to specify server's Kerberos principal name.*"); - private static Pattern Denied = - Pattern.compile(".*Authorization .* is enabled .*"); - private static Pattern NoDigest = - Pattern.compile(".*Server is not configured to do DIGEST auth.*"); + private static Pattern Denied(AuthenticationMethod method) { + return Pattern.compile(".*RemoteException.*AccessControlException.*: " + +method.getAuthMethod() + " authentication is not enabled.*"); + } + private static Pattern NoTokenAuth = + Pattern.compile(".*IllegalArgumentException: " + + "TOKEN authentication requires a secret manager"); /* * simple server @@ -604,13 +607,40 @@ public void testSimpleServerWithInvalidTokens() throws Exception { assertAuthEquals(SIMPLE, getAuthMethod(KERBEROS, SIMPLE, false)); } + /* + * token server + */ + @Test + public void testTokenOnlyServer() throws Exception { + assertAuthEquals(Denied(SIMPLE), getAuthMethod(SIMPLE, TOKEN)); + assertAuthEquals(KrbFailed, getAuthMethod(KERBEROS, TOKEN)); + } + + @Test + public void testTokenOnlyServerWithTokens() throws Exception { + assertAuthEquals(TOKEN, getAuthMethod(SIMPLE, TOKEN, true)); + assertAuthEquals(TOKEN, getAuthMethod(KERBEROS, TOKEN, true)); + forceSecretManager = false; + assertAuthEquals(NoTokenAuth, getAuthMethod(SIMPLE, TOKEN, true)); + assertAuthEquals(NoTokenAuth, getAuthMethod(KERBEROS, TOKEN, true)); + } + + @Test + public void testTokenOnlyServerWithInvalidTokens() throws Exception { + assertAuthEquals(BadToken, getAuthMethod(SIMPLE, TOKEN, false)); + assertAuthEquals(BadToken, getAuthMethod(KERBEROS, TOKEN, false)); + forceSecretManager = false; + assertAuthEquals(NoTokenAuth, getAuthMethod(SIMPLE, TOKEN, false)); + assertAuthEquals(NoTokenAuth, getAuthMethod(KERBEROS, TOKEN, false)); + } + /* * kerberos server */ @Test public void testKerberosServer() throws Exception { - assertAuthEquals(Denied, getAuthMethod(SIMPLE, KERBEROS)); - assertAuthEquals(KrbFailed, getAuthMethod(KERBEROS, KERBEROS)); + assertAuthEquals(Denied(SIMPLE), getAuthMethod(SIMPLE, KERBEROS)); + assertAuthEquals(KrbFailed, getAuthMethod(KERBEROS, KERBEROS)); } @Test @@ -620,8 +650,8 @@ public void testKerberosServerWithTokens() throws Exception { assertAuthEquals(TOKEN, getAuthMethod(KERBEROS, KERBEROS, true)); // can't fallback to simple when using kerberos w/o tokens forceSecretManager = false; - assertAuthEquals(NoDigest, getAuthMethod(SIMPLE, KERBEROS, true)); - assertAuthEquals(NoDigest, getAuthMethod(KERBEROS, KERBEROS, true)); + assertAuthEquals(Denied(TOKEN), getAuthMethod(SIMPLE, KERBEROS, true)); + assertAuthEquals(Denied(TOKEN), getAuthMethod(KERBEROS, KERBEROS, true)); } @Test @@ -629,8 +659,8 @@ public void testKerberosServerWithInvalidTokens() throws Exception { assertAuthEquals(BadToken, getAuthMethod(SIMPLE, KERBEROS, false)); assertAuthEquals(BadToken, getAuthMethod(KERBEROS, KERBEROS, false)); forceSecretManager = false; - assertAuthEquals(NoDigest, getAuthMethod(SIMPLE, KERBEROS, true)); - assertAuthEquals(NoDigest, getAuthMethod(KERBEROS, KERBEROS, true)); + assertAuthEquals(Denied(TOKEN), getAuthMethod(SIMPLE, KERBEROS, false)); + assertAuthEquals(Denied(TOKEN), getAuthMethod(KERBEROS, KERBEROS, false)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/site/resources/css/site.css b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/site/resources/css/site.css new file mode 100644 index 00000000000..f830baafa8c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/site/resources/css/site.css @@ -0,0 +1,30 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#banner { + height: 93px; + background: none; +} + +#bannerLeft img { + margin-left: 30px; + margin-top: 10px; +} + +#bannerRight img { + margin: 17px; +} + diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2e407129b86..c01fdd441f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -464,6 +464,9 @@ Release 2.0.3-alpha - Unreleased HDFS-4048. Use ERROR instead of INFO for volume failure logs. (Stephen Chu via eli) + HDFS-1322. Document umask in DistributedFileSystem#mkdirs javadocs. + (Colin Patrick McCabe via eli) + OPTIMIZATIONS BUG FIXES @@ -577,6 +580,12 @@ Release 2.0.3-alpha - Unreleased HDFS-4162. Some malformed and unquoted HTML strings are returned from datanode web ui. (Darek Dagit via suresh) + HDFS-4164. fuse_dfs: add -lrt to the compiler command line on Linux. + (Colin Patrick McCabe via eli) + + HDFS-3921. NN will prematurely consider blocks missing when entering active + state while still in safe mode. (atm) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES @@ -1971,6 +1980,9 @@ Release 0.23.5 - UNRELEASED HDFS-3990. NN's health report has severe performance problems (daryn) + HDFS-4181. LeaseManager tries to double remove and prints extra messages + (Kihwal Lee via daryn) + BUG FIXES HDFS-3829. TestHftpURLTimeouts fails intermittently with JDK7 (Trevor @@ -1985,6 +1997,9 @@ Release 0.23.5 - UNRELEASED HDFS-4090. getFileChecksum() result incompatible when called against zero-byte files. (Kihwal Lee via daryn) + HDFS-4172. namenode does not URI-encode parameters when building URI for + datanode request (Derek Dagit via bobby) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index f45e86b5ea8..3524acbe5ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -509,14 +509,32 @@ public LocatedFileStatus next() throws IOException { } /** - * Create a directory with given name and permission, only when - * parent directory exists. + * Create a directory, only when the parent directories exist. + * + * See {@link FsPermission#applyUMask(FsPermission)} for details of how + * the permission is applied. + * + * @param f The path to create + * @param permission The permission. See FsPermission#applyUMask for + * details about how this is used to calculate the + * effective permission. */ public boolean mkdir(Path f, FsPermission permission) throws IOException { statistics.incrementWriteOps(1); return dfs.mkdirs(getPathName(f), permission, false); } + /** + * Create a directory and its parent directories. + * + * See {@link FsPermission#applyUMask(FsPermission)} for details of how + * the permission is applied. + * + * @param f The path to create + * @param permission The permission. See FsPermission#applyUMask for + * details about how this is used to calculate the + * effective permission. + */ @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { statistics.incrementWriteOps(1); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index d86e5c7276e..38d5f2621c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -666,13 +666,17 @@ void startActiveServices() throws IOException { LOG.info("Catching up to latest edits from old active before " + "taking over writer role in edits logs"); editLogTailer.catchupDuringFailover(); - blockManager.setPostponeBlocksFromFuture(false); - LOG.info("Reprocessing replication and invalidation queues"); + blockManager.setPostponeBlocksFromFuture(false); blockManager.getDatanodeManager().markAllDatanodesStale(); blockManager.clearQueues(); blockManager.processAllPendingDNMessages(); - blockManager.processMisReplicatedBlocks(); + + if (!isInSafeMode() || + (isInSafeMode() && safeMode.isPopulatingReplQueues())) { + LOG.info("Reprocessing replication and invalidation queues"); + blockManager.processMisReplicatedBlocks(); + } if (LOG.isDebugEnabled()) { LOG.debug("NameNode metadata after re-processing " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index 4dbee88d6fa..13fff598f46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -135,7 +135,9 @@ synchronized Lease addLease(String holder, String src) { synchronized void removeLease(Lease lease, String src) { sortedLeasesByPath.remove(src); if (!lease.removePath(src)) { - LOG.error(src + " not found in lease.paths (=" + lease.paths + ")"); + if (LOG.isDebugEnabled()) { + LOG.debug(src + " not found in lease.paths (=" + lease.paths + ")"); + } } if (!lease.hasPath()) { @@ -440,11 +442,14 @@ private synchronized void checkLeases() { oldest.getPaths().toArray(leasePaths); for(String p : leasePaths) { try { - if(fsnamesystem.internalReleaseLease(oldest, p, HdfsServerConstants.NAMENODE_LEASE_HOLDER)) { - LOG.info("Lease recovery for " + p + " is complete. File closed."); - removing.add(p); - } else { - LOG.info("Started block recovery " + p + " lease " + oldest); + boolean completed = fsnamesystem.internalReleaseLease(oldest, p, + HdfsServerConstants.NAMENODE_LEASE_HOLDER); + if (LOG.isDebugEnabled()) { + if (completed) { + LOG.debug("Lease recovery for " + p + " is complete. File closed."); + } else { + LOG.debug("Started block recovery " + p + " lease " + oldest); + } } } catch (IOException e) { LOG.error("Cannot release the path " + p + " in the lease " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/BooleanParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/BooleanParam.java index 14dfdf5334e..3437a0c97a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/BooleanParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/BooleanParam.java @@ -22,6 +22,12 @@ abstract class BooleanParam extends Param { static final String TRUE = "true"; static final String FALSE = "false"; + /** @return the parameter value as a string */ + @Override + public String getValueString() { + return value.toString(); + } + BooleanParam(final Domain domain, final Boolean value) { super(domain, value); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/EnumSetParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/EnumSetParam.java index feb4128e519..a983d4314dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/EnumSetParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/EnumSetParam.java @@ -53,6 +53,11 @@ public String toString() { return getName() + "=" + toString(value); } + /** @return the parameter value as a string */ + @Override + public String getValueString() { + return toString(value); + } /** The domain of the parameter. */ static final class Domain> extends Param.Domain> { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java index 9765f67996c..1d029ec65cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java @@ -114,6 +114,12 @@ public String toQueryString() { } } + /** @return the parameter value as a string */ + @Override + public String getValueString() { + return value.toString(); + } + HttpOpParam(final Domain domain, final E value) { super(domain, value); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java index 9879ba3032c..f7c09d17641 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java @@ -31,6 +31,12 @@ public String toString() { return getName() + "=" + Domain.toString(getValue()); } + /** @return the parameter value as a string */ + @Override + public String getValueString() { + return Domain.toString(getValue()); + } + /** The domain of the parameter. */ static final class Domain extends Param.Domain { Domain(final String paramName) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/IntegerParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/IntegerParam.java index b80b1a254aa..452552f713d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/IntegerParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/IntegerParam.java @@ -44,6 +44,12 @@ public String toString() { return getName() + "=" + domain.toString(getValue()); } + /** @return the parameter value as a string */ + @Override + public String getValueString() { + return domain.toString(getValue()); + } + /** The domain of the parameter. */ static final class Domain extends Param.Domain { /** The radix of the number. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/LongParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/LongParam.java index 6f102e1c9f3..afb881429a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/LongParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/LongParam.java @@ -43,6 +43,12 @@ public String toString() { return getName() + "=" + domain.toString(getValue()); } + /** @return the parameter value as a string */ + @Override + public String getValueString() { + return domain.toString(getValue()); + } + /** The domain of the parameter. */ static final class Domain extends Param.Domain { /** The radix of the number. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/Param.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/Param.java index b5fd1da241b..79a831b94a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/Param.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/Param.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.web.resources; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; import java.util.Arrays; import java.util.Comparator; @@ -32,16 +34,29 @@ public int compare(Param left, Param right) { } }; - /** Convert the parameters to a sorted String. */ + /** Convert the parameters to a sorted String. + * + * @param separator URI parameter separator character + * @param parameters parameters to encode into a string + * @return the encoded URI string + */ public static String toSortedString(final String separator, final Param... parameters) { Arrays.sort(parameters, NAME_CMP); final StringBuilder b = new StringBuilder(); - for(Param p : parameters) { - if (p.getValue() != null) { - b.append(separator).append(p); + try { + for(Param p : parameters) { + if (p.getValue() != null) { + b.append(separator).append( + URLEncoder.encode(p.getName(), "UTF-8") + + "=" + + URLEncoder.encode(p.getValueString(), "UTF-8")); + } } - } + } catch (UnsupportedEncodingException e) { + // Sane systems know about UTF-8, so this should never happen. + throw new RuntimeException(e); + } return b.toString(); } @@ -60,6 +75,9 @@ public final T getValue() { return value; } + /** @return the parameter value as a string */ + public abstract String getValueString(); + /** @return the parameter name. */ public abstract String getName(); @@ -101,4 +119,4 @@ public final T parse(final String varName, final String str) { } } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ShortParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ShortParam.java index c1749cf18eb..bb42223dd2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ShortParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ShortParam.java @@ -44,6 +44,12 @@ public String toString() { return getName() + "=" + domain.toString(getValue()); } + /** @return the parameter value as a string */ + @Override + public final String getValueString() { + return domain.toString(getValue()); + } + /** The domain of the parameter. */ static final class Domain extends Param.Domain { /** The radix of the number. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/StringParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/StringParam.java index d4303f14079..1c9fbe401da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/StringParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/StringParam.java @@ -25,6 +25,12 @@ abstract class StringParam extends Param { super(domain, domain.parse(str)); } + /** @return the parameter value as a string */ + @Override + public String getValueString() { + return value; + } + /** The domain of the parameter. */ static final class Domain extends Param.Domain { /** The pattern defining the domain; null . */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt index f3a398667b5..39828dcd66c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt @@ -70,6 +70,7 @@ IF(FUSE_FOUND) hdfs m pthread + rt ) add_executable(test_fuse_dfs test/test_fuse_dfs.c diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/resources/css/site.css b/hadoop-hdfs-project/hadoop-hdfs/src/site/resources/css/site.css new file mode 100644 index 00000000000..f830baafa8c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/resources/css/site.css @@ -0,0 +1,30 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#banner { + height: 93px; + background: none; +} + +#bannerLeft img { + margin-left: 30px; + margin-top: 10px; +} + +#bannerRight img { + margin: 17px; +} + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/site.xml b/hadoop-hdfs-project/hadoop-hdfs/src/site/site.xml new file mode 100644 index 00000000000..1296cea12cf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/site.xml @@ -0,0 +1,28 @@ + + + + + org.apache.maven.skins + maven-stylus-skin + 1.2 + + + + + + + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java index f385ea43a2a..75e410d5a6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java @@ -630,6 +630,32 @@ public Boolean get() { assertEquals(0L, nn1.getNamesystem().getPendingReplicationBlocks()); } + /** + * Make sure that when we transition to active in safe mode that we don't + * prematurely consider blocks missing just because not all DNs have reported + * yet. + * + * This is a regression test for HDFS-3921. + */ + @Test + public void testNoPopulatingReplQueuesWhenStartingActiveInSafeMode() + throws IOException { + DFSTestUtil.createFile(fs, new Path("/test"), 15*BLOCK_SIZE, (short)3, 1L); + + // Stop the DN so that when the NN restarts not all blocks wil be reported + // and the NN won't leave safe mode. + cluster.stopDataNode(1); + // Restart the namenode but don't wait for it to hear from all DNs (since + // one DN is deliberately shut down.) + cluster.restartNameNode(0, false); + cluster.transitionToActive(0); + + assertTrue(cluster.getNameNode(0).isInSafeMode()); + // We shouldn't yet consider any blocks "missing" since we're in startup + // safemode, i.e. not all DNs may have reported. + assertEquals(0, cluster.getNamesystem(0).getMissingBlocksCount()); + } + /** * Print a big banner in the test log to make debug easier. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/resources/TestParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/resources/TestParam.java index 646edd42825..a6825f3ec83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/resources/TestParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/resources/TestParam.java @@ -224,4 +224,14 @@ public void testReplicationParam() { LOG.info("EXPECTED: " + e); } } + + @Test + public void testToSortedStringEscapesURICharacters() { + final String sep = "&"; + Param ampParam = new TokenArgumentParam("token&ersand"); + Param equalParam = new RenewerParam("renewer=equal"); + final String expected = "&renewer=renewer%3Dequal&token=token%26ampersand"; + final String actual = Param.toSortedString(sep, equalParam, ampParam); + Assert.assertEquals(expected, actual); + } } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 985cd28a6f6..961dc88675a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -650,6 +650,16 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED state (jlowe via bobby) + + MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby) + + MAPREDUCE-4787. TestJobMonitorAndPrint is broken (Rob Parker via bobby) + + MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe + via bobby) + + MAPREDUCE-4786. Job End Notification retry interval is 5 milliseconds by + default (Ravi Prakash via bobby) Release 0.23.4 - UNRELEASED diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java index ca7f625c279..518305f9589 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java @@ -53,7 +53,7 @@ public class JobEndNotifier implements Configurable { protected String userUrl; protected String proxyConf; protected int numTries; //Number of tries to attempt notification - protected int waitInterval; //Time to wait between retrying notification + protected int waitInterval; //Time (ms) to wait between retrying notification protected URL urlToNotify; //URL to notify read from the config protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification @@ -71,10 +71,10 @@ public void setConf(Configuration conf) { , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, 1) ); waitInterval = Math.min( - conf.getInt(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, 5) - , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, 5) + conf.getInt(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, 5000) + , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, 5000) ); - waitInterval = (waitInterval < 0) ? 5 : waitInterval; + waitInterval = (waitInterval < 0) ? 5000 : waitInterval; userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 19488b2e81e..b46ee466ae2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -712,7 +712,10 @@ protected void scheduleTasks(Set taskIDs) { * The only entry point to change the Job. */ public void handle(JobEvent event) { - LOG.debug("Processing " + event.getJobId() + " of type " + event.getType()); + if (LOG.isDebugEnabled()) { + LOG.debug("Processing " + event.getJobId() + " of type " + + event.getType()); + } try { writeLock.lock(); JobStateInternal oldState = getInternalState(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index e9be7d18fc2..e2ebeb554cf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -22,9 +22,11 @@ import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -118,9 +120,18 @@ public abstract class TaskImpl implements Task, EventHandler { protected Credentials credentials; protected Token jobToken; + //should be set to one which comes first + //saying COMMIT_PENDING + private TaskAttemptId commitAttempt; + + private TaskAttemptId successfulAttempt; + + private final Set failedAttempts; + // Track the finished attempts - successful, failed and killed + private final Set finishedAttempts; // counts the number of attempts that are either running or in a state where // they will come to be running when they get a Container - private int numberUncompletedAttempts = 0; + private final Set inProgressAttempts; private boolean historyTaskStartGenerated = false; @@ -182,6 +193,14 @@ TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition()) EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED), TaskEventType.T_ATTEMPT_KILLED, new KillWaitAttemptKilledTransition()) + .addTransition(TaskStateInternal.KILL_WAIT, + EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED), + TaskEventType.T_ATTEMPT_SUCCEEDED, + new KillWaitAttemptSucceededTransition()) + .addTransition(TaskStateInternal.KILL_WAIT, + EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED), + TaskEventType.T_ATTEMPT_FAILED, + new KillWaitAttemptFailedTransition()) // Ignore-able transitions. .addTransition( TaskStateInternal.KILL_WAIT, @@ -189,8 +208,6 @@ TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition()) EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_LAUNCHED, TaskEventType.T_ATTEMPT_COMMIT_PENDING, - TaskEventType.T_ATTEMPT_FAILED, - TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ADD_SPEC_ATTEMPT)) // Transitions from SUCCEEDED state @@ -200,13 +217,15 @@ TaskEventType.T_ATTEMPT_FAILED, new RetroactiveFailureTransition()) .addTransition(TaskStateInternal.SUCCEEDED, EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED), TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition()) + .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED, + TaskEventType.T_ATTEMPT_SUCCEEDED, + new AttemptSucceededAtSucceededTransition()) // Ignore-able transitions. .addTransition( TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED, EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT, TaskEventType.T_ATTEMPT_COMMIT_PENDING, TaskEventType.T_ATTEMPT_LAUNCHED, - TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_KILL)) // Transitions from FAILED state @@ -242,15 +261,6 @@ public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) { private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR = new RecoverdAttemptsComparator(); - //should be set to one which comes first - //saying COMMIT_PENDING - private TaskAttemptId commitAttempt; - - private TaskAttemptId successfulAttempt; - - private int failedAttempts; - private int finishedAttempts;//finish are total of success, failed and killed - @Override public TaskState getState() { readLock.lock(); @@ -275,6 +285,9 @@ public TaskImpl(JobId jobId, TaskType taskType, int partition, readLock = readWriteLock.readLock(); writeLock = readWriteLock.writeLock(); this.attempts = Collections.emptyMap(); + this.finishedAttempts = new HashSet(2); + this.failedAttempts = new HashSet(2); + this.inProgressAttempts = new HashSet(2); // This overridable method call is okay in a constructor because we // have a convention that none of the overrides depends on any // fields that need initialization. @@ -611,9 +624,9 @@ private void addAndScheduleAttempt() { taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId(); } - ++numberUncompletedAttempts; + inProgressAttempts.add(attempt.getID()); //schedule the nextAttemptNumber - if (failedAttempts > 0) { + if (failedAttempts.size() > 0) { eventHandler.handle(new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_RESCHEDULE)); } else { @@ -788,12 +801,14 @@ private static class AttemptSucceededTransition implements SingleArcTransition { @Override public void transition(TaskImpl task, TaskEvent event) { + TaskTAttemptEvent taskTAttemptEvent = (TaskTAttemptEvent) event; + TaskAttemptId taskAttemptId = taskTAttemptEvent.getTaskAttemptID(); task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), + taskAttemptId, TaskAttemptCompletionEventStatus.SUCCEEDED); - task.finishedAttempts++; - --task.numberUncompletedAttempts; - task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID(); + task.finishedAttempts.add(taskAttemptId); + task.inProgressAttempts.remove(taskAttemptId); + task.successfulAttempt = taskAttemptId; task.eventHandler.handle(new JobTaskEvent( task.taskId, TaskState.SUCCEEDED)); LOG.info("Task succeeded with attempt " + task.successfulAttempt); @@ -824,11 +839,13 @@ private static class AttemptKilledTransition implements SingleArcTransition { @Override public void transition(TaskImpl task, TaskEvent event) { + TaskAttemptId taskAttemptId = + ((TaskTAttemptEvent) event).getTaskAttemptID(); task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), + taskAttemptId, TaskAttemptCompletionEventStatus.KILLED); - task.finishedAttempts++; - --task.numberUncompletedAttempts; + task.finishedAttempts.add(taskAttemptId); + task.inProgressAttempts.remove(taskAttemptId); if (task.successfulAttempt == null) { task.addAndScheduleAttempt(); } @@ -840,15 +857,25 @@ private static class KillWaitAttemptKilledTransition implements MultipleArcTransition { protected TaskStateInternal finalState = TaskStateInternal.KILLED; + protected final TaskAttemptCompletionEventStatus taCompletionEventStatus; + + public KillWaitAttemptKilledTransition() { + this(TaskAttemptCompletionEventStatus.KILLED); + } + + public KillWaitAttemptKilledTransition( + TaskAttemptCompletionEventStatus taCompletionEventStatus) { + this.taCompletionEventStatus = taCompletionEventStatus; + } @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { - task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), - TaskAttemptCompletionEventStatus.KILLED); - task.finishedAttempts++; + TaskAttemptId taskAttemptId = + ((TaskTAttemptEvent) event).getTaskAttemptID(); + task.handleTaskAttemptCompletion(taskAttemptId, taCompletionEventStatus); + task.finishedAttempts.add(taskAttemptId); // check whether all attempts are finished - if (task.finishedAttempts == task.attempts.size()) { + if (task.finishedAttempts.size() == task.attempts.size()) { if (task.historyTaskStartGenerated) { TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null, finalState, null); // TODO JH verify failedAttempt null @@ -867,43 +894,57 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) { } } + private static class KillWaitAttemptSucceededTransition extends + KillWaitAttemptKilledTransition { + public KillWaitAttemptSucceededTransition() { + super(TaskAttemptCompletionEventStatus.SUCCEEDED); + } + } + + private static class KillWaitAttemptFailedTransition extends + KillWaitAttemptKilledTransition { + public KillWaitAttemptFailedTransition() { + super(TaskAttemptCompletionEventStatus.FAILED); + } + } + private static class AttemptFailedTransition implements MultipleArcTransition { @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { - task.failedAttempts++; TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; - if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) { + TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID(); + task.failedAttempts.add(taskAttemptId); + if (taskAttemptId.equals(task.commitAttempt)) { task.commitAttempt = null; } - TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID()); + TaskAttempt attempt = task.attempts.get(taskAttemptId); if (attempt.getAssignedContainerMgrAddress() != null) { //container was assigned task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(), attempt.getAssignedContainerMgrAddress())); } - task.finishedAttempts++; - if (task.failedAttempts < task.maxAttempts) { + task.finishedAttempts.add(taskAttemptId); + if (task.failedAttempts.size() < task.maxAttempts) { task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), + taskAttemptId, TaskAttemptCompletionEventStatus.FAILED); // we don't need a new event if we already have a spare - if (--task.numberUncompletedAttempts == 0 + task.inProgressAttempts.remove(taskAttemptId); + if (task.inProgressAttempts.size() == 0 && task.successfulAttempt == null) { task.addAndScheduleAttempt(); } } else { task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), + taskAttemptId, TaskAttemptCompletionEventStatus.TIPFAILED); - TaskTAttemptEvent ev = (TaskTAttemptEvent) event; - TaskAttemptId taId = ev.getTaskAttemptID(); if (task.historyTaskStartGenerated) { TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(), - TaskStateInternal.FAILED, taId); + TaskStateInternal.FAILED, taskAttemptId); task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), taskFailedEvent)); } else { @@ -927,14 +968,14 @@ private static class RetroactiveFailureTransition @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { - if (event instanceof TaskTAttemptEvent) { - TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; - if (task.getInternalState() == TaskStateInternal.SUCCEEDED && - !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) { - // don't allow a different task attempt to override a previous - // succeeded state - return TaskStateInternal.SUCCEEDED; - } + TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; + if (task.getInternalState() == TaskStateInternal.SUCCEEDED && + !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) { + // don't allow a different task attempt to override a previous + // succeeded state + task.finishedAttempts.add(castEvent.getTaskAttemptID()); + task.inProgressAttempts.remove(castEvent.getTaskAttemptID()); + return TaskStateInternal.SUCCEEDED; } // a successful REDUCE task should not be overridden @@ -953,7 +994,7 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) { // believe that there's no redundancy. unSucceed(task); // fake increase in Uncomplete attempts for super.transition - ++task.numberUncompletedAttempts; + task.inProgressAttempts.add(castEvent.getTaskAttemptID()); return super.transition(task, event); } @@ -976,6 +1017,8 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) { !attemptId.equals(task.successfulAttempt)) { // don't allow a different task attempt to override a previous // succeeded state + task.finishedAttempts.add(castEvent.getTaskAttemptID()); + task.inProgressAttempts.remove(castEvent.getTaskAttemptID()); return TaskStateInternal.SUCCEEDED; } } @@ -1006,6 +1049,16 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) { } } + private static class AttemptSucceededAtSucceededTransition + implements SingleArcTransition { + @Override + public void transition(TaskImpl task, TaskEvent event) { + TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; + task.finishedAttempts.add(castEvent.getTaskAttemptID()); + task.inProgressAttempts.remove(castEvent.getTaskAttemptID()); + } + } + private static class KillNewTransition implements SingleArcTransition { @Override @@ -1045,7 +1098,7 @@ public void transition(TaskImpl task, TaskEvent event) { (attempt, "Task KILL is received. Killing attempt!"); } - task.numberUncompletedAttempts = 0; + task.inProgressAttempts.clear(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 2fe3dcf1547..67400846426 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; +import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; @@ -63,6 +64,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; +import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; @@ -243,6 +245,39 @@ public Job submit(Configuration conf) throws Exception { return job; } + public void waitForInternalState(JobImpl job, + JobStateInternal finalState) throws Exception { + int timeoutSecs = 0; + JobStateInternal iState = job.getInternalState(); + while (!finalState.equals(iState) && timeoutSecs++ < 20) { + System.out.println("Job Internal State is : " + iState + + " Waiting for Internal state : " + finalState); + Thread.sleep(500); + iState = job.getInternalState(); + } + System.out.println("Task Internal State is : " + iState); + Assert.assertEquals("Task Internal state is not correct (timedout)", + finalState, iState); + } + + public void waitForInternalState(TaskImpl task, + TaskStateInternal finalState) throws Exception { + int timeoutSecs = 0; + TaskReport report = task.getReport(); + TaskStateInternal iState = task.getInternalState(); + while (!finalState.equals(iState) && timeoutSecs++ < 20) { + System.out.println("Task Internal State is : " + iState + + " Waiting for Internal state : " + finalState + " progress : " + + report.getProgress()); + Thread.sleep(500); + report = task.getReport(); + iState = task.getInternalState(); + } + System.out.println("Task Internal State is : " + iState); + Assert.assertEquals("Task Internal state is not correct (timedout)", + finalState, iState); + } + public void waitForInternalState(TaskAttemptImpl attempt, TaskAttemptStateInternal finalState) throws Exception { int timeoutSecs = 0; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java index 6d92d0de730..65acc623c36 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java @@ -55,22 +55,22 @@ private void testNumRetries(Configuration conf) { //Test maximum retry interval is capped by //MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL private void testWaitInterval(Configuration conf) { - conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "5"); - conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "1"); + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "5000"); + conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "1000"); setConf(conf); - Assert.assertTrue("Expected waitInterval to be 1, but was " + waitInterval, - waitInterval == 1); + Assert.assertTrue("Expected waitInterval to be 1000, but was " + + waitInterval, waitInterval == 1000); - conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "10"); + conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "10000"); setConf(conf); - Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval, - waitInterval == 5); + Assert.assertTrue("Expected waitInterval to be 5000, but was " + + waitInterval, waitInterval == 5000); //Test negative numbers are set to default conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "-10"); setConf(conf); - Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval, - waitInterval == 5); + Assert.assertTrue("Expected waitInterval to be 5000, but was " + + waitInterval, waitInterval == 5000); } private void testProxyConfiguration(Configuration conf) { @@ -125,17 +125,28 @@ protected boolean notifyURLOnce() { public void testNotifyRetries() throws InterruptedException { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent"); - conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3"); - conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3"); - conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "3000"); - conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "3000"); JobReport jobReport = Mockito.mock(JobReport.class); - + long startTime = System.currentTimeMillis(); this.notificationCount = 0; this.setConf(conf); this.notify(jobReport); long endTime = System.currentTimeMillis(); + Assert.assertEquals("Only 1 try was expected but was : " + + this.notificationCount, this.notificationCount, 1); + Assert.assertTrue("Should have taken more than 5 seconds it took " + + (endTime - startTime), endTime - startTime > 5000); + + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3"); + conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3"); + conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "3000"); + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "3000"); + + startTime = System.currentTimeMillis(); + this.notificationCount = 0; + this.setConf(conf); + this.notify(jobReport); + endTime = System.currentTimeMillis(); Assert.assertEquals("Only 3 retries were expected but was : " + this.notificationCount, this.notificationCount, 3); Assert.assertTrue("Should have taken more than 9 seconds it took " diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java index 3533c285300..f9bc0338705 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java @@ -25,12 +25,15 @@ import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; @@ -39,12 +42,18 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; import org.junit.Test; /** * Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios. * */ +@SuppressWarnings({"unchecked", "rawtypes"}) public class TestKill { @Test @@ -131,6 +140,80 @@ public void testKillTask() throws Exception { iter.next().getReport().getTaskAttemptState()); } + @Test + public void testKillTaskWait() throws Exception { + final Dispatcher dispatcher = new AsyncDispatcher() { + private TaskAttemptEvent cachedKillEvent; + @Override + protected void dispatch(Event event) { + if (event instanceof TaskAttemptEvent) { + TaskAttemptEvent killEvent = (TaskAttemptEvent) event; + if (killEvent.getType() == TaskAttemptEventType.TA_KILL) { + TaskAttemptId taID = killEvent.getTaskAttemptID(); + if (taID.getTaskId().getTaskType() == TaskType.REDUCE + && taID.getTaskId().getId() == 0 && taID.getId() == 0) { + // Task is asking the reduce TA to kill itself. 'Create' a race + // condition. Make the task succeed and then inform the task that + // TA has succeeded. Once Task gets the TA succeeded event at + // KILL_WAIT, then relay the actual kill signal to TA + super.dispatch(new TaskAttemptEvent(taID, + TaskAttemptEventType.TA_DONE)); + super.dispatch(new TaskAttemptEvent(taID, + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + super.dispatch(new TaskTAttemptEvent(taID, + TaskEventType.T_ATTEMPT_SUCCEEDED)); + this.cachedKillEvent = killEvent; + return; + } + } + } else if (event instanceof TaskEvent) { + TaskEvent taskEvent = (TaskEvent) event; + if (taskEvent.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED + && this.cachedKillEvent != null) { + // When the TA comes and reports that it is done, send the + // cachedKillEvent + super.dispatch(this.cachedKillEvent); + return; + } + + } + super.dispatch(event); + } + }; + MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { + @Override + public Dispatcher createDispatcher() { + return dispatcher; + } + }; + Job job = app.submit(new Configuration()); + JobId jobId = app.getJobId(); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask = it.next(); + Task reduceTask = it.next(); + app.waitForState(mapTask, TaskState.RUNNING); + app.waitForState(reduceTask, TaskState.RUNNING); + TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt, TaskAttemptState.RUNNING); + TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next(); + app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); + + // Finish map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapAttempt.getID(), + TaskAttemptEventType.TA_DONE)); + app.waitForState(mapTask, TaskState.SUCCEEDED); + + // Now kill the job + app.getContext().getEventHandler() + .handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + + app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); + } + @Test public void testKillTaskAttempt() throws Exception { final CountDownLatch latch = new CountDownLatch(1); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index 87f93c676e8..5e416d99e83 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -141,7 +141,6 @@ private class MockTaskAttemptImpl extends TaskAttemptImpl { private float progress = 0; private TaskAttemptState state = TaskAttemptState.NEW; - private TaskAttemptId attemptId; private TaskType taskType; public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler, @@ -152,14 +151,11 @@ public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler, AppContext appContext, TaskType taskType) { super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf, dataLocations, committer, jobToken, credentials, clock, appContext); - attemptId = Records.newRecord(TaskAttemptId.class); - attemptId.setId(id); - attemptId.setTaskId(taskId); this.taskType = taskType; } public TaskAttemptId getAttemptId() { - return attemptId; + return getID(); } @Override @@ -561,4 +557,49 @@ public void testCommitAfterSucceeds() { mockTask = createMockTask(TaskType.REDUCE); runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING); } + + @Test + public void testSpeculativeMapFetchFailure() { + // Setup a scenario where speculative task wins, first attempt killed + mockTask = createMockTask(TaskType.MAP); + runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED); + assertEquals(2, taskAttempts.size()); + + // speculative attempt retroactively fails from fetch failures + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + + assertTaskScheduledState(); + assertEquals(3, taskAttempts.size()); + } + + @Test + public void testSpeculativeMapMultipleSucceedFetchFailure() { + // Setup a scenario where speculative task wins, first attempt succeeds + mockTask = createMockTask(TaskType.MAP); + runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED); + assertEquals(2, taskAttempts.size()); + + // speculative attempt retroactively fails from fetch failures + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + + assertTaskScheduledState(); + assertEquals(3, taskAttempts.size()); + } + + @Test + public void testSpeculativeMapFailedFetchFailure() { + // Setup a scenario where speculative task wins, first attempt succeeds + mockTask = createMockTask(TaskType.MAP); + runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED); + assertEquals(2, taskAttempts.size()); + + // speculative attempt retroactively fails from fetch failures + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + + assertTaskScheduledState(); + assertEquals(3, taskAttempts.size()); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml index dbe27370bd1..b84ca7db754 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml @@ -51,7 +51,7 @@ maven-surefire-plugin - file:///${project.parent.basedir}/../src/test/log4j.properties + file:///${project.basedir}/src/test/resources/log4j.properties diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 8852d3980b9..85330457aea 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -610,36 +610,6 @@ - - - - - - mapreduce.job.end-notification.retry.attempts - 0 - Indicates how many times hadoop should attempt to contact the - notification URL - - - - mapreduce.job.end-notification.retry.interval - 30000 - Indicates time in milliseconds between notification URL retry - calls - - - mapreduce.job.queuename default @@ -802,6 +772,34 @@ + + + mapreduce.job.end-notification.url + + Indicates url which will be called on completion of job to inform + end status of job. + User can give at most 2 variables with URI : $jobId and $jobStatus. + If they are present in URI, then they will be replaced by their + respective values. + + + + + mapreduce.job.end-notification.retry.attempts + 0 + The number of times the submitter of the job wants to retry job + end notification if it fails. This is capped by + mapreduce.job.end-notification.max.attempts + + + + mapreduce.job.end-notification.retry.interval + 1000 + The number of milliseconds the submitter of the job wants to + wait before job end notification is retried if it fails. This is capped by + mapreduce.job.end-notification.max.retry.interval + + mapreduce.job.end-notification.max.attempts 5 @@ -815,36 +813,12 @@ mapreduce.job.end-notification.max.retry.interval - 5 + 5000 true - The maximum amount of time (in seconds) to wait before retrying - job end notification. Cluster administrators can set this to limit how long - the Application Master waits before exiting. Must be marked as final to - prevent users from overriding this. - - - - mapreduce.job.end-notification.url - - The URL to send job end notification. It may contain sentinels - $jobId and $jobStatus which will be replaced with jobId and jobStatus. - - - - - mapreduce.job.end-notification.retry.attempts - 5 - The number of times the submitter of the job wants to retry job - end notification if it fails. This is capped by - mapreduce.job.end-notification.max.attempts - - - - mapreduce.job.end-notification.retry.interval - 1 - The number of seconds the submitter of the job wants to wait - before job end notification is retried if it fails. This is capped by - mapreduce.job.end-notification.max.retry.interval + The maximum amount of time (in milliseconds) to wait before + retrying job end notification. Cluster administrators can set this to + limit how long the Application Master waits before exiting. Must be marked + as final to prevent users from overriding this. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/resources/css/site.css b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/resources/css/site.css new file mode 100644 index 00000000000..f830baafa8c --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/resources/css/site.css @@ -0,0 +1,30 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#banner { + height: 93px; + background: none; +} + +#bannerLeft img { + margin-left: 30px; + margin-top: 10px; +} + +#bannerRight img { + margin: 17px; +} + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/site.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/site.xml new file mode 100644 index 00000000000..1296cea12cf --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/site.xml @@ -0,0 +1,28 @@ + + + + + org.apache.maven.skins + maven-stylus-skin + 1.2 + + + + + + + + + diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 87c3e8a1d57..f605767e1cc 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -274,7 +274,7 @@ public int getMaxMaps() { * @param maxMaps - Number of maps */ public void setMaxMaps(int maxMaps) { - this.maxMaps = maxMaps; + this.maxMaps = Math.max(maxMaps, 1); } /** Get the map bandwidth in MB diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java index 32909818301..a0dfec82204 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java @@ -131,8 +131,8 @@ public void testDuplicates() { fs = FileSystem.get(getConf()); List srcPaths = new ArrayList(); srcPaths.add(new Path("/tmp/in/*/*")); - TestDistCpUtils.createFile(fs, "/tmp/in/1.txt"); - TestDistCpUtils.createFile(fs, "/tmp/in/src/1.txt"); + TestDistCpUtils.createFile(fs, "/tmp/in/src1/1.txt"); + TestDistCpUtils.createFile(fs, "/tmp/in/src2/1.txt"); Path target = new Path("/tmp/out"); Path listingFile = new Path("/tmp/list"); DistCpOptions options = new DistCpOptions(srcPaths, target); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index 5ab345c7246..fb6e29212eb 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -275,6 +275,13 @@ public void testParseMaps() { "hdfs://localhost:8020/target/"}); Assert.assertEquals(options.getMaxMaps(), 1); + options = OptionsParser.parse(new String[] { + "-m", + "0", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getMaxMaps(), 1); + try { OptionsParser.parse(new String[] { "-m", diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 03d293aa6ae..38285936b02 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -216,6 +216,9 @@ Release 0.23.5 - UNRELEASED YARN-206. TestApplicationCleanup.testContainerCleanup occasionally fails. (jlowe via jeagles) + YARN-212. NM state machine ignores an APPLICATION_CONTAINER_FINISHED event + when it shouldn't (Nathan Roberts via jlowe) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 491cbe48de8..85f7da1d08c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -143,6 +143,9 @@ ApplicationEventType.INIT_APPLICATION, new AppInitTransition()) ApplicationState.APPLICATION_RESOURCES_CLEANINGUP), ApplicationEventType.FINISH_APPLICATION, new AppFinishTriggeredTransition()) + .addTransition(ApplicationState.INITING, ApplicationState.INITING, + ApplicationEventType.APPLICATION_CONTAINER_FINISHED, + CONTAINER_DONE_TRANSITION) .addTransition(ApplicationState.INITING, ApplicationState.INITING, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED, new AppLogInitDoneTransition()) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index c9802080854..b4752ff8f5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -277,6 +277,8 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) // From DONE .addTransition(ContainerState.DONE, ContainerState.DONE, ContainerEventType.KILL_CONTAINER) + .addTransition(ContainerState.DONE, ContainerState.DONE, + ContainerEventType.INIT_CONTAINER) .addTransition(ContainerState.DONE, ContainerState.DONE, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 3d742cb696f..3dd8d2765a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -155,6 +155,60 @@ public void testAppRunningAfterContainersComplete() { } } + /** + * Finished containers properly tracked when only container finishes in APP_INITING + */ + @Test + public void testContainersCompleteDuringAppInit1() { + WrappedApplication wa = null; + try { + wa = new WrappedApplication(3, 314159265358979L, "yak", 1); + wa.initApplication(); + wa.initContainer(-1); + assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); + + wa.containerFinished(0); + assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); + + wa.applicationInited(); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + assertEquals(0, wa.app.getContainers().size()); + } finally { + if (wa != null) + wa.finished(); + } + } + + /** + * Finished containers properly tracked when 1 of several containers finishes in APP_INITING + */ + @Test + public void testContainersCompleteDuringAppInit2() { + WrappedApplication wa = null; + try { + wa = new WrappedApplication(3, 314159265358979L, "yak", 3); + wa.initApplication(); + wa.initContainer(-1); + assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); + + wa.containerFinished(0); + + assertEquals(ApplicationState.INITING, wa.app.getApplicationState()); + + wa.applicationInited(); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + assertEquals(2, wa.app.getContainers().size()); + + wa.containerFinished(1); + wa.containerFinished(2); + assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState()); + assertEquals(0, wa.app.getContainers().size()); + } finally { + if (wa != null) + wa.finished(); + } + } + @Test @SuppressWarnings("unchecked") public void testAppFinishedOnRunningContainers() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index cb7c19dc2fa..f4c48bad306 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; @@ -65,6 +67,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; @@ -208,6 +212,32 @@ public void testCleanupOnSuccess() throws Exception { } } } + + @Test + @SuppressWarnings("unchecked") // mocked generic + public void testInitWhileDone() throws Exception { + WrappedContainer wc = null; + try { + wc = new WrappedContainer(6, 314159265358979L, 4344, "yak"); + wc.initContainer(); + wc.localizeResources(); + wc.launchContainer(); + reset(wc.localizerBus); + wc.containerSuccessful(); + wc.containerResourcesCleanup(); + assertEquals(ContainerState.DONE, wc.c.getContainerState()); + // Now in DONE, issue INIT + wc.initContainer(); + // Verify still in DONE + assertEquals(ContainerState.DONE, wc.c.getContainerState()); + verifyCleanupCall(wc); + } + finally { + if (wc != null) { + wc.finished(); + } + } + } @Test @SuppressWarnings("unchecked") // mocked generic @@ -506,6 +536,8 @@ private class WrappedContainer { final EventHandler launcherBus; final EventHandler monitorBus; final EventHandler auxBus; + final EventHandler appBus; + final EventHandler LogBus; final ContainerLaunchContext ctxt; final ContainerId cId; @@ -527,10 +559,14 @@ private class WrappedContainer { launcherBus = mock(EventHandler.class); monitorBus = mock(EventHandler.class); auxBus = mock(EventHandler.class); + appBus = mock(EventHandler.class); + LogBus = mock(EventHandler.class); dispatcher.register(LocalizationEventType.class, localizerBus); dispatcher.register(ContainersLauncherEventType.class, launcherBus); dispatcher.register(ContainersMonitorEventType.class, monitorBus); dispatcher.register(AuxServicesEventType.class, auxBus); + dispatcher.register(ApplicationEventType.class, appBus); + dispatcher.register(LogHandlerEventType.class, LogBus); this.user = user; ctxt = mock(ContainerLaunchContext.class); @@ -654,6 +690,11 @@ public void containerSuccessful() { ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); drainDispatcherEvents(); } + public void containerResourcesCleanup() { + c.handle(new ContainerEvent(cId, + ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); + drainDispatcherEvents(); + } public void containerFailed(int exitCode) { c.handle(new ContainerExitEvent(cId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 74a3858a5d0..aad7b845c92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -319,6 +319,7 @@ public void testMultipleAppsLogAggregation() throws Exception { this.user, null, ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls)); + dispatcher.await(); ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{ new ApplicationEvent( application1,