Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1224965 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2011-12-27 18:18:36 +00:00
commit a500ba50c2
84 changed files with 2204 additions and 260 deletions

View File

@ -86,7 +86,7 @@
</init-param>
<init-param>
<param-name>kerberos.keytab</param-name>
<param-value>/tmp/alfredo.keytab</param-value>
<param-value>/tmp/my.keytab</param-value>
</init-param>
<init-param>
<param-name>token.validity</param-name>

View File

@ -459,7 +459,7 @@ public void testDoFilterAuthenticated() throws Exception {
AuthenticationToken token = new AuthenticationToken("u", "p", "t");
token.setExpires(System.currentTimeMillis() + 1000);
Signer signer = new Signer("alfredo".getBytes());
Signer signer = new Signer("secret".getBytes());
String tokenSigned = signer.sign(token.toString());
Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned);
@ -504,7 +504,7 @@ public void testDoFilterAuthenticatedExpired() throws Exception {
AuthenticationToken token = new AuthenticationToken("u", "p", DummyAuthenticationHandler.TYPE);
token.setExpires(System.currentTimeMillis() - 1000);
Signer signer = new Signer("alfredo".getBytes());
Signer signer = new Signer("secret".getBytes());
String tokenSigned = signer.sign(token.toString());
Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned);
@ -564,7 +564,7 @@ public void testDoFilterAuthenticatedInvalidType() throws Exception {
AuthenticationToken token = new AuthenticationToken("u", "p", "invalidtype");
token.setExpires(System.currentTimeMillis() + 1000);
Signer signer = new Signer("alfredo".getBytes());
Signer signer = new Signer("secret".getBytes());
String tokenSigned = signer.sign(token.toString());
Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned);

View File

@ -189,6 +189,9 @@ Release 0.23.1 - Unreleased
HADOOP-7912. test-patch should run eclipse:eclipse to verify that it does
not break again. (Robert Joseph Evans via tomwhite)
HADOOP-7890. Redirect hadoop script's deprecation message to stderr.
(Koji Knoguchi via mahadev)
OPTIMIZATIONS
BUG FIXES
@ -220,6 +223,8 @@ Release 0.23.1 - Unreleased
HADOOP-7914. Remove the duplicated declaration of hadoop-hdfs test-jar in
hadoop-project/pom.xml. (szetszwo)
HADOOP-7837. no NullAppender in the log4j config. (eli)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -51,9 +51,9 @@ COMMAND=$1
case $COMMAND in
#hdfs commands
namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer)
echo "DEPRECATED: Use of this script to execute hdfs command is deprecated."
echo "Instead use the hdfs command for it."
echo ""
echo "DEPRECATED: Use of this script to execute hdfs command is deprecated." 1>&2
echo "Instead use the hdfs command for it." 1>&2
echo "" 1>&2
#try to locate hdfs and if present, delegate to it.
if [ -f "${HADOOP_HDFS_HOME}"/bin/hdfs ]; then
exec "${HADOOP_HDFS_HOME}"/bin/hdfs $*

View File

@ -19,6 +19,9 @@ log4j.rootLogger=${hadoop.root.logger}, EventCounter
# Logging Threshold
log4j.threshold=ALL
# Null Appender
log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender
#
# Daily Rolling File Appender
#

View File

@ -52,7 +52,7 @@
<p>
If a custom authentication mechanism is required for the HTTP web-consoles, it is possible
to implement a plugin to support the alternate authentication mechanism (refer to
Hadoop Alfredo for details on writing an <code>AuthenticatorHandler</code>).
Hadoop hadoop-auth for details on writing an <code>AuthenticatorHandler</code>).
</p>
<p>
The next section describes how to configure Hadoop HTTP web-consoles to require user

View File

@ -29,7 +29,7 @@
import java.util.Map;
/**
* Initializes Alfredo AuthenticationFilter which provides support for
* Initializes hadoop-auth AuthenticationFilter which provides support for
* Kerberos HTTP SPENGO authentication.
* <p/>
* It enables anonymous access, simple/speudo and Kerberos HTTP SPNEGO
@ -48,9 +48,9 @@ public class AuthenticationFilterInitializer extends FilterInitializer {
static final String SIGNATURE_SECRET_FILE = AuthenticationFilter.SIGNATURE_SECRET + ".file";
/**
* Initializes Alfredo AuthenticationFilter.
* Initializes hadoop-auth AuthenticationFilter.
* <p/>
* Propagates to Alfredo AuthenticationFilter configuration all Hadoop
* Propagates to hadoop-auth AuthenticationFilter configuration all Hadoop
* configuration properties prefixed with "hadoop.http.authentication."
*
* @param container The filter container

View File

@ -28,6 +28,9 @@ log4j.rootLogger=${hadoop.root.logger}, EventCounter
# Logging Threshold
log4j.threshold=ALL
# Null Appender
log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender
#
# Daily Rolling File Appender
#

View File

@ -0,0 +1,12 @@
<FindBugsFilter>
<Match>
<Class name="org.apache.hadoop.lib.service.instrumentation.InstrumentationService" />
<Method name="getToAdd" />
<Bug pattern="UL_UNRELEASED_LOCK" />
</Match>
<Match>
<Class name="org.apache.hadoop.fs.http.server.HttpFSServerWebApp" />
<Method name="destroy" />
<Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
</Match>
</FindBugsFilter>

View File

@ -384,6 +384,13 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile>
</configuration>
</plugin>
</plugins>
</build>

View File

@ -0,0 +1 @@
hadoop httpfs secret

View File

@ -98,6 +98,8 @@ public class HttpFSFileSystem extends FileSystem {
public static final String SET_REPLICATION_JSON = "boolean";
public static final String UPLOAD_CONTENT_TYPE= "application/octet-stream";
public static enum FILE_TYPE {
FILE, DIRECTORY, SYMLINK;
@ -459,7 +461,7 @@ private FSDataOutputStream uploadData(String method, Path f, Map<String, String>
String location = conn.getHeaderField("Location");
if (location != null) {
conn = getConnection(new URL(location), method);
conn.setRequestProperty("Content-Type", "application/octet-stream");
conn.setRequestProperty("Content-Type", UPLOAD_CONTENT_TYPE);
try {
OutputStream os = new BufferedOutputStream(conn.getOutputStream(), bufferSize);
return new HttpFSDataOutputStream(conn, os, expectedStatus, statistics);

View File

@ -21,18 +21,23 @@
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import javax.servlet.FilterConfig;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Map;
import java.util.Properties;
/**
* Subclass of Alfredo's <code>AuthenticationFilter</code> that obtains its configuration
* Subclass of hadoop-auth <code>AuthenticationFilter</code> that obtains its configuration
* from HttpFSServer's server configuration.
*/
public class AuthFilter extends AuthenticationFilter {
private static final String CONF_PREFIX = "httpfs.authentication.";
private static final String SIGNATURE_SECRET_FILE = SIGNATURE_SECRET + ".file";
/**
* Returns the Alfredo configuration from HttpFSServer's configuration.
* Returns the hadoop-auth configuration from HttpFSServer's configuration.
* <p/>
* It returns all HttpFSServer's configuration properties prefixed with
* <code>httpfs.authentication</code>. The <code>httpfs.authentication</code>
@ -41,7 +46,7 @@ public class AuthFilter extends AuthenticationFilter {
* @param configPrefix parameter not used.
* @param filterConfig parameter not used.
*
* @return Alfredo configuration read from HttpFSServer's configuration.
* @return hadoop-auth configuration read from HttpFSServer's configuration.
*/
@Override
protected Properties getConfiguration(String configPrefix, FilterConfig filterConfig) {
@ -57,6 +62,25 @@ protected Properties getConfiguration(String configPrefix, FilterConfig filterCo
props.setProperty(name, value);
}
}
String signatureSecretFile = props.getProperty(SIGNATURE_SECRET_FILE, null);
if (signatureSecretFile == null) {
throw new RuntimeException("Undefined property: " + SIGNATURE_SECRET_FILE);
}
try {
StringBuilder secret = new StringBuilder();
Reader reader = new FileReader(signatureSecretFile);
int c = reader.read();
while (c > -1) {
secret.append((char)c);
c = reader.read();
}
reader.close();
props.setProperty(AuthenticationFilter.SIGNATURE_SECRET, secret.toString());
} catch (IOException ex) {
throw new RuntimeException("Could not read HttpFS signature secret file: " + signatureSecretFile);
}
return props;
}

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashSet;
import java.util.Set;
/**
* Filter that Enforces the content-type to be application/octet-stream for
* POST and PUT requests.
*/
public class CheckUploadContentTypeFilter implements Filter {
private static final Set<String> UPLOAD_OPERATIONS = new HashSet<String>();
static {
UPLOAD_OPERATIONS.add(HttpFSFileSystem.PostOpValues.APPEND.toString());
UPLOAD_OPERATIONS.add(HttpFSFileSystem.PutOpValues.CREATE.toString());
}
/**
* Initializes the filter.
* <p/>
* This implementation is a NOP.
*
* @param config filter configuration.
*
* @throws ServletException thrown if the filter could not be initialized.
*/
@Override
public void init(FilterConfig config) throws ServletException {
}
/**
* Enforces the content-type to be application/octet-stream for
* POST and PUT requests.
*
* @param request servlet request.
* @param response servlet response.
* @param chain filter chain.
*
* @throws IOException thrown if an IO error occurrs.
* @throws ServletException thrown if a servet error occurrs.
*/
@Override
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain)
throws IOException, ServletException {
boolean contentTypeOK = true;
HttpServletRequest httpReq = (HttpServletRequest) request;
HttpServletResponse httpRes = (HttpServletResponse) response;
String method = httpReq.getMethod();
if (method.equals("PUT") || method.equals("POST")) {
String op = httpReq.getParameter(HttpFSFileSystem.OP_PARAM);
if (op != null && UPLOAD_OPERATIONS.contains(op.toUpperCase())) {
if ("true".equalsIgnoreCase(httpReq.getParameter(HttpFSParams.DataParam.NAME))) {
String contentType = httpReq.getContentType();
contentTypeOK =
HttpFSFileSystem.UPLOAD_CONTENT_TYPE.equalsIgnoreCase(contentType);
}
}
}
if (contentTypeOK) {
chain.doFilter(httpReq, httpRes);
}
else {
httpRes.sendError(HttpServletResponse.SC_BAD_REQUEST,
"Data upload requests must have content-type set to '" +
HttpFSFileSystem.UPLOAD_CONTENT_TYPE + "'");
}
}
/**
* Destroys the filter.
* <p/>
* This implementation is a NOP.
*/
@Override
public void destroy() {
}
}

View File

