svn merge -c 1409525 FIXES: MAPREDUCE-4797. LocalContainerAllocator can loop forever trying to contact the RM (jlowe via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1409528 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d0a66f405f
commit
3b9bb65473
|
@ -516,6 +516,9 @@ Release 0.23.5 - UNRELEASED
|
|||
|
||||
MAPREDUCE-4517. Too many INFO messages written out during AM to RM heartbeat
|
||||
(Jason Lowe via tgraves)
|
||||
|
||||
MAPREDUCE-4797. LocalContainerAllocator can loop forever trying to contact
|
||||
the RM (jlowe via bobby)
|
||||
|
||||
Release 0.23.4 - UNRELEASED
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.mapreduce.v2.app.local;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -62,7 +61,6 @@ public class LocalContainerAllocator extends RMCommunicator
|
|||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private final EventHandler eventHandler;
|
||||
private AtomicInteger containerCount = new AtomicInteger();
|
||||
private long retryInterval;
|
||||
private long retrystartTime;
|
||||
private String nmHost;
|
||||
|
@ -102,9 +100,9 @@ public class LocalContainerAllocator extends RMCommunicator
|
|||
this.applicationAttemptId, this.lastResponseID, super
|
||||
.getApplicationProgress(), new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>());
|
||||
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
|
||||
AMResponse response;
|
||||
try {
|
||||
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
|
||||
response = allocateResponse.getAMResponse();
|
||||
// Reset retry count if no exception occurred.
|
||||
retrystartTime = System.currentTimeMillis();
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
package org.apache.hadoop.mapreduce.v2.app.local;
|
||||
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.yarn.ClusterInfo;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestLocalContainerAllocator {
|
||||
|
||||
@Test
|
||||
public void testRMConnectionRetry() throws Exception {
|
||||
// verify the connection exception is thrown
|
||||
// if we haven't exhausted the retry interval
|
||||
Configuration conf = new Configuration();
|
||||
LocalContainerAllocator lca = new StubbedLocalContainerAllocator();
|
||||
lca.init(conf);
|
||||
lca.start();
|
||||
try {
|
||||
lca.heartbeat();
|
||||
Assert.fail("heartbeat was supposed to throw");
|
||||
} catch (YarnRemoteException e) {
|
||||
// YarnRemoteException is expected
|
||||
} finally {
|
||||
lca.stop();
|
||||
}
|
||||
|
||||
// verify YarnException is thrown when the retry interval has expired
|
||||
conf.setLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
|
||||
lca = new StubbedLocalContainerAllocator();
|
||||
lca.init(conf);
|
||||
lca.start();
|
||||
try {
|
||||
lca.heartbeat();
|
||||
Assert.fail("heartbeat was supposed to throw");
|
||||
} catch (YarnException e) {
|
||||
// YarnException is expected
|
||||
} finally {
|
||||
lca.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private static class StubbedLocalContainerAllocator
|
||||
extends LocalContainerAllocator {
|
||||
|
||||
public StubbedLocalContainerAllocator() {
|
||||
super(mock(ClientService.class), createAppContext(),
|
||||
"nmhost", 1, 2, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void register() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void startAllocatorThread() {
|
||||
allocatorThread = new Thread();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AMRMProtocol createSchedulerProxy() {
|
||||
AMRMProtocol scheduler = mock(AMRMProtocol.class);
|
||||
try {
|
||||
when(scheduler.allocate(isA(AllocateRequest.class)))
|
||||
.thenThrow(RPCUtil.getRemoteException(new IOException("forcefail")));
|
||||
} catch (YarnRemoteException e) {
|
||||
}
|
||||
return scheduler;
|
||||
}
|
||||
|
||||
private static AppContext createAppContext() {
|
||||
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
|
||||
ApplicationAttemptId attemptId =
|
||||
BuilderUtils.newApplicationAttemptId(appId, 1);
|
||||
Job job = mock(Job.class);
|
||||
@SuppressWarnings("rawtypes")
|
||||
EventHandler eventHandler = mock(EventHandler.class);
|
||||
AppContext ctx = mock(AppContext.class);
|
||||
when(ctx.getApplicationID()).thenReturn(appId);
|
||||
when(ctx.getApplicationAttemptId()).thenReturn(attemptId);
|
||||
when(ctx.getJob(isA(JobId.class))).thenReturn(job);
|
||||
when(ctx.getClusterInfo()).thenReturn(
|
||||
new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
|
||||
.newResource(10240)));
|
||||
when(ctx.getEventHandler()).thenReturn(eventHandler);
|
||||
return ctx;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue