MAPREDUCE-4074. Client continuously retries to RM When RM goes down before launching Application Master (xieguiming via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1327972 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-04-19 14:44:57 +00:00
parent 3edc40e377
commit 97ae5b675f
5 changed files with 106 additions and 9 deletions

View File

@ -367,6 +367,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4160. some mrv1 ant tests fail with timeout - due to 4156 MAPREDUCE-4160. some mrv1 ant tests fail with timeout - due to 4156
(tgraves) (tgraves)
MAPREDUCE-4074. Client continuously retries to RM When RM goes down
before launching Application Master (xieguiming via tgraves)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -325,6 +325,13 @@ public interface MRJobConfig {
MR_PREFIX + "client-am.ipc.max-retries"; MR_PREFIX + "client-am.ipc.max-retries";
public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3; public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3;
/**
* The number of client retries to the RM/HS/AM before throwing exception.
*/
public static final String MR_CLIENT_MAX_RETRIES =
MR_PREFIX + "client.max-retries";
public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3;
/** The staging directory for map reduce.*/ /** The staging directory for map reduce.*/
public static final String MR_AM_STAGING_DIR = public static final String MR_AM_STAGING_DIR =
MR_AM_PREFIX+"staging-dir"; MR_AM_PREFIX+"staging-dir";

View File

@ -1250,6 +1250,13 @@
to the RM to fetch Application Status.</description> to the RM to fetch Application Status.</description>
</property> </property>
<property>
<name>yarn.app.mapreduce.client.max-retries</name>
<value>3</value>
<description>The number of client retries to the RM/HS/AM before
throwing exception. This is a layer above the ipc.</description>
</property>
<!-- jobhistory properties --> <!-- jobhistory properties -->
<property> <property>

View File

@ -282,7 +282,7 @@ public class ClientServiceDelegate {
} }
private synchronized Object invoke(String method, Class argClass, private synchronized Object invoke(String method, Class argClass,
Object args) throws YarnRemoteException { Object args) throws IOException {
Method methodOb = null; Method methodOb = null;
try { try {
methodOb = MRClientProtocol.class.getMethod(method, argClass); methodOb = MRClientProtocol.class.getMethod(method, argClass);
@ -291,7 +291,11 @@ public class ClientServiceDelegate {
} catch (NoSuchMethodException e) { } catch (NoSuchMethodException e) {
throw new YarnException("Method name mismatch", e); throw new YarnException("Method name mismatch", e);
} }
while (true) { int maxRetries = this.conf.getInt(
MRJobConfig.MR_CLIENT_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
IOException lastException = null;
while (maxRetries > 0) {
try { try {
return methodOb.invoke(getProxy(), args); return methodOb.invoke(getProxy(), args);
} catch (YarnRemoteException yre) { } catch (YarnRemoteException yre) {
@ -308,13 +312,21 @@ public class ClientServiceDelegate {
" retrying..", e.getTargetException()); " retrying..", e.getTargetException());
// Force reconnection by setting the proxy to null. // Force reconnection by setting the proxy to null.
realProxy = null; realProxy = null;
// HS/AMS shut down
maxRetries--;
lastException = new IOException(e.getMessage());
} catch (Exception e) { } catch (Exception e) {
LOG.debug("Failed to contact AM/History for job " + jobId LOG.debug("Failed to contact AM/History for job " + jobId
+ " Will retry..", e); + " Will retry..", e);
// Force reconnection by setting the proxy to null. // Force reconnection by setting the proxy to null.
realProxy = null; realProxy = null;
// RM shutdown
maxRetries--;
lastException = new IOException(e.getMessage());
} }
} }
throw lastException;
} }
public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException, public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
@ -364,7 +376,7 @@ public class ClientServiceDelegate {
return result; return result;
} }
public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException { public JobStatus getJobStatus(JobID oldJobID) throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID); TypeConverter.toYarn(oldJobID);
GetJobReportRequest request = GetJobReportRequest request =
@ -390,7 +402,7 @@ public class ClientServiceDelegate {
} }
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType) public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
throws YarnRemoteException, YarnRemoteException { throws IOException{
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID); TypeConverter.toYarn(oldJobID);
GetTaskReportsRequest request = GetTaskReportsRequest request =
@ -407,7 +419,7 @@ public class ClientServiceDelegate {
} }
public boolean killTask(TaskAttemptID taskAttemptID, boolean fail) public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
throws YarnRemoteException { throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
= TypeConverter.toYarn(taskAttemptID); = TypeConverter.toYarn(taskAttemptID);
if (fail) { if (fail) {
@ -423,7 +435,7 @@ public class ClientServiceDelegate {
} }
public boolean killJob(JobID oldJobID) public boolean killJob(JobID oldJobID)
throws YarnRemoteException { throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
= TypeConverter.toYarn(oldJobID); = TypeConverter.toYarn(oldJobID);
KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class); KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -122,8 +123,7 @@ public class TestClientServiceDelegate {
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow( when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
new RuntimeException("1")).thenThrow(new RuntimeException("2")) new RuntimeException("1")).thenThrow(new RuntimeException("2"))
.thenThrow(new RuntimeException("3"))
.thenReturn(getJobReportResponse()); .thenReturn(getJobReportResponse());
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
@ -135,7 +135,7 @@ public class TestClientServiceDelegate {
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus); Assert.assertNotNull(jobStatus);
verify(historyServerProxy, times(4)).getJobReport( verify(historyServerProxy, times(3)).getJobReport(
any(GetJobReportRequest.class)); any(GetJobReportRequest.class));
} }
@ -312,6 +312,74 @@ public class TestClientServiceDelegate {
any(String.class)); any(String.class));
} }
@Test
public void testRMDownForJobStatusBeforeGetAMReport() throws IOException {
Configuration conf = new YarnConfiguration();
testRMDownForJobStatusBeforeGetAMReport(conf,
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
}
@Test
public void testRMDownForJobStatusBeforeGetAMReportWithRetryTimes()
throws IOException {
Configuration conf = new YarnConfiguration();
conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 2);
testRMDownForJobStatusBeforeGetAMReport(conf, conf.getInt(
MRJobConfig.MR_CLIENT_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES));
}
@Test
public void testRMDownRestoreForJobStatusBeforeGetAMReport()
throws IOException {
Configuration conf = new YarnConfiguration();
conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 3);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
!isAMReachableFromClient);
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(any(GetJobReportRequest.class)))
.thenReturn(getJobReportResponse());
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced1"))).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced2"))).thenReturn(getFinishedApplicationReport());
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
conf, rmDelegate, oldJobId, historyServerProxy);
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
verify(rmDelegate, times(3)).getApplicationReport(any(ApplicationId.class));
Assert.assertNotNull(jobStatus);
}
private void testRMDownForJobStatusBeforeGetAMReport(Configuration conf,
int noOfRetries) throws YarnRemoteException {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
!isAMReachableFromClient);
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced1"))).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced2"))).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced3")));
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
conf, rmDelegate, oldJobId, historyServerProxy);
try {
clientServiceDelegate.getJobStatus(oldJobId);
Assert.fail("It should throw exception after retries");
} catch (IOException e) {
System.out.println("fail to get job status,and e=" + e.toString());
}
verify(rmDelegate, times(noOfRetries)).getApplicationReport(
any(ApplicationId.class));
}
private GetJobReportRequest getJobReportRequest() { private GetJobReportRequest getJobReportRequest() {
GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class); GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
request.setJobId(jobId); request.setJobId(jobId);