@ -111,7 +111,7 @@ private static String format(ERROR error, Object... args) {
}
template = sb.deleteCharAt(0).toString();
}
return error + ": " + MessageFormat.format(error.getTemplate(), args);
return error + ": " + MessageFormat.format(template, args);
}
/**

View File

@ -42,7 +42,10 @@ public InputStreamEntity(InputStream is) {
@Override
public void write(OutputStream os) throws IOException {
is.skip(offset);
long skipped = is.skip(offset);
if (skipped < offset) {
throw new IOException("Requested offset beyond stream size");
}
if (len == -1) {
IOUtils.copyBytes(is, os, 4096, true);
} else {

View File

@ -69,6 +69,19 @@
</description>
</property>
<property>
<name>httpfs.authentication.signature.secret.file</name>
<value>${httpfs.config.dir}/httpfs-signature.secret</value>
<description>
File containing the secret to sign HttpFS hadoop-auth cookies.
This file should be readable only by the system user running HttpFS service.
If multiple HttpFS servers are used in a load-balancer/round-robin fashion,
they should share the secret file.
</description>
</property>
<property>
<name>httpfs.authentication.type</name>
<value>simple</value>

View File

@ -60,6 +60,11 @@
<filter-class>org.apache.hadoop.lib.servlet.HostnameFilter</filter-class>
</filter>
<filter>
<filter-name>checkUploadContentType</filter-name>
<filter-class>org.apache.hadoop.fs.http.server.CheckUploadContentTypeFilter</filter-class>
</filter>
<filter>
<filter-name>fsReleaseFilter</filter-name>
<filter-class>org.apache.hadoop.fs.http.server.HttpFSReleaseFilter</filter-class>
@ -80,6 +85,11 @@
<url-pattern>*</url-pattern>
</filter-mapping>
<filter-mapping>
<filter-name>checkUploadContentType</filter-name>
<url-pattern>*</url-pattern>
</filter-mapping>
<filter-mapping>
<filter-name>fsReleaseFilter</filter-name>
<url-pattern>*</url-pattern>

View File

@ -45,9 +45,11 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Writer;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
@ -63,6 +65,11 @@ private void createHttpFSServer() throws Exception {
Assert.assertTrue(new File(homeDir, "temp").mkdir());
HttpFSServerWebApp.setHomeDirForCurrentThread(homeDir.getAbsolutePath());
File secretFile = new File(new File(homeDir, "conf"), "secret");
Writer w = new FileWriter(secretFile);
w.write("secret");
w.close();
String fsDefaultName = TestHdfsHelper.getHdfsConf().get("fs.default.name");
Configuration conf = new Configuration(false);
conf.set("httpfs.hadoop.conf:fs.default.name", fsDefaultName);
@ -70,6 +77,7 @@ private void createHttpFSServer() throws Exception {
.getHadoopProxyUserGroups());
conf.set("httpfs.proxyuser." + HadoopUsersConfTestHelper.getHadoopProxyUser() + ".hosts", HadoopUsersConfTestHelper
.getHadoopProxyUserHosts());
conf.set("httpfs.authentication.signature.secret.file", secretFile.getAbsolutePath());
File hoopSite = new File(new File(homeDir, "conf"), "httpfs-site.xml");
OutputStream os = new FileOutputStream(hoopSite);
conf.writeXml(os);

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import org.junit.Test;
import org.mockito.Mockito;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
public class TestCheckUploadContentTypeFilter {
@Test
public void putUpload() throws Exception {
test("PUT", HttpFSFileSystem.PutOpValues.CREATE.toString(), "application/octet-stream", true, false);
}
@Test
public void postUpload() throws Exception {
test("POST", HttpFSFileSystem.PostOpValues.APPEND.toString(), "APPLICATION/OCTET-STREAM", true, false);
}
@Test
public void putUploadWrong() throws Exception {
test("PUT", HttpFSFileSystem.PutOpValues.CREATE.toString(), "plain/text", false, false);
test("PUT", HttpFSFileSystem.PutOpValues.CREATE.toString(), "plain/text", true, true);
}
@Test
public void postUploadWrong() throws Exception {
test("POST", HttpFSFileSystem.PostOpValues.APPEND.toString(), "plain/text", false, false);
test("POST", HttpFSFileSystem.PostOpValues.APPEND.toString(), "plain/text", true, true);
}
@Test
public void getOther() throws Exception {
test("GET", HttpFSFileSystem.GetOpValues.GETHOMEDIR.toString(), "plain/text", false, false);
}
@Test
public void putOther() throws Exception {
test("PUT", HttpFSFileSystem.PutOpValues.MKDIRS.toString(), "plain/text", false, false);
}
private void test(String method, String operation, String contentType,
boolean upload, boolean error) throws Exception {
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
Mockito.reset(request);
Mockito.when(request.getMethod()).thenReturn(method);
Mockito.when(request.getParameter(HttpFSFileSystem.OP_PARAM)).thenReturn(operation);
Mockito.when(request.getParameter(HttpFSParams.DataParam.NAME)).
thenReturn(Boolean.toString(upload));
Mockito.when(request.getContentType()).thenReturn(contentType);
FilterChain chain = Mockito.mock(FilterChain.class);
Filter filter = new CheckUploadContentTypeFilter();
filter.doFilter(request, response, chain);
if (error) {
Mockito.verify(response).sendError(Mockito.eq(HttpServletResponse.SC_BAD_REQUEST),
Mockito.contains("Data upload"));
}
else {
Mockito.verify(chain).doFilter(request, response);
}
}
}

View File

@ -20,8 +20,10 @@
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.lib.service.security.DummyGroupMapping;
import org.apache.hadoop.test.HFSTestCase;
import org.apache.hadoop.test.HadoopUsersConfTestHelper;
import org.apache.hadoop.test.TestDir;
@ -37,8 +39,10 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Writer;
import java.net.HttpURLConnection;
import java.net.URL;
import java.text.MessageFormat;
@ -63,9 +67,16 @@ private void createHttpFSServer() throws Exception {
Assert.assertTrue(new File(homeDir, "temp").mkdir());
HttpFSServerWebApp.setHomeDirForCurrentThread(homeDir.getAbsolutePath());
File secretFile = new File(new File(homeDir, "conf"), "secret");
Writer w = new FileWriter(secretFile);
w.write("secret");
w.close();
String fsDefaultName = TestHdfsHelper.getHdfsConf().get("fs.default.name");
Configuration conf = new Configuration(false);
conf.set("httpfs.hadoop.conf:fs.default.name", fsDefaultName);
conf.set("httpfs.groups." + CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, DummyGroupMapping.class.getName());
conf.set("httpfs.authentication.signature.secret.file", secretFile.getAbsolutePath());
File hoopSite = new File(new File(homeDir, "conf"), "httpfs-site.xml");
OutputStream os = new FileOutputStream(hoopSite);
conf.writeXml(os);

View File

@ -0,0 +1,34 @@
package org.apache.hadoop.lib.service.security;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.test.HadoopUsersConfTestHelper;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class DummyGroupMapping implements GroupMappingServiceProvider {
@Override
@SuppressWarnings("unchecked")
public List<String> getGroups(String user) throws IOException {
if (user.equals("root")) {
return Arrays.asList("admin");
}
else if (user.equals("nobody")) {
return Arrays.asList("nobody");
} else {
String[] groups = HadoopUsersConfTestHelper.getHadoopUserGroups(user);
return (groups != null) ? Arrays.asList(groups) : Collections.EMPTY_LIST;
}
}
@Override
public void cacheGroupsRefresh() throws IOException {
}
@Override
public void cacheGroupsAdd(List<String> groups) throws IOException {
}
}

View File

@ -11,5 +11,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#

View File

@ -180,6 +180,17 @@ Trunk (unreleased changes)
HDFS-2658. HttpFS introduced 70 javadoc warnings. (tucu)
HDFS-2646. Hadoop HttpFS introduced 4 findbug warnings. (tucu)
HDFS-2657. TestHttpFSServer and TestServerWebApp are failing on trunk.
(tucu)
HttpFS server should check that upload requests have correct
content-type. (tucu)
HDFS-2707. HttpFS should read the hadoop-auth secret from a file
instead inline from the configuration. (tucu)
Release 0.23.1 - UNRELEASED
INCOMPATIBLE CHANGES
@ -283,6 +294,9 @@ Release 0.23.1 - UNRELEASED
HDFS-2553. Fix BlockPoolSliceScanner spinning in a tight loop (Uma
Maheswara Rao G via todd)
HDFS-2706. Use configuration for blockInvalidateLimit if it is set.
(szetszwo)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -155,8 +155,10 @@ public class DatanodeManager {
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
+ 10 * 1000 * heartbeatIntervalSeconds;
this.blockInvalidateLimit = Math.max(20*(int)(heartbeatIntervalSeconds),
final int blockInvalidateLimit = Math.max(20*(int)(heartbeatIntervalSeconds),
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
this.blockInvalidateLimit = conf.getInt(
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
+ "=" + this.blockInvalidateLimit);
}

View File

@ -12,6 +12,9 @@ Trunk (unreleased changes)
(Plamen Jeliazkov via shv)
IMPROVEMENTS
MAPREDUCE-3597. [Rumen] Rumen should provide APIs to access all the
job-history related information.
MAPREDUCE-3375. [Gridmix] Memory Emulation system tests.
(Vinay Thota via amarrk)
@ -47,6 +50,9 @@ Trunk (unreleased changes)
PB and Avro can all use it (Sanjay)
BUG FIXES
MAPREDUCE-3349. Log rack-name in JobHistory for unsuccessful tasks.
(Devaraj K and Amar Kamat via amarrk)
MAPREDUCE-3412. Fix 'ant docs'. (amarrk)
MAPREDUCE-3346. [Rumen] LoggedTaskAttempt#getHostName() returns null.
@ -161,8 +167,14 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3518. mapred queue -info <queue> -showJobs throws NPE.
(Jonathan Eagles via mahadev)
MAPREDUCE-3391. Making a trivial change to correct a log message in
DistributedShell app's AM. (Subroto Sanyal via vinodkv)
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
Vavilapalli via sseth)
BUG FIXES
MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob
@ -331,7 +343,23 @@ Release 0.23.1 - Unreleased
before the job started, so that it works properly with oozie throughout
the job execution. (Robert Joseph Evans via vinodkv)
MAPREDUCE-3579. ConverterUtils shouldn't include a port in a path from a url without a port. (atm via harsh)
MAPREDUCE-3579. ConverterUtils shouldn't include a port in a path from a url
without a port. (atm via harsh)
MAPREDUCE-3563. Fixed LocalJobRunner to work correctly with new mapreduce
apis. (acmurthy)
MAPREDUCE-3376. Fixed Task to ensure it passes reporter to combiners using
old MR api. (Subroto Sanyal via acmurthy)
MAPREDUCE-3339. Fixed MR AM to stop considering node blacklisting after the
number of nodes blacklisted crosses a threshold. (Siddharth Seth via vinodkv)
MAPREDUCE-3588. Fixed bin/yarn which was broken by MAPREDUCE-3366 so that
yarn daemons can start. (Arun C Murthy via vinodkv)
MAPREDUCE-3586. Modified CompositeService to avoid duplicate stop operations
thereby solving race conditions in MR AM shutdown. (vinodkv)
Release 0.23.0 - 2011-11-01

View File

@ -29,9 +29,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.mapreduce.JobContext;
@ -41,11 +41,11 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;

View File

@ -20,7 +20,6 @@
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
@ -35,13 +34,14 @@
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
@SuppressWarnings({ "rawtypes", "deprecation" })
public class MapTaskAttemptImpl extends TaskAttemptImpl {
private final TaskSplitMetaInfo splitInfo;
public MapTaskAttemptImpl(TaskId taskId, int attempt,
EventHandler eventHandler, Path jobFile,
int partition, TaskSplitMetaInfo splitInfo, Configuration conf,
int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
TaskAttemptListener taskAttemptListener,
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {

View File

@ -20,7 +20,6 @@
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
@ -34,14 +33,14 @@
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
@SuppressWarnings({ "rawtypes", "deprecation" })
public class ReduceTaskAttemptImpl extends TaskAttemptImpl {
private final int numMapTasks;
public ReduceTaskAttemptImpl(TaskId id, int attempt,
EventHandler eventHandler, Path jobFile, int partition,
int numMapTasks, Configuration conf,
int numMapTasks, JobConf conf,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {

View File

@ -54,7 +54,6 @@
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@ -110,6 +109,7 @@
/** Implementation of Job interface. Maintains the state machines of Job.
* The read and write calls use ReadWriteLock for concurrency.
*/
@SuppressWarnings({ "rawtypes", "deprecation" })
public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
EventHandler<JobEvent> {
@ -154,7 +154,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// Can then replace task-level uber counters (MR-2424) with job-level ones
// sent from LocalContainerLauncher, and eventually including a count of
// of uber-AM attempts (probably sent from MRAppMaster).
public Configuration conf;
public JobConf conf;
//fields initialized in init
private FileSystem fs;
@ -371,7 +371,7 @@ public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
this.applicationAttemptId = applicationAttemptId;
this.jobId = jobId;
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
this.conf = conf;
this.conf = new JobConf(conf);
this.metrics = metrics;
this.clock = clock;
this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
@ -979,7 +979,7 @@ public JobState transition(JobImpl job, JobEvent event) {
job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
new JobConf(job.conf), job.oldJobId);
job.conf, job.oldJobId);
}
long inputLength = 0;

View File

