YARN-755. Renamed AllocateResponse.reboot to AllocateResponse.resync. Contributed by Bikas Saha.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1489295 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-06-04 05:53:47 +00:00
parent 28aabe0b2b
commit 978012b9b6
13 changed files with 38 additions and 29 deletions

View File

@ -116,7 +116,7 @@ protected synchronized void heartbeat() throws Exception {
// continue to attempt to contact the RM. // continue to attempt to contact the RM.
throw e; throw e;
} }
if (allocateResponse.getReboot()) { if (allocateResponse.getResync()) {
LOG.info("Event from RM: shutting down Application Master"); LOG.info("Event from RM: shutting down Application Master");
// This can happen if the RM has been restarted. If it is in that state, // This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up. // this application must clean itself up.

View File

@ -570,7 +570,7 @@ private List<Container> getResources() throws Exception {
// continue to attempt to contact the RM. // continue to attempt to contact the RM.
throw e; throw e;
} }
if (response.getReboot()) { if (response.getResync()) {
// This can happen if the RM has been restarted. If it is in that state, // This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up. // this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(), eventHandler.handle(new JobEvent(this.getJob().getID(),

View File

@ -93,8 +93,8 @@ Release 2.1.0-beta - UNRELEASED
YARN-635. Renamed YarnRemoteException to YarnException. (Siddharth Seth via YARN-635. Renamed YarnRemoteException to YarnException. (Siddharth Seth via
vinodkv) vinodkv)
YARN-756. Move Preemption* records to yarn.api where they really belong. YARN-755. Renamed AllocateResponse.reboot to AllocateResponse.resync. (Bikas
(Jian He via vinodkv) Saha via vinodkv)
NEW FEATURES NEW FEATURES
@ -265,6 +265,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-717. Put object creation factories for Token in the class itself and YARN-717. Put object creation factories for Token in the class itself and
remove useless derivations for specific tokens. (Jian He via vinodkv) remove useless derivations for specific tokens. (Jian He via vinodkv)
YARN-756. Move Preemption* records to yarn.api where they really belong.
(Jian He via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -65,7 +65,7 @@ public abstract class AllocateResponse {
public static AllocateResponse newInstance(int responseId, public static AllocateResponse newInstance(int responseId,
List<ContainerStatus> completedContainers, List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes, List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, boolean reboot, int numClusterNodes, Resource availResources, boolean resync, int numClusterNodes,
PreemptionMessage preempt) { PreemptionMessage preempt) {
AllocateResponse response = Records.newRecord(AllocateResponse.class); AllocateResponse response = Records.newRecord(AllocateResponse.class);
response.setNumClusterNodes(numClusterNodes); response.setNumClusterNodes(numClusterNodes);
@ -74,26 +74,32 @@ public static AllocateResponse newInstance(int responseId,
response.setAllocatedContainers(allocatedContainers); response.setAllocatedContainers(allocatedContainers);
response.setUpdatedNodes(updatedNodes); response.setUpdatedNodes(updatedNodes);
response.setAvailableResources(availResources); response.setAvailableResources(availResources);
response.setReboot(reboot); response.setResync(resync);
response.setPreemptionMessage(preempt); response.setPreemptionMessage(preempt);
return response; return response;
} }
/** /**
* Should the <code>ApplicationMaster</code> reboot for being horribly * Should the <code>ApplicationMaster</code> take action because of being
* out-of-sync with the <code>ResourceManager</code> as deigned by * out-of-sync with the <code>ResourceManager</code> as deigned by
* {@link #getResponseId()}? * {@link #getResponseId()}
* This can be due to application errors or because the ResourceManager
* has restarted. The action to be taken by the <code>ApplicationMaster</code>
* is to shutdown without unregistering with the <code>ResourceManager</code>.
* The ResourceManager will start a new attempt. If the application is already
* done when it gets the resync command, then it may choose to shutdown after
* unregistering in which case the ResourceManager will not start a new attempt.
* *
* @return <code>true</code> if the <code>ApplicationMaster</code> should * @return <code>true</code> if the <code>ApplicationMaster</code> should
* reboot, <code>false</code> otherwise * take action, <code>false</code> otherwise
*/ */
@Public @Public
@Stable @Stable
public abstract boolean getReboot(); public abstract boolean getResync();
@Private @Private
@Unstable @Unstable
public abstract void setReboot(boolean reboot); public abstract void setResync(boolean value);
/** /**
* Get the <em>last response id</em>. * Get the <em>last response id</em>.

View File

@ -145,15 +145,15 @@ private synchronized void maybeInitBuilder() {
} }
@Override @Override
public synchronized boolean getReboot() { public synchronized boolean getResync() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getReboot()); return (p.getResync());
} }
@Override @Override
public synchronized void setReboot(boolean reboot) { public synchronized void setResync(boolean resync) {
maybeInitBuilder(); maybeInitBuilder();
builder.setReboot((reboot)); builder.setResync((resync));
} }
@Override @Override

View File

@ -59,7 +59,7 @@ message AllocateRequestProto {
} }
message AllocateResponseProto { message AllocateResponseProto {
optional bool reboot = 1; optional bool resync = 1;
optional int32 response_id = 2; optional int32 response_id = 2;
repeated ContainerProto allocated_containers = 3; repeated ContainerProto allocated_containers = 3;
repeated ContainerStatusProto completed_container_statuses = 4; repeated ContainerStatusProto completed_container_statuses = 4;

View File

@ -331,7 +331,7 @@ public void run() {
continue; continue;
} }
if (response.getReboot()) { if (response.getResync()) {
handler.onRebootRequest(); handler.onRebootRequest();
LOG.info("Reboot requested. Stopping callback."); LOG.info("Reboot requested. Stopping callback.");
break; break;

View File

@ -189,7 +189,7 @@ public void testAMRMClientAsyncReboot() throws Exception {
final AllocateResponse rebootResponse = createAllocateResponse( final AllocateResponse rebootResponse = createAllocateResponse(
new ArrayList<ContainerStatus>(), new ArrayList<Container>()); new ArrayList<ContainerStatus>(), new ArrayList<Container>());
rebootResponse.setReboot(true); rebootResponse.setResync(true);
when(client.allocate(anyFloat())).thenReturn(rebootResponse); when(client.allocate(anyFloat())).thenReturn(rebootResponse);
AMRMClientAsync<ContainerRequest> asyncClient = AMRMClientAsync<ContainerRequest> asyncClient =

View File

@ -411,7 +411,7 @@ public static AllocateResponse newAllocateResponse(int responseId,
response.setAllocatedContainers(allocatedContainers); response.setAllocatedContainers(allocatedContainers);
response.setUpdatedNodes(updatedNodes); response.setUpdatedNodes(updatedNodes);
response.setAvailableResources(availResources); response.setAvailableResources(availResources);
response.setReboot(reboot); response.setResync(reboot);
response.setPreemptionMessage(preempt); response.setPreemptionMessage(preempt);
return response; return response;

View File

@ -90,7 +90,7 @@ public class ApplicationMasterService extends AbstractService implements
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
private final ConcurrentMap<ApplicationAttemptId, AllocateResponse> responseMap = private final ConcurrentMap<ApplicationAttemptId, AllocateResponse> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponse>(); new ConcurrentHashMap<ApplicationAttemptId, AllocateResponse>();
private final AllocateResponse reboot = private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class); recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext; private final RMContext rmContext;
@ -98,7 +98,7 @@ public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName()); super(ApplicationMasterService.class.getName());
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler; this.rScheduler = scheduler;
this.reboot.setReboot(true); this.resync.setResync(true);
// this.reboot.containers = new ArrayList<Container>(); // this.reboot.containers = new ArrayList<Container>();
this.rmContext = rmContext; this.rmContext = rmContext;
} }
@ -263,7 +263,7 @@ public AllocateResponse allocate(AllocateRequest request)
AllocateResponse lastResponse = responseMap.get(appAttemptId); AllocateResponse lastResponse = responseMap.get(appAttemptId);
if (lastResponse == null) { if (lastResponse == null) {
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
return reboot; return resync;
} }
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
/* old heartbeat */ /* old heartbeat */
@ -273,7 +273,7 @@ public AllocateResponse allocate(AllocateRequest request)
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
// Reboot is not useful since after AM reboots, it will send register and // Reboot is not useful since after AM reboots, it will send register and
// get an exception. Might as well throw an exception here. // get an exception. Might as well throw an exception here.
return reboot; return resync;
} }
// Allow only one thread in AM to do heartbeat at a time. // Allow only one thread in AM to do heartbeat at a time.
@ -344,7 +344,7 @@ public AllocateResponse allocate(AllocateRequest request)
String message = "App Attempt removed from the cache during allocate" String message = "App Attempt removed from the cache during allocate"
+ appAttemptId; + appAttemptId;
LOG.error(message); LOG.error(message);
return reboot; return resync;
} }
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());

View File

@ -250,7 +250,7 @@ public void testRMRestart() throws Exception {
AllocateResponse allocResponse = am1.allocate( AllocateResponse allocResponse = am1.allocate(
new ArrayList<ResourceRequest>(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()); new ArrayList<ContainerId>());
Assert.assertTrue(allocResponse.getReboot()); Assert.assertTrue(allocResponse.getResync());
// NM should be rebooted on heartbeat, even first heartbeat for nm2 // NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);

View File

@ -82,7 +82,7 @@ public void testARRMResponseId() throws Exception {
AllocateResponse response = amService.allocate(allocateRequest); AllocateResponse response = amService.allocate(allocateRequest);
Assert.assertEquals(1, response.getResponseId()); Assert.assertEquals(1, response.getResponseId());
Assert.assertFalse(response.getReboot()); Assert.assertFalse(response.getResync());
allocateRequest = AllocateRequest.newInstance(attempt allocateRequest = AllocateRequest.newInstance(attempt
.getAppAttemptId(), response.getResponseId(), 0F, null, null); .getAppAttemptId(), response.getResponseId(), 0F, null, null);
@ -96,6 +96,6 @@ public void testARRMResponseId() throws Exception {
allocateRequest = AllocateRequest.newInstance(attempt allocateRequest = AllocateRequest.newInstance(attempt
.getAppAttemptId(), 0, 0F, null, null); .getAppAttemptId(), 0, 0F, null, null);
response = amService.allocate(allocateRequest); response = amService.allocate(allocateRequest);
Assert.assertTrue(response.getReboot()); Assert.assertTrue(response.getResync());
} }
} }

View File

@ -208,7 +208,7 @@ public void testMasterKeyRollOver() throws Exception {
AllocateRequest allocateRequest = AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class); Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId); allocateRequest.setApplicationAttemptId(applicationAttemptId);
Assert.assertFalse(rmClient.allocate(allocateRequest).getReboot()); Assert.assertFalse(rmClient.allocate(allocateRequest).getResync());
// Simulate a master-key-roll-over // Simulate a master-key-roll-over
ApplicationTokenSecretManager appTokenSecretManager = ApplicationTokenSecretManager appTokenSecretManager =
@ -224,7 +224,7 @@ public void testMasterKeyRollOver() throws Exception {
rmClient = createRMClient(rm, conf, rpc, currentUser); rmClient = createRMClient(rm, conf, rpc, currentUser);
allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId); allocateRequest.setApplicationAttemptId(applicationAttemptId);
Assert.assertFalse(rmClient.allocate(allocateRequest).getReboot()); Assert.assertFalse(rmClient.allocate(allocateRequest).getResync());
} finally { } finally {
rm.stop(); rm.stop();
if (rmClient != null) { if (rmClient != null) {