MAPREDUCE-5332. Support token-preserving restart of history server. Contributed by Jason Lowe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1527015 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-09-27 18:19:41 +00:00
parent 3caca924bc
commit 2627e352d6
14 changed files with 1258 additions and 17 deletions

View File

@ -155,6 +155,8 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5411. Refresh size of loaded job cache on history server (Ashwin
Shankar via jlowe)
MAPREDUCE-5332. Support token-preserving restart of history server (jlowe)
IMPROVEMENTS
MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and

View File

@ -164,6 +164,27 @@ public class JHAdminConfig {
public static final String MR_HISTORY_STORAGE =
MR_HISTORY_PREFIX + "store.class";
/**
* Enable the history server to store server state and recover server state
* upon startup.
*/
public static final String MR_HS_RECOVERY_ENABLE =
MR_HISTORY_PREFIX + "recovery.enable";
public static final boolean DEFAULT_MR_HS_RECOVERY_ENABLE = false;
/**
* The HistoryServerStateStoreService class to store and recover server state
*/
public static final String MR_HS_STATE_STORE =
MR_HISTORY_PREFIX + "recovery.store.class";
/**
* The URI where server state will be stored when
* HistoryServerFileSystemStateStoreService is configured as the state store
*/
public static final String MR_HS_FS_STATE_STORE_URI =
MR_HISTORY_PREFIX + "recovery.store.fs.uri";
/** Whether to use fixed ports with the minicluster. */
public static final String MR_HISTORY_MINICLUSTER_FIXED_PORTS = MR_HISTORY_PREFIX
+ "minicluster.fixed.ports";

View File

@ -1181,4 +1181,28 @@
<description>ACL of who can be admin of the History server.</description>
</property>
<property>
<name>mapreduce.jobhistory.recovery.enable</name>
<value>false</value>
<description>Enable the history server to store server state and recover
server state upon startup. If enabled then
mapreduce.jobhistory.recovery.store.class must be specified.</description>
</property>
<property>
<name>mapreduce.jobhistory.recovery.store.class</name>
<value>org.apache.hadoop.mapreduce.v2.hs.HistoryServerFileSystemStateStoreService</value>
<description>The HistoryServerStateStoreService class to store history server
state for recovery.</description>
</property>
<property>
<name>mapreduce.jobhistory.recovery.store.fs.uri</name>
<value>${hadoop.tmp.dir}/mapred/history/recoverystore</value>
<!--value>hdfs://localhost:9000/mapred/history/recoverystore</value-->
<description>The URI where history server state will be stored if
HistoryServerFileSystemStateStoreService is configured as the recovery
storage class.</description>
</property>
</configuration>

View File

@ -0,0 +1,370 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@Private
@Unstable
/**
* A history server state storage implementation that supports any persistent
* storage that adheres to the FileSystem interface.
*/
public class HistoryServerFileSystemStateStoreService
extends HistoryServerStateStoreService {
public static final Log LOG =
LogFactory.getLog(HistoryServerFileSystemStateStoreService.class);
private static final String ROOT_STATE_DIR_NAME = "HistoryServerState";
private static final String TOKEN_STATE_DIR_NAME = "tokens";
private static final String TOKEN_KEYS_DIR_NAME = "keys";
private static final String TOKEN_BUCKET_DIR_PREFIX = "tb_";
private static final String TOKEN_BUCKET_NAME_FORMAT =
TOKEN_BUCKET_DIR_PREFIX + "%03d";
private static final String TOKEN_MASTER_KEY_FILE_PREFIX = "key_";
private static final String TOKEN_FILE_PREFIX = "token_";
private static final String TMP_FILE_PREFIX = "tmp-";
private static final FsPermission DIR_PERMISSIONS =
new FsPermission((short)0700);
private static final FsPermission FILE_PERMISSIONS =
new FsPermission((short)0400);
private static final int NUM_TOKEN_BUCKETS = 1000;
private FileSystem fs;
private Path rootStatePath;
private Path tokenStatePath;
private Path tokenKeysStatePath;
@Override
protected void initStorage(Configuration conf)
throws IOException {
final String storeUri = conf.get(JHAdminConfig.MR_HS_FS_STATE_STORE_URI);
if (storeUri == null) {
throw new IOException("No store location URI configured in " +
JHAdminConfig.MR_HS_FS_STATE_STORE_URI);
}
LOG.info("Using " + storeUri + " for history server state storage");
rootStatePath = new Path(storeUri, ROOT_STATE_DIR_NAME);
}
@Override
protected void startStorage() throws IOException {
fs = rootStatePath.getFileSystem(getConfig());
createDir(rootStatePath);
tokenStatePath = new Path(rootStatePath, TOKEN_STATE_DIR_NAME);
createDir(tokenStatePath);
tokenKeysStatePath = new Path(tokenStatePath, TOKEN_KEYS_DIR_NAME);
createDir(tokenKeysStatePath);
for (int i=0; i < NUM_TOKEN_BUCKETS; ++i) {
createDir(getTokenBucketPath(i));
}
}
@Override
protected void closeStorage() throws IOException {
// don't close the filesystem as it's part of the filesystem cache
// and other clients may still be using it
}
@Override
public HistoryServerState loadState() throws IOException {
LOG.info("Loading history server state from " + rootStatePath);
HistoryServerState state = new HistoryServerState();
loadTokenState(state);
return state;
}
@Override
public void storeToken(MRDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing token " + tokenId.getSequenceNumber());
}
Path tokenPath = getTokenPath(tokenId);
if (fs.exists(tokenPath)) {
throw new IOException(tokenPath + " already exists");
}
createFile(tokenPath, buildTokenData(tokenId, renewDate));
}
@Override
public void updateToken(MRDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Updating token " + tokenId.getSequenceNumber());
}
createFile(getTokenPath(tokenId), buildTokenData(tokenId, renewDate));
}
@Override
public void removeToken(MRDelegationTokenIdentifier tokenId)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing token " + tokenId.getSequenceNumber());
}
deleteFile(getTokenPath(tokenId));
}
@Override
public void storeTokenMasterKey(DelegationKey key) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing master key " + key.getKeyId());
}
Path keyPath = new Path(tokenKeysStatePath,
TOKEN_MASTER_KEY_FILE_PREFIX + key.getKeyId());
if (fs.exists(keyPath)) {
throw new IOException(keyPath + " already exists");
}
ByteArrayOutputStream memStream = new ByteArrayOutputStream();
DataOutputStream dataStream = new DataOutputStream(memStream);
try {
key.write(dataStream);
} finally {
IOUtils.cleanup(LOG, dataStream);
}
createFile(keyPath, memStream.toByteArray());
}
@Override
public void removeTokenMasterKey(DelegationKey key)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing master key " + key.getKeyId());
}
Path keyPath = new Path(tokenKeysStatePath,
TOKEN_MASTER_KEY_FILE_PREFIX + key.getKeyId());
deleteFile(keyPath);
}
private static int getBucketId(MRDelegationTokenIdentifier tokenId) {
return tokenId.getSequenceNumber() % NUM_TOKEN_BUCKETS;
}
private Path getTokenBucketPath(int bucketId) {
return new Path(tokenStatePath,
String.format(TOKEN_BUCKET_NAME_FORMAT, bucketId));
}
private Path getTokenPath(MRDelegationTokenIdentifier tokenId) {
Path bucketPath = getTokenBucketPath(getBucketId(tokenId));
return new Path(bucketPath,
TOKEN_FILE_PREFIX + tokenId.getSequenceNumber());
}
private void createDir(Path dir) throws IOException {
try {
FileStatus status = fs.getFileStatus(dir);
if (!status.isDirectory()) {
throw new FileAlreadyExistsException("Unexpected file in store: "
+ dir);
}
if (!status.getPermission().equals(DIR_PERMISSIONS)) {
fs.setPermission(dir, DIR_PERMISSIONS);
}
} catch (FileNotFoundException e) {
fs.mkdirs(dir, DIR_PERMISSIONS);
}
}
private void createFile(Path file, byte[] data) throws IOException {
final int WRITE_BUFFER_SIZE = 4096;
Path tmp = new Path(file.getParent(), TMP_FILE_PREFIX + file.getName());
FSDataOutputStream out = fs.create(tmp, FILE_PERMISSIONS, true,
WRITE_BUFFER_SIZE, fs.getDefaultReplication(tmp),
fs.getDefaultBlockSize(tmp), null);
try {
try {
out.write(data);
} finally {
IOUtils.cleanup(LOG, out);
}
if (!fs.rename(tmp, file)) {
throw new IOException("Could not rename " + tmp + " to " + file);
}
} catch (IOException e) {
fs.delete(tmp, false);
throw e;
}
}
private byte[] readFile(Path file, long numBytes) throws IOException {
byte[] data = new byte[(int)numBytes];
FSDataInputStream in = fs.open(file);
try {
in.readFully(data);
} finally {
IOUtils.cleanup(LOG, in);
}
return data;
}
private void deleteFile(Path file) throws IOException {
boolean deleted;
try {
deleted = fs.delete(file, false);
} catch (FileNotFoundException e) {
deleted = true;
}
if (!deleted) {
throw new IOException("Unable to delete " + file);
}
}
private byte[] buildTokenData(MRDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException {
ByteArrayOutputStream memStream = new ByteArrayOutputStream();
DataOutputStream dataStream = new DataOutputStream(memStream);
try {
tokenId.write(dataStream);
dataStream.writeLong(renewDate);
} finally {
IOUtils.cleanup(LOG, dataStream);
}
return memStream.toByteArray();
}
private void loadTokenMasterKey(HistoryServerState state, Path keyFile,
long numKeyFileBytes) throws IOException {
DelegationKey key = new DelegationKey();
byte[] keyData = readFile(keyFile, numKeyFileBytes);
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(keyData));
try {
key.readFields(in);
} finally {
IOUtils.cleanup(LOG, in);
}
state.tokenMasterKeyState.add(key);
}
private MRDelegationTokenIdentifier loadToken(HistoryServerState state,
Path tokenFile, long numTokenFileBytes) throws IOException {
MRDelegationTokenIdentifier tokenId = new MRDelegationTokenIdentifier();
long renewDate;
byte[] tokenData = readFile(tokenFile, numTokenFileBytes);
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(tokenData));
try {
tokenId.readFields(in);
renewDate = in.readLong();
} finally {
IOUtils.cleanup(LOG, in);
}
state.tokenState.put(tokenId, renewDate);
return tokenId;
}
private int loadTokensFromBucket(HistoryServerState state, Path bucket)
throws IOException {
String numStr =
bucket.getName().substring(TOKEN_BUCKET_DIR_PREFIX.length());
final int bucketId = Integer.parseInt(numStr);
int numTokens = 0;
FileStatus[] tokenStats = fs.listStatus(bucket);
for (FileStatus stat : tokenStats) {
String name = stat.getPath().getName();
if (name.startsWith(TOKEN_FILE_PREFIX)) {
MRDelegationTokenIdentifier token =
loadToken(state, stat.getPath(), stat.getLen());
int tokenBucketId = getBucketId(token);
if (tokenBucketId != bucketId) {
throw new IOException("Token " + stat.getPath()
+ " should be in bucket " + tokenBucketId + ", found in bucket "
+ bucketId);
}
++numTokens;
} else {
LOG.warn("Skipping unexpected file in history server token bucket: "
+ stat.getPath());
}
}
return numTokens;
}
private int loadKeys(HistoryServerState state) throws IOException {
FileStatus[] stats = fs.listStatus(tokenKeysStatePath);
int numKeys = 0;
for (FileStatus stat : stats) {
String name = stat.getPath().getName();
if (name.startsWith(TOKEN_MASTER_KEY_FILE_PREFIX)) {
loadTokenMasterKey(state, stat.getPath(), stat.getLen());
++numKeys;
} else {
LOG.warn("Skipping unexpected file in history server token state: "
+ stat.getPath());
}
}
return numKeys;
}
private int loadTokens(HistoryServerState state) throws IOException {
FileStatus[] stats = fs.listStatus(tokenStatePath);
int numTokens = 0;
for (FileStatus stat : stats) {
String name = stat.getPath().getName();
if (name.startsWith(TOKEN_BUCKET_DIR_PREFIX)) {
numTokens += loadTokensFromBucket(state, stat.getPath());
} else if (name.equals(TOKEN_KEYS_DIR_NAME)) {
// key loading is done elsewhere
continue;
} else {
LOG.warn("Skipping unexpected file in history server token state: "
+ stat.getPath());
}
}
return numTokens;
}
private void loadTokenState(HistoryServerState state) throws IOException {
int numKeys = loadKeys(state);
int numTokens = loadTokens(state);
LOG.info("Loaded " + numKeys + " master keys and " + numTokens
+ " tokens from " + tokenStatePath);
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@Private
@Unstable
public class HistoryServerNullStateStoreService
extends HistoryServerStateStoreService {
@Override
protected void initStorage(Configuration conf) throws IOException {
// Do nothing
}
@Override
protected void startStorage() throws IOException {
// Do nothing
}
@Override
protected void closeStorage() throws IOException {
// Do nothing
}
@Override
public HistoryServerState loadState() throws IOException {
throw new UnsupportedOperationException(
"Cannot load state from null store");
}
@Override
public void storeToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
throws IOException {
// Do nothing
}
@Override
public void updateToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
throws IOException {
// Do nothing
}
@Override
public void removeToken(MRDelegationTokenIdentifier tokenId)
throws IOException {
// Do nothing
}
@Override
public void storeTokenMasterKey(DelegationKey key) throws IOException {
// Do nothing
}
@Override
public void removeTokenMasterKey(DelegationKey key) throws IOException {
// Do nothing
}
}

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.AbstractService;
@Private
@Unstable
/**
* Base class for history server state storage.
* Storage implementations need to implement blocking store and load methods
* to actually store and load the state.
*/
public abstract class HistoryServerStateStoreService extends AbstractService {
public static class HistoryServerState {
Map<MRDelegationTokenIdentifier, Long> tokenState =
new HashMap<MRDelegationTokenIdentifier, Long>();
Set<DelegationKey> tokenMasterKeyState = new HashSet<DelegationKey>();
public Map<MRDelegationTokenIdentifier, Long> getTokenState() {
return tokenState;
}
public Set<DelegationKey> getTokenMasterKeyState() {
return tokenMasterKeyState;
}
}
public HistoryServerStateStoreService() {
super(HistoryServerStateStoreService.class.getName());
}
/**
* Initialize the state storage
*
* @param conf the configuration
* @throws IOException
*/
@Override
public void serviceInit(Configuration conf) throws IOException {
initStorage(conf);
}
/**
* Start the state storage for use
*
* @throws IOException
*/
@Override
public void serviceStart() throws IOException {
startStorage();
}
/**
* Shutdown the state storage.
*
* @throws IOException
*/
@Override
public void serviceStop() throws IOException {
closeStorage();
}
/**
* Implementation-specific initialization.
*
* @param conf the configuration
* @throws IOException
*/
protected abstract void initStorage(Configuration conf) throws IOException;
/**
* Implementation-specific startup.
*
* @throws IOException
*/
protected abstract void startStorage() throws IOException;
/**
* Implementation-specific shutdown.
*
* @throws IOException
*/
protected abstract void closeStorage() throws IOException;
/**
* Load the history server state from the state storage.
*
* @throws IOException
*/
public abstract HistoryServerState loadState() throws IOException;
/**
* Blocking method to store a delegation token along with the current token
* sequence number to the state storage.
*
* Implementations must not return from this method until the token has been
* committed to the state store.
*
* @param tokenId the token to store
* @param renewDate the token renewal deadline
* @throws IOException
*/
public abstract void storeToken(MRDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException;
/**
* Blocking method to update the expiration of a delegation token
* in the state storage.
*
* Implementations must not return from this method until the expiration
* date of the token has been updated in the state store.
*
* @param tokenId the token to update
* @param renewDate the new token renewal deadline
* @throws IOException
*/
public abstract void updateToken(MRDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException;
/**
* Blocking method to remove a delegation token from the state storage.
*
* Implementations must not return from this method until the token has been
* removed from the state store.
*
* @param tokenId the token to remove
* @throws IOException
*/
public abstract void removeToken(MRDelegationTokenIdentifier tokenId)
throws IOException;
/**
* Blocking method to store a delegation token master key.
*
* Implementations must not return from this method until the key has been
* committed to the state store.
*
* @param key the master key to store
* @throws IOException
*/
public abstract void storeTokenMasterKey(
DelegationKey key) throws IOException;
/**
* Blocking method to remove a delegation token master key.
*
* Implementations must not return from this method until the key has been
* removed from the state store.
*
* @param key the master key to remove
* @throws IOException
*/
public abstract void removeTokenMasterKey(DelegationKey key)
throws IOException;
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.util.ReflectionUtils;
public class HistoryServerStateStoreServiceFactory {
/**
* Constructs an instance of the configured storage class
*
* @param conf the configuration
* @return the state storage instance
*/
public static HistoryServerStateStoreService getStore(Configuration conf) {
Class<? extends HistoryServerStateStoreService> storeClass =
HistoryServerNullStateStoreService.class;
boolean recoveryEnabled = conf.getBoolean(
JHAdminConfig.MR_HS_RECOVERY_ENABLE,
JHAdminConfig.DEFAULT_MR_HS_RECOVERY_ENABLE);
if (recoveryEnabled) {
storeClass = conf.getClass(JHAdminConfig.MR_HS_STATE_STORE, null,
HistoryServerStateStoreService.class);
if (storeClass == null) {
throw new RuntimeException("Unable to locate storage class, check "
+ JHAdminConfig.MR_HS_STATE_STORE);
}
}
return ReflectionUtils.newInstance(storeClass, conf);
}
}

View File

@ -18,10 +18,17 @@
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.DelegationKey;
/**
* A MapReduce specific delegation token secret manager.
@ -33,6 +40,11 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecret
public class JHSDelegationTokenSecretManager
extends AbstractDelegationTokenSecretManager<MRDelegationTokenIdentifier> {
private static final Log LOG = LogFactory.getLog(
JHSDelegationTokenSecretManager.class);
private HistoryServerStateStoreService store;
/**
* Create a secret manager
* @param delegationKeyUpdateInterval the number of seconds for rolling new
@ -42,17 +54,94 @@ public class JHSDelegationTokenSecretManager
* @param delegationTokenRenewInterval how often the tokens must be renewed
* @param delegationTokenRemoverScanInterval how often the tokens are scanned
* for expired tokens
* @param store history server state store for persisting state
*/
public JHSDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime,
long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval) {
long delegationTokenRemoverScanInterval,
HistoryServerStateStoreService store) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
this.store = store;
}
@Override
public MRDelegationTokenIdentifier createIdentifier() {
return new MRDelegationTokenIdentifier();
}
@Override
protected void storeNewMasterKey(DelegationKey key) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing master key " + key.getKeyId());
}
try {
store.storeTokenMasterKey(key);
} catch (IOException e) {
LOG.error("Unable to store master key " + key.getKeyId(), e);
}
}
@Override
protected void removeStoredMasterKey(DelegationKey key) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing master key " + key.getKeyId());
}
try {
store.removeTokenMasterKey(key);
} catch (IOException e) {
LOG.error("Unable to remove master key " + key.getKeyId(), e);
}
}
@Override
protected void storeNewToken(MRDelegationTokenIdentifier tokenId,
long renewDate) {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing token " + tokenId.getSequenceNumber());
}
try {
store.storeToken(tokenId, renewDate);
} catch (IOException e) {
LOG.error("Unable to store token " + tokenId.getSequenceNumber(), e);
}
}
@Override
protected void removeStoredToken(MRDelegationTokenIdentifier tokenId)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing token " + tokenId.getSequenceNumber());
}
try {
store.removeToken(tokenId);
} catch (IOException e) {
LOG.error("Unable to remove token " + tokenId.getSequenceNumber(), e);
}
}
@Override
protected void updateStoredToken(MRDelegationTokenIdentifier tokenId,
long renewDate) {
if (LOG.isDebugEnabled()) {
LOG.debug("Updating token " + tokenId.getSequenceNumber());
}
try {
store.updateToken(tokenId, renewDate);
} catch (IOException e) {
LOG.error("Unable to update token " + tokenId.getSequenceNumber(), e);
}
}
public void recover(HistoryServerState state) throws IOException {
LOG.info("Recovering " + getClass().getSimpleName());
for (DelegationKey key : state.tokenMasterKeyState) {
addKey(key);
}
for (Entry<MRDelegationTokenIdentifier, Long> entry :
state.tokenState.entrySet()) {
addPersistedDelegationToken(entry.getKey(), entry.getValue());
}
}
}

