MAPREDUCE-3186. User jobs are getting hanged if the Resource manager process goes down and comes up while job is getting executed. (Eric Payne via mahadev) - Merging r1190122 from trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1190123 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0b0404822c
commit
e8b692c4f6
|
@ -1763,6 +1763,9 @@ Release 0.23.0 - Unreleased
|
|||
MAPREDUCE-3282. bin/mapred job -list throws exception. (acmurthy via
|
||||
mahadev)
|
||||
|
||||
MAPREDUCE-3186. User jobs are getting hanged if the Resource manager process goes down
|
||||
and comes up while job is getting executed. (Eric Payne via mahadev)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -23,19 +23,23 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.JobCounter;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
|
@ -57,8 +61,10 @@ public class LocalContainerAllocator extends RMCommunicator
|
|||
LogFactory.getLog(LocalContainerAllocator.class);
|
||||
|
||||
private final EventHandler eventHandler;
|
||||
private final ApplicationId appID;
|
||||
// private final ApplicationId appID;
|
||||
private AtomicInteger containerCount = new AtomicInteger();
|
||||
private long retryInterval;
|
||||
private long retrystartTime;
|
||||
|
||||
private final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
@ -67,7 +73,19 @@ public class LocalContainerAllocator extends RMCommunicator
|
|||
AppContext context) {
|
||||
super(clientService, context);
|
||||
this.eventHandler = context.getEventHandler();
|
||||
this.appID = context.getApplicationID();
|
||||
// this.appID = context.getApplicationID();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Configuration conf) {
|
||||
super.init(conf);
|
||||
retryInterval =
|
||||
getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
|
||||
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
|
||||
// Init startTime to current time. If all goes well, it will be reset after
|
||||
// first attempt to contact RM.
|
||||
retrystartTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,10 +95,32 @@ public class LocalContainerAllocator extends RMCommunicator
|
|||
.getApplicationProgress(), new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>());
|
||||
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
|
||||
AMResponse response = allocateResponse.getAMResponse();
|
||||
AMResponse response;
|
||||
try {
|
||||
response = allocateResponse.getAMResponse();
|
||||
// Reset retry count if no exception occurred.
|
||||
retrystartTime = System.currentTimeMillis();
|
||||
} catch (Exception e) {
|
||||
// This can happen when the connection to the RM has gone down. Keep
|
||||
// re-trying until the retryInterval has expired.
|
||||
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.INTERNAL_ERROR));
|
||||
throw new YarnException("Could not contact RM after " +
|
||||
retryInterval + " milliseconds.");
|
||||
}
|
||||
// Throw this up to the caller, which may decide to ignore it and
|
||||
// continue to attempt to contact the RM.
|
||||
throw e;
|
||||
}
|
||||
if (response.getReboot()) {
|
||||
// TODO
|
||||
LOG.info("Event from RM: shutting down Application Master");
|
||||
// This can happen if the RM has been restarted. If it is in that state,
|
||||
// this application must clean itself up.
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.INTERNAL_ERROR));
|
||||
throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
|
||||
this.getContext().getApplicationID());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -233,6 +233,9 @@ public abstract class RMCommunicator extends AbstractService {
|
|||
Thread.sleep(rmPollInterval);
|
||||
try {
|
||||
heartbeat();
|
||||
} catch (YarnException e) {
|
||||
LOG.error("Error communicating with RM: " + e.getMessage() , e);
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.error("ERROR IN CONTACTING RM. ", e);
|
||||
// TODO: for other exceptions
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssigned
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.records.AMResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -128,6 +129,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
private float maxReduceRampupLimit = 0;
|
||||
private float maxReducePreemptionLimit = 0;
|
||||
private float reduceSlowStart = 0;
|
||||
private long retryInterval;
|
||||
private long retrystartTime;
|
||||
|
||||
public RMContainerAllocator(ClientService clientService, AppContext context) {
|
||||
super(clientService, context);
|
||||
|
@ -146,6 +149,11 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
|
||||
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
|
||||
RackResolver.init(conf);
|
||||
retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
|
||||
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
|
||||
// Init startTime to current time. If all goes well, it will be reset after
|
||||
// first attempt to contact RM.
|
||||
retrystartTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -429,11 +437,41 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
" rackLocalAssigned:" + rackLocalAssigned +
|
||||
" availableResources(headroom):" + getAvailableResources();
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private List<Container> getResources() throws Exception {
|
||||
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
|
||||
AMResponse response = makeRemoteRequest();
|
||||
AMResponse response;
|
||||
/*
|
||||
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
|
||||
* milliseconds before aborting. During this interval, AM will still try
|
||||
* to contact the RM.
|
||||
*/
|
||||
try {
|
||||
response = makeRemoteRequest();
|
||||
// Reset retry count if no exception occurred.
|
||||
retrystartTime = System.currentTimeMillis();
|
||||
} catch (Exception e) {
|
||||
// This can happen when the connection to the RM has gone down. Keep
|
||||
// re-trying until the retryInterval has expired.
|
||||
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.INTERNAL_ERROR));
|
||||
throw new YarnException("Could not contact RM after " +
|
||||
retryInterval + " milliseconds.");
|
||||
}
|
||||
// Throw this up to the caller, which may decide to ignore it and
|
||||
// continue to attempt to contact the RM.
|
||||
throw e;
|
||||
}
|
||||
if (response.getReboot()) {
|
||||
// This can happen if the RM has been restarted. If it is in that state,
|
||||
// this application must clean itself up.
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.INTERNAL_ERROR));
|
||||
throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
|
||||
this.getContext().getApplicationID());
|
||||
}
|
||||
int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
|
||||
List<Container> newContainers = response.getAllocatedContainers();
|
||||
List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
|
||||
|
|
|
@ -403,6 +403,15 @@ public interface MRJobConfig {
|
|||
MR_AM_PREFIX + "scheduler.heartbeat.interval-ms";
|
||||
public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 2000;
|
||||
|
||||
/**
|
||||
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
|
||||
* milliseconds before aborting. During this interval, AM will still try
|
||||
* to contact the RM.
|
||||
*/
|
||||
public static final String MR_AM_TO_RM_WAIT_INTERVAL_MS =
|
||||
MR_AM_PREFIX + "scheduler.connection.wait.interval-ms";
|
||||
public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000;
|
||||
|
||||
/**
|
||||
* Boolean. Create the base dirs in the JobHistoryEventHandler
|
||||
* Set to false for multi-user clusters. This is an internal config that
|
||||
|
|
Loading…
Reference in New Issue