YARN-755. Renamed AllocateResponse.reboot to AllocateResponse.resync. Contributed by Bikas Saha.

svn merge --ignore-ancestry -c 1489295 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1489296 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-06-04 05:55:22 +00:00
parent 3adbeff801
commit 377f3ef85b
13 changed files with 38 additions and 29 deletions

View File

@ -116,7 +116,7 @@ public class LocalContainerAllocator extends RMCommunicator
// continue to attempt to contact the RM. // continue to attempt to contact the RM.
throw e; throw e;
} }
if (allocateResponse.getReboot()) { if (allocateResponse.getResync()) {
LOG.info("Event from RM: shutting down Application Master"); LOG.info("Event from RM: shutting down Application Master");
// This can happen if the RM has been restarted. If it is in that state, // This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up. // this application must clean itself up.

View File

@ -570,7 +570,7 @@ public class RMContainerAllocator extends RMContainerRequestor
// continue to attempt to contact the RM. // continue to attempt to contact the RM.
throw e; throw e;
} }
if (response.getReboot()) { if (response.getResync()) {
// This can happen if the RM has been restarted. If it is in that state, // This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up. // this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(), eventHandler.handle(new JobEvent(this.getJob().getID(),

View File

@ -73,8 +73,8 @@ Release 2.1.0-beta - UNRELEASED
YARN-635. Renamed YarnRemoteException to YarnException. (Siddharth Seth via YARN-635. Renamed YarnRemoteException to YarnException. (Siddharth Seth via
vinodkv) vinodkv)
YARN-756. Move Preemption* records to yarn.api where they really belong. YARN-755. Renamed AllocateResponse.reboot to AllocateResponse.resync. (Bikas
(Jian He via vinodkv) Saha via vinodkv)
NEW FEATURES NEW FEATURES
@ -245,6 +245,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-717. Put object creation factories for Token in the class itself and YARN-717. Put object creation factories for Token in the class itself and
remove useless derivations for specific tokens. (Jian He via vinodkv) remove useless derivations for specific tokens. (Jian He via vinodkv)
YARN-756. Move Preemption* records to yarn.api where they really belong.
(Jian He via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -65,7 +65,7 @@ public abstract class AllocateResponse {
public static AllocateResponse newInstance(int responseId, public static AllocateResponse newInstance(int responseId,
List<ContainerStatus> completedContainers, List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes, List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, boolean reboot, int numClusterNodes, Resource availResources, boolean resync, int numClusterNodes,
PreemptionMessage preempt) { PreemptionMessage preempt) {
AllocateResponse response = Records.newRecord(AllocateResponse.class); AllocateResponse response = Records.newRecord(AllocateResponse.class);
response.setNumClusterNodes(numClusterNodes); response.setNumClusterNodes(numClusterNodes);
@ -74,26 +74,32 @@ public abstract class AllocateResponse {
response.setAllocatedContainers(allocatedContainers); response.setAllocatedContainers(allocatedContainers);
response.setUpdatedNodes(updatedNodes); response.setUpdatedNodes(updatedNodes);
response.setAvailableResources(availResources); response.setAvailableResources(availResources);
response.setReboot(reboot); response.setResync(resync);
response.setPreemptionMessage(preempt); response.setPreemptionMessage(preempt);
return response; return response;
} }
/** /**
* Should the <code>ApplicationMaster</code> reboot for being horribly * Should the <code>ApplicationMaster</code> take action because of being
* out-of-sync with the <code>ResourceManager</code> as deigned by * out-of-sync with the <code>ResourceManager</code> as deigned by
* {@link #getResponseId()}? * {@link #getResponseId()}
* This can be due to application errors or because the ResourceManager
* has restarted. The action to be taken by the <code>ApplicationMaster</code>
* is to shutdown without unregistering with the <code>ResourceManager</code>.
* The ResourceManager will start a new attempt. If the application is already
* done when it gets the resync command, then it may choose to shutdown after
* unregistering in which case the ResourceManager will not start a new attempt.
* *
* @return <code>true</code> if the <code>ApplicationMaster</code> should * @return <code>true</code> if the <code>ApplicationMaster</code> should
* reboot, <code>false</code> otherwise * take action, <code>false</code> otherwise
*/ */
@Public @Public
@Stable @Stable
public abstract boolean getReboot(); public abstract boolean getResync();
@Private @Private
@Unstable @Unstable
public abstract void setReboot(boolean reboot); public abstract void setResync(boolean value);
/** /**
* Get the <em>last response id</em>. * Get the <em>last response id</em>.

View File

@ -145,15 +145,15 @@ public class AllocateResponsePBImpl extends AllocateResponse {
} }
@Override @Override
public synchronized boolean getReboot() { public synchronized boolean getResync() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getReboot()); return (p.getResync());
} }
@Override @Override
public synchronized void setReboot(boolean reboot) { public synchronized void setResync(boolean resync) {
maybeInitBuilder(); maybeInitBuilder();
builder.setReboot((reboot)); builder.setResync((resync));
} }
@Override @Override

View File

@ -59,7 +59,7 @@ message AllocateRequestProto {
} }
message AllocateResponseProto { message AllocateResponseProto {
optional bool reboot = 1; optional bool resync = 1;
optional int32 response_id = 2; optional int32 response_id = 2;
repeated ContainerProto allocated_containers = 3; repeated ContainerProto allocated_containers = 3;
repeated ContainerStatusProto completed_container_statuses = 4; repeated ContainerStatusProto completed_container_statuses = 4;

View File

@ -331,7 +331,7 @@ public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService
continue; continue;
} }
if (response.getReboot()) { if (response.getResync()) {
handler.onRebootRequest(); handler.onRebootRequest();
LOG.info("Reboot requested. Stopping callback."); LOG.info("Reboot requested. Stopping callback.");
break; break;

View File

@ -189,7 +189,7 @@ public class TestAMRMClientAsync {
final AllocateResponse rebootResponse = createAllocateResponse( final AllocateResponse rebootResponse = createAllocateResponse(
new ArrayList<ContainerStatus>(), new ArrayList<Container>()); new ArrayList<ContainerStatus>(), new ArrayList<Container>());
rebootResponse.setReboot(true); rebootResponse.setResync(true);
when(client.allocate(anyFloat())).thenReturn(rebootResponse); when(client.allocate(anyFloat())).thenReturn(rebootResponse);
AMRMClientAsync<ContainerRequest> asyncClient = AMRMClientAsync<ContainerRequest> asyncClient =

View File

@ -411,7 +411,7 @@ public class BuilderUtils {
response.setAllocatedContainers(allocatedContainers); response.setAllocatedContainers(allocatedContainers);
response.setUpdatedNodes(updatedNodes); response.setUpdatedNodes(updatedNodes);
response.setAvailableResources(availResources); response.setAvailableResources(availResources);
response.setReboot(reboot); response.setResync(reboot);
response.setPreemptionMessage(preempt); response.setPreemptionMessage(preempt);
return response; return response;

View File

@ -90,7 +90,7 @@ public class ApplicationMasterService extends AbstractService implements
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
private final ConcurrentMap<ApplicationAttemptId, AllocateResponse> responseMap = private final ConcurrentMap<ApplicationAttemptId, AllocateResponse> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponse>(); new ConcurrentHashMap<ApplicationAttemptId, AllocateResponse>();
private final AllocateResponse reboot = private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class); recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext; private final RMContext rmContext;
@ -98,7 +98,7 @@ public class ApplicationMasterService extends AbstractService implements
super(ApplicationMasterService.class.getName()); super(ApplicationMasterService.class.getName());
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler; this.rScheduler = scheduler;
this.reboot.setReboot(true); this.resync.setResync(true);
// this.reboot.containers = new ArrayList<Container>(); // this.reboot.containers = new ArrayList<Container>();
this.rmContext = rmContext; this.rmContext = rmContext;
} }
@ -263,7 +263,7 @@ public class ApplicationMasterService extends AbstractService implements
AllocateResponse lastResponse = responseMap.get(appAttemptId); AllocateResponse lastResponse = responseMap.get(appAttemptId);
if (lastResponse == null) { if (lastResponse == null) {
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
return reboot; return resync;
} }
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
/* old heartbeat */ /* old heartbeat */
@ -273,7 +273,7 @@ public class ApplicationMasterService extends AbstractService implements
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
// Reboot is not useful since after AM reboots, it will send register and // Reboot is not useful since after AM reboots, it will send register and
// get an exception. Might as well throw an exception here. // get an exception. Might as well throw an exception here.
return reboot; return resync;
} }
// Allow only one thread in AM to do heartbeat at a time. // Allow only one thread in AM to do heartbeat at a time.
@ -344,7 +344,7 @@ public class ApplicationMasterService extends AbstractService implements
String message = "App Attempt removed from the cache during allocate" String message = "App Attempt removed from the cache during allocate"
+ appAttemptId; + appAttemptId;
LOG.error(message); LOG.error(message);
return reboot; return resync;
} }
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());

View File

@ -250,7 +250,7 @@ public class TestRMRestart {
AllocateResponse allocResponse = am1.allocate( AllocateResponse allocResponse = am1.allocate(
new ArrayList<ResourceRequest>(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()); new ArrayList<ContainerId>());
Assert.assertTrue(allocResponse.getReboot()); Assert.assertTrue(allocResponse.getResync());
// NM should be rebooted on heartbeat, even first heartbeat for nm2 // NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);

View File

@ -82,7 +82,7 @@ public class TestAMRMRPCResponseId {
AllocateResponse response = amService.allocate(allocateRequest); AllocateResponse response = amService.allocate(allocateRequest);
Assert.assertEquals(1, response.getResponseId()); Assert.assertEquals(1, response.getResponseId());
Assert.assertFalse(response.getReboot()); Assert.assertFalse(response.getResync());
allocateRequest = AllocateRequest.newInstance(attempt allocateRequest = AllocateRequest.newInstance(attempt
.getAppAttemptId(), response.getResponseId(), 0F, null, null); .getAppAttemptId(), response.getResponseId(), 0F, null, null);
@ -96,6 +96,6 @@ public class TestAMRMRPCResponseId {
allocateRequest = AllocateRequest.newInstance(attempt allocateRequest = AllocateRequest.newInstance(attempt
.getAppAttemptId(), 0, 0F, null, null); .getAppAttemptId(), 0, 0F, null, null);
response = amService.allocate(allocateRequest); response = amService.allocate(allocateRequest);
Assert.assertTrue(response.getReboot()); Assert.assertTrue(response.getResync());
} }
} }

View File

@ -208,7 +208,7 @@ public class TestApplicationTokens {
AllocateRequest allocateRequest = AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class); Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId); allocateRequest.setApplicationAttemptId(applicationAttemptId);
Assert.assertFalse(rmClient.allocate(allocateRequest).getReboot()); Assert.assertFalse(rmClient.allocate(allocateRequest).getResync());
// Simulate a master-key-roll-over // Simulate a master-key-roll-over
ApplicationTokenSecretManager appTokenSecretManager = ApplicationTokenSecretManager appTokenSecretManager =
@ -224,7 +224,7 @@ public class TestApplicationTokens {
rmClient = createRMClient(rm, conf, rpc, currentUser); rmClient = createRMClient(rm, conf, rpc, currentUser);
allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId); allocateRequest.setApplicationAttemptId(applicationAttemptId);
Assert.assertFalse(rmClient.allocate(allocateRequest).getReboot()); Assert.assertFalse(rmClient.allocate(allocateRequest).getResync());
} finally { } finally {
rm.stop(); rm.stop();
if (rmClient != null) { if (rmClient != null) {