YARN-6127. Add support for work preserving NM restart when AMRMProxy is enabled. (Botong Huang via asuresh).

This commit is contained in:
Arun Suresh 2017-06-22 15:07:38 -07:00
parent 897d5a4d24
commit 66f780e6ab
15 changed files with 1005 additions and 85 deletions

View File

@ -46,11 +46,12 @@ public class AMRMProxyApplicationContextImpl implements
/**
* Create an instance of the AMRMProxyApplicationContext.
*
* @param nmContext
* @param conf
* @param applicationAttemptId
* @param user
* @param amrmToken
* @param nmContext NM context
* @param conf configuration
* @param applicationAttemptId attempt id
* @param user user name of the application
* @param amrmToken amrmToken issued by RM
* @param localToken amrmToken issued by AMRMProxy
*/
public AMRMProxyApplicationContextImpl(Context nmContext,
Configuration conf, ApplicationAttemptId applicationAttemptId,
@ -86,6 +87,8 @@ public class AMRMProxyApplicationContextImpl implements
/**
* Sets the application's AMRMToken.
*
* @param amrmToken amrmToken issued by RM
*/
public synchronized void setAMRMToken(
Token<AMRMTokenIdentifier> amrmToken) {
@ -99,6 +102,8 @@ public class AMRMProxyApplicationContextImpl implements
/**
* Sets the application's AMRMToken.
*
* @param localToken amrmToken issued by AMRMProxy
*/
public synchronized void setLocalAMRMToken(
Token<AMRMTokenIdentifier> localToken) {

View File

@ -64,7 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -86,6 +86,10 @@ public class AMRMProxyService extends AbstractService implements
ApplicationMasterProtocol {
private static final Logger LOG = LoggerFactory
.getLogger(AMRMProxyService.class);
private static final String NMSS_USER_KEY = "user";
private static final String NMSS_AMRMTOKEN_KEY = "amrmtoken";
private Server server;
private final Context nmContext;
private final AsyncDispatcher dispatcher;
@ -96,8 +100,8 @@ public class AMRMProxyService extends AbstractService implements
/**
* Creates an instance of the service.
*
* @param nmContext
* @param dispatcher
* @param nmContext NM context
* @param dispatcher NM dispatcher
*/
public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) {
super(AMRMProxyService.class.getName());
@ -112,6 +116,14 @@ public class AMRMProxyService extends AbstractService implements
new ApplicationEventHandler());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
this.secretManager =
new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore());
this.secretManager.init(conf);
}
@Override
protected void serviceStart() throws Exception {
LOG.info("Starting AMRMProxyService");
@ -134,7 +146,6 @@ public class AMRMProxyService extends AbstractService implements
YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT);
this.secretManager = new AMRMProxyTokenSecretManager(serverConf);
this.secretManager.start();
this.server =
@ -160,6 +171,62 @@ public class AMRMProxyService extends AbstractService implements
super.serviceStop();
}
/**
* Recover from NM state store. Called after serviceInit before serviceStart.
*
* @throws IOException if recover fails
*/
public void recover() throws IOException {
LOG.info("Recovering AMRMProxyService");
RecoveredAMRMProxyState state =
this.nmContext.getNMStateStore().loadAMRMProxyState();
this.secretManager.recover(state);
LOG.info("Recovering {} running applications for AMRMProxy",
state.getAppContexts().size());
for (Map.Entry<ApplicationAttemptId, Map<String, byte[]>> entry : state
.getAppContexts().entrySet()) {
ApplicationAttemptId attemptId = entry.getKey();
LOG.info("Recovering app attempt {}", attemptId);
// Try recover for the running application attempt
try {
String user = null;
Token<AMRMTokenIdentifier> amrmToken = null;
for (Map.Entry<String, byte[]> contextEntry : entry.getValue()
.entrySet()) {
if (contextEntry.getKey().equals(NMSS_USER_KEY)) {
user = new String(contextEntry.getValue(), "UTF-8");
} else if (contextEntry.getKey().equals(NMSS_AMRMTOKEN_KEY)) {
amrmToken = new Token<>();
amrmToken.decodeFromUrlString(
new String(contextEntry.getValue(), "UTF-8"));
}
}
if (amrmToken == null) {
throw new IOException(
"No amrmToken found for app attempt " + attemptId);
}
if (user == null) {
throw new IOException("No user found for app attempt " + attemptId);
}
Token<AMRMTokenIdentifier> localToken =
this.secretManager.createAndGetAMRMToken(attemptId);
initializePipeline(attemptId, user, amrmToken, localToken,
entry.getValue(), true);
} catch (Exception e) {
LOG.error("Exception when recovering " + attemptId
+ ", removing it from NMStateStore and move on", e);
this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId);
}
}
}
/**
* This is called by the AMs started on this node to register with the RM.
* This method does the initial authorization and then forwards the request to
@ -221,8 +288,8 @@ public class AMRMProxyService extends AbstractService implements
* application request processing pipeline.
*
* @param request - encapsulates information for starting an AM
* @throws IOException
* @throws YarnException
* @throws IOException if fails
* @throws YarnException if fails
*/
public void processApplicationStartRequest(StartContainerRequest request)
throws IOException, YarnException {
@ -257,22 +324,25 @@ public class AMRMProxyService extends AbstractService implements
request.getContainerLaunchContext().setTokens(
ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
initializePipeline(containerTokenIdentifierForKey.getContainerID()
.getApplicationAttemptId(),
containerTokenIdentifierForKey.getApplicationSubmitter(),
amrmToken, localToken);
initializePipeline(appAttemptId,
containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken,
localToken, null, false);
}
/**
* Initializes the request intercepter pipeline for the specified application.
*
* @param applicationAttemptId
* @param user
* @param amrmToken
* @param applicationAttemptId attempt id
* @param user user name
* @param amrmToken amrmToken issued by RM
* @param localToken amrmToken issued by AMRMProxy
* @param recoveredDataMap the recovered states for AMRMProxy from NMSS
* @param isRecovery whether this is to recover a previously existing pipeline
*/
protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
String user, Token<AMRMTokenIdentifier> amrmToken,
Token<AMRMTokenIdentifier> localToken) {
Token<AMRMTokenIdentifier> localToken,
Map<String, byte[]> recoveredDataMap, boolean isRecovery) {
RequestInterceptorChainWrapper chainWrapper = null;
synchronized (applPipelineMap) {
if (applPipelineMap
@ -288,11 +358,23 @@ public class AMRMProxyService extends AbstractService implements
&& chainWrapperBackup.getApplicationAttemptId() != null
&& !chainWrapperBackup.getApplicationAttemptId()
.equals(applicationAttemptId)) {
// TODO: revisit in AMRMProxy HA in YARN-6128
// Remove the existing pipeline
LOG.info("Remove the previous pipeline for ApplicationId: "
+ applicationAttemptId.toString());
RequestInterceptorChainWrapper pipeline =
applPipelineMap.remove(applicationAttemptId.getApplicationId());
if (!isRecovery && this.nmContext.getNMStateStore() != null) {
try {
this.nmContext.getNMStateStore()
.removeAMRMProxyAppContext(applicationAttemptId);
} catch (IOException e) {
LOG.error("Error removing AMRMProxy application context for "
+ applicationAttemptId, e);
}
}
try {
pipeline.getRootInterceptor().shutdown();
} catch (Throwable ex) {
@ -324,7 +406,27 @@ public class AMRMProxyService extends AbstractService implements
this.createRequestInterceptorChain();
interceptorChain.init(createApplicationMasterContext(this.nmContext,
applicationAttemptId, user, amrmToken, localToken));
if (isRecovery) {
if (recoveredDataMap == null) {
throw new YarnRuntimeException(
"null recoveredDataMap recieved for recover");
}
interceptorChain.recover(recoveredDataMap);
}
chainWrapper.init(interceptorChain, applicationAttemptId);
if (!isRecovery && this.nmContext.getNMStateStore() != null) {
try {
this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
applicationAttemptId, NMSS_USER_KEY, user.getBytes("UTF-8"));
this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
applicationAttemptId, NMSS_AMRMTOKEN_KEY,
amrmToken.encodeToUrlString().getBytes("UTF-8"));
} catch (IOException e) {
LOG.error("Error storing AMRMProxy application context entry for "
+ applicationAttemptId, e);
}
}
} catch (Exception e) {
this.applPipelineMap.remove(applicationAttemptId.getApplicationId());
throw e;
@ -335,7 +437,7 @@ public class AMRMProxyService extends AbstractService implements
* Shuts down the request processing pipeline for the specified application
* attempt id.
*
* @param applicationId
* @param applicationId application id
*/
protected void stopApplication(ApplicationId applicationId) {
Preconditions.checkArgument(applicationId != null,
@ -362,6 +464,17 @@ public class AMRMProxyService extends AbstractService implements
"Failed to shutdown the request processing pipeline for app:"
+ applicationId, ex);
}
// Remove the app context from NMSS after the interceptors are shutdown
if (this.nmContext.getNMStateStore() != null) {
try {
this.nmContext.getNMStateStore()
.removeAMRMProxyAppContext(pipeline.getApplicationAttemptId());
} catch (IOException e) {
LOG.error("Error removing AMRMProxy application context for "
+ applicationId, e);
}
}
}
}
@ -383,12 +496,24 @@ public class AMRMProxyService extends AbstractService implements
// Do not propagate this info back to AM
allocateResponse.setAMRMToken(null);
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newTokenId =
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken =
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
token.getIdentifier().array(), token.getPassword().array(),
new Text(token.getKind()), new Text(token.getService()));
context.setAMRMToken(newTokenId);
context.setAMRMToken(newToken);
// Update the AMRMToken in context map in NM state store
if (this.nmContext.getNMStateStore() != null) {
try {
this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY,
newToken.encodeToUrlString().getBytes("UTF-8"));
} catch (IOException e) {
LOG.error("Error storing AMRMProxy application context entry for "
+ context.getApplicationAttemptId(), e);
}
}
}
// Check if the local AMRMToken is rolled up and update the context and
@ -489,7 +614,7 @@ public class AMRMProxyService extends AbstractService implements
/**
* Returns the comma separated intercepter class names from the configuration.
*
* @param conf
* @param conf configuration
* @return the intercepter class names as an instance of ArrayList
*/
private List<String> getInterceptorClassNames(Configuration conf) {
@ -518,7 +643,7 @@ public class AMRMProxyService extends AbstractService implements
* processing pipeline.
*
* @return the the intercepter wrapper instance
* @throws YarnException
* @throws YarnException if fails
*/
private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain()
throws YarnException {
@ -612,8 +737,8 @@ public class AMRMProxyService extends AbstractService implements
/**
* Initializes the wrapper with the specified parameters.
*
* @param rootInterceptor
* @param applicationAttemptId
* @param rootInterceptor the root request intercepter
* @param applicationAttemptId attempt id
*/
public synchronized void init(RequestInterceptor rootInterceptor,
ApplicationAttemptId applicationAttemptId) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.HashSet;
import java.util.Set;
@ -37,6 +38,9 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import com.google.common.annotations.VisibleForTesting;
@ -60,17 +64,24 @@ public class AMRMProxyTokenSecretManager extends
private final Lock writeLock = readWriteLock.writeLock();
private final Timer timer;
private final long rollingInterval;
private final long activationDelay;
private long rollingInterval;
private long activationDelay;
private NMStateStoreService nmStateStore;
private final Set<ApplicationAttemptId> appAttemptSet =
new HashSet<ApplicationAttemptId>();
/**
* Create an {@link AMRMProxyTokenSecretManager}.
* @param nmStateStoreService NM state store
*/
public AMRMProxyTokenSecretManager(Configuration conf) {
public AMRMProxyTokenSecretManager(NMStateStoreService nmStateStoreService) {
this.timer = new Timer();
this.nmStateStore = nmStateStoreService;
}
public void init(Configuration conf) {
this.rollingInterval =
conf.getLong(
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
@ -94,6 +105,14 @@ public class AMRMProxyTokenSecretManager extends
public void start() {
if (this.currentMasterKey == null) {
this.currentMasterKey = createNewMasterKey();
if (this.nmStateStore != null) {
try {
this.nmStateStore.storeAMRMProxyCurrentMasterKey(
this.currentMasterKey.getMasterKey());
} catch (IOException e) {
LOG.error("Unable to update current master key in state store", e);
}
}
}
this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
rollingInterval);
@ -103,6 +122,11 @@ public class AMRMProxyTokenSecretManager extends
this.timer.cancel();
}
@VisibleForTesting
public void setNMStateStoreService(NMStateStoreService nmStateStoreService) {
this.nmStateStore = nmStateStoreService;
}
public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
this.writeLock.lock();
try {
@ -122,11 +146,21 @@ public class AMRMProxyTokenSecretManager extends
}
@Private
void rollMasterKey() {
@VisibleForTesting
public void rollMasterKey() {
this.writeLock.lock();
try {
LOG.info("Rolling master-key for amrm-tokens");
this.nextMasterKey = createNewMasterKey();
if (this.nmStateStore != null) {
try {
this.nmStateStore
.storeAMRMProxyNextMasterKey(this.nextMasterKey.getMasterKey());
} catch (IOException e) {
LOG.error("Unable to update next master key in state store", e);
}
}
this.timer.schedule(new NextKeyActivator(), this.activationDelay);
} finally {
this.writeLock.unlock();
@ -140,6 +174,8 @@ public class AMRMProxyTokenSecretManager extends
}
}
@Private
@VisibleForTesting
public void activateNextMasterKey() {
this.writeLock.lock();
try {
@ -147,6 +183,15 @@ public class AMRMProxyTokenSecretManager extends
+ this.nextMasterKey.getMasterKey().getKeyId());
this.currentMasterKey = this.nextMasterKey;
this.nextMasterKey = null;
if (this.nmStateStore != null) {
try {
this.nmStateStore.storeAMRMProxyCurrentMasterKey(
this.currentMasterKey.getMasterKey());
this.nmStateStore.storeAMRMProxyNextMasterKey(null);
} catch (IOException e) {
LOG.error("Unable to update current master key in state store", e);
}
}
} finally {
this.writeLock.unlock();
}
@ -237,6 +282,17 @@ public class AMRMProxyTokenSecretManager extends
return new AMRMTokenIdentifier();
}
@Private
@VisibleForTesting
public MasterKeyData getCurrentMasterKeyData() {
this.readLock.lock();
try {
return this.currentMasterKey;
} finally {
this.readLock.unlock();
}
}
@Private
@VisibleForTesting
public MasterKeyData getNextMasterKeyData() {
@ -262,4 +318,33 @@ public class AMRMProxyTokenSecretManager extends
this.readLock.unlock();
}
}
/**
* Recover secretManager from state store. Called after serviceInit before
* serviceStart.
*
* @param state the state to recover from
*/
public void recover(RecoveredAMRMProxyState state) {
if (state != null) {
// recover the current master key
MasterKey currentKey = state.getCurrentMasterKey();
if (currentKey != null) {
this.currentMasterKey = new MasterKeyData(currentKey,
createSecretKey(currentKey.getBytes().array()));
} else {
LOG.warn("No current master key recovered from NM StateStore"
+ " for AMRMProxyTokenSecretManager");
}
// recover the next master key if not null
MasterKey nextKey = state.getNextMasterKey();
if (nextKey != null) {
this.nextMasterKey = new MasterKeyData(nextKey,
createSecretKey(nextKey.getBytes().array()));
this.timer.schedule(new NextKeyActivator(), this.activationDelay);
}
}
}
}

View File

@ -18,16 +18,17 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.util.Map;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import java.io.IOException;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
/**
* Implements the RequestInterceptor interface and provides common functionality
@ -82,6 +83,16 @@ public abstract class AbstractRequestInterceptor implements
}
}
/**
* Recover {@link RequestInterceptor} state from store.
*/
@Override
public void recover(Map<String, byte[]> recoveredDataMap) {
if (this.nextInterceptor != null) {
this.nextInterceptor.recover(recoveredDataMap);
}
}
/**
* Disposes the {@link RequestInterceptor}.
*/
@ -113,8 +124,8 @@ public abstract class AbstractRequestInterceptor implements
*
* @param request ApplicationMaster allocate request
* @return Distribtued Scheduler Allocate Response
* @throws YarnException
* @throws IOException
* @throws YarnException if fails
* @throws IOException if fails
*/
@Override
public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
@ -130,8 +141,8 @@ public abstract class AbstractRequestInterceptor implements
*
* @param request ApplicationMaster registration request
* @return Distributed Scheduler Register Response
* @throws YarnException
* @throws IOException
* @throws YarnException if fails
* @throws IOException if fails
*/
@Override
public RegisterDistributedSchedulingAMResponse
@ -141,4 +152,16 @@ public abstract class AbstractRequestInterceptor implements
return (this.nextInterceptor != null) ? this.nextInterceptor
.registerApplicationMasterForDistributedScheduling(request) : null;
}
/**
* A helper method for getting NM state store.
*
* @return the NMSS instance
*/
public NMStateStoreService getNMStateStore() {
if (this.appContext == null || this.appContext.getNMCotext() == null) {
return null;
}
return this.appContext.getNMCotext().getNMStateStore();
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
@ -32,10 +34,24 @@ public interface RequestInterceptor extends DistributedSchedulingAMProtocol,
* This method is called for initializing the intercepter. This is guaranteed
* to be called only once in the lifetime of this instance.
*
* @param ctx
* @param ctx AMRMProxy application context
*/
void init(AMRMProxyApplicationContext ctx);
/**
* Recover intercepter state when NM recovery is enabled. AMRMProxy will
* recover the data map into
* AMRMProxyApplicationContext.getRecoveredDataMap(). All intercepters should
* recover state from it.
*
* For example, registerRequest has to be saved by the last intercepter (i.e.
* the one that actually connects to RM), in order to re-register when RM
* fails over.
*
* @param recoveredDataMap states for all intercepters recovered from NMSS
*/
void recover(Map<String, byte[]> recoveredDataMap);
/**
* This method is called to release the resources held by the intercepter.
* This will be called when the application pipeline is being destroyed. The
@ -51,7 +67,7 @@ public interface RequestInterceptor extends DistributedSchedulingAMProtocol,
* send the messages to the resource manager service and so the last
* intercepter will not receive this method call.
*
* @param nextInterceptor
* @param nextInterceptor the next intercepter to set
*/
void setNextInterceptor(RequestInterceptor nextInterceptor);

View File

@ -324,6 +324,10 @@ public class ContainerManagerImpl extends CompositeService implements
rsrcLocalizationSrvc.recoverLocalizedResources(
stateStore.loadLocalizationState());
if (this.amrmProxyEnabled) {
this.getAMRMProxyService().recover();
}
RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
for (ContainerManagerApplicationProto proto :
appsState.getApplications()) {

View File

@ -24,12 +24,15 @@ import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -82,8 +85,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String DB_NAME = "yarn-nm-state";
private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
private static final Version CURRENT_VERSION_INFO = Version
.newInstance(1, 0);
private static final Version CURRENT_VERSION_INFO = Version.newInstance(3, 0);
private static final String DELETION_TASK_KEY_PREFIX =
"DeletionService/deltask_";
@ -121,6 +123,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey";
private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey";
private static final String NEXT_MASTER_KEY_SUFFIX = "NextMasterKey";
private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/";
private static final String NM_TOKENS_CURRENT_MASTER_KEY =
NM_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX;
@ -135,6 +138,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String LOG_DELETER_KEY_PREFIX = "LogDeleters/";
private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/";
private static final byte[] EMPTY_VALUE = new byte[0];
private DB db;
@ -1103,6 +1108,177 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
return LOG_DELETER_KEY_PREFIX + appId;
}
@Override
public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException {
RecoveredAMRMProxyState result = new RecoveredAMRMProxyState();
Set<String> unknownKeys = new HashSet<>();
LeveldbIterator iter = null;
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(AMRMPROXY_KEY_PREFIX));
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(AMRMPROXY_KEY_PREFIX)) {
break;
}
String suffix = key.substring(AMRMPROXY_KEY_PREFIX.length());
if (suffix.equals(CURRENT_MASTER_KEY_SUFFIX)) {
iter.next();
result.setCurrentMasterKey(parseMasterKey(entry.getValue()));
LOG.info("Recovered for AMRMProxy: current master key id "
+ result.getCurrentMasterKey().getKeyId());
} else if (suffix.equals(NEXT_MASTER_KEY_SUFFIX)) {
iter.next();
result.setNextMasterKey(parseMasterKey(entry.getValue()));
LOG.info("Recovered for AMRMProxy: next master key id "
+ result.getNextMasterKey().getKeyId());
} else { // Load AMRMProxy application context map for an app attempt
// Parse appAttemptId, also handle the unknown keys
int idEndPos;
ApplicationAttemptId attemptId;
try {
idEndPos = key.indexOf('/', AMRMPROXY_KEY_PREFIX.length());
if (idEndPos < 0) {
throw new IOException(
"Unable to determine attemptId in key: " + key);
}
attemptId = ApplicationAttemptId.fromString(
key.substring(AMRMPROXY_KEY_PREFIX.length(), idEndPos));
} catch (Exception e) {
// Try to move on for back-forward compatibility
LOG.warn("Unknown key " + key + ", remove and move on", e);
// Do this because iter.remove() is not supported here
unknownKeys.add(key);
continue;
}
// Parse the context map for the appAttemptId
Map<String, byte[]> appContext =
loadAMRMProxyAppContextMap(iter, key.substring(0, idEndPos + 1));
result.getAppContexts().put(attemptId, appContext);
LOG.info("Recovered for AMRMProxy: " + attemptId + ", map size "
+ appContext.size());
}
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
// Delete all unknown keys
try {
for (String key : unknownKeys) {
db.delete(bytes(key));
}
} catch (DBException e) {
throw new IOException(e);
}
return result;
}
private Map<String, byte[]> loadAMRMProxyAppContextMap(LeveldbIterator iter,
String keyPrefix) throws IOException {
Map<String, byte[]> appContextMap = new HashMap<>();
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
break;
}
iter.next();
String suffix = key.substring(keyPrefix.length());
byte[] data = entry.getValue();
appContextMap.put(suffix, Arrays.copyOf(data, data.length));
}
return appContextMap;
}
@Override
public void storeAMRMProxyCurrentMasterKey(MasterKey key) throws IOException {
storeMasterKey(AMRMPROXY_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX, key);
}
@Override
public void storeAMRMProxyNextMasterKey(MasterKey key) throws IOException {
String dbkey = AMRMPROXY_KEY_PREFIX + NEXT_MASTER_KEY_SUFFIX;
if (key == null) {
// When key is null, delete the entry instead
try {
db.delete(bytes(dbkey));
} catch (DBException e) {
throw new IOException(e);
}
return;
}
storeMasterKey(dbkey, key);
}
@Override
public void storeAMRMProxyAppContextEntry(ApplicationAttemptId attempt,
String key, byte[] data) throws IOException {
String fullkey = AMRMPROXY_KEY_PREFIX + attempt + "/" + key;
try {
db.put(bytes(fullkey), data);
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public void removeAMRMProxyAppContextEntry(ApplicationAttemptId attempt,
String key) throws IOException {
String fullkey = AMRMPROXY_KEY_PREFIX + attempt + "/" + key;
try {
db.delete(bytes(fullkey));
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
throws IOException {
Set<String> candidates = new HashSet<>();
String keyPrefix = AMRMPROXY_KEY_PREFIX + attempt + "/";
LeveldbIterator iter = null;
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(keyPrefix));
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.next();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
break;
}
// Do this because iter.remove() is not supported here
candidates.add(key);
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
// Delete all candidate keys
try {
for (String key : candidates) {
db.delete(bytes(key));
}
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
protected void initStorage(Configuration conf)
throws IOException {

View File

@ -224,6 +224,35 @@ public class NMNullStateStoreService extends NMStateStoreService {
public void removeLogDeleter(ApplicationId appId) throws IOException {
}
@Override
public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException {
throw new UnsupportedOperationException(
"Recovery not supported by this state store");
}
@Override
public void storeAMRMProxyCurrentMasterKey(MasterKey key) throws IOException {
}
@Override
public void storeAMRMProxyNextMasterKey(MasterKey key) throws IOException {
}
@Override
public void storeAMRMProxyAppContextEntry(ApplicationAttemptId attempt,
String key, byte[] data) throws IOException {
}
@Override
public void removeAMRMProxyAppContextEntry(ApplicationAttemptId attempt,
String key) throws IOException {
}
@Override
public void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
throws IOException {
}
@Override
protected void initStorage(Configuration conf) throws IOException {
}

View File

@ -265,6 +265,41 @@ public abstract class NMStateStoreService extends AbstractService {
}
}
/**
* Recovered states for AMRMProxy.
*/
public static class RecoveredAMRMProxyState {
private MasterKey currentMasterKey;
private MasterKey nextMasterKey;
// For each app, stores amrmToken, user name, as well as various AMRMProxy
// intercepter states
private Map<ApplicationAttemptId, Map<String, byte[]>> appContexts;
public RecoveredAMRMProxyState() {
appContexts = new HashMap<>();
}
public MasterKey getCurrentMasterKey() {
return currentMasterKey;
}
public MasterKey getNextMasterKey() {
return nextMasterKey;
}
public Map<ApplicationAttemptId, Map<String, byte[]>> getAppContexts() {
return appContexts;
}
public void setCurrentMasterKey(MasterKey currentKey) {
currentMasterKey = currentKey;
}
public void setNextMasterKey(MasterKey nextKey) {
nextMasterKey = nextKey;
}
}
/** Initialize the state storage */
@Override
public void serviceInit(Configuration conf) throws IOException {
@ -592,6 +627,57 @@ public abstract class NMStateStoreService extends AbstractService {
public abstract void removeLogDeleter(ApplicationId appId)
throws IOException;
/**
* Load the state of AMRMProxy.
* @return recovered state of AMRMProxy
* @throws IOException if fails
*/
public abstract RecoveredAMRMProxyState loadAMRMProxyState()
throws IOException;
/**
* Record the current AMRMProxyTokenSecretManager master key.
* @param key the current master key
* @throws IOException if fails
*/
public abstract void storeAMRMProxyCurrentMasterKey(MasterKey key)
throws IOException;
/**
* Record the next AMRMProxyTokenSecretManager master key.
* @param key the next master key
* @throws IOException if fails
*/
public abstract void storeAMRMProxyNextMasterKey(MasterKey key)
throws IOException;
/**
* Add a context entry for an application attempt in AMRMProxyService.
* @param attempt app attempt ID
* @param key key string
* @param data state data to store
* @throws IOException if fails
*/
public abstract void storeAMRMProxyAppContextEntry(
ApplicationAttemptId attempt, String key, byte[] data) throws IOException;
/**
* Remove a context entry for an application attempt in AMRMProxyService.
* @param attempt attempt ID
* @param key key string
* @throws IOException if fails
*/
public abstract void removeAMRMProxyAppContextEntry(
ApplicationAttemptId attempt, String key) throws IOException;
/**
* Remove the entire context map for an application attempt in
* AMRMProxyService.
* @param attempt attempt ID
* @throws IOException if fails
*/
public abstract void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
throws IOException;
protected abstract void initStorage(Configuration conf) throws IOException;

View File

@ -39,6 +39,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -62,12 +63,13 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
@ -86,17 +88,15 @@ import org.junit.Before;
public abstract class BaseAMRMProxyTest {
private static final Log LOG = LogFactory
.getLog(BaseAMRMProxyTest.class);
/**
* The AMRMProxyService instance that will be used by all the test cases
*/
// The AMRMProxyService instance that will be used by all the test cases
private MockAMRMProxyService amrmProxyService;
/**
* Thread pool used for asynchronous operations
*/
// Thread pool used for asynchronous operations
private static ExecutorService threadpool = Executors
.newCachedThreadPool();
private Configuration conf;
private AsyncDispatcher dispatcher;
private Context nmContext;
protected MockAMRMProxyService getAMRMProxyService() {
Assert.assertNotNull(this.amrmProxyService);
@ -104,32 +104,40 @@ public abstract class BaseAMRMProxyTest {
}
@Before
public void setUp() {
this.conf = new YarnConfiguration();
this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
String mockPassThroughInterceptorClass =
PassThroughRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain will call the mock resource manager. The others in the chain will
// simply forward it to the next one in the chain
this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + ","
+ mockPassThroughInterceptorClass + ","
+ mockPassThroughInterceptorClass + ","
+ MockRequestInterceptor.class.getName());
public void setUp() throws IOException {
this.conf = createConfiguration();
this.dispatcher = new AsyncDispatcher();
this.dispatcher.init(this.conf);
this.dispatcher.start();
createAndStartAMRMProxyService(this.conf);
}
protected YarnConfiguration createConfiguration() {
YarnConfiguration config = new YarnConfiguration();
config.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
String mockPassThroughInterceptorClass =
PassThroughRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain will call the mock resource manager. The others in the chain will
// simply forward it to the next one in the chain
config.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + mockPassThroughInterceptorClass + ","
+ MockRequestInterceptor.class.getName());
config.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
return config;
}
@After
public void tearDown() {
amrmProxyService.stop();
amrmProxyService = null;
this.amrmProxyService.stop();
this.amrmProxyService = null;
this.dispatcher.stop();
if (this.nmContext.getNMStateStore() != null) {
this.nmContext.getNMStateStore().stop();
}
}
protected ExecutorService getThreadPool() {
@ -140,17 +148,33 @@ public abstract class BaseAMRMProxyTest {
return this.conf;
}
protected void createAndStartAMRMProxyService(Configuration config) {
protected AsyncDispatcher getDispatcher() {
return this.dispatcher;
}
protected void createAndStartAMRMProxyService(Configuration config)
throws IOException {
// Stop the existing instance first if not null
if (this.amrmProxyService != null) {
this.amrmProxyService.stop();
}
if (this.nmContext == null) {
this.nmContext = createContext();
}
this.amrmProxyService =
new MockAMRMProxyService(new NullContext(), dispatcher);
new MockAMRMProxyService(this.nmContext, this.dispatcher);
this.amrmProxyService.init(config);
this.amrmProxyService.recover();
this.amrmProxyService.start();
}
protected Context createContext() {
NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(this.conf);
stateStore.start();
return new NMContext(null, null, null, null, stateStore, false, this.conf);
}
/**
* This helper method will invoke the specified function in parallel for each
* end point in the specified list using a thread pool and return the
@ -578,6 +602,13 @@ public abstract class BaseAMRMProxyTest {
super(nmContext, dispatcher);
}
@Override
protected void serviceStart() throws Exception {
// Override this method and do nothing to avoid the base class from
// listening to server end point
getSecretManager().start();
}
/**
* This method is used by the test code to initialize the pipeline. In the
* actual service, the initialization is called by the
@ -587,7 +618,8 @@ public abstract class BaseAMRMProxyTest {
* @param user
*/
public void initApp(ApplicationAttemptId applicationId, String user) {
super.initializePipeline(applicationId, user, null, null);
super.initializePipeline(applicationId, user,
new Token<AMRMTokenIdentifier>(), null, null, false);
}
public void stopApp(ApplicationId applicationId) {

View File

@ -26,9 +26,12 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -38,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
@ -48,6 +52,8 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
private static final Log LOG = LogFactory
.getLog(TestAMRMProxyService.class);
private static MockResourceManagerFacade mockRM;
/**
* Test if the pipeline is created properly.
*/
@ -99,9 +105,11 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
/**
* Tests the case when interceptor pipeline initialization fails.
*
* @throws IOException
*/
@Test
public void testInterceptorInitFailure() {
public void testInterceptorInitFailure() throws IOException {
Configuration conf = this.getConf();
// Override with a bad interceptor configuration
conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
@ -434,8 +442,8 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
// Second Attempt
applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2);
getAMRMProxyService().initializePipeline(applicationAttemptId, user, null,
null);
getAMRMProxyService().initializePipeline(applicationAttemptId, user,
new Token<AMRMTokenIdentifier>(), null, null, false);
RequestInterceptorChainWrapper chain2 =
getAMRMProxyService().getPipelines().get(appId);
@ -559,4 +567,109 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
Assert.assertEquals(relList.size(),
containersForReleasedContainerIds.size());
}
/**
* Test AMRMProxy restart with recovery.
*/
@Test
public void testRecovery() throws YarnException, Exception {
Configuration conf = createConfiguration();
// Use the MockRequestInterceptorAcrossRestart instead for the chain
conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
MockRequestInterceptorAcrossRestart.class.getName());
mockRM = new MockResourceManagerFacade(new YarnConfiguration(conf), 0);
createAndStartAMRMProxyService(conf);
int testAppId1 = 1;
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId1);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(Integer.toString(testAppId1),
registerResponse.getQueue());
int testAppId2 = 2;
registerResponse = registerApplicationMaster(testAppId2);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(Integer.toString(testAppId2),
registerResponse.getQueue());
AllocateResponse allocateResponse = allocate(testAppId2);
Assert.assertNotNull(allocateResponse);
// At the time of kill, app1 just registerAM, app2 already did one allocate.
// Both application should be recovered
createAndStartAMRMProxyService(conf);
Assert.assertTrue(getAMRMProxyService().getPipelines().size() == 2);
allocateResponse = allocate(testAppId1);
Assert.assertNotNull(allocateResponse);
FinishApplicationMasterResponse finshResponse =
finishApplicationMaster(testAppId1, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
allocateResponse = allocate(testAppId2);
Assert.assertNotNull(allocateResponse);
finshResponse =
finishApplicationMaster(testAppId2, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
int testAppId3 = 3;
try {
// Try to finish an application master that is not registered.
finishApplicationMaster(testAppId3, FinalApplicationStatus.SUCCEEDED);
Assert
.fail("The Mock RM should complain about not knowing the third app");
} catch (Throwable ex) {
}
mockRM = null;
}
/**
* A mock intercepter implementation that uses the same mockRM instance across
* restart.
*/
public static class MockRequestInterceptorAcrossRestart
extends AbstractRequestInterceptor {
public MockRequestInterceptorAcrossRestart() {
}
@Override
public void init(AMRMProxyApplicationContext appContext) {
super.init(appContext);
if (mockRM == null) {
throw new RuntimeException("mockRM not initialized yet");
}
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
return mockRM.registerApplicationMaster(request);
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request)
throws YarnException, IOException {
return mockRM.finishApplicationMaster(request);
}
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
return mockRM.allocate(request);
}
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -40,11 +41,19 @@ public class TestAMRMProxyTokenSecretManager {
private YarnConfiguration conf;
private AMRMProxyTokenSecretManager secretManager;
private NMMemoryStateStoreService stateStore;
@Before
public void setup() {
conf = new YarnConfiguration();
secretManager = new AMRMProxyTokenSecretManager(conf);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
stateStore = new NMMemoryStateStoreService();
stateStore.init(conf);
stateStore.start();
secretManager = new AMRMProxyTokenSecretManager(stateStore);
secretManager.init(conf);
secretManager.start();
}
@ -53,6 +62,9 @@ public class TestAMRMProxyTokenSecretManager {
if (secretManager != null) {
secretManager.stop();
}
if (stateStore != null) {
stateStore.stop();
}
}
@Test
@ -78,4 +90,52 @@ public class TestAMRMProxyTokenSecretManager {
}
}
@Test
public void testRecovery() throws IOException {
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
Token<AMRMTokenIdentifier> localToken =
secretManager.createAndGetAMRMToken(attemptId);
AMRMTokenIdentifier identifier = secretManager.createIdentifier();
identifier.readFields(new DataInputStream(
new ByteArrayInputStream(localToken.getIdentifier())));
secretManager.retrievePassword(identifier);
// Generate next master key
secretManager.rollMasterKey();
// Restart and recover
secretManager.stop();
secretManager = new AMRMProxyTokenSecretManager(stateStore);
secretManager.init(conf);
secretManager.recover(stateStore.loadAMRMProxyState());
secretManager.start();
// Recover the app
secretManager.createAndGetAMRMToken(attemptId);
// Current master key should be recovered, and thus pass here
secretManager.retrievePassword(identifier);
// Roll key, current master key will be replaced
secretManager.activateNextMasterKey();
// Restart and recover
secretManager.stop();
secretManager = new AMRMProxyTokenSecretManager(stateStore);
secretManager.init(conf);
secretManager.recover(stateStore.loadAMRMProxyState());
secretManager.start();
// Recover the app
secretManager.createAndGetAMRMToken(attemptId);
try {
secretManager.retrievePassword(identifier);
Assert.fail("Expect InvalidToken exception because the "
+ "old master key should have expired");
} catch (InvalidToken e) {
}
}
}

View File

@ -20,11 +20,10 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -50,6 +49,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
private RecoveredNMTokensState nmTokenState;
private RecoveredContainerTokensState containerTokenState;
private Map<ApplicationId, LogDeleterProto> logDeleterState;
private RecoveredAMRMProxyState amrmProxyState;
public NMMemoryStateStoreService() {
super(NMMemoryStateStoreService.class.getName());
@ -67,6 +67,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
trackerStates = new HashMap<TrackerKey, TrackerState>();
deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
logDeleterState = new HashMap<ApplicationId, LogDeleterProto>();
amrmProxyState = new RecoveredAMRMProxyState();
}
@Override
@ -411,6 +412,66 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
logDeleterState.remove(appId);
}
@Override
public synchronized RecoveredAMRMProxyState loadAMRMProxyState()
throws IOException {
// return a copy so caller can't modify our state
RecoveredAMRMProxyState result = new RecoveredAMRMProxyState();
result.setCurrentMasterKey(amrmProxyState.getCurrentMasterKey());
result.setNextMasterKey(amrmProxyState.getNextMasterKey());
for (Map.Entry<ApplicationAttemptId, Map<String, byte[]>> entry :
amrmProxyState.getAppContexts().entrySet()) {
result.getAppContexts().put(entry.getKey(),
new HashMap<String, byte[]>(entry.getValue()));
}
return result;
}
@Override
public synchronized void storeAMRMProxyCurrentMasterKey(MasterKey key)
throws IOException {
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
amrmProxyState.setCurrentMasterKey(new MasterKeyPBImpl(keypb.getProto()));
}
@Override
public synchronized void storeAMRMProxyNextMasterKey(MasterKey key)
throws IOException {
if (key == null) {
amrmProxyState.setNextMasterKey(null);
return;
}
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
amrmProxyState.setNextMasterKey(new MasterKeyPBImpl(keypb.getProto()));
}
@Override
public synchronized void storeAMRMProxyAppContextEntry(
ApplicationAttemptId attempt, String key, byte[] data)
throws IOException {
Map<String, byte[]> entryMap = amrmProxyState.getAppContexts().get(attempt);
if (entryMap == null) {
entryMap = new HashMap<>();
amrmProxyState.getAppContexts().put(attempt, entryMap);
}
entryMap.put(key, Arrays.copyOf(data, data.length));
}
@Override
public synchronized void removeAMRMProxyAppContextEntry(
ApplicationAttemptId attempt, String key) throws IOException {
Map<String, byte[]> entryMap = amrmProxyState.getAppContexts().get(attempt);
if (entryMap != null) {
entryMap.remove(key);
}
}
@Override
public synchronized void removeAMRMProxyAppContext(
ApplicationAttemptId attempt) throws IOException {
amrmProxyState.getAppContexts().remove(attempt);
}
private static class TrackerState {
Map<Path, LocalResourceProto> inProgressMap =
new HashMap<Path, LocalResourceProto>();

View File

@ -20,10 +20,11 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
@ -33,6 +34,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -65,7 +67,9 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
@ -1004,6 +1008,105 @@ public class TestNMLeveldbStateStoreService {
assertNull(stateStore.getDB().get(invalidKey));
}
@Test
public void testAMRMProxyStorage() throws IOException {
RecoveredAMRMProxyState state = stateStore.loadAMRMProxyState();
assertEquals(state.getCurrentMasterKey(), null);
assertEquals(state.getNextMasterKey(), null);
assertEquals(state.getAppContexts().size(), 0);
ApplicationId appId1 = ApplicationId.newInstance(1, 1);
ApplicationId appId2 = ApplicationId.newInstance(1, 2);
ApplicationAttemptId attemptId1 =
ApplicationAttemptId.newInstance(appId1, 1);
ApplicationAttemptId attemptId2 =
ApplicationAttemptId.newInstance(appId2, 2);
String key1 = "key1";
String key2 = "key2";
byte[] data1 = "data1".getBytes();
byte[] data2 = "data2".getBytes();
AMRMProxyTokenSecretManager secretManager =
new AMRMProxyTokenSecretManager(stateStore);
secretManager.init(conf);
// Generate currentMasterKey
secretManager.start();
try {
// Add two applications, each with two data entries
stateStore.storeAMRMProxyAppContextEntry(attemptId1, key1, data1);
stateStore.storeAMRMProxyAppContextEntry(attemptId2, key1, data1);
stateStore.storeAMRMProxyAppContextEntry(attemptId1, key2, data2);
stateStore.storeAMRMProxyAppContextEntry(attemptId2, key2, data2);
// restart state store and verify recovered
restartStateStore();
secretManager.setNMStateStoreService(stateStore);
state = stateStore.loadAMRMProxyState();
assertEquals(state.getCurrentMasterKey(),
secretManager.getCurrentMasterKeyData().getMasterKey());
assertEquals(state.getNextMasterKey(), null);
assertEquals(state.getAppContexts().size(), 2);
// app1
Map<String, byte[]> map = state.getAppContexts().get(attemptId1);
assertNotEquals(map, null);
assertEquals(map.size(), 2);
assertTrue(Arrays.equals(map.get(key1), data1));
assertTrue(Arrays.equals(map.get(key2), data2));
// app2
map = state.getAppContexts().get(attemptId2);
assertNotEquals(map, null);
assertEquals(map.size(), 2);
assertTrue(Arrays.equals(map.get(key1), data1));
assertTrue(Arrays.equals(map.get(key2), data2));
// Generate next master key and remove one entry of app2
secretManager.rollMasterKey();
stateStore.removeAMRMProxyAppContextEntry(attemptId2, key1);
// restart state store and verify recovered
restartStateStore();
secretManager.setNMStateStoreService(stateStore);
state = stateStore.loadAMRMProxyState();
assertEquals(state.getCurrentMasterKey(),
secretManager.getCurrentMasterKeyData().getMasterKey());
assertEquals(state.getNextMasterKey(),
secretManager.getNextMasterKeyData().getMasterKey());
assertEquals(state.getAppContexts().size(), 2);
// app1
map = state.getAppContexts().get(attemptId1);
assertNotEquals(map, null);
assertEquals(map.size(), 2);
assertTrue(Arrays.equals(map.get(key1), data1));
assertTrue(Arrays.equals(map.get(key2), data2));
// app2
map = state.getAppContexts().get(attemptId2);
assertNotEquals(map, null);
assertEquals(map.size(), 1);
assertTrue(Arrays.equals(map.get(key2), data2));
// Activate next master key and remove all entries of app1
secretManager.activateNextMasterKey();
stateStore.removeAMRMProxyAppContext(attemptId1);
// restart state store and verify recovered
restartStateStore();
secretManager.setNMStateStoreService(stateStore);
state = stateStore.loadAMRMProxyState();
assertEquals(state.getCurrentMasterKey(),
secretManager.getCurrentMasterKeyData().getMasterKey());
assertEquals(state.getNextMasterKey(), null);
assertEquals(state.getAppContexts().size(), 1);
// app2 only
map = state.getAppContexts().get(attemptId2);
assertNotEquals(map, null);
assertEquals(map.size(), 1);
assertTrue(Arrays.equals(map.get(key2), data2));
} finally {
secretManager.stop();
}
}
private static class NMTokenSecretManagerForTest extends
BaseNMTokenSecretManager {
public MasterKey generateKey() {

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -935,9 +936,10 @@ public class MiniYARNCluster extends CompositeService {
@Override
protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
String user, Token<AMRMTokenIdentifier> amrmToken,
Token<AMRMTokenIdentifier> localToken) {
Token<AMRMTokenIdentifier> localToken,
Map<String, byte[]> recoveredDataMap, boolean isRecovery) {
super.initializePipeline(applicationAttemptId, user, amrmToken,
localToken);
localToken, recoveredDataMap, isRecovery);
RequestInterceptor rt = getPipelines()
.get(applicationAttemptId.getApplicationId()).getRootInterceptor();
// The DefaultRequestInterceptor will generally be the last