MAPREDUCE-4797. LocalContainerAllocator can loop forever trying to contact the RM (jlowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1409525 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-11-14 23:02:29 +00:00
parent 397c261433
commit 316c23dd7e
3 changed files with 112 additions and 3 deletions

View File

@ -664,6 +664,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4517. Too many INFO messages written out during AM to RM heartbeat MAPREDUCE-4517. Too many INFO messages written out during AM to RM heartbeat
(Jason Lowe via tgraves) (Jason Lowe via tgraves)
MAPREDUCE-4797. LocalContainerAllocator can loop forever trying to contact
the RM (jlowe via bobby)
Release 0.23.4 - UNRELEASED Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.mapreduce.v2.app.local; package org.apache.hadoop.mapreduce.v2.app.local;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -62,7 +61,6 @@ public class LocalContainerAllocator extends RMCommunicator
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private final EventHandler eventHandler; private final EventHandler eventHandler;
private AtomicInteger containerCount = new AtomicInteger();
private long retryInterval; private long retryInterval;
private long retrystartTime; private long retrystartTime;
private String nmHost; private String nmHost;
@ -102,9 +100,9 @@ protected synchronized void heartbeat() throws Exception {
this.applicationAttemptId, this.lastResponseID, super this.applicationAttemptId, this.lastResponseID, super
.getApplicationProgress(), new ArrayList<ResourceRequest>(), .getApplicationProgress(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()); new ArrayList<ContainerId>());
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
AMResponse response; AMResponse response;
try { try {
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
response = allocateResponse.getAMResponse(); response = allocateResponse.getAMResponse();
// Reset retry count if no exception occurred. // Reset retry count if no exception occurred.
retrystartTime = System.currentTimeMillis(); retrystartTime = System.currentTimeMillis();

View File

@ -0,0 +1,108 @@
package org.apache.hadoop.mapreduce.v2.app.local;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
public class TestLocalContainerAllocator {
@Test
public void testRMConnectionRetry() throws Exception {
// verify the connection exception is thrown
// if we haven't exhausted the retry interval
Configuration conf = new Configuration();
LocalContainerAllocator lca = new StubbedLocalContainerAllocator();
lca.init(conf);
lca.start();
try {
lca.heartbeat();
Assert.fail("heartbeat was supposed to throw");
} catch (YarnRemoteException e) {
// YarnRemoteException is expected
} finally {
lca.stop();
}
// verify YarnException is thrown when the retry interval has expired
conf.setLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
lca = new StubbedLocalContainerAllocator();
lca.init(conf);
lca.start();
try {
lca.heartbeat();
Assert.fail("heartbeat was supposed to throw");
} catch (YarnException e) {
// YarnException is expected
} finally {
lca.stop();
}
}
private static class StubbedLocalContainerAllocator
extends LocalContainerAllocator {
public StubbedLocalContainerAllocator() {
super(mock(ClientService.class), createAppContext(),
"nmhost", 1, 2, null);
}
@Override
protected void register() {
}
@Override
protected void startAllocatorThread() {
allocatorThread = new Thread();
}
@Override
protected AMRMProtocol createSchedulerProxy() {
AMRMProtocol scheduler = mock(AMRMProtocol.class);
try {
when(scheduler.allocate(isA(AllocateRequest.class)))
.thenThrow(RPCUtil.getRemoteException(new IOException("forcefail")));
} catch (YarnRemoteException e) {
}
return scheduler;
}
private static AppContext createAppContext() {
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId attemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
Job job = mock(Job.class);
@SuppressWarnings("rawtypes")
EventHandler eventHandler = mock(EventHandler.class);
AppContext ctx = mock(AppContext.class);
when(ctx.getApplicationID()).thenReturn(appId);
when(ctx.getApplicationAttemptId()).thenReturn(attemptId);
when(ctx.getJob(isA(JobId.class))).thenReturn(job);
when(ctx.getClusterInfo()).thenReturn(
new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
.newResource(10240)));
when(ctx.getEventHandler()).thenReturn(eventHandler);
return ctx;
}
}
}