From 2627e352d630d19c35b97eea9ef603342feb379f Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Fri, 27 Sep 2013 18:19:41 +0000 Subject: [PATCH] MAPREDUCE-5332. Support token-preserving restart of history server. Contributed by Jason Lowe git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1527015 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 2 + .../v2/jobhistory/JHAdminConfig.java | 21 + .../src/main/resources/mapred-default.xml | 24 ++ ...toryServerFileSystemStateStoreService.java | 370 ++++++++++++++++++ .../HistoryServerNullStateStoreService.java | 82 ++++ .../v2/hs/HistoryServerStateStoreService.java | 184 +++++++++ ...HistoryServerStateStoreServiceFactory.java | 48 +++ .../hs/JHSDelegationTokenSecretManager.java | 91 ++++- .../mapreduce/v2/hs/JobHistoryServer.java | 67 +++- .../hs/HistoryServerMemStateStoreService.java | 92 +++++ ...toryServerFileSystemStateStoreService.java | 164 ++++++++ .../TestJHSDelegationTokenSecretManager.java | 122 ++++++ .../mapreduce/v2/hs/TestJobHistoryServer.java | 2 +- .../mapreduce/security/TestJHSSecurity.java | 6 +- 14 files changed, 1258 insertions(+), 17 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerNullStateStoreService.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreService.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreServiceFactory.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerMemStateStoreService.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJHSDelegationTokenSecretManager.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 42d2084f20f..cf29dabfb0f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -155,6 +155,8 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5411. Refresh size of loaded job cache on history server (Ashwin Shankar via jlowe) + MAPREDUCE-5332. Support token-preserving restart of history server (jlowe) + IMPROVEMENTS MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java index bfe282bd0ee..411170af6c0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java @@ -164,6 +164,27 @@ public class JHAdminConfig { public static final String MR_HISTORY_STORAGE = MR_HISTORY_PREFIX + "store.class"; + /** + * Enable the history server to store server state and recover server state + * upon startup. + */ + public static final String MR_HS_RECOVERY_ENABLE = + MR_HISTORY_PREFIX + "recovery.enable"; + public static final boolean DEFAULT_MR_HS_RECOVERY_ENABLE = false; + + /** + * The HistoryServerStateStoreService class to store and recover server state + */ + public static final String MR_HS_STATE_STORE = + MR_HISTORY_PREFIX + "recovery.store.class"; + + /** + * The URI where server state will be stored when + * HistoryServerFileSystemStateStoreService is configured as the state store + */ + public static final String MR_HS_FS_STATE_STORE_URI = + MR_HISTORY_PREFIX + "recovery.store.fs.uri"; + /** Whether to use fixed ports with the minicluster. */ public static final String MR_HISTORY_MINICLUSTER_FIXED_PORTS = MR_HISTORY_PREFIX + "minicluster.fixed.ports"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index b9796cb798a..dad0264832e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1181,4 +1181,28 @@ ACL of who can be admin of the History server. + + mapreduce.jobhistory.recovery.enable + false + Enable the history server to store server state and recover + server state upon startup. If enabled then + mapreduce.jobhistory.recovery.store.class must be specified. + + + + mapreduce.jobhistory.recovery.store.class + org.apache.hadoop.mapreduce.v2.hs.HistoryServerFileSystemStateStoreService + The HistoryServerStateStoreService class to store history server + state for recovery. + + + + mapreduce.jobhistory.recovery.store.fs.uri + ${hadoop.tmp.dir}/mapred/history/recoverystore + + The URI where history server state will be stored if + HistoryServerFileSystemStateStoreService is configured as the recovery + storage class. + + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java new file mode 100644 index 00000000000..33984cbabbc --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.hs; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.security.token.delegation.DelegationKey; + +@Private +@Unstable +/** + * A history server state storage implementation that supports any persistent + * storage that adheres to the FileSystem interface. + */ +public class HistoryServerFileSystemStateStoreService + extends HistoryServerStateStoreService { + + public static final Log LOG = + LogFactory.getLog(HistoryServerFileSystemStateStoreService.class); + + private static final String ROOT_STATE_DIR_NAME = "HistoryServerState"; + private static final String TOKEN_STATE_DIR_NAME = "tokens"; + private static final String TOKEN_KEYS_DIR_NAME = "keys"; + private static final String TOKEN_BUCKET_DIR_PREFIX = "tb_"; + private static final String TOKEN_BUCKET_NAME_FORMAT = + TOKEN_BUCKET_DIR_PREFIX + "%03d"; + private static final String TOKEN_MASTER_KEY_FILE_PREFIX = "key_"; + private static final String TOKEN_FILE_PREFIX = "token_"; + private static final String TMP_FILE_PREFIX = "tmp-"; + private static final FsPermission DIR_PERMISSIONS = + new FsPermission((short)0700); + private static final FsPermission FILE_PERMISSIONS = + new FsPermission((short)0400); + private static final int NUM_TOKEN_BUCKETS = 1000; + + private FileSystem fs; + private Path rootStatePath; + private Path tokenStatePath; + private Path tokenKeysStatePath; + + @Override + protected void initStorage(Configuration conf) + throws IOException { + final String storeUri = conf.get(JHAdminConfig.MR_HS_FS_STATE_STORE_URI); + if (storeUri == null) { + throw new IOException("No store location URI configured in " + + JHAdminConfig.MR_HS_FS_STATE_STORE_URI); + } + + LOG.info("Using " + storeUri + " for history server state storage"); + rootStatePath = new Path(storeUri, ROOT_STATE_DIR_NAME); + } + + @Override + protected void startStorage() throws IOException { + fs = rootStatePath.getFileSystem(getConfig()); + createDir(rootStatePath); + tokenStatePath = new Path(rootStatePath, TOKEN_STATE_DIR_NAME); + createDir(tokenStatePath); + tokenKeysStatePath = new Path(tokenStatePath, TOKEN_KEYS_DIR_NAME); + createDir(tokenKeysStatePath); + for (int i=0; i < NUM_TOKEN_BUCKETS; ++i) { + createDir(getTokenBucketPath(i)); + } + } + + @Override + protected void closeStorage() throws IOException { + // don't close the filesystem as it's part of the filesystem cache + // and other clients may still be using it + } + + @Override + public HistoryServerState loadState() throws IOException { + LOG.info("Loading history server state from " + rootStatePath); + HistoryServerState state = new HistoryServerState(); + loadTokenState(state); + return state; + } + + @Override + public void storeToken(MRDelegationTokenIdentifier tokenId, + Long renewDate) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing token " + tokenId.getSequenceNumber()); + } + + Path tokenPath = getTokenPath(tokenId); + if (fs.exists(tokenPath)) { + throw new IOException(tokenPath + " already exists"); + } + + createFile(tokenPath, buildTokenData(tokenId, renewDate)); + } + + @Override + public void updateToken(MRDelegationTokenIdentifier tokenId, + Long renewDate) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating token " + tokenId.getSequenceNumber()); + } + createFile(getTokenPath(tokenId), buildTokenData(tokenId, renewDate)); + } + + @Override + public void removeToken(MRDelegationTokenIdentifier tokenId) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing token " + tokenId.getSequenceNumber()); + } + deleteFile(getTokenPath(tokenId)); + } + + @Override + public void storeTokenMasterKey(DelegationKey key) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing master key " + key.getKeyId()); + } + + Path keyPath = new Path(tokenKeysStatePath, + TOKEN_MASTER_KEY_FILE_PREFIX + key.getKeyId()); + if (fs.exists(keyPath)) { + throw new IOException(keyPath + " already exists"); + } + + ByteArrayOutputStream memStream = new ByteArrayOutputStream(); + DataOutputStream dataStream = new DataOutputStream(memStream); + try { + key.write(dataStream); + } finally { + IOUtils.cleanup(LOG, dataStream); + } + + createFile(keyPath, memStream.toByteArray()); + } + + @Override + public void removeTokenMasterKey(DelegationKey key) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing master key " + key.getKeyId()); + } + + Path keyPath = new Path(tokenKeysStatePath, + TOKEN_MASTER_KEY_FILE_PREFIX + key.getKeyId()); + deleteFile(keyPath); + } + + private static int getBucketId(MRDelegationTokenIdentifier tokenId) { + return tokenId.getSequenceNumber() % NUM_TOKEN_BUCKETS; + } + + private Path getTokenBucketPath(int bucketId) { + return new Path(tokenStatePath, + String.format(TOKEN_BUCKET_NAME_FORMAT, bucketId)); + } + + private Path getTokenPath(MRDelegationTokenIdentifier tokenId) { + Path bucketPath = getTokenBucketPath(getBucketId(tokenId)); + return new Path(bucketPath, + TOKEN_FILE_PREFIX + tokenId.getSequenceNumber()); + } + + private void createDir(Path dir) throws IOException { + try { + FileStatus status = fs.getFileStatus(dir); + if (!status.isDirectory()) { + throw new FileAlreadyExistsException("Unexpected file in store: " + + dir); + } + if (!status.getPermission().equals(DIR_PERMISSIONS)) { + fs.setPermission(dir, DIR_PERMISSIONS); + } + } catch (FileNotFoundException e) { + fs.mkdirs(dir, DIR_PERMISSIONS); + } + } + + private void createFile(Path file, byte[] data) throws IOException { + final int WRITE_BUFFER_SIZE = 4096; + Path tmp = new Path(file.getParent(), TMP_FILE_PREFIX + file.getName()); + FSDataOutputStream out = fs.create(tmp, FILE_PERMISSIONS, true, + WRITE_BUFFER_SIZE, fs.getDefaultReplication(tmp), + fs.getDefaultBlockSize(tmp), null); + try { + try { + out.write(data); + } finally { + IOUtils.cleanup(LOG, out); + } + if (!fs.rename(tmp, file)) { + throw new IOException("Could not rename " + tmp + " to " + file); + } + } catch (IOException e) { + fs.delete(tmp, false); + throw e; + } + } + + private byte[] readFile(Path file, long numBytes) throws IOException { + byte[] data = new byte[(int)numBytes]; + FSDataInputStream in = fs.open(file); + try { + in.readFully(data); + } finally { + IOUtils.cleanup(LOG, in); + } + return data; + } + + private void deleteFile(Path file) throws IOException { + boolean deleted; + try { + deleted = fs.delete(file, false); + } catch (FileNotFoundException e) { + deleted = true; + } + if (!deleted) { + throw new IOException("Unable to delete " + file); + } + } + + private byte[] buildTokenData(MRDelegationTokenIdentifier tokenId, + Long renewDate) throws IOException { + ByteArrayOutputStream memStream = new ByteArrayOutputStream(); + DataOutputStream dataStream = new DataOutputStream(memStream); + try { + tokenId.write(dataStream); + dataStream.writeLong(renewDate); + } finally { + IOUtils.cleanup(LOG, dataStream); + } + return memStream.toByteArray(); + } + + private void loadTokenMasterKey(HistoryServerState state, Path keyFile, + long numKeyFileBytes) throws IOException { + DelegationKey key = new DelegationKey(); + byte[] keyData = readFile(keyFile, numKeyFileBytes); + DataInputStream in = + new DataInputStream(new ByteArrayInputStream(keyData)); + try { + key.readFields(in); + } finally { + IOUtils.cleanup(LOG, in); + } + state.tokenMasterKeyState.add(key); + } + + private MRDelegationTokenIdentifier loadToken(HistoryServerState state, + Path tokenFile, long numTokenFileBytes) throws IOException { + MRDelegationTokenIdentifier tokenId = new MRDelegationTokenIdentifier(); + long renewDate; + byte[] tokenData = readFile(tokenFile, numTokenFileBytes); + DataInputStream in = + new DataInputStream(new ByteArrayInputStream(tokenData)); + try { + tokenId.readFields(in); + renewDate = in.readLong(); + } finally { + IOUtils.cleanup(LOG, in); + } + state.tokenState.put(tokenId, renewDate); + return tokenId; + } + + private int loadTokensFromBucket(HistoryServerState state, Path bucket) + throws IOException { + String numStr = + bucket.getName().substring(TOKEN_BUCKET_DIR_PREFIX.length()); + final int bucketId = Integer.parseInt(numStr); + int numTokens = 0; + FileStatus[] tokenStats = fs.listStatus(bucket); + for (FileStatus stat : tokenStats) { + String name = stat.getPath().getName(); + if (name.startsWith(TOKEN_FILE_PREFIX)) { + MRDelegationTokenIdentifier token = + loadToken(state, stat.getPath(), stat.getLen()); + int tokenBucketId = getBucketId(token); + if (tokenBucketId != bucketId) { + throw new IOException("Token " + stat.getPath() + + " should be in bucket " + tokenBucketId + ", found in bucket " + + bucketId); + } + ++numTokens; + } else { + LOG.warn("Skipping unexpected file in history server token bucket: " + + stat.getPath()); + } + } + return numTokens; + } + + private int loadKeys(HistoryServerState state) throws IOException { + FileStatus[] stats = fs.listStatus(tokenKeysStatePath); + int numKeys = 0; + for (FileStatus stat : stats) { + String name = stat.getPath().getName(); + if (name.startsWith(TOKEN_MASTER_KEY_FILE_PREFIX)) { + loadTokenMasterKey(state, stat.getPath(), stat.getLen()); + ++numKeys; + } else { + LOG.warn("Skipping unexpected file in history server token state: " + + stat.getPath()); + } + } + return numKeys; + } + + private int loadTokens(HistoryServerState state) throws IOException { + FileStatus[] stats = fs.listStatus(tokenStatePath); + int numTokens = 0; + for (FileStatus stat : stats) { + String name = stat.getPath().getName(); + if (name.startsWith(TOKEN_BUCKET_DIR_PREFIX)) { + numTokens += loadTokensFromBucket(state, stat.getPath()); + } else if (name.equals(TOKEN_KEYS_DIR_NAME)) { + // key loading is done elsewhere + continue; + } else { + LOG.warn("Skipping unexpected file in history server token state: " + + stat.getPath()); + } + } + return numTokens; + } + + private void loadTokenState(HistoryServerState state) throws IOException { + int numKeys = loadKeys(state); + int numTokens = loadTokens(state); + LOG.info("Loaded " + numKeys + " master keys and " + numTokens + + " tokens from " + tokenStatePath); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerNullStateStoreService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerNullStateStoreService.java new file mode 100644 index 00000000000..19151881ff7 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerNullStateStoreService.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.hs; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.DelegationKey; + +@Private +@Unstable +public class HistoryServerNullStateStoreService + extends HistoryServerStateStoreService { + + @Override + protected void initStorage(Configuration conf) throws IOException { + // Do nothing + } + + @Override + protected void startStorage() throws IOException { + // Do nothing + } + + @Override + protected void closeStorage() throws IOException { + // Do nothing + } + + @Override + public HistoryServerState loadState() throws IOException { + throw new UnsupportedOperationException( + "Cannot load state from null store"); + } + + @Override + public void storeToken(MRDelegationTokenIdentifier tokenId, Long renewDate) + throws IOException { + // Do nothing + } + + @Override + public void updateToken(MRDelegationTokenIdentifier tokenId, Long renewDate) + throws IOException { + // Do nothing + } + + @Override + public void removeToken(MRDelegationTokenIdentifier tokenId) + throws IOException { + // Do nothing + } + + @Override + public void storeTokenMasterKey(DelegationKey key) throws IOException { + // Do nothing + } + + @Override + public void removeTokenMasterKey(DelegationKey key) throws IOException { + // Do nothing + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreService.java new file mode 100644 index 00000000000..95718bf9122 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreService.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.hs; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.service.AbstractService; + +@Private +@Unstable +/** + * Base class for history server state storage. + * Storage implementations need to implement blocking store and load methods + * to actually store and load the state. + */ +public abstract class HistoryServerStateStoreService extends AbstractService { + + public static class HistoryServerState { + Map tokenState = + new HashMap(); + Set tokenMasterKeyState = new HashSet(); + + public Map getTokenState() { + return tokenState; + } + + public Set getTokenMasterKeyState() { + return tokenMasterKeyState; + } + } + + public HistoryServerStateStoreService() { + super(HistoryServerStateStoreService.class.getName()); + } + + /** + * Initialize the state storage + * + * @param conf the configuration + * @throws IOException + */ + @Override + public void serviceInit(Configuration conf) throws IOException { + initStorage(conf); + } + + /** + * Start the state storage for use + * + * @throws IOException + */ + @Override + public void serviceStart() throws IOException { + startStorage(); + } + + /** + * Shutdown the state storage. + * + * @throws IOException + */ + @Override + public void serviceStop() throws IOException { + closeStorage(); + } + + /** + * Implementation-specific initialization. + * + * @param conf the configuration + * @throws IOException + */ + protected abstract void initStorage(Configuration conf) throws IOException; + + /** + * Implementation-specific startup. + * + * @throws IOException + */ + protected abstract void startStorage() throws IOException; + + /** + * Implementation-specific shutdown. + * + * @throws IOException + */ + protected abstract void closeStorage() throws IOException; + + /** + * Load the history server state from the state storage. + * + * @throws IOException + */ + public abstract HistoryServerState loadState() throws IOException; + + /** + * Blocking method to store a delegation token along with the current token + * sequence number to the state storage. + * + * Implementations must not return from this method until the token has been + * committed to the state store. + * + * @param tokenId the token to store + * @param renewDate the token renewal deadline + * @throws IOException + */ + public abstract void storeToken(MRDelegationTokenIdentifier tokenId, + Long renewDate) throws IOException; + + /** + * Blocking method to update the expiration of a delegation token + * in the state storage. + * + * Implementations must not return from this method until the expiration + * date of the token has been updated in the state store. + * + * @param tokenId the token to update + * @param renewDate the new token renewal deadline + * @throws IOException + */ + public abstract void updateToken(MRDelegationTokenIdentifier tokenId, + Long renewDate) throws IOException; + + /** + * Blocking method to remove a delegation token from the state storage. + * + * Implementations must not return from this method until the token has been + * removed from the state store. + * + * @param tokenId the token to remove + * @throws IOException + */ + public abstract void removeToken(MRDelegationTokenIdentifier tokenId) + throws IOException; + + /** + * Blocking method to store a delegation token master key. + * + * Implementations must not return from this method until the key has been + * committed to the state store. + * + * @param key the master key to store + * @throws IOException + */ + public abstract void storeTokenMasterKey( + DelegationKey key) throws IOException; + + /** + * Blocking method to remove a delegation token master key. + * + * Implementations must not return from this method until the key has been + * removed from the state store. + * + * @param key the master key to remove + * @throws IOException + */ + public abstract void removeTokenMasterKey(DelegationKey key) + throws IOException; +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreServiceFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreServiceFactory.java new file mode 100644 index 00000000000..8a9b765cba3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreServiceFactory.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2.hs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.util.ReflectionUtils; + +public class HistoryServerStateStoreServiceFactory { + + /** + * Constructs an instance of the configured storage class + * + * @param conf the configuration + * @return the state storage instance + */ + public static HistoryServerStateStoreService getStore(Configuration conf) { + Class storeClass = + HistoryServerNullStateStoreService.class; + boolean recoveryEnabled = conf.getBoolean( + JHAdminConfig.MR_HS_RECOVERY_ENABLE, + JHAdminConfig.DEFAULT_MR_HS_RECOVERY_ENABLE); + if (recoveryEnabled) { + storeClass = conf.getClass(JHAdminConfig.MR_HS_STATE_STORE, null, + HistoryServerStateStoreService.class); + if (storeClass == null) { + throw new RuntimeException("Unable to locate storage class, check " + + JHAdminConfig.MR_HS_STATE_STORE); + } + } + return ReflectionUtils.newInstance(storeClass, conf); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java index 118bf4e375b..7fac44df8cf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java @@ -18,10 +18,17 @@ package org.apache.hadoop.mapreduce.v2.hs; +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.DelegationKey; /** * A MapReduce specific delegation token secret manager. @@ -33,6 +40,11 @@ public class JHSDelegationTokenSecretManager extends AbstractDelegationTokenSecretManager { + private static final Log LOG = LogFactory.getLog( + JHSDelegationTokenSecretManager.class); + + private HistoryServerStateStoreService store; + /** * Create a secret manager * @param delegationKeyUpdateInterval the number of seconds for rolling new @@ -42,17 +54,94 @@ public class JHSDelegationTokenSecretManager * @param delegationTokenRenewInterval how often the tokens must be renewed * @param delegationTokenRemoverScanInterval how often the tokens are scanned * for expired tokens + * @param store history server state store for persisting state */ public JHSDelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval, - long delegationTokenRemoverScanInterval) { + long delegationTokenRemoverScanInterval, + HistoryServerStateStoreService store) { super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, delegationTokenRemoverScanInterval); + this.store = store; } @Override public MRDelegationTokenIdentifier createIdentifier() { return new MRDelegationTokenIdentifier(); } + + @Override + protected void storeNewMasterKey(DelegationKey key) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing master key " + key.getKeyId()); + } + try { + store.storeTokenMasterKey(key); + } catch (IOException e) { + LOG.error("Unable to store master key " + key.getKeyId(), e); + } + } + + @Override + protected void removeStoredMasterKey(DelegationKey key) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing master key " + key.getKeyId()); + } + try { + store.removeTokenMasterKey(key); + } catch (IOException e) { + LOG.error("Unable to remove master key " + key.getKeyId(), e); + } + } + + @Override + protected void storeNewToken(MRDelegationTokenIdentifier tokenId, + long renewDate) { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing token " + tokenId.getSequenceNumber()); + } + try { + store.storeToken(tokenId, renewDate); + } catch (IOException e) { + LOG.error("Unable to store token " + tokenId.getSequenceNumber(), e); + } + } + + @Override + protected void removeStoredToken(MRDelegationTokenIdentifier tokenId) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing token " + tokenId.getSequenceNumber()); + } + try { + store.removeToken(tokenId); + } catch (IOException e) { + LOG.error("Unable to remove token " + tokenId.getSequenceNumber(), e); + } + } + + @Override + protected void updateStoredToken(MRDelegationTokenIdentifier tokenId, + long renewDate) { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating token " + tokenId.getSequenceNumber()); + } + try { + store.updateToken(tokenId, renewDate); + } catch (IOException e) { + LOG.error("Unable to update token " + tokenId.getSequenceNumber(), e); + } + } + + public void recover(HistoryServerState state) throws IOException { + LOG.info("Recovering " + getClass().getSimpleName()); + for (DelegationKey key : state.tokenMasterKeyState) { + addKey(key); + } + for (Entry entry : + state.tokenState.entrySet()) { + addPersistedDelegationToken(entry.getKey(), entry.getValue()); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java index 0e610d34e89..19c3f054a77 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java @@ -28,11 +28,13 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil; +import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState; import org.apache.hadoop.mapreduce.v2.hs.server.HSAdminServer; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ShutdownHookManager; @@ -64,6 +66,46 @@ public class JobHistoryServer extends CompositeService { private JHSDelegationTokenSecretManager jhsDTSecretManager; private AggregatedLogDeletionService aggLogDelService; private HSAdminServer hsAdminServer; + private HistoryServerStateStoreService stateStore; + + // utility class to start and stop secret manager as part of service + // framework and implement state recovery for secret manager on startup + private class HistoryServerSecretManagerService + extends AbstractService { + + public HistoryServerSecretManagerService() { + super(HistoryServerSecretManagerService.class.getName()); + } + + @Override + protected void serviceStart() throws Exception { + boolean recoveryEnabled = getConfig().getBoolean( + JHAdminConfig.MR_HS_RECOVERY_ENABLE, + JHAdminConfig.DEFAULT_MR_HS_RECOVERY_ENABLE); + if (recoveryEnabled) { + assert stateStore.isInState(STATE.STARTED); + HistoryServerState state = stateStore.loadState(); + jhsDTSecretManager.recover(state); + } + + try { + jhsDTSecretManager.startThreads(); + } catch(IOException io) { + LOG.error("Error while starting the Secret Manager threads", io); + throw io; + } + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (jhsDTSecretManager != null) { + jhsDTSecretManager.stopThreads(); + } + super.serviceStop(); + } + } public JobHistoryServer() { super(JobHistoryServer.class.getName()); @@ -86,11 +128,14 @@ protected void serviceInit(Configuration conf) throws Exception { } jobHistoryService = new JobHistory(); historyContext = (HistoryContext)jobHistoryService; - this.jhsDTSecretManager = createJHSSecretManager(conf); + stateStore = createStateStore(conf); + this.jhsDTSecretManager = createJHSSecretManager(conf, stateStore); clientService = new HistoryClientService(historyContext, this.jhsDTSecretManager); aggLogDelService = new AggregatedLogDeletionService(); hsAdminServer = new HSAdminServer(aggLogDelService, jobHistoryService); + addService(stateStore); + addService(new HistoryServerSecretManagerService()); addService(jobHistoryService); addService(clientService); addService(aggLogDelService); @@ -99,7 +144,7 @@ protected void serviceInit(Configuration conf) throws Exception { } protected JHSDelegationTokenSecretManager createJHSSecretManager( - Configuration conf) { + Configuration conf, HistoryServerStateStoreService store) { long secretKeyInterval = conf.getLong(MRConfig.DELEGATION_KEY_UPDATE_INTERVAL_KEY, MRConfig.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); @@ -111,9 +156,14 @@ protected JHSDelegationTokenSecretManager createJHSSecretManager( MRConfig.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); return new JHSDelegationTokenSecretManager(secretKeyInterval, - tokenMaxLifetime, tokenRenewInterval, 3600000); + tokenMaxLifetime, tokenRenewInterval, 3600000, store); } - + + protected HistoryServerStateStoreService createStateStore( + Configuration conf) { + return HistoryServerStateStoreServiceFactory.getStore(conf); + } + protected void doSecureLogin(Configuration conf) throws IOException { SecurityUtil.login(conf, JHAdminConfig.MR_HISTORY_KEYTAB, JHAdminConfig.MR_HISTORY_PRINCIPAL); @@ -123,20 +173,11 @@ protected void doSecureLogin(Configuration conf) throws IOException { protected void serviceStart() throws Exception { DefaultMetricsSystem.initialize("JobHistoryServer"); JvmMetrics.initSingleton("JobHistoryServer", null); - try { - jhsDTSecretManager.startThreads(); - } catch(IOException io) { - LOG.error("Error while starting the Secret Manager threads", io); - throw io; - } super.serviceStart(); } @Override protected void serviceStop() throws Exception { - if (jhsDTSecretManager != null) { - jhsDTSecretManager.stopThreads(); - } DefaultMetricsSystem.shutdown(); super.serviceStop(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerMemStateStoreService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerMemStateStoreService.java new file mode 100644 index 00000000000..3bc1715d708 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerMemStateStoreService.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2.hs; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.DelegationKey; + +/** + * A state store backed by memory for unit tests + */ +class HistoryServerMemStateStoreService + extends HistoryServerStateStoreService { + + HistoryServerState state; + + @Override + protected void initStorage(Configuration conf) throws IOException { + } + + @Override + protected void startStorage() throws IOException { + state = new HistoryServerState(); + } + + @Override + protected void closeStorage() throws IOException { + state = null; + } + + @Override + public HistoryServerState loadState() throws IOException { + HistoryServerState result = new HistoryServerState(); + result.tokenState.putAll(state.tokenState); + result.tokenMasterKeyState.addAll(state.tokenMasterKeyState); + return result; + } + + @Override + public void storeToken(MRDelegationTokenIdentifier tokenId, Long renewDate) + throws IOException { + if (state.tokenState.containsKey(tokenId)) { + throw new IOException("token " + tokenId + " was stored twice"); + } + state.tokenState.put(tokenId, renewDate); + } + + @Override + public void updateToken(MRDelegationTokenIdentifier tokenId, Long renewDate) + throws IOException { + if (!state.tokenState.containsKey(tokenId)) { + throw new IOException("token " + tokenId + " not in store"); + } + state.tokenState.put(tokenId, renewDate); + } + + @Override + public void removeToken(MRDelegationTokenIdentifier tokenId) + throws IOException { + state.tokenState.remove(tokenId); + } + + @Override + public void storeTokenMasterKey(DelegationKey key) throws IOException { + if (state.tokenMasterKeyState.contains(key)) { + throw new IOException("token master key " + key + " was stored twice"); + } + state.tokenMasterKeyState.add(key); + } + + @Override + public void removeTokenMasterKey(DelegationKey key) throws IOException { + state.tokenMasterKeyState.remove(key); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java new file mode 100644 index 00000000000..c9159963310 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2.hs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestHistoryServerFileSystemStateStoreService { + + private static final File testDir = new File( + System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + "TestHistoryServerFileSystemStateStoreService"); + + private Configuration conf; + + @Before + public void setup() { + FileUtil.fullyDelete(testDir); + testDir.mkdirs(); + conf = new Configuration(); + conf.setBoolean(JHAdminConfig.MR_HS_RECOVERY_ENABLE, true); + conf.setClass(JHAdminConfig.MR_HS_STATE_STORE, + HistoryServerFileSystemStateStoreService.class, + HistoryServerStateStoreService.class); + conf.set(JHAdminConfig.MR_HS_FS_STATE_STORE_URI, + testDir.getAbsoluteFile().toURI().toString()); + } + + @After + public void cleanup() { + FileUtil.fullyDelete(testDir); + } + + private HistoryServerStateStoreService createAndStartStore() + throws IOException { + HistoryServerStateStoreService store = + HistoryServerStateStoreServiceFactory.getStore(conf); + assertTrue("Factory did not create a filesystem store", + store instanceof HistoryServerFileSystemStateStoreService); + store.init(conf); + store.start(); + return store; + } + + @Test + public void testTokenStore() throws IOException { + HistoryServerStateStoreService store = createAndStartStore(); + + HistoryServerState state = store.loadState(); + assertTrue("token state not empty", state.tokenState.isEmpty()); + assertTrue("key state not empty", state.tokenMasterKeyState.isEmpty()); + + final DelegationKey key1 = new DelegationKey(1, 2, "keyData1".getBytes()); + final MRDelegationTokenIdentifier token1 = + new MRDelegationTokenIdentifier(new Text("tokenOwner1"), + new Text("tokenRenewer1"), new Text("tokenUser1")); + token1.setSequenceNumber(1); + final Long tokenDate1 = 1L; + final MRDelegationTokenIdentifier token2 = + new MRDelegationTokenIdentifier(new Text("tokenOwner2"), + new Text("tokenRenewer2"), new Text("tokenUser2")); + token2.setSequenceNumber(12345678); + final Long tokenDate2 = 87654321L; + + store.storeTokenMasterKey(key1); + try { + store.storeTokenMasterKey(key1); + fail("redundant store of key undetected"); + } catch (IOException e) { + // expected + } + store.storeToken(token1, tokenDate1); + store.storeToken(token2, tokenDate2); + try { + store.storeToken(token1, tokenDate1); + fail("redundant store of token undetected"); + } catch (IOException e) { + // expected + } + store.close(); + + store = createAndStartStore(); + state = store.loadState(); + assertEquals("incorrect loaded token count", 2, state.tokenState.size()); + assertTrue("missing token 1", state.tokenState.containsKey(token1)); + assertEquals("incorrect token 1 date", tokenDate1, + state.tokenState.get(token1)); + assertTrue("missing token 2", state.tokenState.containsKey(token2)); + assertEquals("incorrect token 2 date", tokenDate2, + state.tokenState.get(token2)); + assertEquals("incorrect master key count", 1, + state.tokenMasterKeyState.size()); + assertTrue("missing master key 1", + state.tokenMasterKeyState.contains(key1)); + + final DelegationKey key2 = new DelegationKey(3, 4, "keyData2".getBytes()); + final DelegationKey key3 = new DelegationKey(5, 6, "keyData3".getBytes()); + final MRDelegationTokenIdentifier token3 = + new MRDelegationTokenIdentifier(new Text("tokenOwner3"), + new Text("tokenRenewer3"), new Text("tokenUser3")); + token3.setSequenceNumber(12345679); + final Long tokenDate3 = 87654321L; + + store.removeToken(token1); + store.storeTokenMasterKey(key2); + final Long newTokenDate2 = 975318642L; + store.updateToken(token2, newTokenDate2); + store.removeTokenMasterKey(key1); + store.storeTokenMasterKey(key3); + store.storeToken(token3, tokenDate3); + store.close(); + + store = createAndStartStore(); + state = store.loadState(); + assertEquals("incorrect loaded token count", 2, state.tokenState.size()); + assertFalse("token 1 not removed", state.tokenState.containsKey(token1)); + assertTrue("missing token 2", state.tokenState.containsKey(token2)); + assertEquals("incorrect token 2 date", newTokenDate2, + state.tokenState.get(token2)); + assertTrue("missing token 3", state.tokenState.containsKey(token3)); + assertEquals("incorrect token 3 date", tokenDate3, + state.tokenState.get(token3)); + assertEquals("incorrect master key count", 2, + state.tokenMasterKeyState.size()); + assertFalse("master key 1 not removed", + state.tokenMasterKeyState.contains(key1)); + assertTrue("missing master key 2", + state.tokenMasterKeyState.contains(key2)); + assertTrue("missing master key 3", + state.tokenMasterKeyState.contains(key3)); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJHSDelegationTokenSecretManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJHSDelegationTokenSecretManager.java new file mode 100644 index 00000000000..3fa25b9ed69 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJHSDelegationTokenSecretManager.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2.hs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.junit.Test; + +public class TestJHSDelegationTokenSecretManager { + + @Test + public void testRecovery() throws IOException { + Configuration conf = new Configuration(); + HistoryServerStateStoreService store = + new HistoryServerMemStateStoreService(); + store.init(conf); + store.start(); + JHSDelegationTokenSecretManagerForTest mgr = + new JHSDelegationTokenSecretManagerForTest(store); + mgr.startThreads(); + + MRDelegationTokenIdentifier tokenId1 = new MRDelegationTokenIdentifier( + new Text("tokenOwner"), new Text("tokenRenewer"), + new Text("tokenUser")); + Token token1 = + new Token(tokenId1, mgr); + + MRDelegationTokenIdentifier tokenId2 = new MRDelegationTokenIdentifier( + new Text("tokenOwner"), new Text("tokenRenewer"), + new Text("tokenUser")); + Token token2 = + new Token(tokenId2, mgr); + DelegationKey[] keys = mgr.getAllKeys(); + long tokenRenewDate1 = mgr.getAllTokens().get(tokenId1).getRenewDate(); + long tokenRenewDate2 = mgr.getAllTokens().get(tokenId2).getRenewDate(); + mgr.stopThreads(); + + mgr = new JHSDelegationTokenSecretManagerForTest(store); + mgr.recover(store.loadState()); + List recoveredKeys = Arrays.asList(mgr.getAllKeys()); + for (DelegationKey key : keys) { + assertTrue("key missing after recovery", recoveredKeys.contains(key)); + } + assertTrue("token1 missing", mgr.getAllTokens().containsKey(tokenId1)); + assertEquals("token1 renew date", tokenRenewDate1, + mgr.getAllTokens().get(tokenId1).getRenewDate()); + assertTrue("token2 missing", mgr.getAllTokens().containsKey(tokenId2)); + assertEquals("token2 renew date", tokenRenewDate2, + mgr.getAllTokens().get(tokenId2).getRenewDate()); + + mgr.startThreads(); + mgr.verifyToken(tokenId1, token1.getPassword()); + mgr.verifyToken(tokenId2, token2.getPassword()); + MRDelegationTokenIdentifier tokenId3 = new MRDelegationTokenIdentifier( + new Text("tokenOwner"), new Text("tokenRenewer"), + new Text("tokenUser")); + Token token3 = + new Token(tokenId3, mgr); + assertEquals("sequence number restore", tokenId2.getSequenceNumber() + 1, + tokenId3.getSequenceNumber()); + mgr.cancelToken(token1, "tokenOwner"); + long tokenRenewDate3 = mgr.getAllTokens().get(tokenId3).getRenewDate(); + mgr.stopThreads(); + + mgr = new JHSDelegationTokenSecretManagerForTest(store); + mgr.recover(store.loadState()); + assertFalse("token1 should be missing", + mgr.getAllTokens().containsKey(tokenId1)); + assertTrue("token2 missing", mgr.getAllTokens().containsKey(tokenId2)); + assertEquals("token2 renew date", tokenRenewDate2, + mgr.getAllTokens().get(tokenId2).getRenewDate()); + assertTrue("token3 missing", mgr.getAllTokens().containsKey(tokenId3)); + assertEquals("token3 renew date", tokenRenewDate3, + mgr.getAllTokens().get(tokenId3).getRenewDate()); + + mgr.startThreads(); + mgr.verifyToken(tokenId2, token2.getPassword()); + mgr.verifyToken(tokenId3, token3.getPassword()); + mgr.stopThreads(); + } + + private static class JHSDelegationTokenSecretManagerForTest + extends JHSDelegationTokenSecretManager { + + public JHSDelegationTokenSecretManagerForTest( + HistoryServerStateStoreService store) { + super(10000, 10000, 10000, 10000, store); + } + + public Map getAllTokens() { + return new HashMap(currentTokens); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java index 109601e41b5..010e4b6c95a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java @@ -72,7 +72,7 @@ public void testStartStopServer() throws Exception { Configuration config = new Configuration(); historyServer.init(config); assertEquals(STATE.INITED, historyServer.getServiceState()); - assertEquals(4, historyServer.getServices().size()); + assertEquals(6, historyServer.getServices().size()); HistoryClientService historyService = historyServer.getClientService(); assertNotNull(historyServer.getClientService()); assertEquals(STATE.INITED, historyService.getServiceState()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java index d7936674af9..f3b9821f85d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java @@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService; import org.apache.hadoop.mapreduce.v2.hs.JHSDelegationTokenSecretManager; import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; @@ -87,10 +88,11 @@ protected void doSecureLogin(Configuration conf) throws IOException { // no keytab based login }; + @Override protected JHSDelegationTokenSecretManager createJHSSecretManager( - Configuration conf) { + Configuration conf, HistoryServerStateStoreService store) { return new JHSDelegationTokenSecretManager(initialInterval, - maxLifetime, renewInterval, 3600000); + maxLifetime, renewInterval, 3600000, store); } }; // final JobHistoryServer jobHistoryServer = jhServer;