diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c2985d87998..cc97ff43875 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1244,6 +1244,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2677. Fixed 404 for some links from HistoryServer. (Robert Evans via acmurthy) + MAPREDUCE-2937. Ensure reason for application failure is displayed to the + user. (mahadev via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index e49e17d0fac..3b63a69c2be 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -39,7 +39,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; import org.apache.hadoop.mapreduce.ClusterMetrics; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobContext; @@ -55,7 +55,6 @@ import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; -import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; @@ -81,7 +80,6 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -104,9 +102,19 @@ public class YARNRunner implements ClientProtocol { * @param conf the configuration object for the client */ public YARNRunner(Configuration conf) { + this(conf, new ResourceMgrDelegate(conf)); + } + + /** + * Similar to {@link #YARNRunner(Configuration)} but allowing injecting + * {@link ResourceMgrDelegate}. Enables mocking and testing. + * @param conf the configuration object for the client + * @param resMgrDelegate the resourcemanager client handle. + */ + public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) { this.conf = new YarnConfiguration(conf); try { - this.resMgrDelegate = new ResourceMgrDelegate(this.conf); + this.resMgrDelegate = resMgrDelegate; this.clientCache = new ClientCache(this.conf, resMgrDelegate); this.defaultFileContext = FileContext.getFileContext(this.conf); @@ -114,7 +122,7 @@ public class YARNRunner implements ClientProtocol { throw new RuntimeException("Error in instantiating YarnClient", ufe); } } - + @Override public void cancelDelegationToken(Token arg0) throws IOException, InterruptedException { @@ -242,7 +250,8 @@ public class YARNRunner implements ClientProtocol { .getApplicationReport(applicationId); if (appMaster == null || appMaster.getState() == ApplicationState.FAILED || appMaster.getState() == ApplicationState.KILLED) { - throw RPCUtil.getRemoteException("failed to run job"); + throw new IOException("Failed to run job : " + + appMaster.getDiagnostics()); } return clientCache.getClient(jobId).getJobStatus(jobId); } @@ -260,7 +269,7 @@ public class YARNRunner implements ClientProtocol { return rsrc; } - private ApplicationSubmissionContext createApplicationSubmissionContext( + public ApplicationSubmissionContext createApplicationSubmissionContext( Configuration jobConf, String jobSubmitDir, Credentials ts) throws IOException { ApplicationSubmissionContext appContext = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java new file mode 100644 index 00000000000..5a2c29afc58 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.ResourceMgrDelegate; +import org.apache.hadoop.mapred.YARNRunner; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationState; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test if the jobclient shows enough diagnostics + * on a job failure. + * + */ +public class TestYARNRunner extends TestCase { + private static final Log LOG = LogFactory.getLog(TestYARNRunner.class); + private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + + private YARNRunner yarnRunner; + private ResourceMgrDelegate resourceMgrDelegate; + private Configuration conf; + private ApplicationId appId; + private JobID jobId; + private File testWorkDir = + new File("target", TestYARNRunner.class.getName()); + private ApplicationSubmissionContext submissionContext; + private static final String failString = "Rejected job"; + + @Before + public void setUp() throws Exception { + resourceMgrDelegate = mock(ResourceMgrDelegate.class); + conf = new Configuration(); + yarnRunner = new YARNRunner(conf, resourceMgrDelegate); + yarnRunner = spy(yarnRunner); + submissionContext = mock(ApplicationSubmissionContext.class); + doAnswer( + new Answer() { + @Override + public ApplicationSubmissionContext answer(InvocationOnMock invocation) + throws Throwable { + return submissionContext; + } + } + ).when(yarnRunner).createApplicationSubmissionContext(any(Configuration.class), + any(String.class), any(Credentials.class)); + + appId = recordFactory.newRecordInstance(ApplicationId.class); + appId.setClusterTimestamp(System.currentTimeMillis()); + appId.setId(1); + jobId = TypeConverter.fromYarn(appId); + if (testWorkDir.exists()) { + FileContext.getLocalFSFileContext().delete(new Path(testWorkDir.toString()), true); + } + testWorkDir.mkdirs(); + } + + + @Test + public void testJobSubmissionFailure() throws Exception { + when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))). + thenReturn(appId); + ApplicationReport report = mock(ApplicationReport.class); + when(report.getApplicationId()).thenReturn(appId); + when(report.getDiagnostics()).thenReturn(failString); + when(report.getState()).thenReturn(ApplicationState.FAILED); + when(resourceMgrDelegate.getApplicationReport(appId)).thenReturn(report); + Credentials credentials = new Credentials(); + File jobxml = new File(testWorkDir, "job.xml"); + OutputStream out = new FileOutputStream(jobxml); + conf.writeXml(out); + out.close(); + try { + yarnRunner.submitJob(jobId, testWorkDir.getAbsolutePath().toString(), credentials); + } catch(IOException io) { + LOG.info("Logging exception:", io); + assertTrue(io.getLocalizedMessage().contains(failString)); + } + } +}