View File

@ -28,11 +28,13 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil;
import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState;
import org.apache.hadoop.mapreduce.v2.hs.server.HSAdminServer;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager;
@ -64,6 +66,46 @@ public class JobHistoryServer extends CompositeService {
private JHSDelegationTokenSecretManager jhsDTSecretManager;
private AggregatedLogDeletionService aggLogDelService;
private HSAdminServer hsAdminServer;
private HistoryServerStateStoreService stateStore;
// utility class to start and stop secret manager as part of service
// framework and implement state recovery for secret manager on startup
private class HistoryServerSecretManagerService
extends AbstractService {
public HistoryServerSecretManagerService() {
super(HistoryServerSecretManagerService.class.getName());
}
@Override
protected void serviceStart() throws Exception {
boolean recoveryEnabled = getConfig().getBoolean(
JHAdminConfig.MR_HS_RECOVERY_ENABLE,
JHAdminConfig.DEFAULT_MR_HS_RECOVERY_ENABLE);
if (recoveryEnabled) {
assert stateStore.isInState(STATE.STARTED);
HistoryServerState state = stateStore.loadState();
jhsDTSecretManager.recover(state);
}
try {
jhsDTSecretManager.startThreads();
} catch(IOException io) {
LOG.error("Error while starting the Secret Manager threads", io);
throw io;
}
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (jhsDTSecretManager != null) {
jhsDTSecretManager.stopThreads();
}
super.serviceStop();
}
}
public JobHistoryServer() {
super(JobHistoryServer.class.getName());
@ -86,11 +128,14 @@ public class JobHistoryServer extends CompositeService {
}
jobHistoryService = new JobHistory();
historyContext = (HistoryContext)jobHistoryService;
this.jhsDTSecretManager = createJHSSecretManager(conf);
stateStore = createStateStore(conf);
this.jhsDTSecretManager = createJHSSecretManager(conf, stateStore);
clientService = new HistoryClientService(historyContext,
this.jhsDTSecretManager);
aggLogDelService = new AggregatedLogDeletionService();
hsAdminServer = new HSAdminServer(aggLogDelService, jobHistoryService);
addService(stateStore);
addService(new HistoryServerSecretManagerService());
addService(jobHistoryService);
addService(clientService);
addService(aggLogDelService);
@ -99,7 +144,7 @@ public class JobHistoryServer extends CompositeService {
}
protected JHSDelegationTokenSecretManager createJHSSecretManager(
Configuration conf) {
Configuration conf, HistoryServerStateStoreService store) {
long secretKeyInterval =
conf.getLong(MRConfig.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
MRConfig.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
@ -111,9 +156,14 @@ public class JobHistoryServer extends CompositeService {
MRConfig.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
return new JHSDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, 3600000);
tokenMaxLifetime, tokenRenewInterval, 3600000, store);
}
protected HistoryServerStateStoreService createStateStore(
Configuration conf) {
return HistoryServerStateStoreServiceFactory.getStore(conf);
}
protected void doSecureLogin(Configuration conf) throws IOException {
SecurityUtil.login(conf, JHAdminConfig.MR_HISTORY_KEYTAB,
JHAdminConfig.MR_HISTORY_PRINCIPAL);
@ -123,20 +173,11 @@ public class JobHistoryServer extends CompositeService {
protected void serviceStart() throws Exception {
DefaultMetricsSystem.initialize("JobHistoryServer");
JvmMetrics.initSingleton("JobHistoryServer", null);
try {
jhsDTSecretManager.startThreads();
} catch(IOException io) {
LOG.error("Error while starting the Secret Manager threads", io);
throw io;
}
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (jhsDTSecretManager != null) {
jhsDTSecretManager.stopThreads();
}
DefaultMetricsSystem.shutdown();
super.serviceStop();
}

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.DelegationKey;
/**
* A state store backed by memory for unit tests
*/
class HistoryServerMemStateStoreService
extends HistoryServerStateStoreService {
HistoryServerState state;
@Override
protected void initStorage(Configuration conf) throws IOException {
}
@Override
protected void startStorage() throws IOException {
state = new HistoryServerState();
}
@Override
protected void closeStorage() throws IOException {
state = null;
}
@Override
public HistoryServerState loadState() throws IOException {
HistoryServerState result = new HistoryServerState();
result.tokenState.putAll(state.tokenState);
result.tokenMasterKeyState.addAll(state.tokenMasterKeyState);
return result;
}
@Override
public void storeToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
throws IOException {
if (state.tokenState.containsKey(tokenId)) {
throw new IOException("token " + tokenId + " was stored twice");
}
state.tokenState.put(tokenId, renewDate);
}
@Override
public void updateToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
throws IOException {
if (!state.tokenState.containsKey(tokenId)) {
throw new IOException("token " + tokenId + " not in store");
}
state.tokenState.put(tokenId, renewDate);
}
@Override
public void removeToken(MRDelegationTokenIdentifier tokenId)
throws IOException {
state.tokenState.remove(tokenId);
}
@Override
public void storeTokenMasterKey(DelegationKey key) throws IOException {
if (state.tokenMasterKeyState.contains(key)) {
throw new IOException("token master key " + key + " was stored twice");
}
state.tokenMasterKeyState.add(key);
}
@Override
public void removeTokenMasterKey(DelegationKey key) throws IOException {
state.tokenMasterKeyState.remove(key);
}
}

View File

@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestHistoryServerFileSystemStateStoreService {
private static final File testDir = new File(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
"TestHistoryServerFileSystemStateStoreService");
private Configuration conf;
@Before
public void setup() {
FileUtil.fullyDelete(testDir);
testDir.mkdirs();
conf = new Configuration();
conf.setBoolean(JHAdminConfig.MR_HS_RECOVERY_ENABLE, true);
conf.setClass(JHAdminConfig.MR_HS_STATE_STORE,
HistoryServerFileSystemStateStoreService.class,
HistoryServerStateStoreService.class);
conf.set(JHAdminConfig.MR_HS_FS_STATE_STORE_URI,
testDir.getAbsoluteFile().toURI().toString());
}
@After
public void cleanup() {
FileUtil.fullyDelete(testDir);
}
private HistoryServerStateStoreService createAndStartStore()
throws IOException {
HistoryServerStateStoreService store =
HistoryServerStateStoreServiceFactory.getStore(conf);
assertTrue("Factory did not create a filesystem store",
store instanceof HistoryServerFileSystemStateStoreService);
store.init(conf);
store.start();
return store;
}
@Test
public void testTokenStore() throws IOException {
HistoryServerStateStoreService store = createAndStartStore();
HistoryServerState state = store.loadState();
assertTrue("token state not empty", state.tokenState.isEmpty());
assertTrue("key state not empty", state.tokenMasterKeyState.isEmpty());
final DelegationKey key1 = new DelegationKey(1, 2, "keyData1".getBytes());
final MRDelegationTokenIdentifier token1 =
new MRDelegationTokenIdentifier(new Text("tokenOwner1"),
new Text("tokenRenewer1"), new Text("tokenUser1"));
token1.setSequenceNumber(1);
final Long tokenDate1 = 1L;
final MRDelegationTokenIdentifier token2 =
new MRDelegationTokenIdentifier(new Text("tokenOwner2"),
new Text("tokenRenewer2"), new Text("tokenUser2"));
token2.setSequenceNumber(12345678);
final Long tokenDate2 = 87654321L;
store.storeTokenMasterKey(key1);
try {
store.storeTokenMasterKey(key1);
fail("redundant store of key undetected");
} catch (IOException e) {
// expected
}
store.storeToken(token1, tokenDate1);
store.storeToken(token2, tokenDate2);
try {
store.storeToken(token1, tokenDate1);
fail("redundant store of token undetected");
} catch (IOException e) {
// expected
}
store.close();
store = createAndStartStore();
state = store.loadState();
assertEquals("incorrect loaded token count", 2, state.tokenState.size());
assertTrue("missing token 1", state.tokenState.containsKey(token1));
assertEquals("incorrect token 1 date", tokenDate1,
state.tokenState.get(token1));
assertTrue("missing token 2", state.tokenState.containsKey(token2));
assertEquals("incorrect token 2 date", tokenDate2,
state.tokenState.get(token2));
assertEquals("incorrect master key count", 1,
state.tokenMasterKeyState.size());
assertTrue("missing master key 1",
state.tokenMasterKeyState.contains(key1));
final DelegationKey key2 = new DelegationKey(3, 4, "keyData2".getBytes());
final DelegationKey key3 = new DelegationKey(5, 6, "keyData3".getBytes());
final MRDelegationTokenIdentifier token3 =
new MRDelegationTokenIdentifier(new Text("tokenOwner3"),
new Text("tokenRenewer3"), new Text("tokenUser3"));
token3.setSequenceNumber(12345679);
final Long tokenDate3 = 87654321L;
store.removeToken(token1);
store.storeTokenMasterKey(key2);
final Long newTokenDate2 = 975318642L;
store.updateToken(token2, newTokenDate2);
store.removeTokenMasterKey(key1);
store.storeTokenMasterKey(key3);
store.storeToken(token3, tokenDate3);
store.close();
store = createAndStartStore();
state = store.loadState();
assertEquals("incorrect loaded token count", 2, state.tokenState.size());
assertFalse("token 1 not removed", state.tokenState.containsKey(token1));
assertTrue("missing token 2", state.tokenState.containsKey(token2));
assertEquals("incorrect token 2 date", newTokenDate2,
state.tokenState.get(token2));
assertTrue("missing token 3", state.tokenState.containsKey(token3));
assertEquals("incorrect token 3 date", tokenDate3,
state.tokenState.get(token3));
assertEquals("incorrect master key count", 2,
state.tokenMasterKeyState.size());
assertFalse("master key 1 not removed",
state.tokenMasterKeyState.contains(key1));
assertTrue("missing master key 2",
state.tokenMasterKeyState.contains(key2));
assertTrue("missing master key 3",
state.tokenMasterKeyState.contains(key3));
}
}

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.junit.Test;
public class TestJHSDelegationTokenSecretManager {
@Test
public void testRecovery() throws IOException {
Configuration conf = new Configuration();
HistoryServerStateStoreService store =
new HistoryServerMemStateStoreService();
store.init(conf);
store.start();
JHSDelegationTokenSecretManagerForTest mgr =
new JHSDelegationTokenSecretManagerForTest(store);
mgr.startThreads();
MRDelegationTokenIdentifier tokenId1 = new MRDelegationTokenIdentifier(
new Text("tokenOwner"), new Text("tokenRenewer"),
new Text("tokenUser"));
Token<MRDelegationTokenIdentifier> token1 =
new Token<MRDelegationTokenIdentifier>(tokenId1, mgr);
MRDelegationTokenIdentifier tokenId2 = new MRDelegationTokenIdentifier(
new Text("tokenOwner"), new Text("tokenRenewer"),
new Text("tokenUser"));
Token<MRDelegationTokenIdentifier> token2 =
new Token<MRDelegationTokenIdentifier>(tokenId2, mgr);
DelegationKey[] keys = mgr.getAllKeys();
long tokenRenewDate1 = mgr.getAllTokens().get(tokenId1).getRenewDate();
long tokenRenewDate2 = mgr.getAllTokens().get(tokenId2).getRenewDate();
mgr.stopThreads();
mgr = new JHSDelegationTokenSecretManagerForTest(store);
mgr.recover(store.loadState());
List<DelegationKey> recoveredKeys = Arrays.asList(mgr.getAllKeys());
for (DelegationKey key : keys) {
assertTrue("key missing after recovery", recoveredKeys.contains(key));
}
assertTrue("token1 missing", mgr.getAllTokens().containsKey(tokenId1));
assertEquals("token1 renew date", tokenRenewDate1,
mgr.getAllTokens().get(tokenId1).getRenewDate());
assertTrue("token2 missing", mgr.getAllTokens().containsKey(tokenId2));
assertEquals("token2 renew date", tokenRenewDate2,
mgr.getAllTokens().get(tokenId2).getRenewDate());
mgr.startThreads();
mgr.verifyToken(tokenId1, token1.getPassword());
mgr.verifyToken(tokenId2, token2.getPassword());
MRDelegationTokenIdentifier tokenId3 = new MRDelegationTokenIdentifier(
new Text("tokenOwner"), new Text("tokenRenewer"),
new Text("tokenUser"));
Token<MRDelegationTokenIdentifier> token3 =
new Token<MRDelegationTokenIdentifier>(tokenId3, mgr);
assertEquals("sequence number restore", tokenId2.getSequenceNumber() + 1,
tokenId3.getSequenceNumber());
mgr.cancelToken(token1, "tokenOwner");
long tokenRenewDate3 = mgr.getAllTokens().get(tokenId3).getRenewDate();
mgr.stopThreads();
mgr = new JHSDelegationTokenSecretManagerForTest(store);
mgr.recover(store.loadState());
assertFalse("token1 should be missing",
mgr.getAllTokens().containsKey(tokenId1));
assertTrue("token2 missing", mgr.getAllTokens().containsKey(tokenId2));
assertEquals("token2 renew date", tokenRenewDate2,
mgr.getAllTokens().get(tokenId2).getRenewDate());
assertTrue("token3 missing", mgr.getAllTokens().containsKey(tokenId3));
assertEquals("token3 renew date", tokenRenewDate3,
mgr.getAllTokens().get(tokenId3).getRenewDate());
mgr.startThreads();
mgr.verifyToken(tokenId2, token2.getPassword());
mgr.verifyToken(tokenId3, token3.getPassword());
mgr.stopThreads();
}
private static class JHSDelegationTokenSecretManagerForTest
extends JHSDelegationTokenSecretManager {
public JHSDelegationTokenSecretManagerForTest(
HistoryServerStateStoreService store) {
super(10000, 10000, 10000, 10000, store);
}
public Map<MRDelegationTokenIdentifier, DelegationTokenInformation> getAllTokens() {
return new HashMap<MRDelegationTokenIdentifier, DelegationTokenInformation>(currentTokens);
}
}
}

View File

@ -72,7 +72,7 @@ public class TestJobHistoryServer {
Configuration config = new Configuration();
historyServer.init(config);
assertEquals(STATE.INITED, historyServer.getServiceState());
assertEquals(4, historyServer.getServices().size());
assertEquals(6, historyServer.getServices().size());
HistoryClientService historyService = historyServer.getClientService();
assertNotNull(historyServer.getClientService());
assertEquals(STATE.INITED, historyService.getServiceState());

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenR
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService;
import org.apache.hadoop.mapreduce.v2.hs.JHSDelegationTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
@ -87,10 +88,11 @@ public class TestJHSSecurity {
// no keytab based login
};
@Override
protected JHSDelegationTokenSecretManager createJHSSecretManager(
Configuration conf) {
Configuration conf, HistoryServerStateStoreService store) {
return new JHSDelegationTokenSecretManager(initialInterval,
maxLifetime, renewInterval, 3600000);
maxLifetime, renewInterval, 3600000, store);
}
};
// final JobHistoryServer jobHistoryServer = jhServer;