Merge r1490470 from trunk to branch-2 for YARN-759. Create Command enum in AllocateResponse (bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1490473 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-06-06 23:43:38 +00:00
parent 99aff64f1a
commit 485c04d0a4
17 changed files with 170 additions and 60 deletions

View File

@ -116,14 +116,23 @@ public class LocalContainerAllocator extends RMCommunicator
// continue to attempt to contact the RM.
throw e;
}
if (allocateResponse.getResync()) {
LOG.info("Event from RM: shutting down Application Master");
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
if (allocateResponse.getAMCommand() != null) {
switch(allocateResponse.getAMCommand()) {
case AM_RESYNC:
case AM_SHUTDOWN:
LOG.info("Event from RM: shutting down Application Master");
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
default:
String msg =
"Unhandled value of AMCommand: " + allocateResponse.getAMCommand();
LOG.error(msg);
throw new YarnRuntimeException(msg);
}
}
}

View File

@ -570,13 +570,22 @@ public class RMContainerAllocator extends RMContainerRequestor
// continue to attempt to contact the RM.
throw e;
}
if (response.getResync()) {
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
if (response.getAMCommand() != null) {
switch(response.getAMCommand()) {
case AM_RESYNC:
case AM_SHUTDOWN:
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
default:
String msg =
"Unhandled value of AMCommand: " + response.getAMCommand();
LOG.error(msg);
throw new YarnRuntimeException(msg);
}
}
int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
List<Container> newContainers = response.getAllocatedContainers();

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
@ -65,7 +66,7 @@ public abstract class AllocateResponse {
public static AllocateResponse newInstance(int responseId,
List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, boolean resync, int numClusterNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt) {
AllocateResponse response = Records.newRecord(AllocateResponse.class);
response.setNumClusterNodes(numClusterNodes);
@ -74,32 +75,27 @@ public abstract class AllocateResponse {
response.setAllocatedContainers(allocatedContainers);
response.setUpdatedNodes(updatedNodes);
response.setAvailableResources(availResources);
response.setResync(resync);
response.setAMCommand(command);
response.setPreemptionMessage(preempt);
return response;
}
/**
* Should the <code>ApplicationMaster</code> take action because of being
* out-of-sync with the <code>ResourceManager</code> as deigned by
* {@link #getResponseId()}
* This can be due to application errors or because the ResourceManager
* has restarted. The action to be taken by the <code>ApplicationMaster</code>
* is to shutdown without unregistering with the <code>ResourceManager</code>.
* The ResourceManager will start a new attempt. If the application is already
* done when it gets the resync command, then it may choose to shutdown after
* unregistering in which case the ResourceManager will not start a new attempt.
*
* @return <code>true</code> if the <code>ApplicationMaster</code> should
* take action, <code>false</code> otherwise
* If the <code>ResourceManager</code> needs the
* <code>ApplicationMaster</code> to take some action then it will send an
* AMCommand to the <code>ApplicationMaster</code>. See <code>AMCommand</code>
* for details on commands and actions for them.
* @return <code>AMCommand</code> if the <code>ApplicationMaster</code> should
* take action, <code>null</code> otherwise
* @see AMCommand
*/
@Public
@Stable
public abstract boolean getResync();
public abstract AMCommand getAMCommand();
@Private
@Unstable
public abstract void setResync(boolean value);
public abstract void setAMCommand(AMCommand command);
/**
* Get the <em>last response id</em>.

View File

@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
import org.apache.hadoop.yarn.util.ProtoUtils;
public class AllocateResponsePBImpl extends AllocateResponse {
@ -145,15 +147,22 @@ public class AllocateResponsePBImpl extends AllocateResponse {
}
@Override
public synchronized boolean getResync() {
public synchronized AMCommand getAMCommand() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getResync());
if (!p.hasAMCommand()) {
return null;
}
return ProtoUtils.convertFromProtoFormat(p.getAMCommand());
}
@Override
public synchronized void setResync(boolean resync) {
public synchronized void setAMCommand(AMCommand command) {
maybeInitBuilder();
builder.setResync((resync));
if (command == null) {
builder.clearAMCommand();
return;
}
builder.setAMCommand(ProtoUtils.convertToProtoFormat(command));
}
@Override

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
/**
* Command sent by the Resource Manager to the Application Master in the
* AllocateResponse
* @see AllocateResponse
*/
@Public
@Stable
public enum AMCommand {
/**
* Sent by Resource Manager when it is out of sync with the AM and wants the
* AM get back in sync.
*/
AM_RESYNC,
/**
* Sent by Resource Manager when it wants the AM to shutdown. Eg. when the
* node is going down for maintenance. The AM should save any state and
* prepare to be restarted at a later time.
*/
AM_SHUTDOWN
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
@ -138,6 +140,16 @@ public class ProtoUtils {
public static LocalResourceVisibility convertFromProtoFormat(LocalResourceVisibilityProto e) {
return LocalResourceVisibility.valueOf(e.name());
}
/*
* AMCommand
*/
public static AMCommandProto convertToProtoFormat(AMCommand e) {
return AMCommandProto.valueOf(e.name());
}
public static AMCommand convertFromProtoFormat(AMCommandProto e) {
return AMCommand.valueOf(e.name());
}
/*
* ByteBuffer

View File

@ -207,6 +207,11 @@ message ResourceRequestProto {
optional bool relax_locality = 5 [default = true];
}
enum AMCommandProto {
AM_RESYNC = 1;
AM_SHUTDOWN = 2;
}
message PreemptionMessageProto {
optional StrictPreemptionContractProto strictContract = 1;
optional PreemptionContractProto contract = 2;

View File

@ -60,7 +60,7 @@ message AllocateRequestProto {
}
message AllocateResponseProto {
optional bool resync = 1;
optional AMCommandProto a_m_command = 1;
optional int32 response_id = 2;
repeated ContainerProto allocated_containers = 3;
repeated ContainerStatusProto completed_container_statuses = 4;

View File

@ -631,7 +631,9 @@ public class ApplicationMaster {
}
@Override
public void onRebootRequest() {}
public void onShutdownRequest() {
done = true;
}
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {}

View File

@ -43,6 +43,7 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
* Object to represent container request for resources.
* Resources may be localized to nodes and racks.
* Resources may be assigned priorities.
* All getters return unmodifiable collections.
* Can ask for multiple containers of a given type.
*/
public static class ContainerRequest {
@ -65,11 +66,11 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
return capability;
}
public ImmutableList<String> getHosts() {
public List<String> getHosts() {
return hosts;
}
public ImmutableList<String> getRacks() {
public List<String> getRacks() {
return racks;
}

View File

@ -331,10 +331,25 @@ public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService
continue;
}
if (response.getResync()) {
handler.onRebootRequest();
LOG.info("Reboot requested. Stopping callback.");
break;
if (response.getAMCommand() != null) {
boolean stop = false;
switch(response.getAMCommand()) {
case AM_RESYNC:
case AM_SHUTDOWN:
handler.onShutdownRequest();
LOG.info("Shutdown requested. Stopping callback.");
stop = true;
break;
default:
String msg =
"Unhandled value of AMCommand: " + response.getAMCommand();
LOG.error(msg);
throw new YarnRuntimeException(msg);
}
if(stop) {
// should probably stop heartbeating also YARN-763
break;
}
}
List<NodeReport> updatedNodes = response.getUpdatedNodes();
if (!updatedNodes.isEmpty()) {
@ -374,11 +389,11 @@ public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService
public void onContainersAllocated(List<Container> containers);
/**
* Called when the ResourceManager wants the ApplicationMaster to reboot
* for being out of sync. The ApplicationMaster should not unregister with
* the RM unless the ApplicationMaster wants to be the last attempt.
* Called when the ResourceManager wants the ApplicationMaster to shutdown
* for being out of sync etc. The ApplicationMaster should not unregister
* with the RM unless the ApplicationMaster wants to be the last attempt.
*/
public void onRebootRequest();
public void onShutdownRequest();
/**
* Called when nodes tracked by the ResourceManager have changed in health,

View File

@ -36,6 +36,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -180,7 +181,7 @@ public class TestAMRMClientAsync {
Assert.assertTrue(callbackHandler.callbackCount == 0);
}
@Test(timeout=10000)
@Test//(timeout=10000)
public void testAMRMClientAsyncReboot() throws Exception {
Configuration conf = new Configuration();
TestCallbackHandler callbackHandler = new TestCallbackHandler();
@ -189,7 +190,7 @@ public class TestAMRMClientAsync {
final AllocateResponse rebootResponse = createAllocateResponse(
new ArrayList<ContainerStatus>(), new ArrayList<Container>());
rebootResponse.setResync(true);
rebootResponse.setAMCommand(AMCommand.AM_RESYNC);
when(client.allocate(anyFloat())).thenReturn(rebootResponse);
AMRMClientAsync<ContainerRequest> asyncClient =
@ -216,7 +217,7 @@ public class TestAMRMClientAsync {
private AllocateResponse createAllocateResponse(
List<ContainerStatus> completed, List<Container> allocated) {
AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated,
new ArrayList<NodeReport>(), null, false, 1, null);
new ArrayList<NodeReport>(), null, null, 1, null);
return response;
}
@ -292,7 +293,7 @@ public class TestAMRMClientAsync {
}
@Override
public void onRebootRequest() {
public void onShutdownRequest() {
reboot = true;
synchronized (notifier) {
notifier.notifyAll();

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -401,7 +402,7 @@ public class BuilderUtils {
public static AllocateResponse newAllocateResponse(int responseId,
List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, boolean reboot, int numClusterNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt) {
AllocateResponse response = recordFactory
.newRecordInstance(AllocateResponse.class);
@ -411,7 +412,7 @@ public class BuilderUtils {
response.setAllocatedContainers(allocatedContainers);
response.setUpdatedNodes(updatedNodes);
response.setAvailableResources(availResources);
response.setResync(reboot);
response.setAMCommand(command);
response.setPreemptionMessage(preempt);
return response;

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -100,7 +101,7 @@ public class ApplicationMasterService extends AbstractService implements
super(ApplicationMasterService.class.getName());
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler;
this.resync.setResync(true);
this.resync.setAMCommand(AMCommand.AM_RESYNC);
// this.reboot.containers = new ArrayList<Container>();
this.rmContext = rmContext;
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -87,7 +88,7 @@ public class TestRMRestart {
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
}
@Test
@Test (timeout=60000)
public void testRMRestart() throws Exception {
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
@ -250,7 +251,7 @@ public class TestRMRestart {
AllocateResponse allocResponse = am1.allocate(
new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
Assert.assertTrue(allocResponse.getResync());
Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC);
// NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);

View File

@ -22,6 +22,7 @@ import junit.framework.Assert;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
@ -82,7 +83,7 @@ public class TestAMRMRPCResponseId {
AllocateResponse response = amService.allocate(allocateRequest);
Assert.assertEquals(1, response.getResponseId());
Assert.assertFalse(response.getResync());
Assert.assertTrue(response.getAMCommand() == null);
allocateRequest = AllocateRequest.newInstance(attempt
.getAppAttemptId(), response.getResponseId(), 0F, null, null, null);
@ -96,6 +97,6 @@ public class TestAMRMRPCResponseId {
allocateRequest = AllocateRequest.newInstance(attempt
.getAppAttemptId(), 0, 0F, null, null, null);
response = amService.allocate(allocateRequest);
Assert.assertTrue(response.getResync());
Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC);
}
}

View File

@ -208,7 +208,8 @@ public class TestApplicationTokens {
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
Assert.assertFalse(rmClient.allocate(allocateRequest).getResync());
Assert.assertTrue(
rmClient.allocate(allocateRequest).getAMCommand() == null);
// Simulate a master-key-roll-over
ApplicationTokenSecretManager appTokenSecretManager =
@ -224,7 +225,8 @@ public class TestApplicationTokens {
rmClient = createRMClient(rm, conf, rpc, currentUser);
allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
Assert.assertFalse(rmClient.allocate(allocateRequest).getResync());
Assert.assertTrue(
rmClient.allocate(allocateRequest).getAMCommand() == null);
} finally {
rm.stop();
if (rmClient != null) {