YARN-2946. Fixed potential deadlock in RMStateStore. Contributed by Rohith Sharmaks
(cherry picked from commit 4f18018b7a
)
This commit is contained in:
parent
f02bd6683a
commit
60530a6c4f
|
@ -249,6 +249,9 @@ Release 2.7.0 - UNRELEASED
|
|||
YARN-2937. Fixed new findbugs warnings in hadoop-yarn-nodemanager. (Varun Saxena
|
||||
via zjshen)
|
||||
|
||||
YARN-2946. Fixed potential deadlock in RMStateStore. (Rohith Sharmaks via
|
||||
jianhe)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -624,22 +624,17 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||
|
||||
@Override
|
||||
public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
|
||||
AMRMTokenSecretManagerState amrmTokenSecretManagerState,
|
||||
boolean isUpdate){
|
||||
AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
|
||||
throws Exception {
|
||||
Path nodeCreatePath =
|
||||
getNodePath(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
|
||||
AMRMTokenSecretManagerState data =
|
||||
AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
|
||||
byte[] stateData = data.getProto().toByteArray();
|
||||
try {
|
||||
if (isUpdate) {
|
||||
updateFile(nodeCreatePath, stateData);
|
||||
} else {
|
||||
writeFile(nodeCreatePath, stateData);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.info("Error storing info for AMRMTokenSecretManager", ex);
|
||||
notifyStoreOperationFailed(ex);
|
||||
if (isUpdate) {
|
||||
updateFile(nodeCreatePath, stateData);
|
||||
} else {
|
||||
writeFile(nodeCreatePath, stateData);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -641,11 +641,7 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|||
AMRMTokenSecretManagerState data =
|
||||
AMRMTokenSecretManagerState.newInstance(state);
|
||||
byte[] stateData = data.getProto().toByteArray();
|
||||
try {
|
||||
db.put(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT), stateData);
|
||||
} catch (DBException e) {
|
||||
notifyStoreOperationFailed(e);
|
||||
}
|
||||
db.put(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT), stateData);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,6 +24,9 @@ import java.util.HashSet;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
|
@ -86,6 +89,8 @@ public abstract class RMStateStore extends AbstractService {
|
|||
protected static final String VERSION_NODE = "RMVersionNode";
|
||||
protected static final String EPOCH_NODE = "EpochNode";
|
||||
private ResourceManager resourceManager;
|
||||
private final ReadLock readLock;
|
||||
private final WriteLock writeLock;
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
|
||||
|
||||
|
@ -113,6 +118,24 @@ public abstract class RMStateStore extends AbstractService {
|
|||
RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
||||
RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition())
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
||||
RMStateStoreEventType.STORE_MASTERKEY,
|
||||
new StoreRMDTMasterKeyTransition())
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
||||
RMStateStoreEventType.REMOVE_MASTERKEY,
|
||||
new RemoveRMDTMasterKeyTransition())
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
||||
RMStateStoreEventType.STORE_DELEGATION_TOKEN,
|
||||
new StoreRMDTTransition())
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
||||
RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
|
||||
new RemoveRMDTTransition())
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
||||
RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
|
||||
new UpdateRMDTTransition())
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
||||
RMStateStoreEventType.UPDATE_AMRM_TOKEN,
|
||||
new StoreOrUpdateAMRMTokenTransition())
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
|
||||
RMStateStoreEventType.FENCED)
|
||||
.addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED,
|
||||
|
@ -122,7 +145,13 @@ public abstract class RMStateStore extends AbstractService {
|
|||
RMStateStoreEventType.REMOVE_APP,
|
||||
RMStateStoreEventType.STORE_APP_ATTEMPT,
|
||||
RMStateStoreEventType.UPDATE_APP_ATTEMPT,
|
||||
RMStateStoreEventType.FENCED));
|
||||
RMStateStoreEventType.FENCED,
|
||||
RMStateStoreEventType.STORE_MASTERKEY,
|
||||
RMStateStoreEventType.REMOVE_MASTERKEY,
|
||||
RMStateStoreEventType.STORE_DELEGATION_TOKEN,
|
||||
RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
|
||||
RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
|
||||
RMStateStoreEventType.UPDATE_AMRM_TOKEN));
|
||||
|
||||
private final StateMachine<RMStateStoreState,
|
||||
RMStateStoreEventType,
|
||||
|
@ -255,8 +284,143 @@ public abstract class RMStateStore extends AbstractService {
|
|||
};
|
||||
}
|
||||
|
||||
private static class StoreRMDTTransition implements
|
||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
||||
@Override
|
||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||
if (!(event instanceof RMStateStoreRMDTEvent)) {
|
||||
// should never happen
|
||||
LOG.error("Illegal event type: " + event.getClass());
|
||||
return;
|
||||
}
|
||||
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
||||
try {
|
||||
LOG.info("Storing RMDelegationToken and SequenceNumber");
|
||||
store.storeRMDelegationTokenAndSequenceNumberState(
|
||||
dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate(),
|
||||
dtEvent.getLatestSequenceNumber());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
|
||||
e);
|
||||
store.notifyStoreOperationFailed(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class RemoveRMDTTransition implements
|
||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
||||
@Override
|
||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||
if (!(event instanceof RMStateStoreRMDTEvent)) {
|
||||
// should never happen
|
||||
LOG.error("Illegal event type: " + event.getClass());
|
||||
return;
|
||||
}
|
||||
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
||||
try {
|
||||
LOG.info("Removing RMDelegationToken and SequenceNumber");
|
||||
store.removeRMDelegationTokenState(dtEvent.getRmDTIdentifier());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error While Removing RMDelegationToken and SequenceNumber ",
|
||||
e);
|
||||
store.notifyStoreOperationFailed(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class UpdateRMDTTransition implements
|
||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
||||
@Override
|
||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||
if (!(event instanceof RMStateStoreRMDTEvent)) {
|
||||
// should never happen
|
||||
LOG.error("Illegal event type: " + event.getClass());
|
||||
return;
|
||||
}
|
||||
|
||||
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
|
||||
try {
|
||||
LOG.info("Updating RMDelegationToken and SequenceNumber");
|
||||
store.updateRMDelegationTokenAndSequenceNumberInternal(
|
||||
dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate(),
|
||||
dtEvent.getLatestSequenceNumber());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error While Updating RMDelegationToken and SequenceNumber ",
|
||||
e);
|
||||
store.notifyStoreOperationFailed(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class StoreRMDTMasterKeyTransition implements
|
||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
||||
@Override
|
||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||
if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
|
||||
// should never happen
|
||||
LOG.error("Illegal event type: " + event.getClass());
|
||||
return;
|
||||
}
|
||||
RMStateStoreRMDTMasterKeyEvent dtEvent =
|
||||
(RMStateStoreRMDTMasterKeyEvent) event;
|
||||
try {
|
||||
LOG.info("Storing RMDTMasterKey.");
|
||||
store.storeRMDTMasterKeyState(dtEvent.getDelegationKey());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error While Storing RMDTMasterKey.", e);
|
||||
store.notifyStoreOperationFailed(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class RemoveRMDTMasterKeyTransition implements
|
||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
||||
@Override
|
||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||
if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
|
||||
// should never happen
|
||||
LOG.error("Illegal event type: " + event.getClass());
|
||||
return;
|
||||
}
|
||||
RMStateStoreRMDTMasterKeyEvent dtEvent =
|
||||
(RMStateStoreRMDTMasterKeyEvent) event;
|
||||
try {
|
||||
LOG.info("Removing RMDTMasterKey.");
|
||||
store.removeRMDTMasterKeyState(dtEvent.getDelegationKey());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error While Removing RMDTMasterKey.", e);
|
||||
store.notifyStoreOperationFailed(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class StoreOrUpdateAMRMTokenTransition implements
|
||||
SingleArcTransition<RMStateStore, RMStateStoreEvent> {
|
||||
@Override
|
||||
public void transition(RMStateStore store, RMStateStoreEvent event) {
|
||||
if (!(event instanceof RMStateStoreAMRMTokenEvent)) {
|
||||
// should never happen
|
||||
LOG.error("Illegal event type: " + event.getClass());
|
||||
return;
|
||||
}
|
||||
RMStateStoreAMRMTokenEvent amrmEvent = (RMStateStoreAMRMTokenEvent) event;
|
||||
|
||||
try {
|
||||
LOG.info("Updating AMRMToken");
|
||||
store.storeOrUpdateAMRMTokenSecretManagerState(
|
||||
amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error storing info for AMRMTokenSecretManager", e);
|
||||
store.notifyStoreOperationFailed(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public RMStateStore() {
|
||||
super(RMStateStore.class.getName());
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
this.readLock = lock.readLock();
|
||||
this.writeLock = lock.writeLock();
|
||||
stateMachine = stateMachineFactory.make(this);
|
||||
}
|
||||
|
||||
|
@ -445,9 +609,8 @@ public abstract class RMStateStore extends AbstractService {
|
|||
dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
|
||||
}
|
||||
|
||||
public synchronized void updateFencedState() {
|
||||
this.stateMachine.doTransition(RMStateStoreEventType.FENCED,
|
||||
new RMStateStoreEvent(RMStateStoreEventType.FENCED));
|
||||
public void updateFencedState() {
|
||||
handleStoreEvent(new RMStateStoreEvent(RMStateStoreEventType.FENCED));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -509,19 +672,11 @@ public abstract class RMStateStore extends AbstractService {
|
|||
* RMDTSecretManager call this to store the state of a delegation token
|
||||
* and sequence number
|
||||
*/
|
||||
public synchronized void storeRMDelegationTokenAndSequenceNumber(
|
||||
public void storeRMDelegationTokenAndSequenceNumber(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) {
|
||||
if(isFencedState()) {
|
||||
LOG.info("State store is in Fenced state. Can't store RM Delegation Token.");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
|
||||
latestSequenceNumber);
|
||||
} catch (Exception e) {
|
||||
notifyStoreOperationFailed(e);
|
||||
}
|
||||
handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
|
||||
latestSequenceNumber, RMStateStoreEventType.STORE_DELEGATION_TOKEN));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -536,17 +691,10 @@ public abstract class RMStateStore extends AbstractService {
|
|||
/**
|
||||
* RMDTSecretManager call this to remove the state of a delegation token
|
||||
*/
|
||||
public synchronized void removeRMDelegationToken(
|
||||
public void removeRMDelegationToken(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
|
||||
if(isFencedState()) {
|
||||
LOG.info("State store is in Fenced state. Can't remove RM Delegation Token.");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
removeRMDelegationTokenState(rmDTIdentifier);
|
||||
} catch (Exception e) {
|
||||
notifyStoreOperationFailed(e);
|
||||
}
|
||||
handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, null,
|
||||
sequenceNumber, RMStateStoreEventType.REMOVE_DELEGATION_TOKEN));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -560,19 +708,11 @@ public abstract class RMStateStore extends AbstractService {
|
|||
* RMDTSecretManager call this to update the state of a delegation token
|
||||
* and sequence number
|
||||
*/
|
||||
public synchronized void updateRMDelegationTokenAndSequenceNumber(
|
||||
public void updateRMDelegationTokenAndSequenceNumber(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) {
|
||||
if(isFencedState()) {
|
||||
LOG.info("State store is in Fenced state. Can't update RM Delegation Token.");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
updateRMDelegationTokenAndSequenceNumberInternal(rmDTIdentifier, renewDate,
|
||||
latestSequenceNumber);
|
||||
} catch (Exception e) {
|
||||
notifyStoreOperationFailed(e);
|
||||
}
|
||||
handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
|
||||
latestSequenceNumber, RMStateStoreEventType.UPDATE_DELEGATION_TOKEN));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -587,17 +727,9 @@ public abstract class RMStateStore extends AbstractService {
|
|||
/**
|
||||
* RMDTSecretManager call this to store the state of a master key
|
||||
*/
|
||||
public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) {
|
||||
if(isFencedState()) {
|
||||
LOG.info("State store is in Fenced state. Can't store RM Delegation " +
|
||||
"Token Master key.");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
storeRMDTMasterKeyState(delegationKey);
|
||||
} catch (Exception e) {
|
||||
notifyStoreOperationFailed(e);
|
||||
}
|
||||
public void storeRMDTMasterKey(DelegationKey delegationKey) {
|
||||
handleStoreEvent(new RMStateStoreRMDTMasterKeyEvent(delegationKey,
|
||||
RMStateStoreEventType.STORE_MASTERKEY));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -611,17 +743,9 @@ public abstract class RMStateStore extends AbstractService {
|
|||
/**
|
||||
* RMDTSecretManager call this to remove the state of a master key
|
||||
*/
|
||||
public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) {
|
||||
if(isFencedState()) {
|
||||
LOG.info("State store is in Fenced state. Can't remove RM Delegation " +
|
||||
"Token Master key.");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
removeRMDTMasterKeyState(delegationKey);
|
||||
} catch (Exception e) {
|
||||
notifyStoreOperationFailed(e);
|
||||
}
|
||||
public void removeRMDTMasterKey(DelegationKey delegationKey) {
|
||||
handleStoreEvent(new RMStateStoreRMDTMasterKeyEvent(delegationKey,
|
||||
RMStateStoreEventType.REMOVE_MASTERKEY));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -636,9 +760,19 @@ public abstract class RMStateStore extends AbstractService {
|
|||
* Blocking API Derived classes must implement this method to store or update
|
||||
* the state of AMRMToken Master Key
|
||||
*/
|
||||
public abstract void storeOrUpdateAMRMTokenSecretManagerState(
|
||||
AMRMTokenSecretManagerState amrmTokenSecretManagerState,
|
||||
boolean isUpdate);
|
||||
protected abstract void storeOrUpdateAMRMTokenSecretManagerState(
|
||||
AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
|
||||
throws Exception;
|
||||
|
||||
/**
|
||||
* Store or Update state of AMRMToken Master Key
|
||||
*/
|
||||
public void storeOrUpdateAMRMTokenSecretManager(
|
||||
AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) {
|
||||
handleStoreEvent(new RMStateStoreAMRMTokenEvent(
|
||||
amrmTokenSecretManagerState, isUpdate,
|
||||
RMStateStoreEventType.UPDATE_AMRM_TOKEN));
|
||||
}
|
||||
|
||||
/**
|
||||
* Non-blocking API
|
||||
|
@ -689,16 +823,32 @@ public abstract class RMStateStore extends AbstractService {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
synchronized boolean isFencedState() {
|
||||
return (RMStateStoreState.FENCED == this.stateMachine.getCurrentState());
|
||||
protected boolean isFencedState() {
|
||||
return (RMStateStoreState.FENCED == getRMStateStoreState());
|
||||
}
|
||||
|
||||
// Dispatcher related code
|
||||
protected void handleStoreEvent(RMStateStoreEvent event) {
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Processing event of type " + event.getType());
|
||||
}
|
||||
|
||||
final RMStateStoreState oldState = getRMStateStoreState();
|
||||
|
||||
this.stateMachine.doTransition(event.getType(), event);
|
||||
|
||||
if (oldState != getRMStateStoreState()) {
|
||||
LOG.info("RMStateStore state change from " + oldState + " to "
|
||||
+ getRMStateStoreState());
|
||||
}
|
||||
|
||||
} catch (InvalidStateTransitonException e) {
|
||||
LOG.error("Can't handle this event at current state", e);
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -772,4 +922,13 @@ public abstract class RMStateStore extends AbstractService {
|
|||
resourceManager.handleTransitionToStandBy();
|
||||
}
|
||||
}
|
||||
|
||||
public RMStateStoreState getRMStateStoreState() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.stateMachine.getCurrentState();
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
||||
|
||||
public class RMStateStoreAMRMTokenEvent extends RMStateStoreEvent {
|
||||
private AMRMTokenSecretManagerState amrmTokenSecretManagerState;
|
||||
private boolean isUpdate;
|
||||
|
||||
public RMStateStoreAMRMTokenEvent(RMStateStoreEventType type) {
|
||||
super(type);
|
||||
}
|
||||
|
||||
public RMStateStoreAMRMTokenEvent(
|
||||
AMRMTokenSecretManagerState amrmTokenSecretManagerState,
|
||||
boolean isUpdate, RMStateStoreEventType type) {
|
||||
this(type);
|
||||
this.amrmTokenSecretManagerState = amrmTokenSecretManagerState;
|
||||
this.isUpdate = isUpdate;
|
||||
}
|
||||
|
||||
public AMRMTokenSecretManagerState getAmrmTokenSecretManagerState() {
|
||||
return amrmTokenSecretManagerState;
|
||||
}
|
||||
|
||||
public boolean isUpdate() {
|
||||
return isUpdate;
|
||||
}
|
||||
}
|
|
@ -24,5 +24,13 @@ public enum RMStateStoreEventType {
|
|||
UPDATE_APP,
|
||||
UPDATE_APP_ATTEMPT,
|
||||
REMOVE_APP,
|
||||
FENCED
|
||||
FENCED,
|
||||
|
||||
// Below events should be called synchronously
|
||||
STORE_MASTERKEY,
|
||||
REMOVE_MASTERKEY,
|
||||
STORE_DELEGATION_TOKEN,
|
||||
REMOVE_DELEGATION_TOKEN,
|
||||
UPDATE_DELEGATION_TOKEN,
|
||||
UPDATE_AMRM_TOKEN
|
||||
}
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
|
||||
public class RMStateStoreRMDTEvent extends RMStateStoreEvent {
|
||||
private RMDelegationTokenIdentifier rmDTIdentifier;
|
||||
private Long renewDate;
|
||||
private int latestSequenceNumber;
|
||||
|
||||
public RMStateStoreRMDTEvent(RMStateStoreEventType type) {
|
||||
super(type);
|
||||
}
|
||||
|
||||
public RMStateStoreRMDTEvent(RMDelegationTokenIdentifier rmDTIdentifier,
|
||||
Long renewDate, int latestSequenceNumber, RMStateStoreEventType type) {
|
||||
this(type);
|
||||
this.rmDTIdentifier = rmDTIdentifier;
|
||||
this.renewDate = renewDate;
|
||||
this.latestSequenceNumber = latestSequenceNumber;
|
||||
}
|
||||
|
||||
public RMDelegationTokenIdentifier getRmDTIdentifier() {
|
||||
return rmDTIdentifier;
|
||||
}
|
||||
|
||||
public Long getRenewDate() {
|
||||
return renewDate;
|
||||
}
|
||||
|
||||
public int getLatestSequenceNumber() {
|
||||
return latestSequenceNumber;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
|
||||
public class RMStateStoreRMDTMasterKeyEvent extends RMStateStoreEvent {
|
||||
private DelegationKey delegationKey;
|
||||
|
||||
public RMStateStoreRMDTMasterKeyEvent(RMStateStoreEventType type) {
|
||||
super(type);
|
||||
}
|
||||
|
||||
public RMStateStoreRMDTMasterKeyEvent(DelegationKey delegationKey,
|
||||
RMStateStoreEventType type) {
|
||||
this(type);
|
||||
this.delegationKey = delegationKey;
|
||||
}
|
||||
|
||||
public DelegationKey getDelegationKey() {
|
||||
return delegationKey;
|
||||
}
|
||||
}
|
|
@ -726,7 +726,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||
protected synchronized void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) throws Exception {
|
||||
ArrayList<Op> opList = new ArrayList<Op>();
|
||||
|
@ -1135,22 +1135,12 @@ public class ZKRMStateStore extends RMStateStore {
|
|||
|
||||
@Override
|
||||
public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
|
||||
AMRMTokenSecretManagerState amrmTokenSecretManagerState,
|
||||
boolean isUpdate) {
|
||||
if(isFencedState()) {
|
||||
LOG.info("State store is in Fenced state. Can't store/update " +
|
||||
"AMRMToken Secret Manager state.");
|
||||
return;
|
||||
}
|
||||
AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
|
||||
throws Exception {
|
||||
AMRMTokenSecretManagerState data =
|
||||
AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
|
||||
byte[] stateData = data.getProto().toByteArray();
|
||||
try {
|
||||
setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1);
|
||||
} catch (Exception ex) {
|
||||
LOG.info("Error storing info for AMRMTokenSecretManager", ex);
|
||||
notifyStoreOperationFailed(ex);
|
||||
}
|
||||
setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -107,8 +107,8 @@ public class AMRMTokenSecretManager extends
|
|||
AMRMTokenSecretManagerState state =
|
||||
AMRMTokenSecretManagerState.newInstance(
|
||||
this.currentMasterKey.getMasterKey(), null);
|
||||
rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state,
|
||||
false);
|
||||
rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManager(state,
|
||||
false);
|
||||
}
|
||||
this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
|
||||
rollingInterval);
|
||||
|
@ -145,8 +145,8 @@ public class AMRMTokenSecretManager extends
|
|||
AMRMTokenSecretManagerState.newInstance(
|
||||
this.currentMasterKey.getMasterKey(),
|
||||
this.nextMasterKey.getMasterKey());
|
||||
rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state,
|
||||
true);
|
||||
rmContext.getStateStore()
|
||||
.storeOrUpdateAMRMTokenSecretManager(state, true);
|
||||
this.timer.schedule(new NextKeyActivator(), this.activationDelay);
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
|
@ -170,8 +170,8 @@ public class AMRMTokenSecretManager extends
|
|||
AMRMTokenSecretManagerState state =
|
||||
AMRMTokenSecretManagerState.newInstance(
|
||||
this.currentMasterKey.getMasterKey(), null);
|
||||
rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state,
|
||||
true);
|
||||
rmContext.getStateStore()
|
||||
.storeOrUpdateAMRMTokenSecretManager(state, true);
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
|
|
|
@ -88,7 +88,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreAMRMTokenEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreRMDTEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreRMDTMasterKeyEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
|
@ -1458,7 +1461,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|||
@Override
|
||||
protected void handleStoreEvent(RMStateStoreEvent event) {
|
||||
// Block app saving request.
|
||||
while (wait);
|
||||
// Skip if synchronous updation of DTToken
|
||||
if (!(event instanceof RMStateStoreAMRMTokenEvent)
|
||||
&& !(event instanceof RMStateStoreRMDTEvent)
|
||||
&& !(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
|
||||
while (wait);
|
||||
}
|
||||
super.handleStoreEvent(event);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -616,7 +616,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
|||
AMRMTokenSecretManagerState state1 =
|
||||
AMRMTokenSecretManagerState.newInstance(
|
||||
firstMasterKeyData.getMasterKey(), null);
|
||||
rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state1,
|
||||
rmContext.getStateStore()
|
||||
.storeOrUpdateAMRMTokenSecretManager(state1,
|
||||
false);
|
||||
|
||||
// load state
|
||||
|
@ -636,7 +637,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
|||
AMRMTokenSecretManagerState
|
||||
.newInstance(firstMasterKeyData.getMasterKey(),
|
||||
secondMasterKeyData.getMasterKey());
|
||||
rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state2,
|
||||
rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManager(state2,
|
||||
true);
|
||||
|
||||
// load state
|
||||
|
|
|
@ -33,6 +33,8 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
|
@ -42,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPB
|
|||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.records.Version;
|
||||
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
|
@ -329,7 +332,44 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
|||
store.removeApplication(mockApp);
|
||||
assertEquals("RMStateStore should have been in fenced state",
|
||||
true, store.isFencedState());
|
||||
|
||||
|
||||
// store RM delegation token;
|
||||
RMDelegationTokenIdentifier dtId1 =
|
||||
new RMDelegationTokenIdentifier(new Text("owner1"),
|
||||
new Text("renewer1"), new Text("realuser1"));
|
||||
Long renewDate1 = new Long(System.currentTimeMillis());
|
||||
int sequenceNumber = 1111;
|
||||
store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
|
||||
sequenceNumber);
|
||||
assertEquals("RMStateStore should have been in fenced state", true,
|
||||
store.isFencedState());
|
||||
|
||||
store.updateRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
|
||||
sequenceNumber);
|
||||
assertEquals("RMStateStore should have been in fenced state", true,
|
||||
store.isFencedState());
|
||||
|
||||
// remove delegation key;
|
||||
store.removeRMDelegationToken(dtId1, sequenceNumber);
|
||||
assertEquals("RMStateStore should have been in fenced state", true,
|
||||
store.isFencedState());
|
||||
|
||||
// store delegation master key;
|
||||
DelegationKey key = new DelegationKey(1234, 4321, "keyBytes".getBytes());
|
||||
store.storeRMDTMasterKey(key);
|
||||
assertEquals("RMStateStore should have been in fenced state", true,
|
||||
store.isFencedState());
|
||||
|
||||
// remove delegation master key;
|
||||
store.removeRMDTMasterKey(key);
|
||||
assertEquals("RMStateStore should have been in fenced state", true,
|
||||
store.isFencedState());
|
||||
|
||||
// store or update AMRMToken;
|
||||
store.storeOrUpdateAMRMTokenSecretManager(null, false);
|
||||
assertEquals("RMStateStore should have been in fenced state", true,
|
||||
store.isFencedState());
|
||||
|
||||
store.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue