diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 16c8cb338a1..14262b0d17d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -8,6 +8,9 @@ Release 2.0.3-alpha - Unreleased YARN-145. Add a Web UI to the fair share scheduler. (Sandy Ryza via tomwhite) + YARN-230. RM Restart phase 1 - includes support for saving/restarting all + applications on an RM bounce. (Bikas Saha via acmurthy) + IMPROVEMENTS YARN-78. Changed UnManagedAM application to use YarnClient. (Bikas Saha via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java new file mode 100644 index 00000000000..d1dbda0dc51 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/* + * Contains the state data that needs to be persisted for an ApplicationAttempt + */ +@Public +@Unstable +public interface ApplicationAttemptStateData { + + /** + * The ApplicationAttemptId for the application attempt + * @return ApplicationAttemptId for the application attempt + */ + @Public + @Unstable + public ApplicationAttemptId getAttemptId(); + + public void setAttemptId(ApplicationAttemptId attemptId); + + /* + * The master container running the application attempt + * @return Container that hosts the attempt + */ + @Public + @Unstable + public Container getMasterContainer(); + + public void setMasterContainer(Container container); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java new file mode 100644 index 00000000000..9b1e14a3c00 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Contains all the state data that needs to be stored persistently + * for an Application + */ +@Public +@Unstable +public interface ApplicationStateData { + + /** + * The time at which the application was received by the Resource Manager + * @return submitTime + */ + @Public + @Unstable + public long getSubmitTime(); + + @Public + @Unstable + public void setSubmitTime(long submitTime); + + /** + * The {@link ApplicationSubmissionContext} for the application + * {@link ApplicationId} can be obtained from the this + * @return ApplicationSubmissionContext + */ + @Public + @Unstable + public ApplicationSubmissionContext getApplicationSubmissionContext(); + + @Public + @Unstable + public void setApplicationSubmissionContext( + ApplicationSubmissionContext context); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java new file mode 100644 index 00000000000..fa0a596eb53 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProtoOrBuilder; + +public class ApplicationAttemptStateDataPBImpl +extends ProtoBase +implements ApplicationAttemptStateData { + + ApplicationAttemptStateDataProto proto = + ApplicationAttemptStateDataProto.getDefaultInstance(); + ApplicationAttemptStateDataProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationAttemptId attemptId = null; + private Container masterContainer = null; + + public ApplicationAttemptStateDataPBImpl() { + builder = ApplicationAttemptStateDataProto.newBuilder(); + } + + public ApplicationAttemptStateDataPBImpl( + ApplicationAttemptStateDataProto proto) { + this.proto = proto; + viaProto = true; + } + + public ApplicationAttemptStateDataProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.attemptId != null) { + builder.setAttemptId(((ApplicationAttemptIdPBImpl)attemptId).getProto()); + } + if(this.masterContainer != null) { + builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ApplicationAttemptStateDataProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public ApplicationAttemptId getAttemptId() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(attemptId != null) { + return attemptId; + } + if (!p.hasAttemptId()) { + return null; + } + attemptId = new ApplicationAttemptIdPBImpl(p.getAttemptId()); + return attemptId; + } + + @Override + public void setAttemptId(ApplicationAttemptId attemptId) { + maybeInitBuilder(); + if (attemptId == null) { + builder.clearAttemptId(); + } + this.attemptId = attemptId; + } + + @Override + public Container getMasterContainer() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(masterContainer != null) { + return masterContainer; + } + if (!p.hasMasterContainer()) { + return null; + } + masterContainer = new ContainerPBImpl(p.getMasterContainer()); + return masterContainer; + } + + @Override + public void setMasterContainer(Container container) { + maybeInitBuilder(); + if (container == null) { + builder.clearMasterContainer(); + } + this.masterContainer = container; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java new file mode 100644 index 00000000000..dced42397aa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.ApplicationStateData; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProtoOrBuilder; + +public class ApplicationStateDataPBImpl +extends ProtoBase +implements ApplicationStateData { + + ApplicationStateDataProto proto = + ApplicationStateDataProto.getDefaultInstance(); + ApplicationStateDataProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationSubmissionContext applicationSubmissionContext = null; + + public ApplicationStateDataPBImpl() { + builder = ApplicationStateDataProto.newBuilder(); + } + + public ApplicationStateDataPBImpl( + ApplicationStateDataProto proto) { + this.proto = proto; + viaProto = true; + } + + public ApplicationStateDataProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.applicationSubmissionContext != null) { + builder.setApplicationSubmissionContext( + ((ApplicationSubmissionContextPBImpl)applicationSubmissionContext) + .getProto()); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ApplicationStateDataProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public long getSubmitTime() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasSubmitTime()) { + return -1; + } + return (p.getSubmitTime()); + } + + @Override + public void setSubmitTime(long submitTime) { + maybeInitBuilder(); + builder.setSubmitTime(submitTime); + } + + @Override + public ApplicationSubmissionContext getApplicationSubmissionContext() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(applicationSubmissionContext != null) { + return applicationSubmissionContext; + } + if (!p.hasApplicationSubmissionContext()) { + return null; + } + applicationSubmissionContext = + new ApplicationSubmissionContextPBImpl( + p.getApplicationSubmissionContext()); + return applicationSubmissionContext; + } + + @Override + public void setApplicationSubmissionContext( + ApplicationSubmissionContext context) { + maybeInitBuilder(); + if (context == null) { + builder.clearApplicationSubmissionContext(); + } + this.applicationSubmissionContext = context; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index fa3763a7968..ef566834739 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -210,7 +210,6 @@ public void setAMContainerSpec(ContainerLaunchContext amContainer) { @Override public boolean getUnmanagedAM() { ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; - //There is a default so cancelTokens should never be null return p.getUnmanagedAm(); } @@ -219,7 +218,7 @@ public void setUnmanagedAM(boolean value) { maybeInitBuilder(); builder.setUnmanagedAm(value); } - + @Override public boolean getCancelTokensWhenComplete() { ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 55c0d78e7dc..175e134c83a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -329,3 +329,15 @@ message StringBytesMapProto { optional bytes value = 2; } +//////////////////////////////////////////////////////////////////////// +////// From recovery//////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////// +message ApplicationStateDataProto { + optional int64 submit_time = 1; + optional ApplicationSubmissionContextProto application_submission_context = 2; +} + +message ApplicationAttemptStateDataProto { + optional ApplicationAttemptIdProto attemptId = 1; + optional ContainerProto master_container = 2; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java index 2cb69730a63..990368e0b62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestGetGroups.java @@ -28,8 +28,6 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.service.Service.STATE; import org.junit.AfterClass; import org.junit.Before; @@ -46,8 +44,7 @@ public class TestGetGroups extends GetGroupsTestBase { @BeforeClass public static void setUpResourceManager() throws IOException, InterruptedException { conf = new YarnConfiguration(); - RMStateStore store = StoreFactory.getStore(conf); - resourceManager = new ResourceManager(store) { + resourceManager = new ResourceManager() { @Override protected void doSecureLogin() throws IOException { }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java index eb57e4194ba..3d7f1201f5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java @@ -34,7 +34,7 @@ public void test() { @Test public void testClientStop() { Configuration conf = new Configuration(); - ResourceManager rm = new ResourceManager(null); + ResourceManager rm = new ResourceManager(); rm.init(conf); rm.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 920362671ae..948f154ae58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -225,10 +225,12 @@ public class YarnConfiguration extends Configuration { public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7*24*60*60*1000; // 7 days + public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; + public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; /** The class to use as the persistent store.*/ public static final String RM_STORE = RM_PREFIX + "store.class"; - + /** The maximum number of completed applications RM keeps. */ public static final String RM_MAX_COMPLETED_APPLICATIONS = RM_PREFIX + "max-completed-applications"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 2b494172a7e..d60aab8aef5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -204,6 +204,13 @@ 8192 + + Enable RM to recover state after starting. If true, then + yarn.resourcemanager.store.class must be specified + yarn.resourcemanager.recovery.enabled + false + + The class to use as the persistent store. yarn.resourcemanager.store.class diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 96ee551e205..d5f8ac201ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -263,6 +263,8 @@ public AllocateResponse allocate(AllocateRequest request) } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { LOG.error("Invalid responseid from appAttemptId " + appAttemptId); // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: + // Reboot is not useful since after AM reboots, it will send register and + // get an exception. Might as well throw an exception here. allocateResponse.setAMResponse(reboot); return allocateResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index ec29a4792ba..e8bd5d03114 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -37,6 +37,7 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; @@ -75,6 +76,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -254,6 +256,20 @@ public SubmitApplicationResponse submitApplication( // So call handle directly and do not send an event. rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System .currentTimeMillis())); + + // If recovery is enabled then store the application information in a + // blocking call so make sure that RM has stored the information needed + // to restart the AM after RM restart without further client communication + RMStateStore stateStore = rmContext.getStateStore(); + LOG.info("Storing Application with id " + applicationId); + try { + stateStore.storeApplication(rmContext.getRMApps().get(applicationId)); + } catch (Exception e) { + // For HA this exception needs to be handled by giving up + // master status if we got fenced + LOG.error("Failed to store application:" + applicationId, e); + ExitUtil.terminate(1, e); + } LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 44753928fe1..8b5e55aa92b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.LinkedList; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,6 +37,10 @@ import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -48,7 +53,8 @@ /** * This class manages the list of applications for the resource manager. */ -public class RMAppManager implements EventHandler { +public class RMAppManager implements EventHandler, + Recoverable { private static final Log LOG = LogFactory.getLog(RMAppManager.class); @@ -173,6 +179,10 @@ protected synchronized void finishApplication(ApplicationId applicationId) { completedApps.add(applicationId); writeAuditLog(applicationId); + + // application completely done. Remove from state + RMStateStore store = rmContext.getStateStore(); + store.removeApplication(rmContext.getRMApps().get(applicationId)); } } @@ -306,6 +316,37 @@ private Credentials parseCredentials(ApplicationSubmissionContext application) } return credentials; } + + @Override + public void recover(RMState state) throws Exception { + RMStateStore store = rmContext.getStateStore(); + assert store != null; + // recover applications + Map appStates = state.getApplicationState(); + LOG.info("Recovering " + appStates.size() + " applications"); + for(ApplicationState appState : appStates.values()) { + // re-submit the application + // this is going to send an app start event but since the async dispatcher + // has not started that event will be queued until we have completed re + // populating the state + if(appState.getApplicationSubmissionContext().getUnmanagedAM()) { + // do not recover unmanaged applications since current recovery + // mechanism of restarting attempts does not work for them. + // This will need to be changed in work preserving recovery in which + // RM will re-connect with the running AM's instead of restarting them + LOG.info("Not recovering unmanaged application " + appState.getAppId()); + store.removeApplication(appState); + } else { + LOG.info("Recovering application " + appState.getAppId()); + submitApplication(appState.getApplicationSubmissionContext(), + appState.getSubmitTime()); + // re-populate attempt information in application + RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get( + appState.getAppId()); + appImpl.recover(state); + } + } + } @Override public void handle(RMAppManagerEvent event) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index ccb3deb382f..b48767001bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; @@ -38,6 +39,8 @@ public interface RMContext { Dispatcher getDispatcher(); + + RMStateStore getStateStore(); ConcurrentMap getRMApps(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 632b6c3453d..840d129dc4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -23,7 +23,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; @@ -33,6 +36,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import com.google.common.annotations.VisibleForTesting; + public class RMContextImpl implements RMContext { private final Dispatcher rmDispatcher; @@ -48,6 +53,7 @@ public class RMContextImpl implements RMContext { private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amFinishingMonitor; + private RMStateStore stateStore = null; private ContainerAllocationExpirer containerAllocationExpirer; private final DelegationTokenRenewer tokenRenewer; private final ApplicationTokenSecretManager appTokenSecretManager; @@ -55,6 +61,7 @@ public class RMContextImpl implements RMContext { private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; public RMContextImpl(Dispatcher rmDispatcher, + RMStateStore store, ContainerAllocationExpirer containerAllocationExpirer, AMLivelinessMonitor amLivelinessMonitor, AMLivelinessMonitor amFinishingMonitor, @@ -63,6 +70,7 @@ public RMContextImpl(Dispatcher rmDispatcher, RMContainerTokenSecretManager containerTokenSecretManager, ClientToAMTokenSecretManagerInRM clientTokenSecretManager) { this.rmDispatcher = rmDispatcher; + this.stateStore = store; this.containerAllocationExpirer = containerAllocationExpirer; this.amLivelinessMonitor = amLivelinessMonitor; this.amFinishingMonitor = amFinishingMonitor; @@ -71,11 +79,39 @@ public RMContextImpl(Dispatcher rmDispatcher, this.containerTokenSecretManager = containerTokenSecretManager; this.clientToAMTokenSecretManager = clientTokenSecretManager; } + + @VisibleForTesting + // helper constructor for tests + public RMContextImpl(Dispatcher rmDispatcher, + ContainerAllocationExpirer containerAllocationExpirer, + AMLivelinessMonitor amLivelinessMonitor, + AMLivelinessMonitor amFinishingMonitor, + DelegationTokenRenewer tokenRenewer, + ApplicationTokenSecretManager appTokenSecretManager, + RMContainerTokenSecretManager containerTokenSecretManager, + ClientToAMTokenSecretManagerInRM clientTokenSecretManager) { + this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor, + amFinishingMonitor, tokenRenewer, appTokenSecretManager, + containerTokenSecretManager, clientTokenSecretManager); + RMStateStore nullStore = new NullRMStateStore(); + nullStore.setDispatcher(rmDispatcher); + try { + nullStore.init(new YarnConfiguration()); + setStateStore(nullStore); + } catch (Exception e) { + assert false; + } + } @Override public Dispatcher getDispatcher() { return this.rmDispatcher; } + + @Override + public RMStateStore getStateStore() { + return stateStore; + } @Override public ConcurrentMap getRMApps() { @@ -126,4 +162,9 @@ public RMContainerTokenSecretManager getContainerTokenSecretManager() { public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() { return this.clientToAMTokenSecretManager; } + + @VisibleForTesting + public void setStateStore(RMStateStore store) { + stateStore = store; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index c7d2b26b699..e196770837e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; @@ -45,10 +46,11 @@ import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -80,6 +82,8 @@ import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps.Builder; +import com.google.common.annotations.VisibleForTesting; + /** * The ResourceManager is the main class that is a set of components. * "I am the ResourceManager. All your resources are belong to us..." @@ -119,14 +123,13 @@ public class ResourceManager extends CompositeService implements Recoverable { protected RMDelegationTokenSecretManager rmDTSecretManager; private WebApp webApp; protected RMContext rmContext; - private final RMStateStore store; protected ResourceTrackerService resourceTracker; + private boolean recoveryEnabled; private Configuration conf; - - public ResourceManager(RMStateStore store) { + + public ResourceManager() { super("ResourceManager"); - this.store = store; } public RMContext getRMContext() { @@ -160,12 +163,34 @@ public synchronized void init(Configuration conf) { this.containerTokenSecretManager = createContainerTokenSecretManager(conf); + boolean isRecoveryEnabled = conf.getBoolean( + YarnConfiguration.RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); + + RMStateStore rmStore = null; + if(isRecoveryEnabled) { + recoveryEnabled = true; + rmStore = RMStateStoreFactory.getStore(conf); + } else { + recoveryEnabled = false; + rmStore = new NullRMStateStore(); + } + try { + rmStore.init(conf); + rmStore.setDispatcher(rmDispatcher); + } catch (Exception e) { + // the Exception from stateStore.init() needs to be handled for + // HA and we need to give up master status if we got fenced + LOG.error("Failed to init state store", e); + ExitUtil.terminate(1, e); + } + this.rmContext = - new RMContextImpl(this.rmDispatcher, + new RMContextImpl(this.rmDispatcher, rmStore, this.containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, tokenRenewer, this.appTokenSecretManager, this.containerTokenSecretManager, this.clientToAMSecretManager); - + // Register event handler for NodesListManager this.nodesListManager = new NodesListManager(this.rmContext); this.rmDispatcher.register(NodesListManagerEventType.class, @@ -226,9 +251,15 @@ public synchronized void init(Configuration conf) { addService(applicationMasterLauncher); new RMNMInfo(this.rmContext, this.scheduler); - + super.init(conf); } + + @VisibleForTesting + protected void setRMStateStore(RMStateStore rmStore) { + rmStore.setDispatcher(rmDispatcher); + ((RMContextImpl) rmContext).setStateStore(rmStore); + } protected RMContainerTokenSecretManager createContainerTokenSecretManager( Configuration conf) { @@ -502,6 +533,19 @@ public void start() { this.appTokenSecretManager.start(); this.containerTokenSecretManager.start(); + if(recoveryEnabled) { + try { + RMStateStore rmStore = rmContext.getStateStore(); + RMState state = rmStore.loadState(); + recover(state); + } catch (Exception e) { + // the Exception from loadState() needs to be handled for + // HA and we need to give up master status if we got fenced + LOG.error("Failed to load/recover state", e); + ExitUtil.terminate(1, e); + } + } + startWepApp(); DefaultMetricsSystem.initialize("ResourceManager"); JvmMetrics.initSingleton("ResourceManager", null); @@ -555,6 +599,13 @@ public void stop() { DefaultMetricsSystem.shutdown(); + RMStateStore store = rmContext.getStateStore(); + try { + store.close(); + } catch (Exception e) { + LOG.error("Error closing store.", e); + } + super.stop(); } @@ -643,6 +694,8 @@ public ApplicationTokenSecretManager getApplicationTokenSecretManager(){ @Override public void recover(RMState state) throws Exception { + // recover applications + rmAppManager.recover(state); } public static void main(String argv[]) { @@ -650,13 +703,11 @@ public static void main(String argv[]) { StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); try { Configuration conf = new YarnConfiguration(); - RMStateStore store = StoreFactory.getStore(conf); - ResourceManager resourceManager = new ResourceManager(store); + ResourceManager resourceManager = new ResourceManager(); ShutdownHookManager.get().addShutdownHook( new CompositeServiceShutdownHook(resourceManager), SHUTDOWN_HOOK_PRIORITY); resourceManager.init(conf); - //resourceManager.recover(store.restore()); resourceManager.start(); } catch (Throwable t) { LOG.fatal("Error starting ResourceManager", t); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileRMStateStore.java deleted file mode 100644 index d1d0ee6d33b..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileRMStateStore.java +++ /dev/null @@ -1,22 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.yarn.server.resourcemanager.recovery; - -public class FileRMStateStore implements RMStateStore { - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java new file mode 100644 index 00000000000..c5d59378401 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import com.google.common.annotations.VisibleForTesting; + +@Private +@Unstable +public class MemoryRMStateStore extends RMStateStore { + + RMState state = new RMState(); + + @VisibleForTesting + public RMState getState() { + return state; + } + + @Override + public synchronized RMState loadState() throws Exception { + // return a copy of the state to allow for modification of the real state + RMState returnState = new RMState(); + returnState.appState.putAll(state.appState); + return returnState; + } + + @Override + public synchronized void initInternal(Configuration conf) { + } + + @Override + protected synchronized void closeInternal() throws Exception { + } + + @Override + public void storeApplicationState(String appId, + ApplicationStateDataPBImpl appStateData) + throws Exception { + ApplicationState appState = new ApplicationState( + appStateData.getSubmitTime(), + appStateData.getApplicationSubmissionContext()); + state.appState.put(appState.getAppId(), appState); + } + + @Override + public synchronized void storeApplicationAttemptState(String attemptIdStr, + ApplicationAttemptStateDataPBImpl attemptStateData) + throws Exception { + ApplicationAttemptId attemptId = ConverterUtils + .toApplicationAttemptId(attemptIdStr); + ApplicationAttemptState attemptState = new ApplicationAttemptState( + attemptId, attemptStateData.getMasterContainer()); + + ApplicationState appState = state.getApplicationState().get( + attemptState.getAttemptId().getApplicationId()); + assert appState != null; + + appState.attempts.put(attemptState.getAttemptId(), attemptState); + } + + @Override + public synchronized void removeApplicationState(ApplicationState appState) + throws Exception { + ApplicationId appId = appState.getAppId(); + ApplicationState removed = state.appState.remove(appId); + assert removed != null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java new file mode 100644 index 00000000000..6b614606c27 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl; + +public class NullRMStateStore extends RMStateStore { + + @Override + protected void initInternal(Configuration conf) throws Exception { + // Do nothing + } + + @Override + protected void closeInternal() throws Exception { + // Do nothing + } + + @Override + public RMState loadState() throws Exception { + return null; + } + + @Override + protected void storeApplicationState(String appId, + ApplicationStateDataPBImpl appStateData) throws Exception { + // Do nothing + } + + @Override + protected void storeApplicationAttemptState(String attemptId, + ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { + // Do nothing + } + + @Override + protected void removeApplicationState(ApplicationState appState) + throws Exception { + // Do nothing + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 4e1e41e7813..674a779cc2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -15,10 +15,313 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.yarn.server.resourcemanager.recovery; -public interface RMStateStore { - public interface RMState { +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; + +@Private +@Unstable +/** + * Base class to implement storage of ResourceManager state. + * Takes care of asynchronous notifications and interfacing with YARN objects. + * Real store implementations need to derive from it and implement blocking + * store and load methods to actually store and load the state. + */ +public abstract class RMStateStore { + public static final Log LOG = LogFactory.getLog(RMStateStore.class); + + /** + * State of an application attempt + */ + public static class ApplicationAttemptState { + final ApplicationAttemptId attemptId; + final Container masterContainer; + + public ApplicationAttemptState(ApplicationAttemptId attemptId, + Container masterContainer) { + this.attemptId = attemptId; + this.masterContainer = masterContainer; + } + + public Container getMasterContainer() { + return masterContainer; + } + public ApplicationAttemptId getAttemptId() { + return attemptId; + } + } + + /** + * State of an application application + */ + public static class ApplicationState { + final ApplicationSubmissionContext context; + final long submitTime; + Map attempts = + new HashMap(); + + ApplicationState(long submitTime, ApplicationSubmissionContext context) { + this.submitTime = submitTime; + this.context = context; + } + + public ApplicationId getAppId() { + return context.getApplicationId(); + } + public long getSubmitTime() { + return submitTime; + } + public int getAttemptCount() { + return attempts.size(); + } + public ApplicationSubmissionContext getApplicationSubmissionContext() { + return context; + } + public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) { + return attempts.get(attemptId); + } + } + + /** + * State of the ResourceManager + */ + public static class RMState { + Map appState = + new HashMap(); + + public Map getApplicationState() { + return appState; + } + } + + private Dispatcher rmDispatcher; + + /** + * Dispatcher used to send state operation completion events to + * ResourceManager services + */ + public void setDispatcher(Dispatcher dispatcher) { + this.rmDispatcher = dispatcher; + } + + AsyncDispatcher dispatcher; + + public synchronized void init(Configuration conf) throws Exception{ + // create async handler + dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.register(RMStateStoreEventType.class, + new ForwardingEventHandler()); + dispatcher.start(); + + initInternal(conf); + } + + /** + * Derived classes initialize themselves using this method. + * The base class is initialized and the event dispatcher is ready to use at + * this point + */ + protected abstract void initInternal(Configuration conf) throws Exception; + + public synchronized void close() throws Exception { + closeInternal(); + dispatcher.stop(); + } + + /** + * Derived classes close themselves using this method. + * The base class will be closed and the event dispatcher will be shutdown + * after this + */ + protected abstract void closeInternal() throws Exception; + + /** + * Blocking API + * The derived class must recover state from the store and return a new + * RMState object populated with that state + * This must not be called on the dispatcher thread + */ + public abstract RMState loadState() throws Exception; + + /** + * Blocking API + * ResourceManager services use this to store the application's state + * This must not be called on the dispatcher thread + */ + public synchronized void storeApplication(RMApp app) throws Exception { + ApplicationSubmissionContext context = app + .getApplicationSubmissionContext(); + assert context instanceof ApplicationSubmissionContextPBImpl; + + ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl(); + appStateData.setSubmitTime(app.getSubmitTime()); + appStateData.setApplicationSubmissionContext(context); + + LOG.info("Storing info for app: " + context.getApplicationId()); + storeApplicationState(app.getApplicationId().toString(), appStateData); + } + + /** + * Blocking API + * Derived classes must implement this method to store the state of an + * application. + */ + protected abstract void storeApplicationState(String appId, + ApplicationStateDataPBImpl appStateData) + throws Exception; + + @SuppressWarnings("unchecked") + /** + * Non-blocking API + * ResourceManager services call this to store state on an application attempt + * This does not block the dispatcher threads + * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt + */ + public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { + ApplicationAttemptState attemptState = new ApplicationAttemptState( + appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + dispatcher.getEventHandler().handle( + new RMStateStoreAppAttemptEvent(attemptState)); + } + + /** + * Blocking API + * Derived classes must implement this method to store the state of an + * application attempt + */ + protected abstract void storeApplicationAttemptState(String attemptId, + ApplicationAttemptStateDataPBImpl attemptStateData) + throws Exception; + + + /** + * Non-blocking API + * ResourceManager services call this to remove an application from the state + * store + * This does not block the dispatcher threads + * There is no notification of completion for this operation. + */ + public synchronized void removeApplication(RMApp app) { + ApplicationState appState = new ApplicationState( + app.getSubmitTime(), app.getApplicationSubmissionContext()); + for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { + ApplicationAttemptState attemptState = new ApplicationAttemptState( + appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + appState.attempts.put(attemptState.getAttemptId(), attemptState); + } + + removeApplication(appState); + } + + @SuppressWarnings("unchecked") + /** + * Non-Blocking API + */ + public synchronized void removeApplication(ApplicationState appState) { + dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState)); + } + + /** + * Blocking API + * Derived classes must implement this method to remove the state of an + * application and its attempts + */ + protected abstract void removeApplicationState(ApplicationState appState) + throws Exception; + + // Dispatcher related code + + private synchronized void handleStoreEvent(RMStateStoreEvent event) { + switch(event.getType()) { + case STORE_APP_ATTEMPT: + { + ApplicationAttemptState attemptState = + ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); + Exception storedException = null; + ApplicationAttemptStateDataPBImpl attemptStateData = + new ApplicationAttemptStateDataPBImpl(); + attemptStateData.setAttemptId(attemptState.getAttemptId()); + attemptStateData.setMasterContainer(attemptState.getMasterContainer()); + + LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); + try { + storeApplicationAttemptState(attemptState.getAttemptId().toString(), + attemptStateData); + } catch (Exception e) { + LOG.error("Error storing appAttempt: " + + attemptState.getAttemptId(), e); + storedException = e; + } finally { + notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), + storedException); + } + } + break; + case REMOVE_APP: + { + ApplicationState appState = + ((RMStateStoreRemoveAppEvent) event).getAppState(); + ApplicationId appId = appState.getAppId(); + + LOG.info("Removing info for app: " + appId); + try { + removeApplicationState(appState); + } catch (Exception e) { + LOG.error("Error removing app: " + appId, e); + } + } + break; + default: + LOG.error("Unknown RMStateStoreEvent type: " + event.getType()); + } + } + + @SuppressWarnings("unchecked") + /** + * In (@link storeApplicationAttempt}, derived class can call this method to + * notify the application attempt about operation completion + * @param appAttempt attempt that has been saved + */ + private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId, + Exception storedException) { + rmDispatcher.getEventHandler().handle( + new RMAppAttemptStoredEvent(attemptId, storedException)); + } + + /** + * EventHandler implementation which forward events to the FSRMStateStore + * This hides the EventHandle methods of the store from its public interface + */ + private final class ForwardingEventHandler + implements EventHandler { + + @Override + public void handle(RMStateStoreEvent event) { + handleStoreEvent(event); + } } + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java new file mode 100644 index 00000000000..c4a04bc5771 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; + +public class RMStateStoreAppAttemptEvent extends RMStateStoreEvent { + ApplicationAttemptState attemptState; + + public RMStateStoreAppAttemptEvent(ApplicationAttemptState attemptState) { + super(RMStateStoreEventType.STORE_APP_ATTEMPT); + this.attemptState = attemptState; + } + + public ApplicationAttemptState getAppAttemptState() { + return attemptState; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java new file mode 100644 index 00000000000..8e49a826e04 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEvent.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class RMStateStoreEvent extends AbstractEvent { + public RMStateStoreEvent(RMStateStoreEventType type) { + super(type); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java new file mode 100644 index 00000000000..22f155cbf26 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +public enum RMStateStoreEventType { + STORE_APP_ATTEMPT, + REMOVE_APP +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreFactory.java similarity index 92% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFactory.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreFactory.java index b314989968f..f9e2869d997 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreFactory.java @@ -21,12 +21,12 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; -public class StoreFactory { +public class RMStateStoreFactory { public static RMStateStore getStore(Configuration conf) { RMStateStore store = ReflectionUtils.newInstance( conf.getClass(YarnConfiguration.RM_STORE, - FileRMStateStore.class, RMStateStore.class), + MemoryRMStateStore.class, RMStateStore.class), conf); return store; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java new file mode 100644 index 00000000000..402feb96ec9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; + +public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent { + ApplicationState appState; + + RMStateStoreRemoveAppEvent(ApplicationState appState) { + super(RMStateStoreEventType.REMOVE_APP); + this.appState = appState; + } + + public ApplicationState getAppState() { + return appState; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java index 5e5a07efb2e..cbbeb3d01d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + @InterfaceAudience.Private @InterfaceStability.Unstable package org.apache.hadoop.yarn.server.resourcemanager.recovery; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 2533d465dee..92bc2b600f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -44,6 +44,12 @@ public interface RMApp extends EventHandler { * @return the {@link ApplicationId} for this {@link RMApp}. */ ApplicationId getApplicationId(); + + /** + * The application submission context for this {@link RMApp} + * @return the {@link ApplicationSubmissionContext} for this {@link RMApp} + */ + ApplicationSubmissionContext getApplicationSubmissionContext(); /** * The current state of the {@link RMApp}. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 0317a3c6022..70abe2fe319 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -50,6 +50,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -66,7 +69,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.Records; -public class RMAppImpl implements RMApp { +public class RMAppImpl implements RMApp, Recoverable { private static final Log LOG = LogFactory.getLog(RMAppImpl.class); private static final String UNAVAILABLE = "N/A"; @@ -243,6 +246,11 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, public ApplicationId getApplicationId() { return this.applicationId; } + + @Override + public ApplicationSubmissionContext getApplicationSubmissionContext() { + return this.submissionContext; + } @Override public FinalApplicationStatus getFinalApplicationStatus() { @@ -512,9 +520,22 @@ public void handle(RMAppEvent event) { this.writeLock.unlock(); } } + + @Override + public void recover(RMState state) { + ApplicationState appState = state.getApplicationState().get(getApplicationId()); + LOG.info("Recovering app: " + getApplicationId() + " with " + + + appState.getAttemptCount() + " attempts"); + for(int i=0; i= app.maxRetries) { retryApp = false; msg = "Application " + app.getApplicationId() + " failed " + app.maxRetries + " times due to " + failedEvent.getDiagnostics() @@ -655,7 +678,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } if (retryApp) { - app.createNewAttempt(); + app.createNewAttempt(true); return initialState; } else { LOG.info(msg); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java index 99287a3a55f..bd96e2b9f5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java @@ -39,9 +39,15 @@ public enum RMAppAttemptEventType { CONTAINER_ACQUIRED, CONTAINER_ALLOCATED, CONTAINER_FINISHED, + + // Source: RMStateStore + ATTEMPT_SAVED, // Source: Scheduler APP_REJECTED, APP_ACCEPTED, + + // Source: RMAttemptImpl.recover + RECOVER } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index da8a6ed6719..95a19541311 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -57,6 +58,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -69,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -85,7 +92,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils; @SuppressWarnings({"unchecked", "rawtypes"}) -public class RMAppAttemptImpl implements RMAppAttempt { +public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private static final Log LOG = LogFactory.getLog(RMAppAttemptImpl.class); @@ -153,12 +160,15 @@ RMAppAttemptEventType.START, new AttemptStartedTransition()) .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED, RMAppAttemptEventType.REGISTERED, new UnexpectedAMRegisteredTransition()) + .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.RECOVERED, + RMAppAttemptEventType.RECOVER) // Transitions from SUBMITTED state .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED, RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition()) .addTransition(RMAppAttemptState.SUBMITTED, - EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.SCHEDULED), + EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, + RMAppAttemptState.SCHEDULED), RMAppAttemptEventType.APP_ACCEPTED, new ScheduleTransition()) .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED, @@ -170,12 +180,42 @@ RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition()) // Transitions from SCHEDULED State .addTransition(RMAppAttemptState.SCHEDULED, - RMAppAttemptState.ALLOCATED, + RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptEventType.CONTAINER_ALLOCATED, new AMContainerAllocatedTransition()) .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.KILLED, RMAppAttemptEventType.KILL, new BaseFinalTransition(RMAppAttemptState.KILLED)) + + // Transitions from ALLOCATED_SAVING State + .addTransition(RMAppAttemptState.ALLOCATED_SAVING, + RMAppAttemptState.ALLOCATED, + RMAppAttemptEventType.ATTEMPT_SAVED, new AttemptStoredTransition()) + .addTransition(RMAppAttemptState.ALLOCATED_SAVING, + RMAppAttemptState.ALLOCATED_SAVING, + RMAppAttemptEventType.CONTAINER_ACQUIRED, + new ContainerAcquiredTransition()) + // App could be killed by the client. So need to handle this. + .addTransition(RMAppAttemptState.ALLOCATED_SAVING, + RMAppAttemptState.KILLED, + RMAppAttemptEventType.KILL, + new BaseFinalTransition(RMAppAttemptState.KILLED)) + + // Transitions from LAUNCHED_UNMANAGED_SAVING State + .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, + RMAppAttemptState.LAUNCHED, + RMAppAttemptEventType.ATTEMPT_SAVED, + new UnmanagedAMAttemptSavedTransition()) + // attempt should not try to register in this state + .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, + RMAppAttemptState.FAILED, + RMAppAttemptEventType.REGISTERED, + new UnexpectedAMRegisteredTransition()) + // App could be killed by the client. So need to handle this. + .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, + RMAppAttemptState.KILLED, + RMAppAttemptEventType.KILL, + new BaseFinalTransition(RMAppAttemptState.KILLED)) // Transitions from ALLOCATED State .addTransition(RMAppAttemptState.ALLOCATED, @@ -279,11 +319,30 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition()) RMAppAttemptEventType.EXPIRE, RMAppAttemptEventType.REGISTERED, RMAppAttemptEventType.CONTAINER_ALLOCATED, + RMAppAttemptEventType.ATTEMPT_SAVED, + RMAppAttemptEventType.CONTAINER_FINISHED, + RMAppAttemptEventType.UNREGISTERED, + RMAppAttemptEventType.KILL, + RMAppAttemptEventType.STATUS_UPDATE)) + + // Transitions from RECOVERED State + .addTransition( + RMAppAttemptState.RECOVERED, + RMAppAttemptState.RECOVERED, + EnumSet.of(RMAppAttemptEventType.START, + RMAppAttemptEventType.APP_ACCEPTED, + RMAppAttemptEventType.APP_REJECTED, + RMAppAttemptEventType.EXPIRE, + RMAppAttemptEventType.LAUNCHED, + RMAppAttemptEventType.LAUNCH_FAILED, + RMAppAttemptEventType.REGISTERED, + RMAppAttemptEventType.CONTAINER_ALLOCATED, + RMAppAttemptEventType.CONTAINER_ACQUIRED, + RMAppAttemptEventType.ATTEMPT_SAVED, RMAppAttemptEventType.CONTAINER_FINISHED, RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.KILL, RMAppAttemptEventType.STATUS_UPDATE)) - .installTopology(); public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, @@ -318,7 +377,7 @@ public ApplicationAttemptId getAppAttemptId() { @Override public ApplicationSubmissionContext getSubmissionContext() { return this.submissionContext; - } + } @Override public FinalApplicationStatus getFinalApplicationStatus() { @@ -494,6 +553,10 @@ public Container getMasterContainer() { } } + private void setMasterContainer(Container container) { + masterContainer = container; + } + @Override public void handle(RMAppAttemptEvent event) { @@ -561,6 +624,21 @@ public ApplicationResourceUsageReport getApplicationResourceUsageReport() { } } + @Override + public void recover(RMState state) { + ApplicationState appState = + state.getApplicationState().get(getAppAttemptId().getApplicationId()); + ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId()); + assert attemptState != null; + setMasterContainer(attemptState.getMasterContainer()); + LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId() + + " AttemptId: " + getAppAttemptId() + + " MasterContainer: " + masterContainer); + setDiagnostics("Attempt recovered after RM restart"); + handle(new RMAppAttemptEvent(getAppAttemptId(), + RMAppAttemptEventType.RECOVER)); + } + private static class BaseTransition implements SingleArcTransition { @@ -625,13 +703,12 @@ private static final class ScheduleTransition @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - - // Send the acceptance to the app - appAttempt.eventHandler.handle(new RMAppEvent(event - .getApplicationAttemptId().getApplicationId(), - RMAppEventType.APP_ACCEPTED)); - if (!appAttempt.submissionContext.getUnmanagedAM()) { + // Send the acceptance to the app + appAttempt.eventHandler.handle(new RMAppEvent(event + .getApplicationAttemptId().getApplicationId(), + RMAppEventType.APP_ACCEPTED)); + // Request a container for the AM. ResourceRequest request = BuilderUtils.newResourceRequest( AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext @@ -647,35 +724,42 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, return RMAppAttemptState.SCHEDULED; } else { // RM not allocating container. AM is self launched. - // Directly go to LAUNCHED state - // Register with AMLivelinessMonitor - appAttempt.rmContext.getAMLivelinessMonitor().register( - appAttempt.applicationAttemptId); - return RMAppAttemptState.LAUNCHED; + RMStateStore store = appAttempt.rmContext.getStateStore(); + // save state and then go to LAUNCHED state + appAttempt.storeAttempt(store); + return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING; } } } - private static final class AMContainerAllocatedTransition extends BaseTransition { + private static final class AMContainerAllocatedTransition + extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, - RMAppAttemptEvent event) { - + RMAppAttemptEvent event) { // Acquire the AM container from the scheduler. Allocation amContainerAllocation = appAttempt.scheduler.allocate( appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST); // Set the masterContainer - appAttempt.masterContainer = amContainerAllocation.getContainers().get( - 0); + appAttempt.setMasterContainer(amContainerAllocation.getContainers().get( + 0)); - // Send event to launch the AM Container - appAttempt.eventHandler.handle(new AMLauncherEvent( - AMLauncherEventType.LAUNCH, appAttempt)); + RMStateStore store = appAttempt.rmContext.getStateStore(); + appAttempt.storeAttempt(store); } } - + + private static final class AttemptStoredTransition extends BaseTransition { + @Override + public void transition(RMAppAttemptImpl appAttempt, + RMAppAttemptEvent event) { + appAttempt.checkAttemptStoreError(event); + appAttempt.launchAttempt(); + } + } + private static class BaseFinalTransition extends BaseTransition { private final RMAppAttemptState finalAttemptState; @@ -736,17 +820,34 @@ public void transition(RMAppAttemptImpl appAttempt, } } - private static final class AMLaunchedTransition extends BaseTransition { + private static class AMLaunchedTransition extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, - RMAppAttemptEvent event) { - + RMAppAttemptEvent event) { // Register with AMLivelinessMonitor - appAttempt.rmContext.getAMLivelinessMonitor().register( - appAttempt.applicationAttemptId); - + appAttempt.attemptLaunched(); } } + + private static final class UnmanagedAMAttemptSavedTransition + extends AMLaunchedTransition { + @Override + public void transition(RMAppAttemptImpl appAttempt, + RMAppAttemptEvent event) { + appAttempt.checkAttemptStoreError(event); + // Send the acceptance to the app + // Ideally this should have been done when the scheduler accepted the app. + // But its here because until the attempt is saved the client should not + // launch the unmanaged AM. Client waits for the app status to be accepted + // before doing so. So we have to delay the accepted state until we have + // completed storing the attempt + appAttempt.eventHandler.handle(new RMAppEvent(event + .getApplicationAttemptId().getApplicationId(), + RMAppEventType.APP_ACCEPTED)); + + super.transition(appAttempt, event); + } + } private static final class LaunchFailedTransition extends BaseFinalTransition { @@ -1040,4 +1141,37 @@ public long getStartTime() { this.readLock.unlock(); } } + + private void launchAttempt(){ + // Send event to launch the AM Container + eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this)); + } + + private void attemptLaunched() { + // Register with AMLivelinessMonitor + rmContext.getAMLivelinessMonitor().register(getAppAttemptId()); + } + + private void checkAttemptStoreError(RMAppAttemptEvent event) { + RMAppAttemptStoredEvent storeEvent = (RMAppAttemptStoredEvent) event; + if(storeEvent.getStoredException() != null) + { + // This needs to be handled for HA and give up master status if we got + // fenced + LOG.error("Failed to store attempt: " + getAppAttemptId(), + storeEvent.getStoredException()); + ExitUtil.terminate(1, storeEvent.getStoredException()); + } + } + + private void storeAttempt(RMStateStore store) { + // store attempt data in a non-blocking manner to prevent dispatcher + // thread starvation and wait for state to be saved + LOG.info("Storing attempt: AppId: " + + getAppAttemptId().getApplicationId() + + " AttemptId: " + + getAppAttemptId() + + " MasterContainer: " + masterContainer); + store.storeApplicationAttempt(this); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java index b3eaa02cde2..3eb13edbeef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java @@ -19,6 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; public enum RMAppAttemptState { - NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, - FINISHING, FINISHED, KILLED, + NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING, + FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStoredEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStoredEvent.java new file mode 100644 index 00000000000..8d9ba359247 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStoredEvent.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; + +public class RMAppAttemptStoredEvent extends RMAppAttemptEvent { + + final Exception storedException; + + public RMAppAttemptStoredEvent(ApplicationAttemptId appAttemptId, + Exception storedException) { + super(appAttemptId, RMAppAttemptEventType.ATTEMPT_SAVED); + this.storedException = storedException; + } + + public Exception getStoredException() { + return storedException; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index b097ea37140..ace5efb1fa2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -47,7 +47,7 @@ public class MockAM { private volatile int responseId = 0; private final ApplicationAttemptId attemptId; private final RMContext context; - private final AMRMProtocol amRMProtocol; + private AMRMProtocol amRMProtocol; private final List requests = new ArrayList(); private final List releases = new ArrayList(); @@ -58,6 +58,10 @@ public class MockAM { this.amRMProtocol = amRMProtocol; this.attemptId = attemptId; } + + void setAMRMProtocol(AMRMProtocol amRMProtocol) { + this.amRMProtocol = amRMProtocol; + } public void waitForState(RMAppAttemptState finalState) throws Exception { RMApp app = context.getRMApps().get(attemptId.getApplicationId()); @@ -66,7 +70,8 @@ public void waitForState(RMAppAttemptState finalState) throws Exception { while (!finalState.equals(attempt.getAppAttemptState()) && timeoutSecs++ < 20) { System.out - .println("AppAttempt State is : " + attempt.getAppAttemptState() + .println("AppAttempt : " + attemptId + " State is : " + + attempt.getAppAttemptState() + " Waiting for state : " + finalState); Thread.sleep(500); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index ba999bfb2e0..8f66bdba485 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -46,7 +46,7 @@ public class MockNM { private int responseId; private NodeId nodeId; private final int memory; - private final ResourceTrackerService resourceTracker; + private ResourceTrackerService resourceTracker; private final int httpPort = 2; private MasterKey currentMasterKey; @@ -66,6 +66,10 @@ public NodeId getNodeId() { public int getHttpPort() { return httpPort; } + + void setResourceTrackerService(ResourceTrackerService resourceTracker) { + this.resourceTracker = resourceTracker; + } public void containerStatus(Container container) throws Exception { Map> conts = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 11bf85d3f96..0bc3211a819 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -39,9 +39,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -63,10 +64,17 @@ public MockRM() { } public MockRM(Configuration conf) { - super(StoreFactory.getStore(conf)); + this(conf, null); + } + + public MockRM(Configuration conf, RMStateStore store) { + super(); init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); + if(store != null) { + setRMStateStore(store); + } Logger rootLogger = LogManager.getRootLogger(); - rootLogger.setLevel(Level.DEBUG); + rootLogger.setLevel(Level.DEBUG); } public void waitForState(ApplicationId appId, RMAppState finalState) @@ -75,7 +83,7 @@ public void waitForState(ApplicationId appId, RMAppState finalState) Assert.assertNotNull("app shouldn't be null", app); int timeoutSecs = 0; while (!finalState.equals(app.getState()) && timeoutSecs++ < 20) { - System.out.println("App State is : " + app.getState() + System.out.println("App : " + appId + " State is : " + app.getState() + " Waiting for state : " + finalState); Thread.sleep(500); } @@ -83,6 +91,24 @@ public void waitForState(ApplicationId appId, RMAppState finalState) Assert.assertEquals("App state is not correct (timedout)", finalState, app.getState()); } + + public void waitForState(ApplicationAttemptId attemptId, + RMAppAttemptState finalState) + throws Exception { + RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId()); + Assert.assertNotNull("app shouldn't be null", app); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + int timeoutSecs = 0; + while (!finalState.equals(attempt.getAppAttemptState()) && timeoutSecs++ < 20) { + System.out.println("AppAttempt : " + attemptId + + " State is : " + attempt.getAppAttemptState() + + " Waiting for state : " + finalState); + Thread.sleep(500); + } + System.out.println("Attempt State is : " + attempt.getAppAttemptState()); + Assert.assertEquals("Attempt state is not correct (timedout)", finalState, + attempt.getAppAttemptState()); + } // get new application id public GetNewApplicationResponse getNewAppId() throws Exception { @@ -97,11 +123,16 @@ public RMApp submitApp(int masterMemory) throws Exception { // client public RMApp submitApp(int masterMemory, String name, String user) throws Exception { - return submitApp(masterMemory, name, user, null); + return submitApp(masterMemory, name, user, null, false); } - + public RMApp submitApp(int masterMemory, String name, String user, Map acls) throws Exception { + return submitApp(masterMemory, name, user, acls, false); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged) throws Exception { ClientRMProtocol client = getClientRMService(); GetNewApplicationResponse resp = client.getNewApplication(Records .newRecord(GetNewApplicationRequest.class)); @@ -114,6 +145,9 @@ public RMApp submitApp(int masterMemory, String name, String user, sub.setApplicationId(appId); sub.setApplicationName(name); sub.setUser(user); + if(unmanaged) { + sub.setUnmanagedAM(true); + } ContainerLaunchContext clc = Records .newRecord(ContainerLaunchContext.class); Resource capability = Records.newRecord(Resource.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java index e0852c3ddd4..9ec2f13e2d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java @@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.service.Service.STATE; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -85,7 +85,7 @@ public class TestApplicationACLs { @BeforeClass public static void setup() throws InterruptedException, IOException { - RMStateStore store = StoreFactory.getStore(conf); + RMStateStore store = RMStateStoreFactory.getStore(conf); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); AccessControlList adminACL = new AccessControlList(""); adminACL.addGroup(SUPER_GROUP); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java new file mode 100644 index 00000000000..d4f97380c3d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; + +public class TestRMRestart { + + @Test + public void testRMRestart() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, + "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); + conf.set(YarnConfiguration.RM_SCHEDULER, + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); + + + // PHASE 1: create state in an RM + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + + // start like normal because state is empty + rm1.start(); + + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm2 = new MockNM("h2:5678", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm2.registerNode(); // nm2 will not heartbeat with RM1 + + // create app that will not be saved because it will finish + RMApp app0 = rm1.submitApp(200); + RMAppAttempt attempt0 = app0.getCurrentAppAttempt(); + // spot check that app is saved + Assert.assertEquals(1, rmAppState.size()); + nm1.nodeHeartbeat(true); + MockAM am0 = rm1.sendAMLaunched(attempt0.getAppAttemptId()); + am0.registerAppAttempt(); + am0.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt0.getAppAttemptId(), 1, ContainerState.COMPLETE); + am0.waitForState(RMAppAttemptState.FINISHED); + rm1.waitForState(app0.getApplicationId(), RMAppState.FINISHED); + + // spot check that app is not saved anymore + Assert.assertEquals(0, rmAppState.size()); + + // create app that gets launched and does allocate before RM restart + RMApp app1 = rm1.submitApp(200); + // assert app1 info is saved + ApplicationState appState = rmAppState.get(app1.getApplicationId()); + Assert.assertNotNull(appState); + Assert.assertEquals(0, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), app1.getApplicationSubmissionContext() + .getApplicationId()); + + //kick the scheduling to allocate AM container + nm1.nodeHeartbeat(true); + + // assert app1 attempt is saved + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + Assert.assertEquals(1, appState.getAttemptCount()); + ApplicationAttemptState attemptState = + appState.getAttempt(attemptId1); + Assert.assertNotNull(attemptState); + Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), + attemptState.getMasterContainer().getId()); + + // launch the AM + MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // AM request for containers + am1.allocate("h1" , 1000, 1, new ArrayList()); + // kick the scheduler + nm1.nodeHeartbeat(true); + List conts = am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (conts.size() == 0) { + nm1.nodeHeartbeat(true); + conts.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(500); + } + + // create app that does not get launched by RM before RM restart + RMApp app2 = rm1.submitApp(200); + + // assert app2 info is saved + appState = rmAppState.get(app2.getApplicationId()); + Assert.assertNotNull(appState); + Assert.assertEquals(0, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), app2.getApplicationSubmissionContext() + .getApplicationId()); + + // create unmanaged app + RMApp appUnmanaged = rm1.submitApp(200, "", "", null, true); + ApplicationAttemptId unmanagedAttemptId = + appUnmanaged.getCurrentAppAttempt().getAppAttemptId(); + // assert appUnmanaged info is saved + ApplicationId unmanagedAppId = appUnmanaged.getApplicationId(); + appState = rmAppState.get(unmanagedAppId); + Assert.assertNotNull(appState); + // wait for attempt to reach LAUNCHED state + rm1.waitForState(unmanagedAttemptId, RMAppAttemptState.LAUNCHED); + rm1.waitForState(unmanagedAppId, RMAppState.ACCEPTED); + // assert unmanaged attempt info is saved + Assert.assertEquals(1, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), appUnmanaged.getApplicationSubmissionContext() + .getApplicationId()); + + + // PHASE 2: create new RM and start from old state + + // create new RM to represent restart and recover state + MockRM rm2 = new MockRM(conf, memStore); + + // start new RM + rm2.start(); + + // change NM to point to new RM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm2.setResourceTrackerService(rm2.getResourceTrackerService()); + + // verify load of old state + // only 2 apps are loaded since unmanaged app is not loaded back since it + // cannot be restarted by the RM this will change with work preserving RM + // restart in which AMs/NMs are not rebooted + Assert.assertEquals(2, rm2.getRMContext().getRMApps().size()); + + // verify correct number of attempts and other data + RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + Assert.assertNotNull(loadedApp1); + //Assert.assertEquals(1, loadedApp1.getAppAttempts().size()); + Assert.assertEquals(app1.getApplicationSubmissionContext() + .getApplicationId(), loadedApp1.getApplicationSubmissionContext() + .getApplicationId()); + + RMApp loadedApp2 = rm2.getRMContext().getRMApps().get(app2.getApplicationId()); + Assert.assertNotNull(loadedApp2); + //Assert.assertEquals(0, loadedApp2.getAppAttempts().size()); + Assert.assertEquals(app2.getApplicationSubmissionContext() + .getApplicationId(), loadedApp2.getApplicationSubmissionContext() + .getApplicationId()); + + // verify state machine kicked into expected states + rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); + rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED); + + // verify new attempts created + Assert.assertEquals(2, loadedApp1.getAppAttempts().size()); + Assert.assertEquals(1, loadedApp2.getAppAttempts().size()); + + // verify old AM is not accepted + // change running AM to talk to new RM + am1.setAMRMProtocol(rm2.getApplicationMasterService()); + AMResponse amResponse = am1.allocate(new ArrayList(), + new ArrayList()); + Assert.assertTrue(amResponse.getReboot()); + + // NM should be rebooted on heartbeat, even first heartbeat for nm2 + HeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction()); + hbResponse = nm2.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction()); + + // new NM to represent NM re-register + nm1 = rm2.registerNode("h1:1234", 15120); + nm2 = rm2.registerNode("h2:5678", 15120); + + // verify no more reboot response sent + hbResponse = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction()); + hbResponse = nm2.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction()); + + // assert app1 attempt is saved + attempt1 = loadedApp1.getCurrentAppAttempt(); + attemptId1 = attempt1.getAppAttemptId(); + rm2.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + appState = rmAppState.get(loadedApp1.getApplicationId()); + attemptState = appState.getAttempt(attemptId1); + Assert.assertNotNull(attemptState); + Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), + attemptState.getMasterContainer().getId()); + + // Nodes on which the AM's run + MockNM am1Node = nm1; + if(attemptState.getMasterContainer().getNodeId().toString().contains("h2")){ + am1Node = nm2; + } + + // assert app2 attempt is saved + RMAppAttempt attempt2 = loadedApp2.getCurrentAppAttempt(); + ApplicationAttemptId attemptId2 = attempt2.getAppAttemptId(); + rm2.waitForState(attemptId2, RMAppAttemptState.ALLOCATED); + appState = rmAppState.get(loadedApp2.getApplicationId()); + attemptState = appState.getAttempt(attemptId2); + Assert.assertNotNull(attemptState); + Assert.assertEquals(BuilderUtils.newContainerId(attemptId2, 1), + attemptState.getMasterContainer().getId()); + + MockNM am2Node = nm1; + if(attemptState.getMasterContainer().getNodeId().toString().contains("h2")){ + am2Node = nm2; + } + + // start the AM's + am1 = rm2.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + MockAM am2 = rm2.sendAMLaunched(attempt2.getAppAttemptId()); + am2.registerAppAttempt(); + + //request for containers + am1.allocate("h1" , 1000, 3, new ArrayList()); + am2.allocate("h2" , 1000, 1, new ArrayList()); + + // verify container allocate continues to work + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + conts = am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (conts.size() == 0) { + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + conts.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(500); + } + + // finish the AM's + am1.unregisterAppAttempt(); + am1Node.nodeHeartbeat(attempt1.getAppAttemptId(), 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FINISHED); + + am2.unregisterAppAttempt(); + am2Node.nodeHeartbeat(attempt2.getAppAttemptId(), 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FINISHED); + + // stop RM's + rm2.stop(); + rm1.stop(); + + // completed apps should be removed + Assert.assertEquals(0, rmAppState.size()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index 9ae5cbae0f7..65743c36eff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.junit.After; @@ -47,8 +45,7 @@ public class TestResourceManager { @Before public void setUp() throws Exception { Configuration conf = new YarnConfiguration(); - RMStateStore store = StoreFactory.getStore(conf); - resourceManager = new ResourceManager(store); + resourceManager = new ResourceManager(); resourceManager.init(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 2b9c8485c56..088eca9a2b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationMaster; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationStatus; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -152,6 +153,11 @@ public String getUser() { throw new UnsupportedOperationException("Not supported yet."); } + @Override + public ApplicationSubmissionContext getApplicationSubmissionContext() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public String getName() { throw new UnsupportedOperationException("Not supported yet."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java index f866694c1b5..d1d45d09f3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.junit.After; import org.junit.Before; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java index 296ca7388d0..add00db378e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java @@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 2b1b892e189..142cc7e9e1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -24,9 +24,11 @@ import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -67,6 +69,11 @@ public MockRMApp(int newid, long time, RMAppState newState, String userName, Str public ApplicationId getApplicationId() { return id; } + + @Override + public ApplicationSubmissionContext getApplicationSubmissionContext() { + return new ApplicationSubmissionContextPBImpl(); + } @Override public RMAppState getState() { @@ -118,7 +125,9 @@ public void setName(String name) { public Map getAppAttempts() { Map attempts = new LinkedHashMap(); - attempts.put(attempt.getAppAttemptId(), attempt); + if(attempt != null) { + attempts.put(attempt.getAppAttemptId(), attempt); + } return attempts; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 5c766c55cd4..f944744f2d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -167,6 +169,9 @@ null, new ApplicationTokenSecretManager(conf), new RMContainerTokenSecretManager(conf), new ClientToAMTokenSecretManagerInRM()); + RMStateStore store = mock(RMStateStore.class); + ((RMContextImpl) rmContext).setStateStore(store); + scheduler = mock(YarnScheduler.class); masterService = mock(ApplicationMasterService.class); applicationMasterLauncher = mock(ApplicationMasterLauncher.class); @@ -295,6 +300,14 @@ private void testAppAttemptKilledState(Container amContainer, assertEquals(0, applicationAttempt.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); } + + /** + * {@link RMAppAttemptState#RECOVERED} + */ + private void testAppAttemptRecoveredState() { + assertEquals(RMAppAttemptState.RECOVERED, + applicationAttempt.getAppAttemptState()); + } /** * {@link RMAppAttemptState#SCHEDULED} @@ -438,6 +451,15 @@ private void scheduleApplicationAttempt() { new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.APP_ACCEPTED)); + + if(unmanagedAM){ + assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, + applicationAttempt.getAppAttemptState()); + applicationAttempt.handle( + new RMAppAttemptStoredEvent( + applicationAttempt.getAppAttemptId(), null)); + } + testAppAttemptScheduledState(); } @@ -463,6 +485,12 @@ private Container allocateApplicationAttempt() { applicationAttempt.getAppAttemptId(), container)); + assertEquals(RMAppAttemptState.ALLOCATED_SAVING, + applicationAttempt.getAppAttemptState()); + applicationAttempt.handle( + new RMAppAttemptStoredEvent( + applicationAttempt.getAppAttemptId(), null)); + testAppAttemptAllocatedState(container); return container; @@ -554,6 +582,15 @@ public void testNewToKilled() { testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS); } + @Test + public void testNewToRecovered() { + applicationAttempt.handle( + new RMAppAttemptEvent( + applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.RECOVER)); + testAppAttemptRecoveredState(); + } + @Test public void testSubmittedToFailed() { submitApplicationAttempt(); @@ -604,7 +641,7 @@ public void testAllocatedToFailed() { diagnostics)); testAppAttemptFailedState(amContainer, diagnostics); } - + @Test public void testRunningToFailed() { Container amContainer = allocateApplicationAttempt(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index b4cc6b38377..04b365111ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -40,8 +39,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -76,8 +73,7 @@ public class TestCapacityScheduler { @Before public void setUp() throws Exception { - RMStateStore store = StoreFactory.getStore(new Configuration()); - resourceManager = new ResourceManager(store); + resourceManager = new ResourceManager(); CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(csConf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index cbad1564ffb..38ca88f23ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -29,8 +29,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.junit.Before; @@ -47,8 +45,7 @@ public void setup() throws IOException { Configuration conf = createConfiguration(); // All tests assume only one assignment per node update conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); - RMStateStore store = StoreFactory.getStore(conf); - ResourceManager resourceManager = new ResourceManager(store); + ResourceManager resourceManager = new ResourceManager(); resourceManager.init(conf); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 56d247ff3bb..468422f3ab0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -51,8 +51,6 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -102,8 +100,7 @@ public void setUp() throws IOException { Configuration conf = createConfiguration(); // All tests assume only one assignment per node update conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); - RMStateStore store = StoreFactory.getStore(conf); - resourceManager = new ResourceManager(store); + resourceManager = new ResourceManager(); resourceManager.init(conf); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java index 78f0145edf1..db777954ca7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java @@ -27,8 +27,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.junit.After; import org.junit.Before; @@ -50,8 +48,7 @@ public void setUp() throws IOException { // All tests assume only one assignment per node update conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); - RMStateStore store = StoreFactory.getStore(conf); - resourceManager = new ResourceManager(store); + resourceManager = new ResourceManager(); resourceManager.init(conf); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 53251191190..b18a28957e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -38,8 +38,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -59,8 +57,7 @@ public class TestFifoScheduler { @Before public void setUp() throws Exception { - RMStateStore store = StoreFactory.getStore(new Configuration()); - resourceManager = new ResourceManager(store); + resourceManager = new ResourceManager(); Configuration conf = new Configuration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 1c4f2cfb72b..1bb4dea0dde 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -48,8 +48,6 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.CompositeService; @@ -154,8 +152,7 @@ public synchronized void start() { getConfig().set(YarnConfiguration.RM_WEBAPP_ADDRESS, MiniYARNCluster.getHostname() + ":0"); } - RMStateStore store = StoreFactory.getStore(getConfig()); - resourceManager = new ResourceManager(store) { + resourceManager = new ResourceManager() { @Override protected void doSecureLogin() throws IOException { // Don't try to login using keytab in the testcase. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java index 9b6024ce3c0..7bd1ff2032d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java @@ -47,7 +47,7 @@ public void testNMUpdation() throws Exception { // intervene final DrainDispatcher dispatcher = new DrainDispatcher(); - ResourceManager rm = new ResourceManager(null) { + ResourceManager rm = new ResourceManager() { @Override protected void doSecureLogin() throws IOException { // Do nothing.