diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 00fd8ed8fd2..00267a72ef5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1345,6 +1345,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3006. Fixed MapReduce AM to exit only after properly writing out history file. (vinodkv) + MAPREDUCE-2925. Fixed Yarn+MR client code to behave saner with completed + jobs. (Devaraj K via vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java index 50ffadeb84e..c615111f665 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java @@ -23,6 +23,7 @@ import java.security.PrivilegedAction; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -74,8 +75,11 @@ public class ClientCache { private MRClientProtocol instantiateHistoryProxy() throws IOException { - final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS, - JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS); + final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS); + if (StringUtils.isEmpty(serviceAddr)) { + LOG.info("HistoryServer is not configured."); + return null; + } LOG.info("Connecting to HistoryServer at: " + serviceAddr); final Configuration myConf = new Configuration(conf); myConf.setClass(YarnConfiguration.YARN_SECURITY_INFO, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java index f0ee71504d4..605c44e5ed9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java @@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; -import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.RPCUtil; @@ -86,6 +85,7 @@ class ClientServiceDelegate { private boolean forceRefresh; private MRClientProtocol realProxy = null; private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + private static String UNKNOWN_USER = "Unknown User"; ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, JobID jobId, MRClientProtocol historyServerProxy) { @@ -126,7 +126,12 @@ class ClientServiceDelegate { // and redirect to the history server. ApplicationReport application = rm.getApplicationReport(appId); String serviceAddr = null; - while (ApplicationState.RUNNING.equals(application.getState())) { + while (application == null || ApplicationState.RUNNING.equals(application.getState())) { + if (application == null) { + LOG.info("Could not get Job info from RM for job " + jobId + + ". Redirecting to job history server."); + return checkAndGetHSProxy(UNKNOWN_USER, JobState.NEW); + } try { if (application.getHost() == null || "".equals(application.getHost())) { LOG.debug("AM not assigned to Job. Waiting to get the AM ..."); @@ -163,6 +168,11 @@ class ClientServiceDelegate { throw new YarnException(e1); } application = rm.getApplicationReport(appId); + if (application == null) { + LOG.info("Could not get Job info from RM for job " + jobId + + ". Redirecting to job history server."); + return checkAndGetHSProxy(UNKNOWN_USER, JobState.RUNNING); + } } catch (InterruptedException e) { LOG.warn("getProxy() call interruped", e); throw new YarnException(e); @@ -176,7 +186,7 @@ class ClientServiceDelegate { String user = application.getUser(); if (user == null) { - throw new YarnRemoteExceptionPBImpl("User is not set in the application report"); + throw RPCUtil.getRemoteException("User is not set in the application report"); } if (application.getState() == ApplicationState.NEW || application.getState() == ApplicationState.SUBMITTED) { @@ -199,11 +209,19 @@ class ClientServiceDelegate { if (application.getState() == ApplicationState.SUCCEEDED) { LOG.info("Application state is completed. " + "Redirecting to job history server"); - realProxy = historyServerProxy; + realProxy = checkAndGetHSProxy(user, JobState.SUCCEEDED); } return realProxy; } + private MRClientProtocol checkAndGetHSProxy(String user, JobState state) { + if (null == historyServerProxy) { + LOG.warn("Job History Server is not configured."); + return getNotRunningJob(user, state); + } + return historyServerProxy; + } + private void instantiateAMProxy(final String serviceAddr) throws IOException { UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr); @@ -236,11 +254,14 @@ class ClientServiceDelegate { try { return methodOb.invoke(getProxy(), args); } catch (YarnRemoteException yre) { - LOG.warn("Exception thrown by remote end."); - LOG.warn(RPCUtil.toString(yre)); + LOG.warn("Exception thrown by remote end.", yre); throw yre; } catch (InvocationTargetException e) { - //TODO Finite # of errors before giving up? + if (e.getTargetException() instanceof YarnRemoteException) { + LOG.warn("Exception thrown by remote end.", e + .getTargetException()); + throw (YarnRemoteException) e.getTargetException(); + } LOG.info("Failed to contact AM/History for job " + jobId + " Will retry..", e.getTargetException()); forceRefresh = true; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java new file mode 100644 index 00000000000..b7fd6c9475a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Test; + +/** + * Tests for ClientServiceDelegate.java + */ + +public class TestClientServiceDelegate { + private JobID oldJobId = JobID.forName("job_1315895242400_2"); + private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter + .toYarn(oldJobId); + + @Test + public void testUnknownAppInRM() throws Exception { + MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); + when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn( + getJobReportResponse()); + ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( + historyServerProxy, getRMDelegate()); + + JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + Assert.assertNotNull(jobStatus); + } + + @Test + public void testRemoteExceptionFromHistoryServer() throws Exception { + + MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); + when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow( + RPCUtil.getRemoteException("Job ID doesnot Exist")); + + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) + .thenReturn(null); + + ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( + historyServerProxy, rm); + + try { + clientServiceDelegate.getJobStatus(oldJobId); + Assert.fail("Invoke should throw exception after retries."); + } catch (YarnRemoteException e) { + Assert.assertEquals("Job ID doesnot Exist", e.getMessage()); + } + } + + @Test + public void testRetriesOnConnectionFailure() throws Exception { + + MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); + when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow( + new RuntimeException("1")).thenThrow(new RuntimeException("2")) + .thenThrow(new RuntimeException("3")) + .thenReturn(getJobReportResponse()); + + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) + .thenReturn(null); + + ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( + historyServerProxy, rm); + + JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + Assert.assertNotNull(jobStatus); + } + + @Test + public void testHistoryServerNotConfigured() throws Exception { + //RM doesn't have app report and job History Server is not configured + ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( + null, getRMDelegate()); + JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + Assert.assertEquals("Unknown User", jobStatus.getUsername()); + Assert.assertEquals(JobStatus.State.PREP, jobStatus.getState()); + + //RM has app report and job History Server is not configured + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + ApplicationReport applicationReport = getApplicationReport(); + when(rm.getApplicationReport(jobId.getAppId())).thenReturn( + applicationReport); + + clientServiceDelegate = getClientServiceDelegate(null, rm); + jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + Assert.assertEquals(applicationReport.getUser(), jobStatus.getUsername()); + Assert.assertEquals(JobStatus.State.SUCCEEDED, jobStatus.getState()); + } + + private GetJobReportRequest getJobReportRequest() { + GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class); + request.setJobId(jobId); + return request; + } + + private GetJobReportResponse getJobReportResponse() { + GetJobReportResponse jobReportResponse = Records + .newRecord(GetJobReportResponse.class); + JobReport jobReport = Records.newRecord(JobReport.class); + jobReport.setJobId(jobId); + jobReport.setJobState(JobState.SUCCEEDED); + jobReportResponse.setJobReport(jobReport); + return jobReportResponse; + } + + private ApplicationReport getApplicationReport() { + ApplicationReport applicationReport = Records + .newRecord(ApplicationReport.class); + applicationReport.setState(ApplicationState.SUCCEEDED); + applicationReport.setUser("root"); + return applicationReport; + } + + private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException { + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null); + return rm; + } + + private ClientServiceDelegate getClientServiceDelegate( + MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) { + Configuration conf = new YarnConfiguration(); + conf.set(MRConfig.FRAMEWORK_NAME, "yarn"); + ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate( + conf, rm, oldJobId, historyServerProxy); + return clientServiceDelegate; + } + +}