From 485c04d0a49cdea154dbe7dbbb26fa5345b5e0f1 Mon Sep 17 00:00:00 2001 From: Bikas Saha Date: Thu, 6 Jun 2013 23:43:38 +0000 Subject: [PATCH] Merge r1490470 from trunk to branch-2 for YARN-759. Create Command enum in AllocateResponse (bikas) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1490473 13f79535-47bb-0310-9956-ffa450edef68 --- .../v2/app/local/LocalContainerAllocator.java | 25 +++++++---- .../v2/app/rm/RMContainerAllocator.java | 23 +++++++--- .../api/protocolrecords/AllocateResponse.java | 28 +++++------- .../impl/pb/AllocateResponsePBImpl.java | 17 +++++-- .../hadoop/yarn/api/records/AMCommand.java | 45 +++++++++++++++++++ .../apache/hadoop/yarn/util/ProtoUtils.java | 12 +++++ .../src/main/proto/yarn_protos.proto | 5 +++ .../src/main/proto/yarn_service_protos.proto | 2 +- .../distributedshell/ApplicationMaster.java | 4 +- .../apache/hadoop/yarn/client/AMRMClient.java | 5 ++- .../hadoop/yarn/client/AMRMClientAsync.java | 31 +++++++++---- .../yarn/client/TestAMRMClientAsync.java | 9 ++-- .../yarn/server/utils/BuilderUtils.java | 5 ++- .../ApplicationMasterService.java | 3 +- .../server/resourcemanager/TestRMRestart.java | 5 ++- .../TestAMRMRPCResponseId.java | 5 ++- .../security/TestApplicationTokens.java | 6 ++- 17 files changed, 170 insertions(+), 60 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMCommand.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java index d2fa70d9137..aed2ad9470f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java @@ -116,14 +116,23 @@ public class LocalContainerAllocator extends RMCommunicator // continue to attempt to contact the RM. throw e; } - if (allocateResponse.getResync()) { - LOG.info("Event from RM: shutting down Application Master"); - // This can happen if the RM has been restarted. If it is in that state, - // this application must clean itself up. - eventHandler.handle(new JobEvent(this.getJob().getID(), - JobEventType.JOB_AM_REBOOT)); - throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " + - this.getContext().getApplicationID()); + if (allocateResponse.getAMCommand() != null) { + switch(allocateResponse.getAMCommand()) { + case AM_RESYNC: + case AM_SHUTDOWN: + LOG.info("Event from RM: shutting down Application Master"); + // This can happen if the RM has been restarted. If it is in that state, + // this application must clean itself up. + eventHandler.handle(new JobEvent(this.getJob().getID(), + JobEventType.JOB_AM_REBOOT)); + throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " + + this.getContext().getApplicationID()); + default: + String msg = + "Unhandled value of AMCommand: " + allocateResponse.getAMCommand(); + LOG.error(msg); + throw new YarnRuntimeException(msg); + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index f951b900f2d..106f2f59f88 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -570,13 +570,22 @@ public class RMContainerAllocator extends RMContainerRequestor // continue to attempt to contact the RM. throw e; } - if (response.getResync()) { - // This can happen if the RM has been restarted. If it is in that state, - // this application must clean itself up. - eventHandler.handle(new JobEvent(this.getJob().getID(), - JobEventType.JOB_AM_REBOOT)); - throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " + - this.getContext().getApplicationID()); + if (response.getAMCommand() != null) { + switch(response.getAMCommand()) { + case AM_RESYNC: + case AM_SHUTDOWN: + // This can happen if the RM has been restarted. If it is in that state, + // this application must clean itself up. + eventHandler.handle(new JobEvent(this.getJob().getID(), + JobEventType.JOB_AM_REBOOT)); + throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " + + this.getContext().getApplicationID()); + default: + String msg = + "Unhandled value of AMCommand: " + response.getAMCommand(); + LOG.error(msg); + throw new YarnRuntimeException(msg); + } } int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; List newContainers = response.getAllocatedContainers(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 4a46dc9629b..18ce01aea3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.AMRMProtocol; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -65,7 +66,7 @@ public abstract class AllocateResponse { public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, - Resource availResources, boolean resync, int numClusterNodes, + Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt) { AllocateResponse response = Records.newRecord(AllocateResponse.class); response.setNumClusterNodes(numClusterNodes); @@ -74,32 +75,27 @@ public abstract class AllocateResponse { response.setAllocatedContainers(allocatedContainers); response.setUpdatedNodes(updatedNodes); response.setAvailableResources(availResources); - response.setResync(resync); + response.setAMCommand(command); response.setPreemptionMessage(preempt); return response; } /** - * Should the ApplicationMaster take action because of being - * out-of-sync with the ResourceManager as deigned by - * {@link #getResponseId()} - * This can be due to application errors or because the ResourceManager - * has restarted. The action to be taken by the ApplicationMaster - * is to shutdown without unregistering with the ResourceManager. - * The ResourceManager will start a new attempt. If the application is already - * done when it gets the resync command, then it may choose to shutdown after - * unregistering in which case the ResourceManager will not start a new attempt. - * - * @return true if the ApplicationMaster should - * take action, false otherwise + * If the ResourceManager needs the + * ApplicationMaster to take some action then it will send an + * AMCommand to the ApplicationMaster. See AMCommand + * for details on commands and actions for them. + * @return AMCommand if the ApplicationMaster should + * take action, null otherwise + * @see AMCommand */ @Public @Stable - public abstract boolean getResync(); + public abstract AMCommand getAMCommand(); @Private @Unstable - public abstract void setResync(boolean value); + public abstract void setAMCommand(AMCommand command); /** * Get the last response id. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 62379d665a2..311bba5286a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder; +import org.apache.hadoop.yarn.util.ProtoUtils; public class AllocateResponsePBImpl extends AllocateResponse { @@ -145,15 +147,22 @@ public class AllocateResponsePBImpl extends AllocateResponse { } @Override - public synchronized boolean getResync() { + public synchronized AMCommand getAMCommand() { AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; - return (p.getResync()); + if (!p.hasAMCommand()) { + return null; + } + return ProtoUtils.convertFromProtoFormat(p.getAMCommand()); } @Override - public synchronized void setResync(boolean resync) { + public synchronized void setAMCommand(AMCommand command) { maybeInitBuilder(); - builder.setResync((resync)); + if (command == null) { + builder.clearAMCommand(); + return; + } + builder.setAMCommand(ProtoUtils.convertToProtoFormat(command)); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMCommand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMCommand.java new file mode 100644 index 00000000000..011cbe5275f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMCommand.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; + +/** + * Command sent by the Resource Manager to the Application Master in the + * AllocateResponse + * @see AllocateResponse + */ +@Public +@Stable +public enum AMCommand { + /** + * Sent by Resource Manager when it is out of sync with the AM and wants the + * AM get back in sync. + */ + AM_RESYNC, + + /** + * Sent by Resource Manager when it wants the AM to shutdown. Eg. when the + * node is going down for maintenance. The AM should save any state and + * prepare to be restarted at a later time. + */ + AM_SHUTDOWN +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java index 0fea4aa33c1..bc8b04c66c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java @@ -25,6 +25,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; @@ -138,6 +140,16 @@ public class ProtoUtils { public static LocalResourceVisibility convertFromProtoFormat(LocalResourceVisibilityProto e) { return LocalResourceVisibility.valueOf(e.name()); } + + /* + * AMCommand + */ + public static AMCommandProto convertToProtoFormat(AMCommand e) { + return AMCommandProto.valueOf(e.name()); + } + public static AMCommand convertFromProtoFormat(AMCommandProto e) { + return AMCommand.valueOf(e.name()); + } /* * ByteBuffer diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 68575854c8b..fa6057d3365 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -207,6 +207,11 @@ message ResourceRequestProto { optional bool relax_locality = 5 [default = true]; } +enum AMCommandProto { + AM_RESYNC = 1; + AM_SHUTDOWN = 2; +} + message PreemptionMessageProto { optional StrictPreemptionContractProto strictContract = 1; optional PreemptionContractProto contract = 2; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a06b0ace9d8..8e81f21983a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -60,7 +60,7 @@ message AllocateRequestProto { } message AllocateResponseProto { - optional bool resync = 1; + optional AMCommandProto a_m_command = 1; optional int32 response_id = 2; repeated ContainerProto allocated_containers = 3; repeated ContainerStatusProto completed_container_statuses = 4; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index f645837c892..1b517a74a6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -631,7 +631,9 @@ public class ApplicationMaster { } @Override - public void onRebootRequest() {} + public void onShutdownRequest() { + done = true; + } @Override public void onNodesUpdated(List updatedNodes) {} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java index 5b4ba5c5e8b..71c59f9946b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java @@ -43,6 +43,7 @@ public interface AMRMClient extends Servi * Object to represent container request for resources. * Resources may be localized to nodes and racks. * Resources may be assigned priorities. + * All getters return unmodifiable collections. * Can ask for multiple containers of a given type. */ public static class ContainerRequest { @@ -65,11 +66,11 @@ public interface AMRMClient extends Servi return capability; } - public ImmutableList getHosts() { + public List getHosts() { return hosts; } - public ImmutableList getRacks() { + public List getRacks() { return racks; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java index ad8ee2cb03e..f58e3662474 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java @@ -331,10 +331,25 @@ public class AMRMClientAsync extends AbstractService continue; } - if (response.getResync()) { - handler.onRebootRequest(); - LOG.info("Reboot requested. Stopping callback."); - break; + if (response.getAMCommand() != null) { + boolean stop = false; + switch(response.getAMCommand()) { + case AM_RESYNC: + case AM_SHUTDOWN: + handler.onShutdownRequest(); + LOG.info("Shutdown requested. Stopping callback."); + stop = true; + break; + default: + String msg = + "Unhandled value of AMCommand: " + response.getAMCommand(); + LOG.error(msg); + throw new YarnRuntimeException(msg); + } + if(stop) { + // should probably stop heartbeating also YARN-763 + break; + } } List updatedNodes = response.getUpdatedNodes(); if (!updatedNodes.isEmpty()) { @@ -374,11 +389,11 @@ public class AMRMClientAsync extends AbstractService public void onContainersAllocated(List containers); /** - * Called when the ResourceManager wants the ApplicationMaster to reboot - * for being out of sync. The ApplicationMaster should not unregister with - * the RM unless the ApplicationMaster wants to be the last attempt. + * Called when the ResourceManager wants the ApplicationMaster to shutdown + * for being out of sync etc. The ApplicationMaster should not unregister + * with the RM unless the ApplicationMaster wants to be the last attempt. */ - public void onRebootRequest(); + public void onShutdownRequest(); /** * Called when nodes tracked by the ResourceManager have changed in health, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java index 77c929b728a..c5c687cb8dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java @@ -36,6 +36,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -180,7 +181,7 @@ public class TestAMRMClientAsync { Assert.assertTrue(callbackHandler.callbackCount == 0); } - @Test(timeout=10000) + @Test//(timeout=10000) public void testAMRMClientAsyncReboot() throws Exception { Configuration conf = new Configuration(); TestCallbackHandler callbackHandler = new TestCallbackHandler(); @@ -189,7 +190,7 @@ public class TestAMRMClientAsync { final AllocateResponse rebootResponse = createAllocateResponse( new ArrayList(), new ArrayList()); - rebootResponse.setResync(true); + rebootResponse.setAMCommand(AMCommand.AM_RESYNC); when(client.allocate(anyFloat())).thenReturn(rebootResponse); AMRMClientAsync asyncClient = @@ -216,7 +217,7 @@ public class TestAMRMClientAsync { private AllocateResponse createAllocateResponse( List completed, List allocated) { AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated, - new ArrayList(), null, false, 1, null); + new ArrayList(), null, null, 1, null); return response; } @@ -292,7 +293,7 @@ public class TestAMRMClientAsync { } @Override - public void onRebootRequest() { + public void onShutdownRequest() { reboot = true; synchronized (notifier) { notifier.notifyAll(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 9037bd24311..438dd70a719 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -32,6 +32,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -401,7 +402,7 @@ public class BuilderUtils { public static AllocateResponse newAllocateResponse(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, - Resource availResources, boolean reboot, int numClusterNodes, + Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt) { AllocateResponse response = recordFactory .newRecordInstance(AllocateResponse.class); @@ -411,7 +412,7 @@ public class BuilderUtils { response.setAllocatedContainers(allocatedContainers); response.setUpdatedNodes(updatedNodes); response.setAvailableResources(availResources); - response.setResync(reboot); + response.setAMCommand(command); response.setPreemptionMessage(preempt); return response; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index b72a7f6df04..c279a9c80c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -100,7 +101,7 @@ public class ApplicationMasterService extends AbstractService implements super(ApplicationMasterService.class.getName()); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; - this.resync.setResync(true); + this.resync.setAMCommand(AMCommand.AM_RESYNC); // this.reboot.containers = new ArrayList(); this.rmContext = rmContext; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 1bebedd06f2..0b43d31c016 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -42,6 +42,7 @@ import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -87,7 +88,7 @@ public class TestRMRestart { conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); } - @Test + @Test (timeout=60000) public void testRMRestart() throws Exception { Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, @@ -250,7 +251,7 @@ public class TestRMRestart { AllocateResponse allocResponse = am1.allocate( new ArrayList(), new ArrayList()); - Assert.assertTrue(allocResponse.getResync()); + Assert.assertTrue(allocResponse.getAMCommand() == AMCommand.AM_RESYNC); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java index 6e4019d832d..4e309fdaba3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java @@ -22,6 +22,7 @@ import junit.framework.Assert; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; @@ -82,7 +83,7 @@ public class TestAMRMRPCResponseId { AllocateResponse response = amService.allocate(allocateRequest); Assert.assertEquals(1, response.getResponseId()); - Assert.assertFalse(response.getResync()); + Assert.assertTrue(response.getAMCommand() == null); allocateRequest = AllocateRequest.newInstance(attempt .getAppAttemptId(), response.getResponseId(), 0F, null, null, null); @@ -96,6 +97,6 @@ public class TestAMRMRPCResponseId { allocateRequest = AllocateRequest.newInstance(attempt .getAppAttemptId(), 0, 0F, null, null, null); response = amService.allocate(allocateRequest); - Assert.assertTrue(response.getResync()); + Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestApplicationTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestApplicationTokens.java index 8eb4e95497e..4fb81c668a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestApplicationTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestApplicationTokens.java @@ -208,7 +208,8 @@ public class TestApplicationTokens { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest.setApplicationAttemptId(applicationAttemptId); - Assert.assertFalse(rmClient.allocate(allocateRequest).getResync()); + Assert.assertTrue( + rmClient.allocate(allocateRequest).getAMCommand() == null); // Simulate a master-key-roll-over ApplicationTokenSecretManager appTokenSecretManager = @@ -224,7 +225,8 @@ public class TestApplicationTokens { rmClient = createRMClient(rm, conf, rpc, currentUser); allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest.setApplicationAttemptId(applicationAttemptId); - Assert.assertFalse(rmClient.allocate(allocateRequest).getResync()); + Assert.assertTrue( + rmClient.allocate(allocateRequest).getAMCommand() == null); } finally { rm.stop(); if (rmClient != null) {