YARN-759. Create Command enum in AllocateResponse (bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1490470 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
91a4da5ebb
commit
9fcfbf5f51
|
@ -116,14 +116,23 @@ public class LocalContainerAllocator extends RMCommunicator
|
|||
// continue to attempt to contact the RM.
|
||||
throw e;
|
||||
}
|
||||
if (allocateResponse.getResync()) {
|
||||
LOG.info("Event from RM: shutting down Application Master");
|
||||
// This can happen if the RM has been restarted. If it is in that state,
|
||||
// this application must clean itself up.
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.JOB_AM_REBOOT));
|
||||
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
|
||||
this.getContext().getApplicationID());
|
||||
if (allocateResponse.getAMCommand() != null) {
|
||||
switch(allocateResponse.getAMCommand()) {
|
||||
case AM_RESYNC:
|
||||
case AM_SHUTDOWN:
|
||||
LOG.info("Event from RM: shutting down Application Master");
|
||||
// This can happen if the RM has been restarted. If it is in that state,
|
||||
// this application must clean itself up.
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.JOB_AM_REBOOT));
|
||||
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
|
||||
this.getContext().getApplicationID());
|
||||
default:
|
||||
String msg =
|
||||
"Unhandled value of AMCommand: " + allocateResponse.getAMCommand();
|
||||
LOG.error(msg);
|
||||
throw new YarnRuntimeException(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -570,13 +570,22 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
// continue to attempt to contact the RM.
|
||||
throw e;
|
||||
}
|
||||
if (response.getResync()) {
|
||||
// This can happen if the RM has been restarted. If it is in that state,
|
||||
// this application must clean itself up.
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.JOB_AM_REBOOT));
|
||||
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
|
||||
this.getContext().getApplicationID());
|
||||
if (response.getAMCommand() != null) {
|
||||
switch(response.getAMCommand()) {
|
||||
case AM_RESYNC:
|
||||
case AM_SHUTDOWN:
|
||||
// This can happen if the RM has been restarted. If it is in that state,
|
||||
// this application must clean itself up.
|
||||
eventHandler.handle(new JobEvent(this.getJob().getID(),
|
||||
JobEventType.JOB_AM_REBOOT));
|
||||
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
|
||||
this.getContext().getApplicationID());
|
||||
default:
|
||||
String msg =
|
||||
"Unhandled value of AMCommand: " + response.getAMCommand();
|
||||
LOG.error(msg);
|
||||
throw new YarnRuntimeException(msg);
|
||||
}
|
||||
}
|
||||
int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
|
||||
List<Container> newContainers = response.getAllocatedContainers();
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
|
@ -65,7 +66,7 @@ public abstract class AllocateResponse {
|
|||
public static AllocateResponse newInstance(int responseId,
|
||||
List<ContainerStatus> completedContainers,
|
||||
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
|
||||
Resource availResources, boolean resync, int numClusterNodes,
|
||||
Resource availResources, AMCommand command, int numClusterNodes,
|
||||
PreemptionMessage preempt) {
|
||||
AllocateResponse response = Records.newRecord(AllocateResponse.class);
|
||||
response.setNumClusterNodes(numClusterNodes);
|
||||
|
@ -74,32 +75,27 @@ public abstract class AllocateResponse {
|
|||
response.setAllocatedContainers(allocatedContainers);
|
||||
response.setUpdatedNodes(updatedNodes);
|
||||
response.setAvailableResources(availResources);
|
||||
response.setResync(resync);
|
||||
response.setAMCommand(command);
|
||||
response.setPreemptionMessage(preempt);
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should the <code>ApplicationMaster</code> take action because of being
|
||||
* out-of-sync with the <code>ResourceManager</code> as deigned by
|
||||
* {@link #getResponseId()}
|
||||
* This can be due to application errors or because the ResourceManager
|
||||
* has restarted. The action to be taken by the <code>ApplicationMaster</code>
|
||||
* is to shutdown without unregistering with the <code>ResourceManager</code>.
|
||||
* The ResourceManager will start a new attempt. If the application is already
|
||||
* done when it gets the resync command, then it may choose to shutdown after
|
||||
* unregistering in which case the ResourceManager will not start a new attempt.
|
||||
*
|
||||
* @return <code>true</code> if the <code>ApplicationMaster</code> should
|
||||
* take action, <code>false</code> otherwise
|
||||
* If the <code>ResourceManager</code> needs the
|
||||
* <code>ApplicationMaster</code> to take some action then it will send an
|
||||
* AMCommand to the <code>ApplicationMaster</code>. See <code>AMCommand</code>
|
||||
* for details on commands and actions for them.
|
||||
* @return <code>AMCommand</code> if the <code>ApplicationMaster</code> should
|
||||
* take action, <code>null</code> otherwise
|
||||
* @see AMCommand
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract boolean getResync();
|
||||
public abstract AMCommand getAMCommand();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setResync(boolean value);
|
||||
public abstract void setAMCommand(AMCommand command);
|
||||
|
||||
/**
|
||||
* Get the <em>last response id</em>.
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
|
@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto;
|
|||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||
|
||||
|
||||
public class AllocateResponsePBImpl extends AllocateResponse {
|
||||
|
@ -145,15 +147,22 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean getResync() {
|
||||
public synchronized AMCommand getAMCommand() {
|
||||
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return (p.getResync());
|
||||
if (!p.hasAMCommand()) {
|
||||
return null;
|
||||
}
|
||||
return ProtoUtils.convertFromProtoFormat(p.getAMCommand());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setResync(boolean resync) {
|
||||
public synchronized void setAMCommand(AMCommand command) {
|
||||
maybeInitBuilder();
|
||||
builder.setResync((resync));
|
||||
if (command == null) {
|
||||
builder.clearAMCommand();
|
||||
return;
|
||||
}
|
||||
builder.setAMCommand(ProtoUtils.convertToProtoFormat(command));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
|
||||
/**
|
||||
* Command sent by the Resource Manager to the Application Master in the
|
||||
* AllocateResponse
|
||||
* @see AllocateResponse
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public enum AMCommand {
|
||||
/**
|
||||
* Sent by Resource Manager when it is out of sync with the AM and wants the
|
||||
* AM get back in sync.
|
||||
*/
|
||||
AM_RESYNC,
|
||||
|
||||
/**
|
||||
* Sent by Resource Manager when it wants the AM to shutdown. Eg. when the
|
||||
* node is going down for maintenance. The AM should save any state and
|
||||
* prepare to be restarted at a later time.
|
||||
*/
|
||||
AM_SHUTDOWN
|
||||
}
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.io.Text;
|
|||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
|
|||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
|
||||
|
@ -139,6 +141,16 @@ public class ProtoUtils {
|
|||
return LocalResourceVisibility.valueOf(e.name());
|
||||
}
|
||||
|
||||
/*
|
||||
* AMCommand
|
||||
*/
|
||||
public static AMCommandProto convertToProtoFormat(AMCommand e) {
|
||||
return AMCommandProto.valueOf(e.name());
|
||||
}
|
||||
public static AMCommand convertFromProtoFormat(AMCommandProto e) {
|
||||
return AMCommand.valueOf(e.name());
|
||||
}
|
||||
|
||||
/*
|
||||
* ByteBuffer
|
||||
*/
|
||||
|
|
|
@ -207,6 +207,11 @@ message ResourceRequestProto {
|
|||
optional bool relax_locality = 5 [default = true];
|
||||
}
|
||||
|
||||
enum AMCommandProto {
|
||||
AM_RESYNC = 1;
|
||||
AM_SHUTDOWN = 2;
|
||||
}
|
||||
|
||||
message PreemptionMessageProto {
|
||||
optional StrictPreemptionContractProto strictContract = 1;
|
||||
optional PreemptionContractProto contract = 2;
|
||||
|
|
|
@ -60,7 +60,7 @@ message AllocateRequestProto {
|
|||
}
|
||||
|
||||
message AllocateResponseProto {
|
||||
optional bool resync = 1;
|
||||
optional AMCommandProto a_m_command = 1;
|
||||
optional int32 response_id = 2;
|
||||
repeated ContainerProto allocated_containers = 3;
|
||||
repeated ContainerStatusProto completed_container_statuses = 4;
|
||||
|
|
|
@ -631,7 +631,9 @@ public class ApplicationMaster {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onRebootRequest() {}
|
||||
public void onShutdownRequest() {
|
||||
done = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
|
||||
|
|
|
@ -43,6 +43,7 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
|
|||
* Object to represent container request for resources.
|
||||
* Resources may be localized to nodes and racks.
|
||||
* Resources may be assigned priorities.
|
||||
* All getters return unmodifiable collections.
|
||||
* Can ask for multiple containers of a given type.
|
||||
*/
|
||||
public static class ContainerRequest {
|
||||
|
@ -65,11 +66,11 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
|
|||
return capability;
|
||||
}
|
||||
|
||||
public ImmutableList<String> getHosts() {
|
||||
public List<String> getHosts() {
|
||||
return hosts;
|
||||
}
|
||||
|
||||
public ImmutableList<String> getRacks() {
|
||||
public List<String> getRacks() {
|
||||
return racks;
|
||||
}
|
||||
|
||||
|
|
|
@ -331,10 +331,25 @@ public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService
|
|||
continue;
|
||||
}
|
||||
|
||||
if (response.getResync()) {
|
||||
handler.onRebootRequest();
|
||||
LOG.info("Reboot requested. Stopping callback.");
|
||||
break;
|
||||
if (response.getAMCommand() != null) {
|
||||
boolean stop = false;
|
||||
switch(response.getAMCommand()) {
|
||||
case AM_RESYNC:
|
||||
case AM_SHUTDOWN:
|
||||
handler.onShutdownRequest();
|
||||
LOG.info("Shutdown requested. Stopping callback.");
|
||||
stop = true;
|
||||
break;
|
||||
default:
|
||||
String msg =
|
||||
"Unhandled value of AMCommand: " + response.getAMCommand();
|
||||
LOG.error(msg);
|
||||
throw new YarnRuntimeException(msg);
|
||||
}
|
||||
if(stop) {
|
||||
// should probably stop heartbeating also YARN-763
|
||||
break;
|
||||
}
|
||||
}
|
||||
List<NodeReport> updatedNodes = response.getUpdatedNodes();
|
||||
if (!updatedNodes.isEmpty()) {
|
||||
|
@ -374,11 +389,11 @@ public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService
|
|||
public void onContainersAllocated(List<Container> containers);
|
||||
|
||||
/**
|
||||
* Called when the ResourceManager wants the ApplicationMaster to reboot
|
||||
* for being out of sync. The ApplicationMaster should not unregister with
|
||||
* the RM unless the ApplicationMaster wants to be the last attempt.
|
||||
* Called when the ResourceManager wants the ApplicationMaster to shutdown
|
||||
* for being out of sync etc. The ApplicationMaster should not unregister
|
||||
* with the RM unless the ApplicationMaster wants to be the last attempt.
|
||||
*/
|
||||
public void onRebootRequest();
|
||||
public void onShutdownRequest();
|
||||
|
||||
/**
|
||||
* Called when nodes tracked by the ResourceManager have changed in health,
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
|
@ -180,7 +181,7 @@ public class TestAMRMClientAsync {
|
|||
Assert.assertTrue(callbackHandler.callbackCount == 0);
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
@Test//(timeout=10000)
|
||||
public void testAMRMClientAsyncReboot() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
TestCallbackHandler callbackHandler = new TestCallbackHandler();
|
||||
|
@ -189,7 +190,7 @@ public class TestAMRMClientAsync {
|
|||
|
||||
final AllocateResponse rebootResponse = createAllocateResponse(
|
||||
new ArrayList<ContainerStatus>(), new ArrayList<Container>());
|
||||
rebootResponse.setResync(true);
|
||||
rebootResponse.setAMCommand(AMCommand.AM_RESYNC);
|
||||
when(client.allocate(anyFloat())).thenReturn(rebootResponse);
|
||||
|
||||
AMRMClientAsync<ContainerRequest> asyncClient =
|
||||
|
@ -216,7 +217,7 @@ public class TestAMRMClientAsync {
|
|||
private AllocateResponse createAllocateResponse(
|
||||
List<ContainerStatus> completed, List<Container> allocated) {
|
||||
AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated,
|
||||
new ArrayList<NodeReport>(), null, false, 1, null);
|
||||
new ArrayList<NodeReport>(), null, null, 1, null);
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -292,7 +293,7 @@ public class TestAMRMClientAsync {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onRebootRequest() {
|
||||
public void onShutdownRequest() {
|
||||
reboot = true;
|
||||
synchronized (notifier) {
|
||||
notifier.notifyAll();
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.net.NetUtils;
|
|||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -401,7 +402,7 @@ public class BuilderUtils {
|
|||
public static AllocateResponse newAllocateResponse(int responseId,
|
||||
List<ContainerStatus> completedContainers,
|
||||
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
|
||||
Resource availResources, boolean reboot, int numClusterNodes,
|
||||
Resource availResources, AMCommand command, int numClusterNodes,
|
||||
PreemptionMessage preempt) {
|
||||
AllocateResponse response = recordFactory
|
||||
.newRecordInstance(AllocateResponse.class);
|
||||
|
@ -411,7 +412,7 @@ public class BuilderUtils {
|
|||
response.setAllocatedContainers(allocatedContainers);
|
||||
response.setUpdatedNodes(updatedNodes);
|
||||
response.setAvailableResources(availResources);
|
||||
response.setResync(reboot);
|
||||
response.setAMCommand(command);
|
||||
response.setPreemptionMessage(preempt);
|
||||
|
||||
return response;
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -100,7 +101,7 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
super(ApplicationMasterService.class.getName());
|
||||
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
|
||||
this.rScheduler = scheduler;
|
||||
this.resync.setResync(true);
|
||||
this.resync.setAMCommand(AMCommand.AM_RESYNC);
|
||||
// this.reboot.containers = new ArrayList<Container>();
|
||||
this.rmContext = rmContext;
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.util.ExitUtil;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -87,7 +88,7 @@ public class TestRMRestart {
|
|||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test (timeout=60000)
|
||||
public void testRMRestart() throws Exception {
|
||||
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
|
@ -250,7 +251,7 @@ public class TestRMRestart {
|
|||
AllocateResponse allocResponse = am1.allocate(
|
||||
new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>());
|
||||
Assert.assertTrue(allocResponse.getResync());
|
||||
Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC);
|
||||
|
||||
// NM should be rebooted on heartbeat, even first heartbeat for nm2
|
||||
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
|
||||
|
|
|
@ -22,6 +22,7 @@ import junit.framework.Assert;
|
|||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||
|
@ -82,7 +83,7 @@ public class TestAMRMRPCResponseId {
|
|||
|
||||
AllocateResponse response = amService.allocate(allocateRequest);
|
||||
Assert.assertEquals(1, response.getResponseId());
|
||||
Assert.assertFalse(response.getResync());
|
||||
Assert.assertTrue(response.getAMCommand() == null);
|
||||
allocateRequest = AllocateRequest.newInstance(attempt
|
||||
.getAppAttemptId(), response.getResponseId(), 0F, null, null, null);
|
||||
|
||||
|
@ -96,6 +97,6 @@ public class TestAMRMRPCResponseId {
|
|||
allocateRequest = AllocateRequest.newInstance(attempt
|
||||
.getAppAttemptId(), 0, 0F, null, null, null);
|
||||
response = amService.allocate(allocateRequest);
|
||||
Assert.assertTrue(response.getResync());
|
||||
Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -208,7 +208,8 @@ public class TestApplicationTokens {
|
|||
AllocateRequest allocateRequest =
|
||||
Records.newRecord(AllocateRequest.class);
|
||||
allocateRequest.setApplicationAttemptId(applicationAttemptId);
|
||||
Assert.assertFalse(rmClient.allocate(allocateRequest).getResync());
|
||||
Assert.assertTrue(
|
||||
rmClient.allocate(allocateRequest).getAMCommand() == null);
|
||||
|
||||
// Simulate a master-key-roll-over
|
||||
ApplicationTokenSecretManager appTokenSecretManager =
|
||||
|
@ -224,7 +225,8 @@ public class TestApplicationTokens {
|
|||
rmClient = createRMClient(rm, conf, rpc, currentUser);
|
||||
allocateRequest = Records.newRecord(AllocateRequest.class);
|
||||
allocateRequest.setApplicationAttemptId(applicationAttemptId);
|
||||
Assert.assertFalse(rmClient.allocate(allocateRequest).getResync());
|
||||
Assert.assertTrue(
|
||||
rmClient.allocate(allocateRequest).getAMCommand() == null);
|
||||
} finally {
|
||||
rm.stop();
|
||||
if (rmClient != null) {
|
||||
|
|
Loading…
Reference in New Issue