From a231075964a46f007264051bbfa6b19c36c8b2c4 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Wed, 7 Oct 2015 09:35:10 +0530 Subject: [PATCH] =?UTF-8?q?YARN-4209.=20RMStateStore=20FENCED=20state=20do?= =?UTF-8?q?esn=E2=80=99t=20work=20due=20to=20updateFencedState=20called=20?= =?UTF-8?q?by=20stateMachine.doTransition.=20(Zhihai=20Xu=20via=20rohithsh?= =?UTF-8?q?armaks)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hadoop-yarn-project/CHANGES.txt | 3 + .../recovery/RMStateStore.java | 216 ++++++++++++------ .../recovery/TestMemoryRMStateStore.java | 65 ++++++ 3 files changed, 213 insertions(+), 71 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 901109f80cf..f08676b04f4 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -94,6 +94,9 @@ Release 2.7.2 - UNRELEASED YARN-3619. ContainerMetrics unregisters during getMetrics and leads to ConcurrentModificationException (Zhihai Xu via jlowe) + YARN-4209. RMStateStore FENCED state doesn’t work due to updateFencedState called + by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 071b5c66b88..109853bc568 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; -import org.apache.hadoop.yarn.state.SingleArcTransition; +import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -96,7 +96,10 @@ public abstract class RMStateStore extends AbstractService { public static final Log LOG = LogFactory.getLog(RMStateStore.class); - private enum RMStateStoreState { + /** + * The enum defines state of RMStateStore. + */ + public enum RMStateStoreState { ACTIVE, FENCED }; @@ -110,34 +113,47 @@ public abstract class RMStateStore extends AbstractService { RMStateStoreEventType, RMStateStoreEvent>( RMStateStoreState.ACTIVE) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.STORE_APP, new StoreAppTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, - RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, - RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.STORE_APP_ATTEMPT, + new StoreAppAttemptTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.UPDATE_APP_ATTEMPT, + new UpdateAppAttemptTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.STORE_MASTERKEY, - new StoreRMDTMasterKeyTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + new StoreRMDTMasterKeyTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.REMOVE_MASTERKEY, - new RemoveRMDTMasterKeyTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + new RemoveRMDTMasterKeyTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.STORE_DELEGATION_TOKEN, - new StoreRMDTTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + new StoreRMDTTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.REMOVE_DELEGATION_TOKEN, - new RemoveRMDTTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + new RemoveRMDTTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.UPDATE_DELEGATION_TOKEN, - new UpdateRMDTTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, - RMStateStoreEventType.UPDATE_AMRM_TOKEN, - new StoreOrUpdateAMRMTokenTransition()) + new UpdateRMDTTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.UPDATE_AMRM_TOKEN, + new StoreOrUpdateAMRMTokenTransition()) .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED, RMStateStoreEventType.FENCED) .addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED, @@ -160,14 +176,17 @@ public abstract class RMStateStore extends AbstractService { RMStateStoreEvent> stateMachine; private static class StoreAppTransition - implements SingleArcTransition { + implements MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreAppEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; ApplicationStateData appState = ((RMStateStoreAppEvent) event).getAppState(); ApplicationId appId = @@ -179,20 +198,24 @@ public abstract class RMStateStore extends AbstractService { RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing app: " + appId, e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); }; } private static class UpdateAppTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateUpdateAppEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; ApplicationStateData appState = ((RMStateUpdateAppEvent) event).getAppState(); ApplicationId appId = @@ -201,23 +224,27 @@ public abstract class RMStateStore extends AbstractService { try { store.updateApplicationStateInternal(appId, appState); store.notifyApplication(new RMAppEvent(appId, - RMAppEventType.APP_UPDATE_SAVED)); + RMAppEventType.APP_UPDATE_SAVED)); } catch (Exception e) { LOG.error("Error updating app: " + appId, e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); }; } private static class RemoveAppTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreRemoveAppEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; ApplicationStateData appState = ((RMStateStoreRemoveAppEvent) event).getAppState(); ApplicationId appId = @@ -227,20 +254,24 @@ public abstract class RMStateStore extends AbstractService { store.removeApplicationStateInternal(appState); } catch (Exception e) { LOG.error("Error removing app: " + appId, e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); }; } private static class StoreAppAttemptTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreAppAttemptEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; ApplicationAttemptStateData attemptState = ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); try { @@ -254,20 +285,24 @@ public abstract class RMStateStore extends AbstractService { RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); }; } private static class UpdateAppAttemptTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateUpdateAppAttemptEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; ApplicationAttemptStateData attemptState = ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); try { @@ -281,20 +316,24 @@ public abstract class RMStateStore extends AbstractService { RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); } catch (Exception e) { LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); }; } private static class StoreRMDTTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreRMDTEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Storing RMDelegationToken and SequenceNumber"); @@ -303,20 +342,24 @@ public abstract class RMStateStore extends AbstractService { } catch (Exception e) { LOG.error("Error While Storing RMDelegationToken and SequenceNumber ", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } private static class RemoveRMDTTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreRMDTEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Removing RMDelegationToken and SequenceNumber"); @@ -324,21 +367,24 @@ public abstract class RMStateStore extends AbstractService { } catch (Exception e) { LOG.error("Error While Removing RMDelegationToken and SequenceNumber ", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } private static class UpdateRMDTTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreRMDTEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } - + boolean isFenced = false; RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Updating RMDelegationToken and SequenceNumber"); @@ -347,20 +393,24 @@ public abstract class RMStateStore extends AbstractService { } catch (Exception e) { LOG.error("Error While Updating RMDelegationToken and SequenceNumber ", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } private static class StoreRMDTMasterKeyTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; RMStateStoreRMDTMasterKeyEvent dtEvent = (RMStateStoreRMDTMasterKeyEvent) event; try { @@ -368,20 +418,24 @@ public abstract class RMStateStore extends AbstractService { store.storeRMDTMasterKeyState(dtEvent.getDelegationKey()); } catch (Exception e) { LOG.error("Error While Storing RMDTMasterKey.", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } private static class RemoveRMDTMasterKeyTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; RMStateStoreRMDTMasterKeyEvent dtEvent = (RMStateStoreRMDTMasterKeyEvent) event; try { @@ -389,33 +443,41 @@ public abstract class RMStateStore extends AbstractService { store.removeRMDTMasterKeyState(dtEvent.getDelegationKey()); } catch (Exception e) { LOG.error("Error While Removing RMDTMasterKey.", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } private static class StoreOrUpdateAMRMTokenTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreAMRMTokenEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } RMStateStoreAMRMTokenEvent amrmEvent = (RMStateStoreAMRMTokenEvent) event; - + boolean isFenced = false; try { LOG.info("Updating AMRMToken"); store.storeOrUpdateAMRMTokenSecretManagerState( amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate()); } catch (Exception e) { LOG.error("Error storing info for AMRMTokenSecretManager", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } + private static RMStateStoreState finalState(boolean isFenced) { + return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE; + } + public RMStateStore() { super(RMStateStore.class.getName()); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -850,17 +912,28 @@ public abstract class RMStateStore extends AbstractService { } } - @SuppressWarnings("unchecked") /** * This method is called to notify the ResourceManager that the store * operation has failed. * @param failureCause the exception due to which the operation failed */ protected void notifyStoreOperationFailed(Exception failureCause) { + if (isFencedState()) { + return; + } + if (notifyStoreOperationFailedInternal(failureCause)) { + updateFencedState(); + } + } + + @SuppressWarnings("unchecked") + private boolean notifyStoreOperationFailedInternal( + Exception failureCause) { + boolean isFenced = false; LOG.error("State store operation failed ", failureCause); if (HAUtil.isHAEnabled(getConfig())) { LOG.warn("State-store fenced ! Transitioning RM to standby"); - updateFencedState(); + isFenced = true; Thread standByTransitionThread = new Thread(new StandByTransitionThread()); standByTransitionThread.setName("StandByTransitionThread Handler"); @@ -873,6 +946,7 @@ public abstract class RMStateStore extends AbstractService { } else { LOG.warn("Skip the state-store error."); } + return isFenced; } @SuppressWarnings("unchecked") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java new file mode 100644 index 00000000000..89b9e2b46db --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class TestMemoryRMStateStore { + + @Test + public void testNotifyStoreOperationFailed() throws Exception { + RMStateStore store = new MemoryRMStateStore() { + @Override + public synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { + throw new Exception("testNotifyStoreOperationFailed"); + } + }; + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + store.init(conf); + ResourceManager mockRM = mock(ResourceManager.class); + store.setResourceManager(mockRM); + RMDelegationTokenIdentifier mockTokenId = + mock(RMDelegationTokenIdentifier.class); + store.removeRMDelegationToken(mockTokenId); + assertTrue("RMStateStore should have been in fenced state", + store.isFencedState()); + store = new MemoryRMStateStore() { + @Override + public synchronized void removeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier) { + notifyStoreOperationFailed(new Exception( + "testNotifyStoreOperationFailed")); + } + }; + store.init(conf); + store.setResourceManager(mockRM); + store.removeRMDelegationToken(mockTokenId); + assertTrue("RMStateStore should have been in fenced state", + store.isFencedState()); + } +}