MAPREDUCE-5488. Changed MR client to keep trying to reach the application when it sees that on attempt's AM is down. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1524856 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-09-19 22:35:12 +00:00
parent 6b1f5073a7
commit af78fd729c
6 changed files with 91 additions and 11 deletions

View File

@ -193,6 +193,9 @@ Release 2.2.0 - UNRELEASED
MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta
via tgraves) via tgraves)
MAPREDUCE-5488. Changed MR client to keep trying to reach the application
when it sees that on attempt's AM is down. (Jian He via vinodkv)
Release 2.1.1-beta - 2013-09-23 Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -497,6 +497,12 @@
<Bug pattern="IS2_INCONSISTENT_SYNC" /> <Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match> </Match>
<Match>
<Class name="org.apache.hadoop.mapred.ClientServiceDelegate" />
<Method name="invoke" />
<Bug pattern="SWL_SLEEP_WITH_LOCK_HELD" />
</Match>
<Match> <Match>
<Class name="org.apache.hadoop.mapreduce.util.ProcessTree" /> <Class name="org.apache.hadoop.mapreduce.util.ProcessTree" />
<Method name="sendSignal" /> <Method name="sendSignal" />

View File

@ -357,7 +357,7 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3; public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3;
/** /**
* The number of client retries to the RM/HS/AM before throwing exception. * The number of client retries to the RM/HS before throwing exception.
*/ */
public static final String MR_CLIENT_MAX_RETRIES = public static final String MR_CLIENT_MAX_RETRIES =
MR_PREFIX + "client.max-retries"; MR_PREFIX + "client.max-retries";

View File

@ -982,7 +982,7 @@
<property> <property>
<name>yarn.app.mapreduce.client-am.ipc.max-retries</name> <name>yarn.app.mapreduce.client-am.ipc.max-retries</name>
<value>1</value> <value>3</value>
<description>The number of client retries to the AM - before reconnecting <description>The number of client retries to the AM - before reconnecting
to the RM to fetch Application Status.</description> to the RM to fetch Application Status.</description>
</property> </property>
@ -990,7 +990,7 @@
<property> <property>
<name>yarn.app.mapreduce.client.max-retries</name> <name>yarn.app.mapreduce.client.max-retries</name>
<value>3</value> <value>3</value>
<description>The number of client retries to the RM/HS/AM before <description>The number of client retries to the RM/HS before
throwing exception. This is a layer above the ipc.</description> throwing exception. This is a layer above the ipc.</description>
</property> </property>

View File