@ -21,8 +21,8 @@
import java.util.Collection;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
@ -31,20 +31,20 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
@SuppressWarnings({ "rawtypes", "deprecation" })
public class MapTaskImpl extends TaskImpl {
private final TaskSplitMetaInfo taskSplitMetaInfo;
public MapTaskImpl(JobId jobId, int partition, EventHandler eventHandler,
Path remoteJobConfFile, Configuration conf,
Path remoteJobConfFile, JobConf conf,
TaskSplitMetaInfo taskSplitMetaInfo,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,

View File

@ -21,8 +21,8 @@
import java.util.Collection;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
@ -30,19 +30,20 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
@SuppressWarnings({ "rawtypes", "deprecation" })
public class ReduceTaskImpl extends TaskImpl {
private final int numMapTasks;
public ReduceTaskImpl(JobId jobId, int partition,
EventHandler eventHandler, Path jobFile, Configuration conf,
EventHandler eventHandler, Path jobFile, JobConf conf,
int numMapTasks, TaskAttemptListener taskAttemptListener,
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,

View File

@ -125,6 +125,7 @@
/**
* Implementation of TaskAttempt interface.
*/
@SuppressWarnings({ "rawtypes", "deprecation" })
public abstract class TaskAttemptImpl implements
org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
EventHandler<TaskAttemptEvent> {
@ -135,10 +136,9 @@ public abstract class TaskAttemptImpl implements
private static final int REDUCE_MEMORY_MB_DEFAULT = 1024;
private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
protected final Configuration conf;
protected final JobConf conf;
protected final Path jobFile;
protected final int partition;
@SuppressWarnings("rawtypes")
protected final EventHandler eventHandler;
private final TaskAttemptId attemptId;
private final Clock clock;
@ -445,9 +445,9 @@ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
.getProperty("line.separator");
public TaskAttemptImpl(TaskId taskId, int i,
@SuppressWarnings("rawtypes") EventHandler eventHandler,
EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
Configuration conf, String[] dataLocalHosts, OutputCommitter committer,
JobConf conf, String[] dataLocalHosts, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
oldJobId = TypeConverter.fromYarn(taskId.getJobId());
@ -926,6 +926,8 @@ private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
: taskAttempt.containerNodeId.getHost(),
taskAttempt.containerNodeId == null ? -1
: taskAttempt.containerNodeId.getPort(),
taskAttempt.nodeRackName == null ? "UNKNOWN"
: taskAttempt.nodeRackName,
StringUtils.join(
LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
.getProgressSplitBlock().burst());
@ -1197,7 +1199,7 @@ public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
@SuppressWarnings("deprecation")
TaskAttemptContext taskContext =
new TaskAttemptContextImpl(new JobConf(taskAttempt.conf),
new TaskAttemptContextImpl(taskAttempt.conf,
TypeConverter.fromYarn(taskAttempt.attemptId));
taskAttempt.eventHandler.handle(new TaskCleanupEvent(
taskAttempt.attemptId,

View File

@ -31,8 +31,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@ -50,8 +50,6 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@ -66,6 +64,8 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
@ -81,11 +81,12 @@
/**
* Implementation of Task interface.
*/
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private static final Log LOG = LogFactory.getLog(TaskImpl.class);
protected final Configuration conf;
protected final JobConf conf;
protected final Path jobFile;
protected final OutputCommitter committer;
protected final int partition;
@ -225,7 +226,7 @@ public TaskState getState() {
}
public TaskImpl(JobId jobId, TaskType taskType, int partition,
EventHandler eventHandler, Path remoteJobConfFile, Configuration conf,
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,

View File

@ -68,6 +68,7 @@ public abstract class RMCommunicator extends AbstractService {
protected ApplicationAttemptId applicationAttemptId;
private AtomicBoolean stopped;
protected Thread allocatorThread;
@SuppressWarnings("rawtypes")
protected EventHandler eventHandler;
protected AMRMProtocol scheduler;
private final ClientService clientService;

View File

@ -479,12 +479,16 @@ private List<Container> getResources() throws Exception {
//something changed
recalculateReduceSchedule = true;
}
List<Container> allocatedContainers = new ArrayList<Container>();
for (Container cont : newContainers) {
allocatedContainers.add(cont);
if (LOG.isDebugEnabled()) {
for (Container cont : newContainers) {
LOG.debug("Received new Container :" + cont);
}
}
//Called on each allocation. Will know about newly blacklisted/added hosts.
computeIgnoreBlacklisting();
for (ContainerStatus cont : finishedContainers) {
LOG.info("Received completed container " + cont);
TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());

View File

@ -18,15 +18,15 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,6 +35,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
@ -47,6 +48,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.BuilderUtils;
/**
* Keeps the data structures to send container requests to RM.
*/
@ -74,9 +76,15 @@ public abstract class RMContainerRequestor extends RMCommunicator {
private final Set<ContainerId> release = new TreeSet<ContainerId>();
private boolean nodeBlacklistingEnabled;
private int blacklistDisablePercent;
private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
private int blacklistedNodeCount = 0;
private int lastClusterNmCount = 0;
private int clusterNmCount = 0;
private int maxTaskFailuresPerNode;
private final Map<String, Integer> nodeFailures = new HashMap<String, Integer>();
private final Set<String> blacklistedNodes = new HashSet<String>();
private final Set<String> blacklistedNodes = Collections
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public RMContainerRequestor(ClientService clientService, AppContext context) {
super(clientService, context);
@ -122,7 +130,17 @@ public void init(Configuration conf) {
LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled);
maxTaskFailuresPerNode =
conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
blacklistDisablePercent =
conf.getInt(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
throw new YarnException("Invalid blacklistDisablePercent: "
+ blacklistDisablePercent
+ ". Should be an integer between 0 and 100 or -1 to disabled");
}
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
}
protected AMResponse makeRemoteRequest() throws YarnRemoteException {
@ -134,19 +152,49 @@ protected AMResponse makeRemoteRequest() throws YarnRemoteException {
AMResponse response = allocateResponse.getAMResponse();
lastResponseID = response.getResponseId();
availableResources = response.getAvailableResources();
lastClusterNmCount = clusterNmCount;
clusterNmCount = allocateResponse.getNumClusterNodes();
LOG.info("getResources() for " + applicationId + ":" + " ask="
+ ask.size() + " release= " + release.size() +
" newContainers=" + response.getAllocatedContainers().size() +
" finishedContainers=" +
response.getCompletedContainersStatuses().size() +
" resourcelimit=" + availableResources);
" resourcelimit=" + availableResources +
"knownNMs=" + clusterNmCount);
ask.clear();
release.clear();
return response;
}
// May be incorrect if there's multiple NodeManagers running on a single host.
// knownNodeCount is based on node managers, not hosts. blacklisting is
// currently based on hosts.
protected void computeIgnoreBlacklisting() {
if (blacklistDisablePercent != -1
&& (blacklistedNodeCount != blacklistedNodes.size() ||
clusterNmCount != lastClusterNmCount)) {
blacklistedNodeCount = blacklistedNodes.size();
if (clusterNmCount == 0) {
LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting");
return;
}
int val = (int) ((float) blacklistedNodes.size() / clusterNmCount * 100);
if (val >= blacklistDisablePercent) {
if (ignoreBlacklisting.compareAndSet(false, true)) {
LOG.info("Ignore blacklisting set to true. Known: " + clusterNmCount
+ ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
}
} else {
if (ignoreBlacklisting.compareAndSet(true, false)) {
LOG.info("Ignore blacklisting set to false. Known: " + clusterNmCount
+ ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
}
}
}
}
protected void containerFailedOnHost(String hostName) {
if (!nodeBlacklistingEnabled) {
return;
@ -161,8 +209,10 @@ protected void containerFailedOnHost(String hostName) {
LOG.info(failures + " failures on node " + hostName);
if (failures >= maxTaskFailuresPerNode) {
blacklistedNodes.add(hostName);
//Even if blacklisting is ignored, continue to remove the host from
// the request table. The RM may have additional nodes it can allocate on.
LOG.info("Blacklisted host " + hostName);
//remove all the requests corresponding to this hostname
for (Map<String, Map<Resource, ResourceRequest>> remoteRequests
: remoteRequestsTable.values()){
@ -316,7 +366,7 @@ protected void release(ContainerId containerId) {
}
protected boolean isNodeBlacklisted(String hostname) {
if (!nodeBlacklistingEnabled) {
if (!nodeBlacklistingEnabled || ignoreBlacklisting.get()) {
return false;
}
return blacklistedNodes.contains(hostname);

View File

@ -119,6 +119,10 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName, boole
this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
}
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
}
private static ApplicationAttemptId getApplicationAttemptId(
ApplicationId applicationId, int startCount) {
ApplicationAttemptId applicationAttemptId =

View File

@ -30,7 +30,6 @@
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -169,7 +168,7 @@ public void stop() {
}
public void benchmark1() throws Exception {
int maps = 900;
int maps = 100000;
int reduces = 100;
System.out.println("Running benchmark with maps:"+maps +
" reduces:"+reduces);

View File

@ -488,6 +488,8 @@ public void testBlackListedNodes() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
conf.setInt(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
@ -580,6 +582,175 @@ public void testBlackListedNodes() throws Exception {
}
}
@Test
public void testIgnoreBlacklisting() throws Exception {
LOG.info("Running testIgnoreBlacklisting");
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
conf.setInt(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, 33);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher =
(DrainDispatcher) rm.getRMContext().getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM[] nodeManagers = new MockNM[10];
int nmNum = 0;
List<TaskAttemptContainerAssignedEvent> assigned = null;
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
nodeManagers[0].nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId =
app.getCurrentAppAttempt().getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator =
new MyContainerAllocator(rm, conf, appAttemptId, mockJob);
// Known=1, blacklisted=0, ignore should be false - assign first container
assigned =
getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
nodeManagers[0], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
LOG.info("Failing container _1 on H1 (Node should be blacklisted and"
+ " ignore blacklisting enabled");
// Send events to blacklist nodes h1 and h2
ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
allocator.sendFailure(f1);
// Test single node.
// Known=1, blacklisted=1, ignore should be true - assign 1
assigned =
getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
nodeManagers[0], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
// Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
assigned =
getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
nodeManagers[1], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
// Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
assigned =
getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
nodeManagers[2], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Known=3, blacklisted=1, ignore should be true - assign 1
assigned =
getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
nodeManagers[0], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
// Known=4, blacklisted=1, ignore should be false - assign 1 anyway
assigned =
getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
nodeManagers[3], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Test blacklisting re-enabled.
// Known=4, blacklisted=1, ignore should be false - no assignment on h1
assigned =
getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
nodeManagers[0], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// RMContainerRequestor would have created a replacement request.
// Blacklist h2
ContainerFailedEvent f2 = createFailEvent(jobId, 3, "h2", false);
allocator.sendFailure(f2);
// Test ignore blacklisting re-enabled
// Known=4, blacklisted=2, ignore should be true. Should assign 2
// containers.
assigned =
getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
nodeManagers[0], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
// Known=4, blacklisted=2, ignore should be true.
assigned =
getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
nodeManagers[1], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Test blacklist while ignore blacklisting enabled
ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false);
allocator.sendFailure(f3);
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
// Known=5, blacklisted=3, ignore should be true.
assigned =
getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
nodeManagers[2], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Assign on 5 more nodes - to re-enable blacklisting
for (int i = 0; i < 5; i++) {
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
assigned =
getContainerOnHost(jobId, 11 + i, 1024,
new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
dispatcher, allocator);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
}
// Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
assigned =
getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
nodeManagers[2], dispatcher, allocator);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
}
private MockNM registerNodeManager(int i, MyResourceManager rm,
DrainDispatcher dispatcher) throws Exception {
MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240);
dispatcher.await();
return nm;
}
private
List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
DrainDispatcher dispatcher, MyContainerAllocator allocator)
throws Exception {
ContainerRequestEvent reqEvent =
createReq(jobId, taskAttemptId, memory, hosts);
allocator.sendRequest(reqEvent);
// Send the request to the RM
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
dispatcher.await();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// Heartbeat from the required nodeManager
mockNM.nodeHeartbeat(true);
dispatcher.await();
assigned = allocator.schedule();
dispatcher.await();
return assigned;
}
@Test
public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
LOG.info("Running testBlackListedNodesWithSchedulingToThatNode");
@ -587,6 +758,8 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
conf.setInt(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -29,7 +29,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task;
@ -60,11 +59,12 @@
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings({ "rawtypes", "deprecation" })
public class TestTaskImpl {
private static final Log LOG = LogFactory.getLog(TestTaskImpl.class);
private Configuration conf;
private JobConf conf;
private TaskAttemptListener taskAttemptListener;
private OutputCommitter committer;
private Token<JobTokenIdentifier> jobToken;
@ -91,9 +91,8 @@ private class MockTaskImpl extends TaskImpl {
private int taskAttemptCounter = 0;
@SuppressWarnings("rawtypes")
public MockTaskImpl(JobId jobId, int partition,
EventHandler eventHandler, Path remoteJobConfFile, Configuration conf,
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
@ -132,10 +131,9 @@ private class MockTaskAttemptImpl extends TaskAttemptImpl {
private TaskAttemptState state = TaskAttemptState.NEW;
private TaskAttemptId attemptId;
@SuppressWarnings("rawtypes")
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
Configuration conf, OutputCommitter committer,
JobConf conf, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
@ -175,7 +173,6 @@ public TaskAttemptState getState() {
private class MockTask extends Task {
@Override
@SuppressWarnings("deprecation")
public void run(JobConf job, TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
return;
@ -195,7 +192,7 @@ public void setup() {
++startCount;
conf = new Configuration();
conf = new JobConf();
taskAttemptListener = mock(TaskAttemptListener.class);
committer = mock(OutputCommitter.class);
jobToken = (Token<JobTokenIdentifier>) mock(Token.class);

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
@ -52,11 +53,13 @@
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
/** Implements MapReduce locally, in-process, for debugging. */
@InterfaceAudience.Private
@ -304,12 +307,45 @@ protected ExecutorService createMapExecutor(int numMapTasks) {
return executor;
}
private org.apache.hadoop.mapreduce.OutputCommitter
createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
org.apache.hadoop.mapreduce.OutputCommitter committer = null;
LOG.info("OutputCommitter set in config "
+ conf.get("mapred.output.committer.class"));
if (newApiCommitter) {
org.apache.hadoop.mapreduce.TaskID taskId =
new org.apache.hadoop.mapreduce.TaskID(jobId, TaskType.MAP, 0);
org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID =
new org.apache.hadoop.mapreduce.TaskAttemptID(taskId, 0);
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new TaskAttemptContextImpl(conf, taskAttemptID);
OutputFormat outputFormat =
ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), conf);
committer = outputFormat.getOutputCommitter(taskContext);
} else {
committer = ReflectionUtils.newInstance(conf.getClass(
"mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), conf);
}
LOG.info("OutputCommitter is " + committer.getClass().getName());
return committer;
}
@Override
public void run() {
JobID jobId = profile.getJobID();
JobContext jContext = new JobContextImpl(job, jobId);
OutputCommitter outputCommitter = job.getOutputCommitter();
org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
try {
outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
} catch (Exception e) {
LOG.info("Failed to createOutputCommitter", e);
return;
}
try {
TaskSplitMetaInfo[] taskSplitMetaInfos =
SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import static org.junit.Assert.*;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Random;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestLocalModeWithNewApis {
public static final Log LOG =
LogFactory.getLog(TestLocalModeWithNewApis.class);
Configuration conf;
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
}
@After
public void tearDown() throws Exception {
}
@Test
public void testNewApis() throws Exception {
Random r = new Random(System.currentTimeMillis());
Path tmpBaseDir = new Path("/tmp/wc-" + r.nextInt());
final Path inDir = new Path(tmpBaseDir, "input");
final Path outDir = new Path(tmpBaseDir, "output");
String input = "The quick brown fox\nhas many silly\nred fox sox\n";
FileSystem inFs = inDir.getFileSystem(conf);
FileSystem outFs = outDir.getFileSystem(conf);
outFs.delete(outDir, true);
if (!inFs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
{
DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
file.writeBytes(input);
file.close();
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(TestLocalModeWithNewApis.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
assertEquals(job.waitForCompletion(true), true);
String output = readOutput(outDir, conf);
assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
"quick\t1\nred\t1\nsilly\t1\nsox\t1\n", output);
outFs.delete(tmpBaseDir, true);
}
static String readOutput(Path outDir, Configuration conf)
throws IOException {
FileSystem fs = outDir.getFileSystem(conf);
StringBuffer result = new StringBuffer();
Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outputFile : fileList) {
LOG.info("Path" + ": "+ outputFile);
BufferedReader file =
new BufferedReader(new InputStreamReader(fs.open(outputFile)));
String line = file.readLine();
while (line != null) {
result.append(line);
result.append("\n");
line = file.readLine();
}
file.close();
}
return result.toString();
}
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
}

View File

@ -175,6 +175,7 @@
{"name": "taskType", "type": "string"},
{"name": "taskStatus", "type": "string"},
{"name": "finishTime", "type": "long"},
{"name": "rackname", "type": "string"},
{"name": "hostname", "type": "string"},
{"name": "state", "type": "string"},
{"name": "counters", "type": "JhCounters"}
@ -202,6 +203,7 @@
{"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"},
{"name": "port", "type": "int"},
{"name": "rackname", "type": "string"},
{"name": "status", "type": "string"},
{"name": "error", "type": "string"},
{"name": "clockSplits", "type": { "type": "array", "items": "int"}},

View File

@ -1458,11 +1458,11 @@ protected void combine(RawKeyValueIterator kvIter,
try {
CombineValuesIterator<K,V> values =
new CombineValuesIterator<K,V>(kvIter, comparator, keyClass,
valueClass, job, Reporter.NULL,
valueClass, job, reporter,
inputCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
reporter);
values.nextKey();
}
} finally {

View File

@ -348,8 +348,14 @@ public interface MRJobConfig {
/** Enable blacklisting of nodes in the job.*/
public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE =
MR_AM_PREFIX + "job.node.blacklisting.enable";
MR_AM_PREFIX + "job.node-blacklisting.enable";
/** Ignore blacklisting if a certain percentage of nodes have been blacklisted */
public static final String MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT =
MR_AM_PREFIX + "job.node-blacklisting.ignore-threshold-node-percent";
public static final int DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT =
33;
/** Enable job recovery.*/
public static final String MR_AM_JOB_RECOVERY_ENABLE =
MR_AM_PREFIX + "job.recovery.enable";

View File

@ -224,7 +224,7 @@ private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
attemptInfo.counters = event.getCounters();
attemptInfo.hostname = event.getHostname();
attemptInfo.port = event.getPort();
attemptInfo.rackname = event.getRackname();
attemptInfo.rackname = event.getRackName();
}
private void handleTaskAttemptFailedEvent(
@ -237,6 +237,7 @@ private void handleTaskAttemptFailedEvent(
attemptInfo.status = event.getTaskStatus();
attemptInfo.hostname = event.getHostname();
attemptInfo.port = event.getPort();
attemptInfo.rackname = event.getRackName();
attemptInfo.shuffleFinishTime = event.getFinishTime();
attemptInfo.sortFinishTime = event.getFinishTime();
attemptInfo.mapFinishTime = event.getFinishTime();

View File

@ -68,7 +68,10 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
datum.port = port;
datum.rackname = new Utf8(rackName);
// This is needed for reading old jh files
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
@ -139,8 +142,12 @@ public TaskType getTaskType() {
public String getHostname() { return datum.hostname.toString(); }
/** Get the tracker rpc port */
public int getPort() { return datum.port; }
/** Get the rack name */
public String getRackname() { return datum.rackname.toString(); }
public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
}
/** Get the state string */
public String getState() { return datum.state.toString(); }
/** Get the counters */

View File

@ -69,7 +69,9 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
datum.port = port;
datum.rackname = new Utf8(rackName);
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
@ -142,8 +144,12 @@ public TaskType getTaskType() {
public String getHostname() { return datum.hostname.toString(); }
/** Get the tracker rpc port */
public int getPort() { return datum.port; }
/** Get the rack name of the node where the attempt ran */
public String getRackName() { return datum.rackname.toString(); }
public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
}
/** Get the state string */
public String getState() { return datum.state.toString(); }
/** Get the counters for the attempt */

View File

@ -51,13 +51,16 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
*/
public TaskAttemptFinishedEvent(TaskAttemptID id,
TaskType taskType, String taskStatus,
long finishTime,
long finishTime, String rackName,
String hostname, String state, Counters counters) {
datum.taskid = new Utf8(id.getTaskID().toString());
datum.attemptId = new Utf8(id.toString());
datum.taskType = new Utf8(taskType.name());
datum.taskStatus = new Utf8(taskStatus);
datum.finishTime = finishTime;
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.hostname = new Utf8(hostname);
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
@ -86,6 +89,12 @@ public TaskType getTaskType() {
public long getFinishTime() { return datum.finishTime; }
/** Get the host where the attempt executed */
public String getHostname() { return datum.hostname.toString(); }
/** Get the rackname where the attempt executed */
public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
}
/** Get the state string */
public String getState() { return datum.state.toString(); }
/** Get the counters for the attempt */

View File

@ -47,6 +47,7 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
* @param finishTime Finish time of the attempt
* @param hostname Name of the host where the attempt executed
* @param port rpc port for for the tracker
* @param rackName Name of the rack where the attempt executed
* @param error Error string
* @param allSplits the "splits", or a pixelated graph of various
* measurable worker node state variables against progress.
@ -55,14 +56,17 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
*/
public TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID id, TaskType taskType,
String status, long finishTime,
String hostname, int port, String error,
int[][] allSplits) {
String status, long finishTime,
String hostname, int port, String rackName,
String error, int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString());
datum.taskType = new Utf8(taskType.name());
datum.attemptId = new Utf8(id.toString());
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.port = port;
datum.error = new Utf8(error);
datum.status = new Utf8(status);
@ -99,7 +103,7 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
(TaskAttemptID id, TaskType taskType,
String status, long finishTime,
String hostname, String error) {
this(id, taskType, status, finishTime, hostname, -1, error, null);
this(id, taskType, status, finishTime, hostname, -1, null, error, null);
}
TaskAttemptUnsuccessfulCompletionEvent() {}
@ -125,6 +129,12 @@ public TaskAttemptID getTaskAttemptId() {
public String getHostname() { return datum.hostname.toString(); }
/** Get the rpc port for the host where the attempt executed */
public int getPort() { return datum.port; }
/** Get the rack name of the node where the attempt ran */
public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
}
/** Get the error string */
public String getError() { return datum.error.toString(); }
/** Get the task status */

View File

@ -233,7 +233,7 @@ public void commitTask(TaskAttemptContext context)
" directory of task: " + attemptId + " - " + workPath);
}
LOG.info("Saved output of task '" + attemptId + "' to " +
outputPath);
jobOutputPath);
}
}
}

View File

@ -44,10 +44,13 @@
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@ -62,10 +65,12 @@
public class TestJobHistoryParsing {
private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
private static final String RACK_NAME = "/MyRackName";
public static class MyResolver implements DNSToSwitchMapping {
@Override
public List<String> resolve(List<String> names) {
return Arrays.asList(new String[]{"/MyRackName"});
return Arrays.asList(new String[]{RACK_NAME});
}
}
@ -172,7 +177,7 @@ public void testHistoryParsing() throws Exception {
// Verify rack-name
Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
.getRackname(), "/MyRackName");
.getRackname(), RACK_NAME);
}
}
@ -217,9 +222,89 @@ public void testHistoryParsing() throws Exception {
Assert.assertEquals("Status does not match", "SUCCEEDED",
jobSummaryElements.get("status"));
}
@Test
public void testHistoryParsingForFailedAttempts() throws Exception {
Configuration conf = new Configuration();
conf
.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this.getClass().getName(),
true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
String jobhistoryDir = JobHistoryUtils
.getHistoryIntermediateDoneDirForUser(conf);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId)
.getJobIndexInfo();
String jobhistoryFileName = FileNameIndexUtils
.getDoneFileName(jobIndexInfo);
Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
FSDataInputStream in = null;
FileContext fc = null;
try {
fc = FileContext.getFileContext(conf);
in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe);
throw (new Exception("Can not open History File"));
}
JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse();
int noOffailedAttempts = 0;
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
for (Task task : job.getTasks().values()) {
TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
for (TaskAttempt taskAttempt : task.getAttempts().values()) {
TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
TypeConverter.fromYarn((taskAttempt.getID())));
// Verify rack-name for all task attempts
Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
.getRackname(), RACK_NAME);
if (taskAttemptInfo.getTaskStatus().equals("FAILED")) {
noOffailedAttempts++;
}
}
}
Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts);
}
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@SuppressWarnings("unchecked")
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
}
}
}
public static void main(String[] args) throws Exception {
TestJobHistoryParsing t = new TestJobHistoryParsing();
t.testHistoryParsing();
t.testHistoryParsingForFailedAttempts();
}
}

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.CustomOutputCommitter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@SuppressWarnings("deprecation")
public class TestMRAppWithCombiner {
protected static MiniMRYarnCluster mrCluster;
private static Configuration conf = new Configuration();
private static FileSystem localFs;
private static final Log LOG = LogFactory.getLog(TestMRAppWithCombiner.class);
static {
try {
localFs = FileSystem.getLocal(conf);
} catch (IOException io) {
throw new RuntimeException("problem getting local fs", io);
}
}
@BeforeClass
public static void setup() throws IOException {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
if (mrCluster == null) {
mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3);
Configuration conf = new Configuration();
mrCluster.init(conf);
mrCluster.start();
}
// Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
// workaround the absent public discache.
localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR),
TestMRJobs.APP_JAR);
localFs.setPermission(TestMRJobs.APP_JAR, new FsPermission("700"));
}
@AfterClass
public static void tearDown() {
if (mrCluster != null) {
mrCluster.stop();
mrCluster = null;
}
}
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
JobConf conf = new JobConf(mrCluster.getConfig());
int numMaps = 5;
int numReds = 2;
Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-in");
Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-out");
createInputOutPutFolder(in, out, numMaps);
conf.setJobName("test-job-with-combiner");
conf.setMapperClass(IdentityMapper.class);
conf.setCombinerClass(MyCombinerToCheckReporter.class);
//conf.setJarByClass(MyCombinerToCheckReporter.class);
conf.setReducerClass(IdentityReducer.class);
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, in);
FileOutputFormat.setOutputPath(conf, out);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
runJob(conf);
}
static void createInputOutPutFolder(Path inDir, Path outDir, int numMaps)
throws Exception {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
}
static boolean runJob(JobConf conf) throws Exception {
JobClient jobClient = new JobClient(conf);
RunningJob job = jobClient.submitJob(conf);
return jobClient.monitorAndPrintJob(conf, job);
}
class MyCombinerToCheckReporter<K, V> extends IdentityReducer<K, V> {
public void reduce(K key, Iterator<V> values, OutputCollector<K, V> output,
Reporter reporter) throws IOException {
if (Reporter.NULL == reporter) {
Assert.fail("A valid Reporter should have been used but, Reporter.NULL is used");
}
}
}
}

View File

@ -141,6 +141,8 @@ if [ -d "$YARN_HOME/build/tools" ]; then
CLASSPATH=${CLASSPATH}:$YARN_HOME/build/tools
fi
CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce/*
CLASSPATH=${CLASSPATH}:$YARN_HOME/share/hadoop/mapreduce/lib/*
# so that filenames w/ spaces are handled correctly in loops below
IFS=

View File

@ -61,4 +61,17 @@ public interface AllocateResponse {
@Private
@Unstable
public abstract void setAMResponse(AMResponse amResponse);
/**
* Get the number of hosts available on the cluster.
* @return the available host count.
*/
@Public
@Stable
public int getNumClusterNodes();
@Private
@Unstable
public void setNumClusterNodes(int numNodes);
}

View File

@ -29,7 +29,8 @@
public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto> implements AllocateResponse {
public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
implements AllocateResponse {
AllocateResponseProto proto = AllocateResponseProto.getDefaultInstance();
AllocateResponseProto.Builder builder = null;
boolean viaProto = false;
@ -95,7 +96,20 @@ public void setAMResponse(AMResponse aMResponse) {
builder.clearAMResponse();
this.amResponse = aMResponse;
}
@Override
public int getNumClusterNodes() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
return p.getNumClusterNodes();
}
@Override
public void setNumClusterNodes(int numNodes) {
maybeInitBuilder();
builder.setNumClusterNodes(numNodes);
}
private AMResponsePBImpl convertFromProtoFormat(AMResponseProto p) {
return new AMResponsePBImpl(p);
}
@ -103,7 +117,4 @@ private AMResponsePBImpl convertFromProtoFormat(AMResponseProto p) {
private AMResponseProto convertToProtoFormat(AMResponse t) {
return ((AMResponsePBImpl)t).getProto();
}
}

View File

@ -59,6 +59,7 @@ message AllocateRequestProto {
message AllocateResponseProto {
optional AMResponseProto AM_response = 1;
optional int32 num_cluster_nodes = 2;
}

View File

@ -184,7 +184,7 @@ public class ApplicationMaster {
private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
// Launch threads
private List<Thread> launchThreads = new ArrayList<Thread>();
private List<Thread> launchThreads = new ArrayList<Thread>();
/**
* @param args Command line args
@ -194,7 +194,7 @@ public static void main(String[] args) {
try {
ApplicationMaster appMaster = new ApplicationMaster();
LOG.info("Initializing ApplicationMaster");
boolean doRun = appMaster.init(args);
boolean doRun = appMaster.init(args);
if (!doRun) {
System.exit(0);
}
@ -202,14 +202,14 @@ public static void main(String[] args) {
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
System.exit(1);
}
}
if (result) {
LOG.info("Application Master completed successfully. exiting");
System.exit(0);
}
else {
LOG.info("Application Master failed. exiting");
System.exit(2);
System.exit(2);
}
}
@ -218,7 +218,7 @@ public static void main(String[] args) {
*/
private void dumpOutDebugInfo() {
LOG.info("Dump debug output");
LOG.info("Dump debug output");
Map<String, String> envs = System.getenv();
for (Map.Entry<String, String> env : envs.entrySet()) {
LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
@ -277,7 +277,7 @@ public boolean init(String[] args) throws ParseException, IOException {
if (args.length == 0) {
printUsage(opts);
throw new IllegalArgumentException("No args specified for application master to initialize");
}
}
if (cliParser.hasOption("help")) {
printUsage(opts);
@ -297,8 +297,8 @@ public boolean init(String[] args) throws ParseException, IOException {
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
}
else {
throw new IllegalArgumentException("Application Attempt Id not set in the environment");
}
throw new IllegalArgumentException("Application Attempt Id not set in the environment");
}
} else {
ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
appAttemptID = containerId.getApplicationAttemptId();
@ -338,11 +338,11 @@ public boolean init(String[] args) throws ParseException, IOException {
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
}
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
}
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
}
if (!shellScriptPath.isEmpty()
@ -351,7 +351,7 @@ public boolean init(String[] args) throws ParseException, IOException {
LOG.error("Illegal values in env for shell script path"
+ ", path=" + shellScriptPath
+ ", len=" + shellScriptPathLen
+ ", timestamp=" + shellScriptPathTimestamp);
+ ", timestamp=" + shellScriptPathTimestamp);
throw new IllegalArgumentException("Illegal values in env for shell script path");
}
}
@ -368,7 +368,7 @@ public boolean init(String[] args) throws ParseException, IOException {
* @param opts Parsed command line options
*/
private void printUsage(Options opts) {
new HelpFormatter().printHelp("ApplicationMaster", opts);
new HelpFormatter().printHelp("ApplicationMaster", opts);
}
/**
@ -378,7 +378,7 @@ private void printUsage(Options opts) {
public boolean run() throws YarnRemoteException {
LOG.info("Starting ApplicationMaster");
// Connect to ResourceManager
// Connect to ResourceManager
resourceManager = connectToRM();
// Setup local RPC Server to accept status requests directly from clients
@ -395,7 +395,7 @@ public boolean run() throws YarnRemoteException {
// A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
// a multiple of the min value and cannot exceed the max.
// If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
// If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
if (containerMemory < minMem) {
LOG.info("Container memory specified below min threshold of cluster. Using min value."
+ ", specified=" + containerMemory
@ -409,14 +409,14 @@ else if (containerMemory > maxMem) {
containerMemory = maxMem;
}
// Setup heartbeat emitter
// Setup heartbeat emitter
// TODO poll RM every now and then with an empty request to let RM know that we are alive
// The heartbeat interval after which an AM is timed out by the RM is defined by a config setting:
// RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
// The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter
// is not required.
// Setup ask for containers from RM
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for containers
// Keep looping until all the containers are launched and shell script executed on them
@ -426,7 +426,7 @@ else if (containerMemory > maxMem) {
while (numCompletedContainers.get() < numTotalContainers
&& !appDone) {
loopCounter++;
loopCounter++;
// log current state
LOG.info("Current application state: loop=" + loopCounter
@ -435,7 +435,7 @@ else if (containerMemory > maxMem) {
+ ", requested=" + numRequestedContainers
+ ", completed=" + numCompletedContainers
+ ", failed=" + numFailedContainers
+ ", currentAllocated=" + numAllocatedContainers);
+ ", currentAllocated=" + numAllocatedContainers);
// Sleep before each loop when asking RM for containers
// to avoid flooding RM with spurious requests when it
@ -444,7 +444,7 @@ else if (containerMemory > maxMem) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.info("Sleep interrupted " + e.getMessage());
LOG.info("Sleep interrupted " + e.getMessage());
}
// No. of containers to request
@ -457,14 +457,14 @@ else if (containerMemory > maxMem) {
// Setup request to be sent to RM to allocate containers
List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
if (askCount > 0) {
ResourceRequest containerAsk = setupContainerAskForRM(askCount);
ResourceRequest containerAsk = setupContainerAskForRM(askCount);
resourceReq.add(containerAsk);
}
// Send the request to RM
LOG.info("Asking RM for containers"
+ ", askCount=" + askCount);
AMResponse amResp = sendContainerAskToRM(resourceReq);
AMResponse amResp =sendContainerAskToRM(resourceReq);
// Retrieve list of allocated containers from the response
List<Container> allocatedContainers = amResp.getAllocatedContainers();
@ -478,10 +478,10 @@ else if (containerMemory > maxMem) {
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ ", containerState" + allocatedContainer.getState()
+ ", containerResourceMemory" + allocatedContainer.getResource().getMemory());
// + ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
//+ ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer);
Thread launchThread = new Thread(runnableLaunchContainer);
Thread launchThread = new Thread(runnableLaunchContainer);
// launch and start the container on a separate thread to keep the main thread unblocked
// as all containers may not be allocated at one go.
@ -492,14 +492,14 @@ else if (containerMemory > maxMem) {
// Check what the current available resources in the cluster are
// TODO should we do anything if the available resources are not enough?
Resource availableResources = amResp.getAvailableResources();
LOG.info("Current available resources in the cluster " + availableResources);
LOG.info("Current available resources in the cluster " + availableResources);
// Check the completed containers
// Check the completed containers
List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
for (ContainerStatus containerStatus : completedContainers) {
for (ContainerStatus containerStatus : completedContainers) {
LOG.info("Got container status for containerID= " + containerStatus.getContainerId()
+ ", state=" + containerStatus.getState()
+ ", state=" + containerStatus.getState()
+ ", exitStatus=" + containerStatus.getExitStatus()
+ ", diagnostics=" + containerStatus.getDiagnostics());
@ -514,7 +514,7 @@ else if (containerMemory > maxMem) {
// shell script failed
// counts as completed
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
}
else {
// something else bad happened
@ -541,15 +541,15 @@ else if (containerMemory > maxMem) {
LOG.info("Current application state: loop=" + loopCounter
+ ", appDone=" + appDone
+ ", total=" + numTotalContainers
+ ", total=" + numTotalContainers
+ ", requested=" + numRequestedContainers
+ ", completed=" + numCompletedContainers
+ ", failed=" + numFailedContainers
+ ", currentAllocated=" + numAllocatedContainers);
+ ", currentAllocated=" + numAllocatedContainers);
// TODO
// Add a timeout handling layer
// for misbehaving shell commands
// for misbehaving shell commands
}
// Join all launched threads
@ -561,7 +561,7 @@ else if (containerMemory > maxMem) {
} catch (InterruptedException e) {
LOG.info("Exception thrown in thread join: " + e.getMessage());
e.printStackTrace();
}
}
}
// When the application completes, it should send a finish application signal
@ -610,10 +610,11 @@ public LaunchContainerRunnable(Container lcontainer) {
* Helper function to connect to CM
*/
private void connectToCM() {
String cmIpPortStr = container.getNodeId().getHost() + ":"
+ container.getNodeId().getPort();
InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
LOG.info("Connecting to ResourceManager at " + cmIpPortStr);
LOG.debug("Connecting to ContainerManager for containerid=" + container.getId());
String cmIpPortStr = container.getNodeId().getHost() + ":"
+ container.getNodeId().getPort();
InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
}
@ -626,7 +627,6 @@ private void connectToCM() {
*/
public void run() {
// Connect to ContainerManager
LOG.info("Connecting to container manager for containerid=" + container.getId());
connectToCM();
LOG.info("Setting up container launch container for containerid=" + container.getId());
@ -654,7 +654,7 @@ public void run() {
if (!shellScriptPath.isEmpty()) {
LocalResource shellRsrc = Records.newRecord(LocalResource.class);
shellRsrc.setType(LocalResourceType.FILE);
shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
try {
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
} catch (URISyntaxException e) {
@ -664,17 +664,17 @@ public void run() {
// A failure scenario on bad input such as invalid shell script path
// We know we cannot continue launching the container
// so we should release it.
// so we should release it.
// TODO
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
return;
return;
}
shellRsrc.setTimestamp(shellScriptPathTimestamp);
shellRsrc.setSize(shellScriptPathLen);
localResources.put(ExecShellStringPath, shellRsrc);
}
ctx.setLocalResources(localResources);
}
ctx.setLocalResources(localResources);
// Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
@ -686,7 +686,7 @@ public void run() {
vargs.add(ExecShellStringPath);
}
// Set args for the shell command if any
// Set args for the shell command if any
vargs.add(shellArgs);
// Add log redirect params
// TODO
@ -722,19 +722,19 @@ public void run() {
// Left commented out as the shell scripts are short lived
// and we are relying on the status for completed containers from RM to detect status
// GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
// statusReq.setContainerId(container.getId());
// GetContainerStatusResponse statusResp;
// try {
// statusResp = cm.getContainerStatus(statusReq);
// LOG.info("Container Status"
// + ", id=" + container.getId()
// + ", status=" +statusResp.getStatus());
// } catch (YarnRemoteException e) {
// e.printStackTrace();
// }
}
}
// GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
// statusReq.setContainerId(container.getId());
// GetContainerStatusResponse statusResp;
//try {
//statusResp = cm.getContainerStatus(statusReq);
// LOG.info("Container Status"
// + ", id=" + container.getId()
// + ", status=" +statusResp.getStatus());
//} catch (YarnRemoteException e) {
//e.printStackTrace();
//}
}
}
/**
* Connect to the Resource Manager
@ -744,25 +744,25 @@ private AMRMProtocol connectToRM() {
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
LOG.info("Connecting to ResourceManager at " + rmAddress);
return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
}
}
/**
* Register the Application Master to the Resource Manager
* @return the registration response from the RM
* @throws YarnRemoteException
*/
private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
// set the required info into the registration request:
// application attempt id,
// host on which the app master is running
// rpc port on which the app master accepts requests from the client
// tracking url for the app master
appMasterRequest.setApplicationAttemptId(appAttemptID);
appMasterRequest.setApplicationAttemptId(appAttemptID);
appMasterRequest.setHost(appMasterHostname);
appMasterRequest.setRpcPort(appMasterRpcPort);
appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
@ -792,7 +792,7 @@ private ResourceRequest setupContainerAskForRM(int numContainers) {
Priority pri = Records.newRecord(Priority.class);
// TODO - what is the range for priority? how to decide?
pri.setPriority(requestPriority);
request.setPriority(pri);
request.setPriority(pri);
// Set up resource type requirements
// For now, only memory is supported so we set memory requirements
@ -810,7 +810,7 @@ private ResourceRequest setupContainerAskForRM(int numContainers) {
* @throws YarnRemoteException
*/
private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers)
throws YarnRemoteException {
throws YarnRemoteException {
AllocateRequest req = Records.newRecord(AllocateRequest.class);
req.setResponseId(rmRequestID.incrementAndGet());
req.setApplicationAttemptId(appAttemptID);
@ -830,7 +830,7 @@ private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainer
LOG.info("Released container, id=" + id.getId());
}
AllocateResponse resp = resourceManager.allocate(req);
return resp.getAMResponse();
AllocateResponse resp = resourceManager.allocate(req);
return resp.getAMResponse();
}
}

View File

@ -75,7 +75,7 @@ public void run() {
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
LOG.info("AsyncDispatcher thread interrupted", ie);
LOG.warn("AsyncDispatcher thread interrupted", ie);
return;
}
if (event != null) {
@ -114,8 +114,10 @@ public void stop() {
@SuppressWarnings("unchecked")
protected void dispatch(Event event) {
//all events go thru this loop
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
}
Class<? extends Enum> type = event.getType().getDeclaringClass();
@ -131,12 +133,11 @@ protected void dispatch(Event event) {
}
}
@SuppressWarnings("unchecked")
@Override
@SuppressWarnings("rawtypes")
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
/* check to see if we have a listener registered */
@SuppressWarnings("unchecked")
EventHandler<Event> registeredHandler = (EventHandler<Event>)
eventDispatchers.get(eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
@ -170,7 +171,7 @@ public void handle(Event event) {
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.info("Very low remaining capacity in the event-queue: "
LOG.warn("Very low remaining capacity in the event-queue: "
+ remCapacity);
}
try {
@ -186,7 +187,6 @@ public void handle(Event event) {
* are interested in the event.
* @param <T> the type of event these multiple handlers are interested in.
*/
@SuppressWarnings("rawtypes")
static class MultiListenerHandler implements EventHandler<Event> {
List<EventHandler<Event>> listofHandlers;

View File

@ -81,6 +81,10 @@ public synchronized void start() {
}
public synchronized void stop() {
if (this.getServiceState() == STATE.STOPPED) {
// The base composite-service is already stopped, don't do anything again.
return;
}
if (serviceList.size() > 0) {
stop(serviceList.size() - 1);
}

View File

@ -88,6 +88,14 @@ public void testCallSequence() {
((NUM_OF_SERVICES - 1) - i), services[i].getCallSequenceNumber());
}
// Try to stop again. This should be a no-op.
serviceManager.stop();
// Verify that stop() call sequence numbers for every service don't change.
for (int i = 0; i < NUM_OF_SERVICES; i++) {
assertEquals("For " + services[i]
+ " service, stop() call sequence number should have been ",
((NUM_OF_SERVICES - 1) - i), services[i].getCallSequenceNumber());
}
}
@Test
@ -153,7 +161,7 @@ public void testServiceStop() {
serviceManager.start();
// Start the composite service
// Stop the composite service
try {
serviceManager.stop();
} catch (YarnException e) {

View File

@ -285,6 +285,7 @@ public AllocateResponse allocate(AllocateRequest request)
response.setAvailableResources(allocation.getResourceLimit());
responseMap.put(appAttemptId, response);
allocateResponse.setAMResponse(response);
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
return allocateResponse;
}
}

View File

@ -79,6 +79,14 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
@Stable
public Resource getMaximumResourceCapability();
/**
* Get the number of nodes available in the cluster.
* @return the number of available nodes.
*/
@Public
@Stable
public int getNumClusterNodes();
/**
* The main api between the ApplicationMaster and the Scheduler.
* The ApplicationMaster is updating his future resource requirements

View File

@ -159,6 +159,7 @@ public Resource getMaximumResourceCapability() {
return maximumAllocation;
}
@Override
public synchronized int getNumClusterNodes() {
return numNodeManagers;
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@ -36,7 +35,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Lock;
@ -179,6 +177,11 @@ public Resource getMinimumResourceCapability() {
return minimumAllocation;
}
@Override
public int getNumClusterNodes() {
return nodes.size();
}
@Override
public Resource getMaximumResourceCapability() {
return maximumAllocation;

View File

@ -2671,7 +2671,9 @@ public synchronized boolean completedTask(TaskInProgress tip,
// Update jobhistory
TaskTrackerStatus ttStatus =
this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
Node node = jobtracker.getNode(ttStatus.getHost());
String trackerHostname = node.getName();
String trackerRackName = node.getParent().getName();
TaskType taskType = getTaskType(tip);
TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
@ -2685,7 +2687,7 @@ public synchronized boolean completedTask(TaskInProgress tip,
MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(
statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
status.getMapFinishTime(),
status.getFinishTime(), trackerHostname, -1, "",
status.getFinishTime(), trackerHostname, -1, trackerRackName,
status.getStateString(),
new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
tip.getSplits(statusAttemptID).burst()
@ -2698,7 +2700,7 @@ public synchronized boolean completedTask(TaskInProgress tip,
statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
status.getShuffleFinishTime(),
status.getSortFinishTime(), status.getFinishTime(),
trackerHostname, -1, "", status.getStateString(),
trackerHostname, -1, trackerRackName, status.getStateString(),
new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
tip.getSplits(statusAttemptID).burst()
);
@ -3208,7 +3210,7 @@ private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
(taskid,
taskType, taskStatus.getRunState().toString(),
finishTime,
taskTrackerHostName, -1, diagInfo,
taskTrackerHostName, -1, null, diagInfo,
splits.burst());
jobHistory.logEvent(tue, taskid.getJobID());

View File

@ -83,7 +83,7 @@ private static void testFailedKilledEventsForTypes(EventType expected,
for (TaskType t : types) {
TaskAttemptUnsuccessfulCompletionEvent tauce =
new TaskAttemptUnsuccessfulCompletionEvent
(id, t, state, 0L, "", -1, "", NULL_SPLITS_ARRAY);
(id, t, state, 0L, "", -1, "", "", NULL_SPLITS_ARRAY);
assertEquals(expected, tauce.getEventType());
}
}
@ -132,7 +132,8 @@ private static void testFinishedEventsForTypes(EventType expected,
for (TaskType t : types) {
TaskAttemptFinishedEvent tafe =
new TaskAttemptFinishedEvent(id, t,
TaskStatus.State.SUCCEEDED.toString(), 0L, "", "", new Counters());
TaskStatus.State.SUCCEEDED.toString(), 0L, "", "", "",
new Counters());
assertEquals(expected, tafe.getEventType());
}
}

View File

@ -26,6 +26,8 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
@ -42,6 +44,7 @@
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
@ -49,6 +52,9 @@
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.tools.rumen.TraceBuilder.MyOptions;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@ -57,6 +63,8 @@
import static org.junit.Assert.*;
public class TestRumenJobTraces {
private static final Log LOG = LogFactory.getLog(TestRumenJobTraces.class);
@Test
public void testSmallTrace() throws Exception {
performSingleTest("sample-job-tracker-logs.gz",
@ -232,11 +240,21 @@ public void testHadoop20JHParser() throws Exception {
parser = new Hadoop20JHParser(ris);
ArrayList<String> seenEvents = new ArrayList<String>(150);
getHistoryEvents(parser, seenEvents, null); // get events into seenEvents
// this is same as the one in input history file
String jobId = "job_200904211745_0002";
JobBuilder builder = new JobBuilder(jobId);
// get events into seenEvents
getHistoryEvents(parser, seenEvents, builder);
// Validate the events seen by history parser from
// history file v20-single-input-log.gz
validateSeenHistoryEvents(seenEvents, goldLines);
ParsedJob parsedJob = builder.build();
// validate the obtainXXX api of ParsedJob, ParsedTask and
// ParsedTaskAttempt
validateParsedJob(parsedJob, 20, 1, true);
} finally {
if (parser != null) {
parser.close();
@ -246,8 +264,10 @@ public void testHadoop20JHParser() throws Exception {
}
/**
* Validate the parsing of given history file name. Also validate the history
* file name suffixed with old/stale file suffix.
* Validate the parsing of given history file name.
*
* TODO: Also validate the history file name suffixed with old/stale file
* suffix.
* @param jhFileName job history file path
* @param jid JobID
*/
@ -257,13 +277,7 @@ private void validateHistoryFileNameParsing(Path jhFileName,
JobID.forName(JobHistoryUtils.extractJobID(jhFileName.getName()));
assertEquals("TraceBuilder failed to parse the current JH filename"
+ jhFileName, jid, extractedJID);
// test jobhistory filename with old/stale file suffix
jhFileName = jhFileName.suffix(JobHistory.getOldFileSuffix("123"));
extractedJID =
JobID.forName(JobHistoryUtils.extractJobID(jhFileName.getName()));
assertEquals("TraceBuilder failed to parse the current JH filename"
+ "(old-suffix):" + jhFileName,
jid, extractedJID);
//TODO test jobhistory filename with old/stale file suffix
}
/**
@ -318,8 +332,9 @@ public void testJobHistoryFilenameParsing() throws IOException {
.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
// Check if current jobhistory filenames are detected properly
Path jhFilename = org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils
.getStagingJobHistoryFile(rootInputDir, jid.toString(), 1);
JobId jobId = TypeConverter.toYarn(jid);
JobIndexInfo info = new JobIndexInfo(0L, 0L, "", "", jobId, 0, 0, "");
Path jhFilename = new Path(FileNameIndexUtils.getDoneFileName(info));
validateHistoryFileNameParsing(jhFilename, jid);
// Check if Pre21 V1 jophistory file names are detected properly
@ -583,9 +598,11 @@ public void testCurrentJHParser() throws Exception {
// validate resource usage metrics
// get the job counters
Counters counters = job.getTaskReports(TaskType.MAP)[0].getTaskCounters();
// get the parsed job
ParsedJob parsedJob = builder.build();
// get the logged job
LoggedJob loggedJob = builder.build();
LoggedJob loggedJob = parsedJob;
// get the logged attempts
LoggedTaskAttempt attempt =
loggedJob.getMapTasks().get(0).getAttempts().get(0);
@ -599,6 +616,10 @@ public void testCurrentJHParser() throws Exception {
counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).getValue(),
counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES).getValue(),
true);
// validate the obtainXXX api of ParsedJob, ParsedTask and
// ParsedTaskAttempt
validateParsedJob(parsedJob, 1, 1, false);
} finally {
// stop the MR cluster
mrCluster.shutdown();
@ -615,6 +636,142 @@ public void testCurrentJHParser() throws Exception {
}
}
/**
* Verify if the obtainXXX methods of {@link ParsedJob}, {@link ParsedTask}
* and {@link ParsedTaskAttempt} give valid info
*/
private void validateParsedJob(ParsedJob parsedJob, int numMaps,
int numReduces, boolean pre21JobHistory) {
validateParsedJobAPI(parsedJob, numMaps, numReduces, pre21JobHistory);
List<ParsedTask> maps = parsedJob.obtainMapTasks();
for (ParsedTask task : maps) {
validateParsedTask(task);
}
List<ParsedTask> reduces = parsedJob.obtainReduceTasks();
for (ParsedTask task : reduces) {
validateParsedTask(task);
}
List<ParsedTask> others = parsedJob.obtainOtherTasks();
for (ParsedTask task : others) {
validateParsedTask(task);
}
}
/** Verify if the obtainXXX methods of {@link ParsedJob} give valid info */
private void validateParsedJobAPI(ParsedJob parsedJob, int numMaps,
int numReduces, boolean pre21JobHistory) {
LOG.info("Validating ParsedJob.obtainXXX api... for "
+ parsedJob.getJobID());
assertNotNull("Job acls in ParsedJob is null",
parsedJob.obtainJobAcls());
assertNotNull("Job conf path in ParsedJob is null",
parsedJob.obtainJobConfpath());
assertNotNull("Map Counters in ParsedJob is null",
parsedJob.obtainMapCounters());
assertNotNull("Reduce Counters in ParsedJob is null",
parsedJob.obtainReduceCounters());
assertNotNull("Total Counters in ParsedJob is null",
parsedJob.obtainTotalCounters());
assertNotNull("Map Tasks List in ParsedJob is null",
parsedJob.obtainMapTasks());
assertNotNull("Reduce Tasks List in ParsedJob is null",
parsedJob.obtainReduceTasks());
assertNotNull("Other Tasks List in ParsedJob is null",
parsedJob.obtainOtherTasks());
// 1 map and 1 reduce task should be there
assertEquals("Number of map tasks in ParsedJob is wrong",
numMaps, parsedJob.obtainMapTasks().size());
assertEquals("Number of reduce tasks in ParsedJob is wrong",
numReduces, parsedJob.obtainReduceTasks().size(), 1);
// old hadoop20 version history files don't have job-level-map-counters and
// job-level-reduce-counters. Only total counters exist there.
assertTrue("Total Counters in ParsedJob is empty",
parsedJob.obtainTotalCounters().size() > 0);
if (!pre21JobHistory) {
assertTrue("Map Counters in ParsedJob is empty",
parsedJob.obtainMapCounters().size() > 0);
assertTrue("Reduce Counters in ParsedJob is empty",
parsedJob.obtainReduceCounters().size() > 0);
}
}
/**
* Verify if the obtainXXX methods of {@link ParsedTask} and
* {@link ParsedTaskAttempt} give valid info
*/
private void validateParsedTask(ParsedTask parsedTask) {
validateParsedTaskAPI(parsedTask);
List<ParsedTaskAttempt> attempts = parsedTask.obtainTaskAttempts();
for (ParsedTaskAttempt attempt : attempts) {
validateParsedTaskAttemptAPI(attempt);
}
}
/** Verify if the obtainXXX methods of {@link ParsedTask} give valid info */
private void validateParsedTaskAPI(ParsedTask parsedTask) {
LOG.info("Validating ParsedTask.obtainXXX api... for "
+ parsedTask.getTaskID());
assertNotNull("Task counters in ParsedTask is null",
parsedTask.obtainCounters());
if (parsedTask.getTaskStatus()
== Pre21JobHistoryConstants.Values.SUCCESS) {
// task counters should not be empty
assertTrue("Task counters in ParsedTask is empty",
parsedTask.obtainCounters().size() > 0);
assertNull("Diagnostic-info is non-null for a succeeded task",
parsedTask.obtainDiagnosticInfo());
assertNull("Failed-due-to-attemptId is non-null for a succeeded task",
parsedTask.obtainFailedDueToAttemptId());
} else {
assertNotNull("Diagnostic-info is non-null for a succeeded task",
parsedTask.obtainDiagnosticInfo());
assertNotNull("Failed-due-to-attemptId is non-null for a succeeded task",
parsedTask.obtainFailedDueToAttemptId());
}
List<ParsedTaskAttempt> attempts = parsedTask.obtainTaskAttempts();
assertNotNull("TaskAttempts list in ParsedTask is null", attempts);
assertTrue("TaskAttempts list in ParsedTask is empty",
attempts.size() > 0);
}
/**
* Verify if the obtainXXX methods of {@link ParsedTaskAttempt} give
* valid info
*/
private void validateParsedTaskAttemptAPI(
ParsedTaskAttempt parsedTaskAttempt) {
LOG.info("Validating ParsedTaskAttempt.obtainXXX api... for "
+ parsedTaskAttempt.getAttemptID());
assertNotNull("Counters in ParsedTaskAttempt is null",
parsedTaskAttempt.obtainCounters());
if (parsedTaskAttempt.getResult()
== Pre21JobHistoryConstants.Values.SUCCESS) {
assertTrue("Counters in ParsedTaskAttempt is empty",
parsedTaskAttempt.obtainCounters().size() > 0);
assertNull("Diagnostic-info is non-null for a succeeded taskAttempt",
parsedTaskAttempt.obtainDiagnosticInfo());
} else {
assertNotNull("Diagnostic-info is non-null for a succeeded taskAttempt",
parsedTaskAttempt.obtainDiagnosticInfo());
}
assertNotNull("TrackerName in ParsedTaskAttempt is null",
parsedTaskAttempt.obtainTrackerName());
assertNotNull("http-port info in ParsedTaskAttempt is null",
parsedTaskAttempt.obtainHttpPort());
assertNotNull("Shuffle-port info in ParsedTaskAttempt is null",
parsedTaskAttempt.obtainShufflePort());
}
@Test
public void testJobConfigurationParser() throws Exception {
@ -932,18 +1089,18 @@ public void testTopologyBuilder() throws Exception {
subject.process(new TaskAttemptFinishedEvent(TaskAttemptID
.forName("attempt_200904211745_0003_m_000004_0"), TaskType
.valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.64/cluster50261\\.secondleveldomain\\.com",
"/194\\.6\\.134\\.64", "cluster50261\\.secondleveldomain\\.com",
"SUCCESS", null));
subject.process(new TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID.forName("attempt_200904211745_0003_m_000004_1"),
TaskType.valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com",
-1, "MACHINE_EXPLODED", splits));
"cluster50262\\.secondleveldomain\\.com",
-1, "/194\\.6\\.134\\.80", "MACHINE_EXPLODED", splits));
subject.process(new TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID.forName("attempt_200904211745_0003_m_000004_2"),
TaskType.valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com",
-1, "MACHINE_EXPLODED", splits));
"cluster50263\\.secondleveldomain\\.com",
-1, "/194\\.6\\.134\\.80", "MACHINE_EXPLODED", splits));
subject.process(new TaskStartedEvent(TaskID
.forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType
.valueOf("MAP"),

View File

@ -5,6 +5,9 @@
"children" : [ {
"name" : "cluster50213\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50235\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50226\\.secondleveldomain\\.com",
"children" : null
@ -20,6 +23,9 @@
}, {
"name" : "cluster50231\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50223\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50232\\.secondleveldomain\\.com",
"children" : null
@ -98,12 +104,18 @@
}, {
"name" : "cluster1236\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1232\\.secondleveldomain\\.com",
"children" : null
} ]
}, {
"name" : "194\\.6\\.134\\.64",
"children" : [ {
"name" : "cluster50317\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50283\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50292\\.secondleveldomain\\.com",
"children" : null
@ -146,6 +158,9 @@
}, {
"name" : "cluster50316\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50303\\.secondleveldomain\\.com",
"children" : null
} ]
}, {
"name" : "194\\.6\\.129\\.128",
@ -431,6 +446,9 @@
}, {
"name" : "cluster50120\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50132\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50130\\.secondleveldomain\\.com",
"children" : null
@ -566,9 +584,15 @@
}, {
"name" : "cluster50166\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50173\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50170\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50189\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50179\\.secondleveldomain\\.com",
"children" : null
@ -578,6 +602,21 @@
"children" : [ {
"name" : "cluster1283\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1295\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1302\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1294\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1310\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1305\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1299\\.secondleveldomain\\.com",
"children" : null
@ -587,20 +626,14 @@
}, {
"name" : "cluster1288\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1302\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1294\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1289\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1315\\.secondleveldomain\\.com",
"name" : "cluster1314\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1305\\.secondleveldomain\\.com",
"name" : "cluster1315\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1316\\.secondleveldomain\\.com",
@ -662,6 +695,9 @@
}, {
"name" : "cluster3054\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster3064\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster3077\\.secondleveldomain\\.com",
"children" : null
@ -695,6 +731,9 @@
"children" : [ {
"name" : "cluster50468\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50445\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50476\\.secondleveldomain\\.com",
"children" : null
@ -785,6 +824,9 @@
}, {
"name" : "cluster50493\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50511\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50510\\.secondleveldomain\\.com",
"children" : null
@ -1100,6 +1142,9 @@
}, {
"name" : "cluster1907\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1917\\.secondleveldomain\\.com",
"children" : null
} ]
}, {
"name" : "192\\.30\\.63\\.192",
@ -1223,6 +1268,9 @@
}, {
"name" : "cluster1446\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1440\\.secondleveldomain\\.com",
"children" : null
} ]
}, {
"name" : "194\\.6\\.132\\.128",
@ -1238,6 +1286,9 @@
}, {
"name" : "cluster50025\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50024\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50021\\.secondleveldomain\\.com",
"children" : null
@ -1292,6 +1343,9 @@
}, {
"name" : "cluster50348\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50346\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50325\\.secondleveldomain\\.com",
"children" : null
@ -1379,6 +1433,9 @@
}, {
"name" : "cluster1662\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1647\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1649\\.secondleveldomain\\.com",
"children" : null
@ -1430,6 +1487,9 @@
}, {
"name" : "cluster1503\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1514\\.secondleveldomain\\.com",
"children" : null
} ]
}, {
"name" : "194\\.6\\.129\\.0",
@ -1439,6 +1499,9 @@
}, {
"name" : "cluster50539\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50533\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50530\\.secondleveldomain\\.com",
"children" : null
@ -1475,6 +1538,9 @@
}, {
"name" : "cluster50418\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50406\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster50411\\.secondleveldomain\\.com",
"children" : null
@ -1527,6 +1593,9 @@
}, {
"name" : "194\\.6\\.128\\.64",
"children" : [ {
"name" : "cluster1613\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1639\\.secondleveldomain\\.com",
"children" : null
}, {
@ -1574,6 +1643,9 @@
}, {
"name" : "cluster1602\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1627\\.secondleveldomain\\.com",
"children" : null
} ]
}, {
"name" : "194\\.6\\.132\\.192",
@ -1661,6 +1733,9 @@
}, {
"name" : "cluster1736\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1735\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1722\\.secondleveldomain\\.com",
"children" : null

View File

@ -1308,6 +1308,8 @@ private void processMapAttemptLine(ParsedLine line) {
if (host != null) {
attempt.setHostName(host.getNodeName(), host.getRackName());
attempt.setLocation(host.makeLoggedLocation());
} else {
attempt.setHostName(hostName, null);
}
List<LoggedLocation> locs = task.getPreferredLocations();
@ -1470,9 +1472,13 @@ private void processReduceAttemptLine(ParsedLine line) {
}
}
ParsedHost host = getAndRecordParsedHost(hostName);
if (host != null) {
attempt.setHostName(host.getNodeName(), host.getRackName());
if (hostName != null) {
ParsedHost host = getAndRecordParsedHost(hostName);
if (host != null) {
attempt.setHostName(host.getNodeName(), host.getRackName());
} else {
attempt.setHostName(hostName, null);
}
}
if (attemptID != null) {

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobFinished;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
@ -45,14 +46,15 @@
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinished;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFailed;
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinished;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
import org.apache.hadoop.tools.rumen.datatypes.JobProperties;
import org.apache.hadoop.util.StringUtils;
/**
@ -67,16 +69,16 @@ public class JobBuilder {
private boolean finalized = false;
private LoggedJob result = new LoggedJob();
private ParsedJob result = new ParsedJob();
private Map<String, LoggedTask> mapTasks = new HashMap<String, LoggedTask>();
private Map<String, LoggedTask> reduceTasks =
new HashMap<String, LoggedTask>();
private Map<String, LoggedTask> otherTasks =
new HashMap<String, LoggedTask>();
private Map<String, ParsedTask> mapTasks = new HashMap<String, ParsedTask>();
private Map<String, ParsedTask> reduceTasks =
new HashMap<String, ParsedTask>();
private Map<String, ParsedTask> otherTasks =
new HashMap<String, ParsedTask>();
private Map<String, LoggedTaskAttempt> attempts =
new HashMap<String, LoggedTaskAttempt>();
private Map<String, ParsedTaskAttempt> attempts =
new HashMap<String, ParsedTaskAttempt>();
private Map<ParsedHost, ParsedHost> allHosts =
new HashMap<ParsedHost, ParsedHost>();
@ -123,7 +125,7 @@ public String getJobID() {
public void process(HistoryEvent event) {
if (finalized) {
throw new IllegalStateException(
"JobBuilder.process(HistoryEvent event) called after LoggedJob built");
"JobBuilder.process(HistoryEvent event) called after ParsedJob built");
}
// these are in lexicographical order by class name.
@ -229,12 +231,16 @@ private void maybeSetJobReduceMB(Integer megabytes) {
public void process(Properties conf) {
if (finalized) {
throw new IllegalStateException(
"JobBuilder.process(Properties conf) called after LoggedJob built");
"JobBuilder.process(Properties conf) called after ParsedJob built");
}
//TODO remove this once the deprecate APIs in LoggedJob are removed
result.setQueue(extract(conf, JobConfPropertyNames.QUEUE_NAMES
.getCandidates(), "default"));
String queue = extract(conf, JobConfPropertyNames.QUEUE_NAMES
.getCandidates(), null);
// set the queue name if existing
if (queue != null) {
result.setQueue(queue);
}
result.setJobName(extract(conf, JobConfPropertyNames.JOB_NAMES
.getCandidates(), null));
@ -252,9 +258,9 @@ public void process(Properties conf) {
* Request the builder to build the final object. Once called, the
* {@link JobBuilder} would accept no more events or job-conf properties.
*
* @return Parsed {@link LoggedJob} object.
* @return Parsed {@link ParsedJob} object.
*/
public LoggedJob build() {
public ParsedJob build() {
// The main job here is to build CDFs and manage the conf
finalized = true;
@ -416,7 +422,7 @@ private static Values getPre21Value(String name) {
}
private void processTaskUpdatedEvent(TaskUpdatedEvent event) {
LoggedTask task = getTask(event.getTaskId().toString());
ParsedTask task = getTask(event.getTaskId().toString());
if (task == null) {
return;
}
@ -424,7 +430,7 @@ private void processTaskUpdatedEvent(TaskUpdatedEvent event) {
}
private void processTaskStartedEvent(TaskStartedEvent event) {
LoggedTask task =
ParsedTask task =
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), true);
task.setStartTime(event.getStartTime());
task.setPreferredLocations(preferredLocationForSplits(event
@ -432,7 +438,7 @@ private void processTaskStartedEvent(TaskStartedEvent event) {
}
private void processTaskFinishedEvent(TaskFinishedEvent event) {
LoggedTask task =
ParsedTask task =
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
if (task == null) {
return;
@ -443,18 +449,22 @@ private void processTaskFinishedEvent(TaskFinishedEvent event) {
}
private void processTaskFailedEvent(TaskFailedEvent event) {
LoggedTask task =
ParsedTask task =
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
if (task == null) {
return;
}
task.setFinishTime(event.getFinishTime());
task.setTaskStatus(getPre21Value(event.getTaskStatus()));
TaskFailed t = (TaskFailed)(event.getDatum());
task.putDiagnosticInfo(t.error.toString());
task.putFailedDueToAttemptId(t.failedDueToAttempt.toString());
// No counters in TaskFailedEvent
}
private void processTaskAttemptUnsuccessfulCompletionEvent(
TaskAttemptUnsuccessfulCompletionEvent event) {
LoggedTaskAttempt attempt =
ParsedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getTaskAttemptId().toString());
@ -463,10 +473,11 @@ private void processTaskAttemptUnsuccessfulCompletionEvent(
}
attempt.setResult(getPre21Value(event.getTaskStatus()));
ParsedHost parsedHost = getAndRecordParsedHost(event.getHostname());
if (parsedHost != null) {
attempt.setLocation(parsedHost.makeLoggedLocation());
attempt.setHostName(event.getHostname(), event.getRackName());
ParsedHost pHost =
getAndRecordParsedHost(event.getRackName(), event.getHostname());
if (pHost != null) {
attempt.setLocation(pHost.makeLoggedLocation());
}
attempt.setFinishTime(event.getFinishTime());
@ -475,28 +486,37 @@ private void processTaskAttemptUnsuccessfulCompletionEvent(
attempt.arraySetCpuUsages(event.getCpuUsages());
attempt.arraySetVMemKbytes(event.getVMemKbytes());
attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
TaskAttemptUnsuccessfulCompletion t =
(TaskAttemptUnsuccessfulCompletion) (event.getDatum());
attempt.putDiagnosticInfo(t.error.toString());
// No counters in TaskAttemptUnsuccessfulCompletionEvent
}
private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
LoggedTaskAttempt attempt =
ParsedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getTaskAttemptId().toString());
if (attempt == null) {
return;
}
attempt.setStartTime(event.getStartTime());
attempt.putTrackerName(event.getTrackerName());
attempt.putHttpPort(event.getHttpPort());
attempt.putShufflePort(event.getShufflePort());
}
private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
LoggedTaskAttempt attempt =
ParsedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getAttemptId().toString());
if (attempt == null) {
return;
}
attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setLocation(getAndRecordParsedHost(event.getHostname())
.makeLoggedLocation());
ParsedHost pHost = getAndRecordParsedHost(event.getRackName(), event.getHostname());
if (pHost != null) {
attempt.setLocation(pHost.makeLoggedLocation());
}
attempt.setFinishTime(event.getFinishTime());
attempt
.incorporateCounters(((TaskAttemptFinished) event.getDatum()).counters);
@ -504,7 +524,7 @@ private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
private void processReduceAttemptFinishedEvent(
ReduceAttemptFinishedEvent event) {
LoggedTaskAttempt attempt =
ParsedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getAttemptId().toString());
if (attempt == null) {
@ -512,6 +532,11 @@ private void processReduceAttemptFinishedEvent(
}
attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setHostName(event.getHostname(), event.getRackName());
ParsedHost pHost =
getAndRecordParsedHost(event.getRackName(), event.getHostname());
if (pHost != null) {
attempt.setLocation(pHost.makeLoggedLocation());
}
// XXX There may be redundant location info available in the event.
// We might consider extracting it from this event. Currently this
@ -528,15 +553,21 @@ private void processReduceAttemptFinishedEvent(
}
private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
LoggedTaskAttempt attempt =
ParsedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getAttemptId().toString());
if (attempt == null) {
return;
}
attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setHostName(event.getHostname(), event.getRackname());
attempt.setHostName(event.getHostname(), event.getRackName());
ParsedHost pHost =
getAndRecordParsedHost(event.getRackName(), event.getHostname());
if (pHost != null) {
attempt.setLocation(pHost.makeLoggedLocation());
}
// XXX There may be redundant location info available in the event.
// We might consider extracting it from this event. Currently this
// is redundant, but making this will add future-proofing.
@ -554,6 +585,7 @@ private void processJobUnsuccessfulCompletionEvent(
result.setOutcome(Pre21JobHistoryConstants.Values
.valueOf(event.getStatus()));
result.setFinishTime(event.getFinishTime());
// No counters in JobUnsuccessfulCompletionEvent
}
private void processJobSubmittedEvent(JobSubmittedEvent event) {
@ -561,8 +593,14 @@ private void processJobSubmittedEvent(JobSubmittedEvent event) {
result.setJobName(event.getJobName());
result.setUser(event.getUserName());
result.setSubmitTime(event.getSubmitTime());
// job queue name is set when conf file is processed.
// See JobBuilder.process(Properties) method for details.
result.putJobConfPath(event.getJobConfPath());
result.putJobAcls(event.getJobAcls());
// set the queue name if existing
String queue = event.getJobQueueName();
if (queue != null) {
result.setQueue(queue);
}
}
private void processJobStatusChangedEvent(JobStatusChangedEvent event) {
@ -589,10 +627,19 @@ private void processJobFinishedEvent(JobFinishedEvent event) {
result.setFinishTime(event.getFinishTime());
result.setJobID(jobID);
result.setOutcome(Values.SUCCESS);
JobFinished job = (JobFinished)event.getDatum();
Map<String, Long> countersMap =
JobHistoryUtils.extractCounters(job.totalCounters);
result.putTotalCounters(countersMap);
countersMap = JobHistoryUtils.extractCounters(job.mapCounters);
result.putMapCounters(countersMap);
countersMap = JobHistoryUtils.extractCounters(job.reduceCounters);
result.putReduceCounters(countersMap);
}
private LoggedTask getTask(String taskIDname) {
LoggedTask result = mapTasks.get(taskIDname);
private ParsedTask getTask(String taskIDname) {
ParsedTask result = mapTasks.get(taskIDname);
if (result != null) {
return result;
@ -616,9 +663,9 @@ private LoggedTask getTask(String taskIDname) {
* if true, we can create a task.
* @return
*/
private LoggedTask getOrMakeTask(TaskType type, String taskIDname,
private ParsedTask getOrMakeTask(TaskType type, String taskIDname,
boolean allowCreate) {
Map<String, LoggedTask> taskMap = otherTasks;
Map<String, ParsedTask> taskMap = otherTasks;
List<LoggedTask> tasks = this.result.getOtherTasks();
switch (type) {
@ -636,10 +683,10 @@ private LoggedTask getOrMakeTask(TaskType type, String taskIDname,
// no code
}
LoggedTask result = taskMap.get(taskIDname);
ParsedTask result = taskMap.get(taskIDname);
if (result == null && allowCreate) {
result = new LoggedTask();
result = new ParsedTask();
result.setTaskType(getPre21Value(type.toString()));
result.setTaskID(taskIDname);
taskMap.put(taskIDname, result);
@ -649,13 +696,13 @@ private LoggedTask getOrMakeTask(TaskType type, String taskIDname,
return result;
}
private LoggedTaskAttempt getOrMakeTaskAttempt(TaskType type,
private ParsedTaskAttempt getOrMakeTaskAttempt(TaskType type,
String taskIDName, String taskAttemptName) {
LoggedTask task = getOrMakeTask(type, taskIDName, false);
LoggedTaskAttempt result = attempts.get(taskAttemptName);
ParsedTask task = getOrMakeTask(type, taskIDName, false);
ParsedTaskAttempt result = attempts.get(taskAttemptName);
if (result == null && task != null) {
result = new LoggedTaskAttempt();
result = new ParsedTaskAttempt();
result.setAttemptID(taskAttemptName);
attempts.put(taskAttemptName, result);
task.getAttempts().add(result);
@ -665,7 +712,19 @@ private LoggedTaskAttempt getOrMakeTaskAttempt(TaskType type,
}
private ParsedHost getAndRecordParsedHost(String hostName) {
ParsedHost result = ParsedHost.parse(hostName);
return getAndRecordParsedHost(null, hostName);
}
private ParsedHost getAndRecordParsedHost(String rackName, String hostName) {
ParsedHost result = null;
if (rackName == null) {
// for old (pre-23) job history files where hostname was represented as
// /rackname/hostname
result = ParsedHost.parse(hostName);
} else {
// for new (post-23) job history files
result = new ParsedHost(rackName, hostName);
}
if (result != null) {
ParsedHost canonicalResult = allHosts.get(result);

View File

@ -18,10 +18,15 @@
package org.apache.hadoop.tools.rumen;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
/**
@ -143,4 +148,21 @@ static boolean isJobConfXml(String fileName) {
String jobId = extractJobIDFromConfFileName(fileName);
return jobId != null;
}
/**
* Extract/Add counters into the Map from the given JhCounters object.
* @param counters the counters to be extracted from
* @return the map of counters
*/
static Map<String, Long> extractCounters(JhCounters counters) {
Map<String, Long> countersMap = new HashMap<String, Long>();
if (counters != null) {
for (JhCounterGroup group : counters.groups) {
for (JhCounter counter : group.counts) {
countersMap.put(counter.name.toString(), counter.value);
}
}
}
return countersMap;
}
}

View File

@ -360,6 +360,10 @@ void setRelativeTime(long relativeTime) {
this.relativeTime = relativeTime;
}
/**
* @return job queue name if it is available in job history file or
* job history conf file. Returns null otherwise.
*/
public QueueName getQueue() {
return queue;
}

View File

@ -71,11 +71,17 @@ public static ParsedHost parse(String name) {
return new ParsedHost(matcher.group(1), matcher.group(2));
}
private String process(String name) {
return name == null
? null
: name.startsWith("/") ? name.substring(1) : name;
}
public ParsedHost(LoggedLocation loc) {
List<NodeName> coordinates = loc.getLayers();
rackName = coordinates.get(0).getRackName();
nodeName = coordinates.get(1).getHostName();
rackName = process(coordinates.get(0).getRackName());
nodeName = process(coordinates.get(1).getHostName());
}
LoggedLocation makeLoggedLocation() {
@ -101,8 +107,8 @@ public String getRackName() {
// expects the broadest name first
ParsedHost(String rackName, String nodeName) {
this.rackName = rackName;
this.nodeName = nodeName;
this.rackName = process(rackName);
this.nodeName = process(nodeName);
}
@Override

View File

@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
*
*/
package org.apache.hadoop.tools.rumen;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.security.authorize.AccessControlList;
/**
* This is a wrapper class around {@link LoggedJob}. This provides also the
* extra information about the job obtained from job history which is not
* written to the JSON trace file.
*/
public class ParsedJob extends LoggedJob {
private static final Log LOG = LogFactory.getLog(ParsedJob.class);
private Map<String, Long> totalCountersMap = new HashMap<String, Long>();
private Map<String, Long> mapCountersMap = new HashMap<String, Long>();
private Map<String, Long> reduceCountersMap = new HashMap<String, Long>();
private String jobConfPath;
private Map<JobACL, AccessControlList> jobAcls;
ParsedJob() {
}
ParsedJob(String jobID) {
super();
setJobID(jobID);
}
/** Set the job total counters */
void putTotalCounters(Map<String, Long> totalCounters) {
this.totalCountersMap = totalCounters;
}
/**
* @return the job total counters
*/
public Map<String, Long> obtainTotalCounters() {
return totalCountersMap;
}
/** Set the job level map tasks' counters */
void putMapCounters(Map<String, Long> mapCounters) {
this.mapCountersMap = mapCounters;
}
/**
* @return the job level map tasks' counters
*/
public Map<String, Long> obtainMapCounters() {
return mapCountersMap;
}
/** Set the job level reduce tasks' counters */
void putReduceCounters(Map<String, Long> reduceCounters) {
this.reduceCountersMap = reduceCounters;
}
/**
* @return the job level reduce tasks' counters
*/
public Map<String, Long> obtainReduceCounters() {
return reduceCountersMap;
}
/** Set the job conf path in staging dir on hdfs */
void putJobConfPath(String confPath) {
jobConfPath = confPath;
}
/**
* @return the job conf path in staging dir on hdfs
*/
public String obtainJobConfpath() {
return jobConfPath;
}
/** Set the job acls */
void putJobAcls(Map<JobACL, AccessControlList> acls) {
jobAcls = acls;
}
/**
* @return the job acls
*/
public Map<JobACL, AccessControlList> obtainJobAcls() {
return jobAcls;
}
/**
* @return the list of map tasks of this job
*/
public List<ParsedTask> obtainMapTasks() {
List<LoggedTask> tasks = super.getMapTasks();
return convertTasks(tasks);
}
/**
* @return the list of reduce tasks of this job
*/
public List<ParsedTask> obtainReduceTasks() {
List<LoggedTask> tasks = super.getReduceTasks();
return convertTasks(tasks);
}
/**
* @return the list of other tasks of this job
*/
public List<ParsedTask> obtainOtherTasks() {
List<LoggedTask> tasks = super.getOtherTasks();
return convertTasks(tasks);
}
/** As we know that this list of {@link LoggedTask} objects is actually a list
* of {@link ParsedTask} objects, we go ahead and cast them.
* @return the list of {@link ParsedTask} objects
*/
private List<ParsedTask> convertTasks(List<LoggedTask> tasks) {
List<ParsedTask> result = new ArrayList<ParsedTask>();
for (LoggedTask t : tasks) {
if (t instanceof ParsedTask) {
result.add((ParsedTask)t);
} else {
throw new RuntimeException("Unexpected type of tasks in the list...");
}
}
return result;
}
/** Dump the extra info of ParsedJob */
void dumpParsedJob() {
LOG.info("ParsedJob details:" + obtainTotalCounters() + ";"
+ obtainMapCounters() + ";" + obtainReduceCounters()
+ "\n" + obtainJobConfpath() + "\n" + obtainJobAcls()
+ ";Q=" + (getQueue() == null ? "null" : getQueue().getValue()));
List<ParsedTask> maps = obtainMapTasks();
for (ParsedTask task : maps) {
task.dumpParsedTask();
}
List<ParsedTask> reduces = obtainReduceTasks();
for (ParsedTask task : reduces) {
task.dumpParsedTask();
}
List<ParsedTask> others = obtainOtherTasks();
for (ParsedTask task : others) {
task.dumpParsedTask();
}
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
/**
* This is a wrapper class around {@link LoggedTask}. This provides also the
* extra information about the task obtained from job history which is not
* written to the JSON trace file.
*/
public class ParsedTask extends LoggedTask {
private static final Log LOG = LogFactory.getLog(ParsedTask.class);
private String diagnosticInfo;
private String failedDueToAttempt;
private Map<String, Long> countersMap = new HashMap<String, Long>();
ParsedTask() {
super();
}
public void incorporateCounters(JhCounters counters) {
Map<String, Long> countersMap =
JobHistoryUtils.extractCounters(counters);
putCounters(countersMap);
super.incorporateCounters(counters);
}
/** Set the task counters */
public void putCounters(Map<String, Long> counters) {
this.countersMap = counters;
}
/**
* @return the task counters
*/
public Map<String, Long> obtainCounters() {
return countersMap;
}
/** Set the task diagnostic-info */
public void putDiagnosticInfo(String msg) {
diagnosticInfo = msg;
}
/**
* @return the diagnostic-info of this task.
* If the task is successful, returns null.
*/
public String obtainDiagnosticInfo() {
return diagnosticInfo;
}
/**
* Set the failed-due-to-attemptId info of this task.
*/
public void putFailedDueToAttemptId(String attempt) {
failedDueToAttempt = attempt;
}
/**
* @return the failed-due-to-attemptId info of this task.
* If the task is successful, returns null.
*/
public String obtainFailedDueToAttemptId() {
return failedDueToAttempt;
}
List<ParsedTaskAttempt> obtainTaskAttempts() {
List<LoggedTaskAttempt> attempts = getAttempts();
return convertTaskAttempts(attempts);
}
List<ParsedTaskAttempt> convertTaskAttempts(
List<LoggedTaskAttempt> attempts) {
List<ParsedTaskAttempt> result = new ArrayList<ParsedTaskAttempt>();
for (LoggedTaskAttempt t : attempts) {
if (t instanceof ParsedTaskAttempt) {
result.add((ParsedTaskAttempt)t);
} else {
throw new RuntimeException(
"Unexpected type of taskAttempts in the list...");
}
}
return result;
}
/** Dump the extra info of ParsedTask */
void dumpParsedTask() {
LOG.info("ParsedTask details:" + obtainCounters()
+ "\n" + obtainFailedDueToAttemptId()
+ "\nPreferred Locations are:");
List<LoggedLocation> loc = getPreferredLocations();
for (LoggedLocation l : loc) {
LOG.info(l.getLayers() + ";" + l.toString());
}
List<ParsedTaskAttempt> attempts = obtainTaskAttempts();
for (ParsedTaskAttempt attempt : attempts) {
attempt.dumpParsedTaskAttempt();
}
}
}

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
/**
* This is a wrapper class around {@link LoggedTaskAttempt}. This provides
* also the extra information about the task attempt obtained from
* job history which is not written to the JSON trace file.
*/
public class ParsedTaskAttempt extends LoggedTaskAttempt {
private static final Log LOG = LogFactory.getLog(ParsedTaskAttempt.class);
private String diagnosticInfo;
private String trackerName;
private Integer httpPort, shufflePort;
private Map<String, Long> countersMap = new HashMap<String, Long>();
ParsedTaskAttempt() {
super();
}
/** incorporate event counters */
public void incorporateCounters(JhCounters counters) {
Map<String, Long> countersMap =
JobHistoryUtils.extractCounters(counters);
putCounters(countersMap);
super.incorporateCounters(counters);
}
/** Set the task attempt counters */
public void putCounters(Map<String, Long> counters) {
this.countersMap = counters;
}
/**
* @return the task attempt counters
*/
public Map<String, Long> obtainCounters() {
return countersMap;
}
/** Set the task attempt diagnostic-info */
public void putDiagnosticInfo(String msg) {
diagnosticInfo = msg;
}
/**
* @return the diagnostic-info of this task attempt.
* If the attempt is successful, returns null.
*/
public String obtainDiagnosticInfo() {
return diagnosticInfo;
}
void putTrackerName(String trackerName) {
this.trackerName = trackerName;
}
public String obtainTrackerName() {
return trackerName;
}
void putHttpPort(int port) {
httpPort = port;
}
/**
* @return http port if set. Returns null otherwise.
*/
public Integer obtainHttpPort() {
return httpPort;
}
void putShufflePort(int port) {
shufflePort = port;
}
/**
* @return shuffle port if set. Returns null otherwise.
*/
public Integer obtainShufflePort() {
return shufflePort;
}
/** Dump the extra info of ParsedTaskAttempt */
void dumpParsedTaskAttempt() {
LOG.info("ParsedTaskAttempt details:" + obtainCounters()
+ ";DiagnosticInfo=" + obtainDiagnosticInfo() + "\n"
+ obtainTrackerName() + ";" + obtainHttpPort() + ";"
+ obtainShufflePort() + ";rack=" + getHostName().getRackName()
+ ";host=" + getHostName().getHostName());
}
}

View File

@ -108,9 +108,12 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg;
ParsedHost pHost = ParsedHost.parse(hostName);
return new TaskAttemptFinishedEvent(taskAttemptID,
that.originalTaskType, status, Long.parseLong(finishTime),
hostName, state, maybeParseCounters(counters));
pHost.getRackName(), pHost.getNodeName(), state,
maybeParseCounters(counters));
}
return null;
@ -138,10 +141,19 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg;
ParsedHost pHost = ParsedHost.parse(hostName);
String rackName = null;
// Earlier versions of MR logged on hostnames (without rackname) for
// unsuccessful attempts
if (pHost != null) {
rackName = pHost.getRackName();
hostName = pHost.getNodeName();
}
return new TaskAttemptUnsuccessfulCompletionEvent
(taskAttemptID,
that.originalTaskType, status, Long.parseLong(finishTime),
hostName, -1, error, null);
hostName, -1, rackName, error, null);
}
return null;

View File

@ -25,6 +25,8 @@
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
/**
@ -46,6 +48,10 @@ public void process(HistoryEvent event) {
processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
} else if (event instanceof TaskStartedEvent) {
processTaskStartedEvent((TaskStartedEvent) event);
} else if (event instanceof MapAttemptFinishedEvent) {
processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
} else if (event instanceof ReduceAttemptFinishedEvent) {
processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
}
// I do NOT expect these if statements to be exhaustive.
@ -78,15 +84,40 @@ private void processTaskStartedEvent(TaskStartedEvent event) {
private void processTaskAttemptUnsuccessfulCompletionEvent(
TaskAttemptUnsuccessfulCompletionEvent event) {
recordParsedHost(event.getHostname());
recordParsedHost(event.getHostname(), event.getRackName());
}
private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
recordParsedHost(event.getHostname());
recordParsedHost(event.getHostname(), event.getRackName());
}
private void recordParsedHost(String hostName) {
ParsedHost result = ParsedHost.parse(hostName);
private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
recordParsedHost(event.getHostname(), event.getRackName());
}
private void processReduceAttemptFinishedEvent(ReduceAttemptFinishedEvent event) {
recordParsedHost(event.getHostname(), event.getRackName());
}
private void recordParsedHost(String hostName, String rackName) {
if (hostName == null) {
return;
}
ParsedHost result = null;
if (rackName == null) {
result = ParsedHost.parse(hostName);
} else {
result = new ParsedHost(rackName, hostName);
}
if (result != null && !allHosts.contains(result)) {
allHosts.add(result);
}
}
private void recordParsedHost(String nodeName) {
ParsedHost result = ParsedHost.parse(nodeName);
if (result != null && !allHosts.contains(result)) {
allHosts.add(result);