diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9d12b827f39..de8187986f7 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1020,6 +1020,9 @@ Release 2.7.2 - UNRELEASED YARN-3619. ContainerMetrics unregisters during getMetrics and leads to ConcurrentModificationException (Zhihai Xu via jlowe) + YARN-4209. RMStateStore FENCED state doesn’t work due to updateFencedState called + by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index aa5caf96e9a..a2cf517d877 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -67,7 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; -import org.apache.hadoop.yarn.state.SingleArcTransition; +import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -100,7 +100,10 @@ public abstract class RMStateStore extends AbstractService { public static final Log LOG = LogFactory.getLog(RMStateStore.class); - private enum RMStateStoreState { + /** + * The enum defines state of RMStateStore. + */ + public enum RMStateStoreState { ACTIVE, FENCED }; @@ -114,41 +117,57 @@ public abstract class RMStateStore extends AbstractService { RMStateStoreEventType, RMStateStoreEvent>( RMStateStoreState.ACTIVE) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.STORE_APP, new StoreAppTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, - RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, - RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, - RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.STORE_APP_ATTEMPT, + new StoreAppAttemptTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.UPDATE_APP_ATTEMPT, + new UpdateAppAttemptTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.STORE_MASTERKEY, - new StoreRMDTMasterKeyTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + new StoreRMDTMasterKeyTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.REMOVE_MASTERKEY, - new RemoveRMDTMasterKeyTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + new RemoveRMDTMasterKeyTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.STORE_DELEGATION_TOKEN, - new StoreRMDTTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + new StoreRMDTTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.REMOVE_DELEGATION_TOKEN, - new RemoveRMDTTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + new RemoveRMDTTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.UPDATE_DELEGATION_TOKEN, - new UpdateRMDTTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, - RMStateStoreEventType.UPDATE_AMRM_TOKEN, - new StoreOrUpdateAMRMTokenTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + new UpdateRMDTTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.UPDATE_AMRM_TOKEN, + new StoreOrUpdateAMRMTokenTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.STORE_RESERVATION, new StoreReservationAllocationTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.UPDATE_RESERVATION, new UpdateReservationAllocationTransition()) - .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.REMOVE_RESERVATION, new RemoveReservationAllocationTransition()) .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED, @@ -176,14 +195,17 @@ public abstract class RMStateStore extends AbstractService { RMStateStoreEvent> stateMachine; private static class StoreAppTransition - implements SingleArcTransition { + implements MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreAppEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; ApplicationStateData appState = ((RMStateStoreAppEvent) event).getAppState(); ApplicationId appId = @@ -195,20 +217,24 @@ public abstract class RMStateStore extends AbstractService { RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing app: " + appId, e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); }; } private static class UpdateAppTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateUpdateAppEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; ApplicationStateData appState = ((RMStateUpdateAppEvent) event).getAppState(); ApplicationId appId = @@ -222,20 +248,24 @@ public abstract class RMStateStore extends AbstractService { } } catch (Exception e) { LOG.error("Error updating app: " + appId, e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); }; } private static class RemoveAppTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreRemoveAppEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; ApplicationStateData appState = ((RMStateStoreRemoveAppEvent) event).getAppState(); ApplicationId appId = @@ -245,20 +275,24 @@ public abstract class RMStateStore extends AbstractService { store.removeApplicationStateInternal(appState); } catch (Exception e) { LOG.error("Error removing app: " + appId, e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); }; } private static class StoreAppAttemptTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreAppAttemptEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; ApplicationAttemptStateData attemptState = ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); try { @@ -272,20 +306,24 @@ public abstract class RMStateStore extends AbstractService { RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); }; } private static class UpdateAppAttemptTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateUpdateAppAttemptEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; ApplicationAttemptStateData attemptState = ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); try { @@ -299,20 +337,24 @@ public abstract class RMStateStore extends AbstractService { RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); } catch (Exception e) { LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); }; } private static class StoreRMDTTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreRMDTEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Storing RMDelegationToken and SequenceNumber"); @@ -321,20 +363,24 @@ public abstract class RMStateStore extends AbstractService { } catch (Exception e) { LOG.error("Error While Storing RMDelegationToken and SequenceNumber ", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } private static class RemoveRMDTTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreRMDTEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Removing RMDelegationToken and SequenceNumber"); @@ -342,21 +388,24 @@ public abstract class RMStateStore extends AbstractService { } catch (Exception e) { LOG.error("Error While Removing RMDelegationToken and SequenceNumber ", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } private static class UpdateRMDTTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreRMDTEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } - + boolean isFenced = false; RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Updating RMDelegationToken and SequenceNumber"); @@ -365,20 +414,24 @@ public abstract class RMStateStore extends AbstractService { } catch (Exception e) { LOG.error("Error While Updating RMDelegationToken and SequenceNumber ", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } private static class StoreRMDTMasterKeyTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; RMStateStoreRMDTMasterKeyEvent dtEvent = (RMStateStoreRMDTMasterKeyEvent) event; try { @@ -386,20 +439,24 @@ public abstract class RMStateStore extends AbstractService { store.storeRMDTMasterKeyState(dtEvent.getDelegationKey()); } catch (Exception e) { LOG.error("Error While Storing RMDTMasterKey.", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } private static class RemoveRMDTMasterKeyTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; RMStateStoreRMDTMasterKeyEvent dtEvent = (RMStateStoreRMDTMasterKeyEvent) event; try { @@ -407,42 +464,49 @@ public abstract class RMStateStore extends AbstractService { store.removeRMDTMasterKeyState(dtEvent.getDelegationKey()); } catch (Exception e) { LOG.error("Error While Removing RMDTMasterKey.", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } private static class StoreOrUpdateAMRMTokenTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreAMRMTokenEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } RMStateStoreAMRMTokenEvent amrmEvent = (RMStateStoreAMRMTokenEvent) event; - + boolean isFenced = false; try { LOG.info("Updating AMRMToken"); store.storeOrUpdateAMRMTokenSecretManagerState( amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate()); } catch (Exception e) { LOG.error("Error storing info for AMRMTokenSecretManager", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } private static class StoreReservationAllocationTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreStoreReservationEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; RMStateStoreStoreReservationEvent reservationEvent = (RMStateStoreStoreReservationEvent) event; try { @@ -454,20 +518,24 @@ public abstract class RMStateStore extends AbstractService { reservationEvent.getReservationIdName()); } catch (Exception e) { LOG.error("Error while storing reservation allocation.", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } private static class UpdateReservationAllocationTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreStoreReservationEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; RMStateStoreStoreReservationEvent reservationEvent = (RMStateStoreStoreReservationEvent) event; try { @@ -479,20 +547,24 @@ public abstract class RMStateStore extends AbstractService { reservationEvent.getReservationIdName()); } catch (Exception e) { LOG.error("Error while updating reservation allocation.", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } private static class RemoveReservationAllocationTransition implements - SingleArcTransition { + MultipleArcTransition { @Override - public void transition(RMStateStore store, RMStateStoreEvent event) { + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { if (!(event instanceof RMStateStoreStoreReservationEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); - return; + return RMStateStoreState.ACTIVE; } + boolean isFenced = false; RMStateStoreStoreReservationEvent reservationEvent = (RMStateStoreStoreReservationEvent) event; try { @@ -503,11 +575,16 @@ public abstract class RMStateStore extends AbstractService { reservationEvent.getReservationIdName()); } catch (Exception e) { LOG.error("Error while removing reservation allocation.", e); - store.notifyStoreOperationFailed(e); + isFenced = store.notifyStoreOperationFailedInternal(e); } + return finalState(isFenced); } } + private static RMStateStoreState finalState(boolean isFenced) { + return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE; + } + public RMStateStore() { super(RMStateStore.class.getName()); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -1006,17 +1083,28 @@ public abstract class RMStateStore extends AbstractService { } } - @SuppressWarnings("unchecked") /** * This method is called to notify the ResourceManager that the store * operation has failed. * @param failureCause the exception due to which the operation failed */ protected void notifyStoreOperationFailed(Exception failureCause) { + if (isFencedState()) { + return; + } + if (notifyStoreOperationFailedInternal(failureCause)) { + updateFencedState(); + } + } + + @SuppressWarnings("unchecked") + private boolean notifyStoreOperationFailedInternal( + Exception failureCause) { + boolean isFenced = false; LOG.error("State store operation failed ", failureCause); if (HAUtil.isHAEnabled(getConfig())) { LOG.warn("State-store fenced ! Transitioning RM to standby"); - updateFencedState(); + isFenced = true; Thread standByTransitionThread = new Thread(new StandByTransitionThread()); standByTransitionThread.setName("StandByTransitionThread Handler"); @@ -1029,6 +1117,7 @@ public abstract class RMStateStore extends AbstractService { } else { LOG.warn("Skip the state-store error."); } + return isFenced; } @SuppressWarnings("unchecked") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java new file mode 100644 index 00000000000..89b9e2b46db --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestMemoryRMStateStore.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class TestMemoryRMStateStore { + + @Test + public void testNotifyStoreOperationFailed() throws Exception { + RMStateStore store = new MemoryRMStateStore() { + @Override + public synchronized void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { + throw new Exception("testNotifyStoreOperationFailed"); + } + }; + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + store.init(conf); + ResourceManager mockRM = mock(ResourceManager.class); + store.setResourceManager(mockRM); + RMDelegationTokenIdentifier mockTokenId = + mock(RMDelegationTokenIdentifier.class); + store.removeRMDelegationToken(mockTokenId); + assertTrue("RMStateStore should have been in fenced state", + store.isFencedState()); + store = new MemoryRMStateStore() { + @Override + public synchronized void removeRMDelegationToken( + RMDelegationTokenIdentifier rmDTIdentifier) { + notifyStoreOperationFailed(new Exception( + "testNotifyStoreOperationFailed")); + } + }; + store.init(conf); + store.setResourceManager(mockRM); + store.removeRMDelegationToken(mockTokenId); + assertTrue("RMStateStore should have been in fenced state", + store.isFencedState()); + } +}