@ -26,6 +26,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -77,6 +78,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
public class ClientServiceDelegate { public class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class); private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
private static final String UNAVAILABLE = "N/A"; private static final String UNAVAILABLE = "N/A";
@ -93,7 +96,8 @@ public class ClientServiceDelegate {
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static String UNKNOWN_USER = "Unknown User"; private static String UNKNOWN_USER = "Unknown User";
private String trackingUrl; private String trackingUrl;
private AtomicBoolean usingAMProxy = new AtomicBoolean(false);
private int maxClientRetry;
private boolean amAclDisabledStatusLogged = false; private boolean amAclDisabledStatusLogged = false;
public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
@ -287,6 +291,7 @@ public class ClientServiceDelegate {
MRClientProtocol proxy = MRClientProtocol proxy =
(MRClientProtocol) rpc.getProxy(MRClientProtocol.class, (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
serviceAddr, conf); serviceAddr, conf);
usingAMProxy.set(true);
LOG.trace("Connected to ApplicationMaster at: " + serviceAddr); LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
return proxy; return proxy;
} }
@ -301,13 +306,15 @@ public class ClientServiceDelegate {
} catch (NoSuchMethodException e) { } catch (NoSuchMethodException e) {
throw new YarnRuntimeException("Method name mismatch", e); throw new YarnRuntimeException("Method name mismatch", e);
} }
int maxRetries = this.conf.getInt( maxClientRetry = this.conf.getInt(
MRJobConfig.MR_CLIENT_MAX_RETRIES, MRJobConfig.MR_CLIENT_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES); MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
IOException lastException = null; IOException lastException = null;
while (maxRetries > 0) { while (maxClientRetry > 0) {
MRClientProtocol MRClientProxy = null;
try { try {
return methodOb.invoke(getProxy(), args); MRClientProxy = getProxy();
return methodOb.invoke(MRClientProxy, args);
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
// Will not throw out YarnException anymore // Will not throw out YarnException anymore
LOG.debug("Failed to contact AM/History for job " + jobId + LOG.debug("Failed to contact AM/History for job " + jobId +
@ -315,22 +322,44 @@ public class ClientServiceDelegate {
// Force reconnection by setting the proxy to null. // Force reconnection by setting the proxy to null.
realProxy = null; realProxy = null;
// HS/AMS shut down // HS/AMS shut down
maxRetries--; // if it's AM shut down, do not decrement maxClientRetry as we wait for
// AM to be restarted.
if (!usingAMProxy.get()) {
maxClientRetry--;
}
usingAMProxy.set(false);
lastException = new IOException(e.getTargetException()); lastException = new IOException(e.getTargetException());
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
LOG.warn("ClientServiceDelegate invoke call interrupted", ie);
throw new YarnRuntimeException(ie);
}
} catch (Exception e) { } catch (Exception e) {
LOG.debug("Failed to contact AM/History for job " + jobId LOG.debug("Failed to contact AM/History for job " + jobId
+ " Will retry..", e); + " Will retry..", e);
// Force reconnection by setting the proxy to null. // Force reconnection by setting the proxy to null.
realProxy = null; realProxy = null;
// RM shutdown // RM shutdown
maxRetries--; maxClientRetry--;
lastException = new IOException(e.getMessage()); lastException = new IOException(e.getMessage());
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
LOG.warn("ClientServiceDelegate invoke call interrupted", ie);
throw new YarnRuntimeException(ie);
}
} }
} }
throw lastException; throw lastException;
} }
// Only for testing
@VisibleForTesting
public int getMaxClientRetry() {
return this.maxClientRetry;
}
public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException, public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
InterruptedException { InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0); org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);

View File

@ -140,6 +140,48 @@ public class TestClientServiceDelegate {
any(GetJobReportRequest.class)); any(GetJobReportRequest.class));
} }
@Test
public void testRetriesOnAMConnectionFailures() throws Exception {
if (!isAMReachableFromClient) {
return;
}
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
.thenReturn(getRunningApplicationReport("am1", 78));
// throw exception in 1st, 2nd, 3rd and 4th call of getJobReport, and
// succeed in the 5th call.
final MRClientProtocol amProxy = mock(MRClientProtocol.class);
when(amProxy.getJobReport(any(GetJobReportRequest.class)))
.thenThrow(new RuntimeException("11"))
.thenThrow(new RuntimeException("22"))
.thenThrow(new RuntimeException("33"))
.thenThrow(new RuntimeException("44")).thenReturn(getJobReportResponse());
Configuration conf = new YarnConfiguration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
!isAMReachableFromClient);
ClientServiceDelegate clientServiceDelegate =
new ClientServiceDelegate(conf, rm, oldJobId, null) {
@Override
MRClientProtocol instantiateAMProxy(
final InetSocketAddress serviceAddr) throws IOException {
super.instantiateAMProxy(serviceAddr);
return amProxy;
}
};
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
// assert maxClientRetry is not decremented.
Assert.assertEquals(conf.getInt(MRJobConfig.MR_CLIENT_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES), clientServiceDelegate
.getMaxClientRetry());
verify(amProxy, times(5)).getJobReport(any(GetJobReportRequest.class));
}
@Test @Test
public void testHistoryServerNotConfigured() throws Exception { public void testHistoryServerNotConfigured() throws Exception {
//RM doesn't have app report and job History Server is not configured //RM doesn't have app report and job History Server is not configured