YARN-4209. RMStateStore FENCED state doesn’t work due to updateFencedState called by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks)
This commit is contained in:
parent
9f6ed41b95
commit
a231075964
|
@ -94,6 +94,9 @@ Release 2.7.2 - UNRELEASED
|
||||||
YARN-3619. ContainerMetrics unregisters during getMetrics and leads to
|
YARN-3619. ContainerMetrics unregisters during getMetrics and leads to
|
||||||
ConcurrentModificationException (Zhihai Xu via jlowe)
|
ConcurrentModificationException (Zhihai Xu via jlowe)
|
||||||
|
|
||||||
|
YARN-4209. RMStateStore FENCED state doesn’t work due to updateFencedState called
|
||||||
|
by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks)
|
||||||
|
|
||||||
Release 2.7.1 - 2015-07-06
|
Release 2.7.1 - 2015-07-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||||
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
||||||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||||
import org.apache.hadoop.yarn.state.StateMachine;
|
import org.apache.hadoop.yarn.state.StateMachine;
|
||||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||||
|
|
||||||
|
@ -96,7 +96,10 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
|
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
|
||||||
|
|
||||||
private enum RMStateStoreState {
|
/**
|
||||||
|
* The enum defines state of RMStateStore.
|
||||||
|
*/
|
||||||
|
public enum RMStateStoreState {
|
||||||
ACTIVE,
|
ACTIVE,
|
||||||
FENCED
|
FENCED
|
||||||
};
|
};
|
||||||
|
@ -110,34 +113,47 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
RMStateStoreEventType,
|
RMStateStoreEventType,
|
||||||
RMStateStoreEvent>(
|
RMStateStoreEvent>(
|
||||||
RMStateStoreState.ACTIVE)
|
RMStateStoreState.ACTIVE)
|
||||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
.addTransition(RMStateStoreState.ACTIVE,
|
||||||
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
RMStateStoreEventType.STORE_APP, new StoreAppTransition())
|
RMStateStoreEventType.STORE_APP, new StoreAppTransition())
|
||||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
.addTransition(RMStateStoreState.ACTIVE,
|
||||||
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
|
RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
|
||||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
.addTransition(RMStateStoreState.ACTIVE,
|
||||||
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
|
RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
|
||||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
.addTransition(RMStateStoreState.ACTIVE,
|
||||||
RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
RMStateStoreEventType.STORE_APP_ATTEMPT,
|
||||||
RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition())
|
new StoreAppAttemptTransition())
|
||||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
.addTransition(RMStateStoreState.ACTIVE,
|
||||||
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
|
RMStateStoreEventType.UPDATE_APP_ATTEMPT,
|
||||||
|
new UpdateAppAttemptTransition())
|
||||||
|
.addTransition(RMStateStoreState.ACTIVE,
|
||||||
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
RMStateStoreEventType.STORE_MASTERKEY,
|
RMStateStoreEventType.STORE_MASTERKEY,
|
||||||
new StoreRMDTMasterKeyTransition())
|
new StoreRMDTMasterKeyTransition())
|
||||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
.addTransition(RMStateStoreState.ACTIVE,
|
||||||
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
RMStateStoreEventType.REMOVE_MASTERKEY,
|
RMStateStoreEventType.REMOVE_MASTERKEY,
|
||||||
new RemoveRMDTMasterKeyTransition())
|
new RemoveRMDTMasterKeyTransition())
|
||||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
.addTransition(RMStateStoreState.ACTIVE,
|
||||||
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
RMStateStoreEventType.STORE_DELEGATION_TOKEN,
|
RMStateStoreEventType.STORE_DELEGATION_TOKEN,
|
||||||
new StoreRMDTTransition())
|
new StoreRMDTTransition())
|
||||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
.addTransition(RMStateStoreState.ACTIVE,
|
||||||
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
|
RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
|
||||||
new RemoveRMDTTransition())
|
new RemoveRMDTTransition())
|
||||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
.addTransition(RMStateStoreState.ACTIVE,
|
||||||
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
|
RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
|
||||||
new UpdateRMDTTransition())
|
new UpdateRMDTTransition())
|
||||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
.addTransition(RMStateStoreState.ACTIVE,
|
||||||
RMStateStoreEventType.UPDATE_AMRM_TOKEN,
|
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
|
||||||
new StoreOrUpdateAMRMTokenTransition())
|
RMStateStoreEventType.UPDATE_AMRM_TOKEN,
|
||||||
|
new StoreOrUpdateAMRMTokenTransition())
|
||||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
|
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
|
||||||
RMStateStoreEventType.FENCED)
|
RMStateStoreEventType.FENCED)
|
||||||
.addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED,
|
.addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED,
|
||||||
|
@ -160,14 +176,17 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
RMStateStoreEvent> stateMachine;
|
RMStateStoreEvent> stateMachine;
|
||||||
|
|
||||||
private static class StoreAppTransition
|
private static class StoreAppTransition
|
||||||
implements SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
implements MultipleArcTransition<RMStateStore, RMStateStoreEvent,
|
||||||
|
RMStateStoreState> {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
public RMStateStoreState transition(RMStateStore store,
|
||||||
|
RMStateStoreEvent event) {
|
||||||
if (!(event instanceof RMStateStoreAppEvent)) {
|
if (!(event instanceof RMStateStoreAppEvent)) {
|
||||||
// should never happen
|
// should never happen
|
||||||
LOG.error("Illegal event type: " + event.getClass());
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
return;
|
return RMStateStoreState.ACTIVE;
|
||||||
}
|
}
|
||||||
|
boolean isFenced = false;
|
||||||
ApplicationStateData appState =
|
ApplicationStateData appState =
|
||||||
((RMStateStoreAppEvent) event).getAppState();
|
((RMStateStoreAppEvent) event).getAppState();
|
||||||
ApplicationId appId =
|
ApplicationId appId =
|
||||||
|
@ -179,20 +198,24 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
RMAppEventType.APP_NEW_SAVED));
|
RMAppEventType.APP_NEW_SAVED));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error storing app: " + appId, e);
|
LOG.error("Error storing app: " + appId, e);
|
||||||
store.notifyStoreOperationFailed(e);
|
isFenced = store.notifyStoreOperationFailedInternal(e);
|
||||||
}
|
}
|
||||||
|
return finalState(isFenced);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class UpdateAppTransition implements
|
private static class UpdateAppTransition implements
|
||||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
|
||||||
|
RMStateStoreState> {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
public RMStateStoreState transition(RMStateStore store,
|
||||||
|
RMStateStoreEvent event) {
|
||||||
if (!(event instanceof RMStateUpdateAppEvent)) {
|
if (!(event instanceof RMStateUpdateAppEvent)) {
|
||||||
// should never happen
|
// should never happen
|
||||||
LOG.error("Illegal event type: " + event.getClass());
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
return;
|
return RMStateStoreState.ACTIVE;
|
||||||
}
|
}
|
||||||
|
boolean isFenced = false;
|
||||||
ApplicationStateData appState =
|
ApplicationStateData appState =
|
||||||
((RMStateUpdateAppEvent) event).getAppState();
|
((RMStateUpdateAppEvent) event).getAppState();
|
||||||
ApplicationId appId =
|
ApplicationId appId =
|
||||||
|
@ -201,23 +224,27 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
try {
|
try {
|
||||||
store.updateApplicationStateInternal(appId, appState);
|
store.updateApplicationStateInternal(appId, appState);
|
||||||
store.notifyApplication(new RMAppEvent(appId,
|
store.notifyApplication(new RMAppEvent(appId,
|
||||||
RMAppEventType.APP_UPDATE_SAVED));
|
RMAppEventType.APP_UPDATE_SAVED));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error updating app: " + appId, e);
|
LOG.error("Error updating app: " + appId, e);
|
||||||
store.notifyStoreOperationFailed(e);
|
isFenced = store.notifyStoreOperationFailedInternal(e);
|
||||||
}
|
}
|
||||||
|
return finalState(isFenced);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class RemoveAppTransition implements
|
private static class RemoveAppTransition implements
|
||||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
|
||||||
|
RMStateStoreState> {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
public RMStateStoreState transition(RMStateStore store,
|
||||||
|
RMStateStoreEvent event) {
|
||||||
if (!(event instanceof RMStateStoreRemoveAppEvent)) {
|
if (!(event instanceof RMStateStoreRemoveAppEvent)) {
|
||||||
// should never happen
|
// should never happen
|
||||||
LOG.error("Illegal event type: " + event.getClass());
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
return;
|
return RMStateStoreState.ACTIVE;
|
||||||
}
|
}
|
||||||
|
boolean isFenced = false;
|
||||||
ApplicationStateData appState =
|
ApplicationStateData appState =
|
||||||
((RMStateStoreRemoveAppEvent) event).getAppState();
|
((RMStateStoreRemoveAppEvent) event).getAppState();
|
||||||
ApplicationId appId =
|
ApplicationId appId =
|
||||||
|
@ -227,20 +254,24 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
store.removeApplicationStateInternal(appState);
|
store.removeApplicationStateInternal(appState);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error removing app: " + appId, e);
|
LOG.error("Error removing app: " + appId, e);
|
||||||
store.notifyStoreOperationFailed(e);
|
isFenced = store.notifyStoreOperationFailedInternal(e);
|
||||||
}
|
}
|
||||||
|
return finalState(isFenced);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class StoreAppAttemptTransition implements
|
private static class StoreAppAttemptTransition implements
|
||||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
|
||||||
|
RMStateStoreState> {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
public RMStateStoreState transition(RMStateStore store,
|
||||||
|
RMStateStoreEvent event) {
|
||||||
if (!(event instanceof RMStateStoreAppAttemptEvent)) {
|
if (!(event instanceof RMStateStoreAppAttemptEvent)) {
|
||||||
// should never happen
|
// should never happen
|
||||||
LOG.error("Illegal event type: " + event.getClass());
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
return;
|
return RMStateStoreState.ACTIVE;
|
||||||
}
|
}
|
||||||
|
boolean isFenced = false;
|
||||||
ApplicationAttemptStateData attemptState =
|
ApplicationAttemptStateData attemptState =
|
||||||
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
|
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
|
||||||
try {
|
try {
|
||||||
|
@ -254,20 +285,24 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
|
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
|
LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
|
||||||
store.notifyStoreOperationFailed(e);
|
isFenced = store.notifyStoreOperationFailedInternal(e);
|
||||||
}
|
}
|
||||||
|
return finalState(isFenced);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class UpdateAppAttemptTransition implements
|
private static class UpdateAppAttemptTransition implements
|
||||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
|
||||||
|
RMStateStoreState> {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
public RMStateStoreState transition(RMStateStore store,
|
||||||
|
RMStateStoreEvent event) {
|
||||||
if (!(event instanceof RMStateUpdateAppAttemptEvent)) {
|
if (!(event instanceof RMStateUpdateAppAttemptEvent)) {
|
||||||
// should never happen
|
// should never happen
|
||||||
LOG.error("Illegal event type: " + event.getClass());
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
return;
|
return RMStateStoreState.ACTIVE;
|
||||||
}
|
}
|
||||||
|
boolean isFenced = false;
|
||||||
ApplicationAttemptStateData attemptState =
|
ApplicationAttemptStateData attemptState =
|
||||||
((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
|
((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
|
||||||
try {
|
try {
|
||||||
|
@ -281,20 +316,24 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
|
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
|
LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
|
||||||
store.notifyStoreOperationFailed(e);
|
isFenced = store.notifyStoreOperationFailedInternal(e);
|
||||||
}
|
}
|
||||||
|
return finalState(isFenced);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class StoreRMDTTransition implements
|
private static class StoreRMDTTransition implements
|
||||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
|
||||||
|
RMStateStoreState> {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
public RMStateStoreState transition(RMStateStore store,
|
||||||
|
RMStateStoreEvent event) {
|
||||||
if (!(event instanceof RMStateStoreRMDTEvent)) {
|
if (!(event instanceof RMStateStoreRMDTEvent)) {
|
||||||
// should never happen
|
// should never happen
|
||||||
LOG.error("Illegal event type: " + event.getClass());
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
return;
|
return RMStateStoreState.ACTIVE;
|
||||||
}
|
}
|
||||||
|
boolean isFenced = false;
|
||||||
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
||||||
try {
|
try {
|
||||||
LOG.info("Storing RMDelegationToken and SequenceNumber");
|
LOG.info("Storing RMDelegationToken and SequenceNumber");
|
||||||
|
@ -303,20 +342,24 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
|
LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
|
||||||
e);
|
e);
|
||||||
store.notifyStoreOperationFailed(e);
|
isFenced = store.notifyStoreOperationFailedInternal(e);
|
||||||
}
|
}
|
||||||
|
return finalState(isFenced);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class RemoveRMDTTransition implements
|
private static class RemoveRMDTTransition implements
|
||||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
|
||||||
|
RMStateStoreState> {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
public RMStateStoreState transition(RMStateStore store,
|
||||||
|
RMStateStoreEvent event) {
|
||||||
if (!(event instanceof RMStateStoreRMDTEvent)) {
|
if (!(event instanceof RMStateStoreRMDTEvent)) {
|
||||||
// should never happen
|
// should never happen
|
||||||
LOG.error("Illegal event type: " + event.getClass());
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
return;
|
return RMStateStoreState.ACTIVE;
|
||||||
}
|
}
|
||||||
|
boolean isFenced = false;
|
||||||
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
||||||
try {
|
try {
|
||||||
LOG.info("Removing RMDelegationToken and SequenceNumber");
|
LOG.info("Removing RMDelegationToken and SequenceNumber");
|
||||||
|
@ -324,21 +367,24 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error While Removing RMDelegationToken and SequenceNumber ",
|
LOG.error("Error While Removing RMDelegationToken and SequenceNumber ",
|
||||||
e);
|
e);
|
||||||
store.notifyStoreOperationFailed(e);
|
isFenced = store.notifyStoreOperationFailedInternal(e);
|
||||||
}
|
}
|
||||||
|
return finalState(isFenced);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class UpdateRMDTTransition implements
|
private static class UpdateRMDTTransition implements
|
||||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
|
||||||
|
RMStateStoreState> {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
public RMStateStoreState transition(RMStateStore store,
|
||||||
|
RMStateStoreEvent event) {
|
||||||
if (!(event instanceof RMStateStoreRMDTEvent)) {
|
if (!(event instanceof RMStateStoreRMDTEvent)) {
|
||||||
// should never happen
|
// should never happen
|
||||||
LOG.error("Illegal event type: " + event.getClass());
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
return;
|
return RMStateStoreState.ACTIVE;
|
||||||
}
|
}
|
||||||
|
boolean isFenced = false;
|
||||||
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
||||||
try {
|
try {
|
||||||
LOG.info("Updating RMDelegationToken and SequenceNumber");
|
LOG.info("Updating RMDelegationToken and SequenceNumber");
|
||||||
|
@ -347,20 +393,24 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error While Updating RMDelegationToken and SequenceNumber ",
|
LOG.error("Error While Updating RMDelegationToken and SequenceNumber ",
|
||||||
e);
|
e);
|
||||||
store.notifyStoreOperationFailed(e);
|
isFenced = store.notifyStoreOperationFailedInternal(e);
|
||||||
}
|
}
|
||||||
|
return finalState(isFenced);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class StoreRMDTMasterKeyTransition implements
|
private static class StoreRMDTMasterKeyTransition implements
|
||||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
|
||||||
|
RMStateStoreState> {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
public RMStateStoreState transition(RMStateStore store,
|
||||||
|
RMStateStoreEvent event) {
|
||||||
if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
|
if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
|
||||||
// should never happen
|
// should never happen
|
||||||
LOG.error("Illegal event type: " + event.getClass());
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
return;
|
return RMStateStoreState.ACTIVE;
|
||||||
}
|
}
|
||||||
|
boolean isFenced = false;
|
||||||
RMStateStoreRMDTMasterKeyEvent dtEvent =
|
RMStateStoreRMDTMasterKeyEvent dtEvent =
|
||||||
(RMStateStoreRMDTMasterKeyEvent) event;
|
(RMStateStoreRMDTMasterKeyEvent) event;
|
||||||
try {
|
try {
|
||||||
|
@ -368,20 +418,24 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
store.storeRMDTMasterKeyState(dtEvent.getDelegationKey());
|
store.storeRMDTMasterKeyState(dtEvent.getDelegationKey());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error While Storing RMDTMasterKey.", e);
|
LOG.error("Error While Storing RMDTMasterKey.", e);
|
||||||
store.notifyStoreOperationFailed(e);
|
isFenced = store.notifyStoreOperationFailedInternal(e);
|
||||||
}
|
}
|
||||||
|
return finalState(isFenced);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class RemoveRMDTMasterKeyTransition implements
|
private static class RemoveRMDTMasterKeyTransition implements
|
||||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
|
||||||
|
RMStateStoreState> {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
public RMStateStoreState transition(RMStateStore store,
|
||||||
|
RMStateStoreEvent event) {
|
||||||
if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
|
if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
|
||||||
// should never happen
|
// should never happen
|
||||||
LOG.error("Illegal event type: " + event.getClass());
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
return;
|
return RMStateStoreState.ACTIVE;
|
||||||
}
|
}
|
||||||
|
boolean isFenced = false;
|
||||||
RMStateStoreRMDTMasterKeyEvent dtEvent =
|
RMStateStoreRMDTMasterKeyEvent dtEvent =
|
||||||
(RMStateStoreRMDTMasterKeyEvent) event;
|
(RMStateStoreRMDTMasterKeyEvent) event;
|
||||||
try {
|
try {
|
||||||
|
@ -389,33 +443,41 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
store.removeRMDTMasterKeyState(dtEvent.getDelegationKey());
|
store.removeRMDTMasterKeyState(dtEvent.getDelegationKey());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error While Removing RMDTMasterKey.", e);
|
LOG.error("Error While Removing RMDTMasterKey.", e);
|
||||||
store.notifyStoreOperationFailed(e);
|
isFenced = store.notifyStoreOperationFailedInternal(e);
|
||||||
}
|
}
|
||||||
|
return finalState(isFenced);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class StoreOrUpdateAMRMTokenTransition implements
|
private static class StoreOrUpdateAMRMTokenTransition implements
|
||||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
MultipleArcTransition<RMStateStore, RMStateStoreEvent,
|
||||||
|
RMStateStoreState> {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
public RMStateStoreState transition(RMStateStore store,
|
||||||
|
RMStateStoreEvent event) {
|
||||||
if (!(event instanceof RMStateStoreAMRMTokenEvent)) {
|
if (!(event instanceof RMStateStoreAMRMTokenEvent)) {
|
||||||
// should never happen
|
// should never happen
|
||||||
LOG.error("Illegal event type: " + event.getClass());
|
LOG.error("Illegal event type: " + event.getClass());
|
||||||
return;
|
return RMStateStoreState.ACTIVE;
|
||||||
}
|
}
|
||||||
RMStateStoreAMRMTokenEvent amrmEvent = (RMStateStoreAMRMTokenEvent) event;
|
RMStateStoreAMRMTokenEvent amrmEvent = (RMStateStoreAMRMTokenEvent) event;
|
||||||
|
boolean isFenced = false;
|
||||||
try {
|
try {
|
||||||
LOG.info("Updating AMRMToken");
|
LOG.info("Updating AMRMToken");
|
||||||
store.storeOrUpdateAMRMTokenSecretManagerState(
|
store.storeOrUpdateAMRMTokenSecretManagerState(
|
||||||
amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate());
|
amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error storing info for AMRMTokenSecretManager", e);
|
LOG.error("Error storing info for AMRMTokenSecretManager", e);
|
||||||
store.notifyStoreOperationFailed(e);
|
isFenced = store.notifyStoreOperationFailedInternal(e);
|
||||||
}
|
}
|
||||||
|
return finalState(isFenced);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static RMStateStoreState finalState(boolean isFenced) {
|
||||||
|
return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE;
|
||||||
|
}
|
||||||
|
|
||||||
public RMStateStore() {
|
public RMStateStore() {
|
||||||
super(RMStateStore.class.getName());
|
super(RMStateStore.class.getName());
|
||||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
@ -850,17 +912,28 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
/**
|
/**
|
||||||
* This method is called to notify the ResourceManager that the store
|
* This method is called to notify the ResourceManager that the store
|
||||||
* operation has failed.
|
* operation has failed.
|
||||||
* @param failureCause the exception due to which the operation failed
|
* @param failureCause the exception due to which the operation failed
|
||||||
*/
|
*/
|
||||||
protected void notifyStoreOperationFailed(Exception failureCause) {
|
protected void notifyStoreOperationFailed(Exception failureCause) {
|
||||||
|
if (isFencedState()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (notifyStoreOperationFailedInternal(failureCause)) {
|
||||||
|
updateFencedState();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private boolean notifyStoreOperationFailedInternal(
|
||||||
|
Exception failureCause) {
|
||||||
|
boolean isFenced = false;
|
||||||
LOG.error("State store operation failed ", failureCause);
|
LOG.error("State store operation failed ", failureCause);
|
||||||
if (HAUtil.isHAEnabled(getConfig())) {
|
if (HAUtil.isHAEnabled(getConfig())) {
|
||||||
LOG.warn("State-store fenced ! Transitioning RM to standby");
|
LOG.warn("State-store fenced ! Transitioning RM to standby");
|
||||||
updateFencedState();
|
isFenced = true;
|
||||||
Thread standByTransitionThread =
|
Thread standByTransitionThread =
|
||||||
new Thread(new StandByTransitionThread());
|
new Thread(new StandByTransitionThread());
|
||||||
standByTransitionThread.setName("StandByTransitionThread Handler");
|
standByTransitionThread.setName("StandByTransitionThread Handler");
|
||||||
|
@ -873,6 +946,7 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Skip the state-store error.");
|
LOG.warn("Skip the state-store error.");
|
||||||
}
|
}
|
||||||
|
return isFenced;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
|
@ -0,0 +1,65 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
public class TestMemoryRMStateStore {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNotifyStoreOperationFailed() throws Exception {
|
||||||
|
RMStateStore store = new MemoryRMStateStore() {
|
||||||
|
@Override
|
||||||
|
public synchronized void removeRMDelegationTokenState(
|
||||||
|
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
|
||||||
|
throw new Exception("testNotifyStoreOperationFailed");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
|
store.init(conf);
|
||||||
|
ResourceManager mockRM = mock(ResourceManager.class);
|
||||||
|
store.setResourceManager(mockRM);
|
||||||
|
RMDelegationTokenIdentifier mockTokenId =
|
||||||
|
mock(RMDelegationTokenIdentifier.class);
|
||||||
|
store.removeRMDelegationToken(mockTokenId);
|
||||||
|
assertTrue("RMStateStore should have been in fenced state",
|
||||||
|
store.isFencedState());
|
||||||
|
store = new MemoryRMStateStore() {
|
||||||
|
@Override
|
||||||
|
public synchronized void removeRMDelegationToken(
|
||||||
|
RMDelegationTokenIdentifier rmDTIdentifier) {
|
||||||
|
notifyStoreOperationFailed(new Exception(
|
||||||
|
"testNotifyStoreOperationFailed"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
store.init(conf);
|
||||||
|
store.setResourceManager(mockRM);
|
||||||
|
store.removeRMDelegationToken(mockTokenId);
|
||||||
|
assertTrue("RMStateStore should have been in fenced state",
|
||||||
|
store.isFencedState());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue