Merge -r 1166965:1166966 from trunk to branch-0.23 to fix MAPREDUCE-2937.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1166967 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-09-09 02:15:20 +00:00
parent 4ffba53aca
commit 778e65aa74
3 changed files with 143 additions and 7 deletions

View File

@ -1231,6 +1231,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2677. Fixed 404 for some links from HistoryServer. (Robert Evans MAPREDUCE-2677. Fixed 404 for some links from HistoryServer. (Robert Evans
via acmurthy) via acmurthy)
MAPREDUCE-2937. Ensure reason for application failure is displayed to the
user. (mahadev via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.ClusterMetrics; import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
@ -55,7 +55,6 @@ import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
@ -81,7 +80,6 @@ import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
@ -104,9 +102,19 @@ public class YARNRunner implements ClientProtocol {
* @param conf the configuration object for the client * @param conf the configuration object for the client
*/ */
public YARNRunner(Configuration conf) { public YARNRunner(Configuration conf) {
this(conf, new ResourceMgrDelegate(conf));
}
/**
* Similar to {@link #YARNRunner(Configuration)} but allowing injecting
* {@link ResourceMgrDelegate}. Enables mocking and testing.
* @param conf the configuration object for the client
* @param resMgrDelegate the resourcemanager client handle.
*/
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
this.conf = new YarnConfiguration(conf); this.conf = new YarnConfiguration(conf);
try { try {
this.resMgrDelegate = new ResourceMgrDelegate(this.conf); this.resMgrDelegate = resMgrDelegate;
this.clientCache = new ClientCache(this.conf, this.clientCache = new ClientCache(this.conf,
resMgrDelegate); resMgrDelegate);
this.defaultFileContext = FileContext.getFileContext(this.conf); this.defaultFileContext = FileContext.getFileContext(this.conf);
@ -114,7 +122,7 @@ public class YARNRunner implements ClientProtocol {
throw new RuntimeException("Error in instantiating YarnClient", ufe); throw new RuntimeException("Error in instantiating YarnClient", ufe);
} }
} }
@Override @Override
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0) public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
throws IOException, InterruptedException { throws IOException, InterruptedException {
@ -242,7 +250,8 @@ public class YARNRunner implements ClientProtocol {
.getApplicationReport(applicationId); .getApplicationReport(applicationId);
if (appMaster == null || appMaster.getState() == ApplicationState.FAILED if (appMaster == null || appMaster.getState() == ApplicationState.FAILED
|| appMaster.getState() == ApplicationState.KILLED) { || appMaster.getState() == ApplicationState.KILLED) {
throw RPCUtil.getRemoteException("failed to run job"); throw new IOException("Failed to run job : " +
appMaster.getDiagnostics());
} }
return clientCache.getClient(jobId).getJobStatus(jobId); return clientCache.getClient(jobId).getJobStatus(jobId);
} }
@ -260,7 +269,7 @@ public class YARNRunner implements ClientProtocol {
return rsrc; return rsrc;
} }
private ApplicationSubmissionContext createApplicationSubmissionContext( public ApplicationSubmissionContext createApplicationSubmissionContext(
Configuration jobConf, Configuration jobConf,
String jobSubmitDir, Credentials ts) throws IOException { String jobSubmitDir, Credentials ts) throws IOException {
ApplicationSubmissionContext appContext = ApplicationSubmissionContext appContext =

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ResourceMgrDelegate;
import org.apache.hadoop.mapred.YARNRunner;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Test if the jobclient shows enough diagnostics
* on a job failure.
*
*/
public class TestYARNRunner extends TestCase {
private static final Log LOG = LogFactory.getLog(TestYARNRunner.class);
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private YARNRunner yarnRunner;
private ResourceMgrDelegate resourceMgrDelegate;
private Configuration conf;
private ApplicationId appId;
private JobID jobId;
private File testWorkDir =
new File("target", TestYARNRunner.class.getName());
private ApplicationSubmissionContext submissionContext;
private static final String failString = "Rejected job";
@Before
public void setUp() throws Exception {
resourceMgrDelegate = mock(ResourceMgrDelegate.class);
conf = new Configuration();
yarnRunner = new YARNRunner(conf, resourceMgrDelegate);
yarnRunner = spy(yarnRunner);
submissionContext = mock(ApplicationSubmissionContext.class);
doAnswer(
new Answer<ApplicationSubmissionContext>() {
@Override
public ApplicationSubmissionContext answer(InvocationOnMock invocation)
throws Throwable {
return submissionContext;
}
}
).when(yarnRunner).createApplicationSubmissionContext(any(Configuration.class),
any(String.class), any(Credentials.class));
appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(1);
jobId = TypeConverter.fromYarn(appId);
if (testWorkDir.exists()) {
FileContext.getLocalFSFileContext().delete(new Path(testWorkDir.toString()), true);
}
testWorkDir.mkdirs();
}
@Test
public void testJobSubmissionFailure() throws Exception {
when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).
thenReturn(appId);
ApplicationReport report = mock(ApplicationReport.class);
when(report.getApplicationId()).thenReturn(appId);
when(report.getDiagnostics()).thenReturn(failString);
when(report.getState()).thenReturn(ApplicationState.FAILED);
when(resourceMgrDelegate.getApplicationReport(appId)).thenReturn(report);
Credentials credentials = new Credentials();
File jobxml = new File(testWorkDir, "job.xml");
OutputStream out = new FileOutputStream(jobxml);
conf.writeXml(out);
out.close();
try {
yarnRunner.submitJob(jobId, testWorkDir.getAbsolutePath().toString(), credentials);
} catch(IOException io) {
LOG.info("Logging exception:", io);
assertTrue(io.getLocalizedMessage().contains(failString));
}
}
}