diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 7bdfd17b5dd..1f01fd8c349 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -186,6 +186,9 @@ Trunk (Unreleased) HADOOP-10750. KMSKeyProviderCache should be in hadoop-common. (asuresh via tucu) + HADOOP-10720. KMS: Implement generateEncryptedKey and decryptEncryptedKey + in the REST API. (asuresh via tucu) + BUG FIXES HADOOP-9451. Fault single-layer config if node group topology is enabled. @@ -438,6 +441,9 @@ Release 2.6.0 - UNRELEASED HADOOP-10817. ProxyUsers configuration should support configurable prefixes. (tucu) + HADOOP-10755. Support negative caching of user-group mapping. + (Lei Xu via wang) + OPTIMIZATIONS BUG FIXES @@ -459,6 +465,13 @@ Release 2.6.0 - UNRELEASED HADOOP-10591. Compression codecs must used pooled direct buffers or deallocate direct buffers when stream is closed (cmccabe) + HADOOP-10857. Native Libraries Guide doen't mention a dependency on + openssl-development package (ozawa via cmccabe) + + HADOOP-10866. RawLocalFileSystem fails to read symlink targets via the stat + command when the format of the stat command uses non-curly quotes (yzhang + via cmccabe) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java index 204af819a4c..e4b822d2c6c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java @@ -27,17 +27,19 @@ import javax.crypto.spec.IvParameterSpec; import javax.crypto.spec.SecretKeySpec; import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; /** * A KeyProvider with Cytographic Extensions specifically for generating * Encrypted Keys as well as decrypting them * */ +@InterfaceAudience.Private public class KeyProviderCryptoExtension extends KeyProviderExtension { - protected static final String EEK = "EEK"; - protected static final String EK = "EK"; + public static final String EEK = "EEK"; + public static final String EK = "EK"; /** * This is a holder class whose instance contains the keyVersionName, iv @@ -81,6 +83,14 @@ public class KeyProviderCryptoExtension extends */ public interface CryptoExtension extends KeyProviderExtension.Extension { + /** + * Calls to this method allows the underlying KeyProvider to warm-up any + * implementation specific caches used to store the Encrypted Keys. + * @param keyNames Array of Key Names + */ + public void warmUpEncryptedKeys(String... keyNames) + throws IOException; + /** * Generates a key material and encrypts it using the given key version name * and initialization vector. The generated key material is of the same @@ -180,13 +190,35 @@ public class KeyProviderCryptoExtension extends return new KeyVersion(keyVer.getName(), EK, ek); } + @Override + public void warmUpEncryptedKeys(String... keyNames) + throws IOException { + // NO-OP since the default version does not cache any keys + } + } - private KeyProviderCryptoExtension(KeyProvider keyProvider, + /** + * This constructor is to be used by sub classes that provide + * delegating/proxying functionality to the {@link KeyProviderCryptoExtension} + * @param keyProvider + * @param extension + */ + protected KeyProviderCryptoExtension(KeyProvider keyProvider, CryptoExtension extension) { super(keyProvider, extension); } + /** + * Notifies the Underlying CryptoExtension implementation to warm up any + * implementation specific caches for the specified KeyVersions + * @param keyNames Arrays of key Names + */ + public void warmUpEncryptedKeys(String... keyNames) + throws IOException { + getExtension().warmUpEncryptedKeys(keyNames); + } + /** * Generates a key material and encrypts it using the given key version name * and initialization vector. The generated key material is of the same diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java index 7d52854845a..808b1bb102d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java @@ -21,7 +21,9 @@ import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.crypto.key.KeyProviderFactory; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.security.authentication.client.AuthenticatedURL; @@ -33,6 +35,7 @@ import org.apache.http.client.utils.URIBuilder; import org.codehaus.jackson.map.ObjectMapper; import javax.net.ssl.HttpsURLConnection; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -40,6 +43,7 @@ import java.io.OutputStreamWriter; import java.io.Writer; import java.lang.reflect.Constructor; import java.net.HttpURLConnection; +import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; @@ -50,14 +54,22 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension; + +import com.google.common.base.Preconditions; /** * KMS client KeyProvider implementation. */ @InterfaceAudience.Private -public class KMSClientProvider extends KeyProvider { +public class KMSClientProvider extends KeyProvider implements CryptoExtension { public static final String SCHEME_NAME = "kms"; @@ -78,6 +90,73 @@ public class KMSClientProvider extends KeyProvider { public static final String TIMEOUT_ATTR = CONFIG_PREFIX + "timeout"; public static final int DEFAULT_TIMEOUT = 60; + private final ValueQueue encKeyVersionQueue; + + private class EncryptedQueueRefiller implements + ValueQueue.QueueRefiller { + + @Override + public void fillQueueForKey(String keyName, + Queue keyQueue, int numEKVs) throws IOException { + checkNotNull(keyName, "keyName"); + Map params = new HashMap(); + params.put(KMSRESTConstants.EEK_OP, KMSRESTConstants.EEK_GENERATE); + params.put(KMSRESTConstants.EEK_NUM_KEYS, "" + numEKVs); + URL url = createURL(KMSRESTConstants.KEY_RESOURCE, keyName, + KMSRESTConstants.EEK_SUB_RESOURCE, params); + HttpURLConnection conn = createConnection(url, HTTP_GET); + conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON_MIME); + List response = call(conn, null, + HttpURLConnection.HTTP_OK, List.class); + List ekvs = + parseJSONEncKeyVersion(keyName, response); + keyQueue.addAll(ekvs); + } + } + + public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion { + public KMSEncryptedKeyVersion(String keyName, String keyVersionName, + byte[] iv, String encryptedVersionName, byte[] keyMaterial) { + super(keyName, keyVersionName, iv, new KMSKeyVersion(null, + encryptedVersionName, keyMaterial)); + } + } + + @SuppressWarnings("rawtypes") + private static List + parseJSONEncKeyVersion(String keyName, List valueList) { + List ekvs = new LinkedList(); + if (!valueList.isEmpty()) { + for (Object values : valueList) { + Map valueMap = (Map) values; + + String versionName = checkNotNull( + (String) valueMap.get(KMSRESTConstants.VERSION_NAME_FIELD), + KMSRESTConstants.VERSION_NAME_FIELD); + + byte[] iv = Base64.decodeBase64(checkNotNull( + (String) valueMap.get(KMSRESTConstants.IV_FIELD), + KMSRESTConstants.IV_FIELD)); + + Map encValueMap = checkNotNull((Map) + valueMap.get(KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD), + KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD); + + String encVersionName = checkNotNull((String) + encValueMap.get(KMSRESTConstants.VERSION_NAME_FIELD), + KMSRESTConstants.VERSION_NAME_FIELD); + + byte[] encKeyMaterial = Base64.decodeBase64(checkNotNull((String) + encValueMap.get(KMSRESTConstants.MATERIAL_FIELD), + KMSRESTConstants.MATERIAL_FIELD)); + + ekvs.add(new KMSEncryptedKeyVersion(keyName, versionName, iv, + encVersionName, encKeyMaterial)); + } + } + return ekvs; + } + private static KeyVersion parseJSONKeyVersion(Map valueMap) { KeyVersion keyVersion = null; if (!valueMap.isEmpty()) { @@ -208,6 +287,28 @@ public class KMSClientProvider extends KeyProvider { } int timeout = conf.getInt(TIMEOUT_ATTR, DEFAULT_TIMEOUT); configurator = new TimeoutConnConfigurator(timeout, sslFactory); + encKeyVersionQueue = + new ValueQueue( + conf.getInt( + CommonConfigurationKeysPublic.KMS_CLIENT_ENC_KEY_CACHE_SIZE, + CommonConfigurationKeysPublic. + KMS_CLIENT_ENC_KEY_CACHE_SIZE_DEFAULT), + conf.getFloat( + CommonConfigurationKeysPublic. + KMS_CLIENT_ENC_KEY_CACHE_LOW_WATERMARK, + CommonConfigurationKeysPublic. + KMS_CLIENT_ENC_KEY_CACHE_LOW_WATERMARK_DEFAULT), + conf.getInt( + CommonConfigurationKeysPublic. + KMS_CLIENT_ENC_KEY_CACHE_EXPIRY_MS, + CommonConfigurationKeysPublic. + KMS_CLIENT_ENC_KEY_CACHE_EXPIRY_DEFAULT), + conf.getInt( + CommonConfigurationKeysPublic. + KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS, + CommonConfigurationKeysPublic. + KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT), + new EncryptedQueueRefiller()); } private String createServiceURL(URL url) throws IOException { @@ -527,6 +628,51 @@ public class KMSClientProvider extends KeyProvider { } } + @Override + public EncryptedKeyVersion generateEncryptedKey( + String encryptionKeyName) throws IOException, GeneralSecurityException { + try { + return encKeyVersionQueue.getNext(encryptionKeyName); + } catch (ExecutionException e) { + if (e.getCause() instanceof SocketTimeoutException) { + throw (SocketTimeoutException)e.getCause(); + } + throw new IOException(e); + } + } + + @SuppressWarnings("rawtypes") + @Override + public KeyVersion decryptEncryptedKey( + EncryptedKeyVersion encryptedKeyVersion) throws IOException, + GeneralSecurityException { + checkNotNull(encryptedKeyVersion.getKeyVersionName(), "versionName"); + checkNotNull(encryptedKeyVersion.getIv(), "iv"); + Preconditions.checkArgument(encryptedKeyVersion.getEncryptedKey() + .getVersionName().equals(KeyProviderCryptoExtension.EEK), + "encryptedKey version name must be '%s', is '%s'", + KeyProviderCryptoExtension.EK, encryptedKeyVersion.getEncryptedKey() + .getVersionName()); + checkNotNull(encryptedKeyVersion.getEncryptedKey(), "encryptedKey"); + Map params = new HashMap(); + params.put(KMSRESTConstants.EEK_OP, KMSRESTConstants.EEK_DECRYPT); + Map jsonPayload = new HashMap(); + jsonPayload.put(KMSRESTConstants.NAME_FIELD, + encryptedKeyVersion.getKeyName()); + jsonPayload.put(KMSRESTConstants.IV_FIELD, Base64.encodeBase64String( + encryptedKeyVersion.getIv())); + jsonPayload.put(KMSRESTConstants.MATERIAL_FIELD, Base64.encodeBase64String( + encryptedKeyVersion.getEncryptedKey().getMaterial())); + URL url = createURL(KMSRESTConstants.KEY_VERSION_RESOURCE, + encryptedKeyVersion.getKeyVersionName(), + KMSRESTConstants.EEK_SUB_RESOURCE, params); + HttpURLConnection conn = createConnection(url, HTTP_POST); + conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON_MIME); + Map response = + call(conn, jsonPayload, HttpURLConnection.HTTP_OK, Map.class); + return parseJSONKeyVersion(response); + } + @Override public List getKeyVersions(String name) throws IOException { checkNotEmpty(name, "name"); @@ -570,4 +716,14 @@ public class KMSClientProvider extends KeyProvider { // the server should not keep in memory state on behalf of clients either. } + @Override + public void warmUpEncryptedKeys(String... keyNames) + throws IOException { + try { + encKeyVersionQueue.initializeQueuesForKeys(keyNames); + } catch (ExecutionException e) { + throw new IOException(e); + } + } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSRESTConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSRESTConstants.java index 807cba7fbba..b949ab91b52 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSRESTConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSRESTConstants.java @@ -34,10 +34,16 @@ public class KMSRESTConstants { public static final String KEY_VERSION_RESOURCE = "keyversion"; public static final String METADATA_SUB_RESOURCE = "_metadata"; public static final String VERSIONS_SUB_RESOURCE = "_versions"; + public static final String EEK_SUB_RESOURCE = "_eek"; public static final String CURRENT_VERSION_SUB_RESOURCE = "_currentversion"; public static final String KEY_OP = "key"; + public static final String EEK_OP = "eek_op"; + public static final String EEK_GENERATE = "generate"; + public static final String EEK_DECRYPT = "decrypt"; + public static final String EEK_NUM_KEYS = "num_keys"; + public static final String IV_FIELD = "iv"; public static final String NAME_FIELD = "name"; public static final String CIPHER_FIELD = "cipher"; public static final String LENGTH_FIELD = "length"; @@ -47,6 +53,8 @@ public class KMSRESTConstants { public static final String VERSIONS_FIELD = "versions"; public static final String MATERIAL_FIELD = "material"; public static final String VERSION_NAME_FIELD = "versionName"; + public static final String ENCRYPTED_KEY_VERSION_FIELD = + "encryptedKeyVersion"; public static final String ERROR_EXCEPTION_JSON = "exception"; public static final String ERROR_MESSAGE_JSON = "message"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java new file mode 100644 index 00000000000..a415e2ea93e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto.key.kms; + +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A Utility class that maintains a Queue of entries for a given key. It tries + * to ensure that there is are always at-least numValues entries + * available for the client to consume for a particular key. + * It also uses an underlying Cache to evict queues for keys that have not been + * accessed for a configurable period of time. + * Implementing classes are required to implement the + * QueueRefiller interface that exposes a method to refill the + * queue, when empty + */ +@InterfaceAudience.Private +public class ValueQueue { + + /** + * QueueRefiller interface a client must implement to use this class + */ + public interface QueueRefiller { + /** + * Method that has to be implemented by implementing classes to fill the + * Queue. + * @param keyName Key name + * @param keyQueue Queue that needs to be filled + * @param numValues number of Values to be added to the queue. + * @throws IOException + */ + public void fillQueueForKey(String keyName, + Queue keyQueue, int numValues) throws IOException; + } + + private static final String REFILL_THREAD = + ValueQueue.class.getName() + "_thread"; + + private final LoadingCache> keyQueues; + private final ThreadPoolExecutor executor; + private final UniqueKeyBlockingQueue queue = new UniqueKeyBlockingQueue(); + private final QueueRefiller refiller; + private final SyncGenerationPolicy policy; + + private final int numValues; + private final float lowWatermark; + + /** + * A Runnable which takes a string name. + */ + private abstract static class NamedRunnable implements Runnable { + final String name; + private NamedRunnable(String keyName) { + this.name = keyName; + } + } + + /** + * This backing blocking queue used in conjunction with the + * ThreadPoolExecutor used by the ValueQueue. This + * Queue accepts a task only if the task is not currently in the process + * of being run by a thread which is implied by the presence of the key + * in the keysInProgress set. + * + * NOTE: Only methods that ware explicitly called by the + * ThreadPoolExecutor need to be over-ridden. + */ + private static class UniqueKeyBlockingQueue extends + LinkedBlockingQueue { + + private static final long serialVersionUID = -2152747693695890371L; + private HashSet keysInProgress = new HashSet(); + + @Override + public synchronized void put(Runnable e) throws InterruptedException { + if (keysInProgress.add(((NamedRunnable)e).name)) { + super.put(e); + } + } + + @Override + public Runnable take() throws InterruptedException { + Runnable k = super.take(); + if (k != null) { + keysInProgress.remove(((NamedRunnable)k).name); + } + return k; + } + + @Override + public Runnable poll(long timeout, TimeUnit unit) + throws InterruptedException { + Runnable k = super.poll(timeout, unit); + if (k != null) { + keysInProgress.remove(((NamedRunnable)k).name); + } + return k; + } + + } + + /** + * Policy to decide how many values to return to client when client asks for + * "n" values and Queue is empty. + * This decides how many values to return when client calls "getAtMost" + */ + public static enum SyncGenerationPolicy { + ATLEAST_ONE, // Return atleast 1 value + LOW_WATERMARK, // Return min(n, lowWatermark * numValues) values + ALL // Return n values + } + + /** + * Constructor takes the following tunable configuration parameters + * @param numValues The number of values cached in the Queue for a + * particular key. + * @param lowWatermark The ratio of (number of current entries/numValues) + * below which the fillQueueForKey() funciton will be + * invoked to fill the Queue. + * @param expiry Expiry time after which the Key and associated Queue are + * evicted from the cache. + * @param numFillerThreads Number of threads to use for the filler thread + * @param policy The SyncGenerationPolicy to use when client + * calls "getAtMost" + * @param refiller implementation of the QueueRefiller + */ + public ValueQueue(final int numValues, final float lowWatermark, + long expiry, int numFillerThreads, SyncGenerationPolicy policy, + final QueueRefiller refiller) { + Preconditions.checkArgument(numValues > 0, "\"numValues\" must be > 0"); + Preconditions.checkArgument(((lowWatermark > 0)&&(lowWatermark <= 1)), + "\"lowWatermark\" must be > 0 and <= 1"); + Preconditions.checkArgument(expiry > 0, "\"expiry\" must be > 0"); + Preconditions.checkArgument(numFillerThreads > 0, + "\"numFillerThreads\" must be > 0"); + Preconditions.checkNotNull(policy, "\"policy\" must not be null"); + this.refiller = refiller; + this.policy = policy; + this.numValues = numValues; + this.lowWatermark = lowWatermark; + keyQueues = CacheBuilder.newBuilder() + .expireAfterAccess(expiry, TimeUnit.MILLISECONDS) + .build(new CacheLoader>() { + @Override + public LinkedBlockingQueue load(String keyName) + throws Exception { + LinkedBlockingQueue keyQueue = + new LinkedBlockingQueue(); + refiller.fillQueueForKey(keyName, keyQueue, + (int)(lowWatermark * numValues)); + return keyQueue; + } + }); + + executor = + new ThreadPoolExecutor(numFillerThreads, numFillerThreads, 0L, + TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(REFILL_THREAD).build()); + // To ensure all requests are first queued, make coreThreads = maxThreads + // and pre-start all the Core Threads. + executor.prestartAllCoreThreads(); + } + + public ValueQueue(final int numValues, final float lowWaterMark, long expiry, + int numFillerThreads, QueueRefiller fetcher) { + this(numValues, lowWaterMark, expiry, numFillerThreads, + SyncGenerationPolicy.ALL, fetcher); + } + + /** + * Initializes the Value Queues for the provided keys by calling the + * fill Method with "numInitValues" values + * @param keyNames Array of key Names + * @throws ExecutionException + */ + public void initializeQueuesForKeys(String... keyNames) + throws ExecutionException { + for (String keyName : keyNames) { + keyQueues.get(keyName); + } + } + + /** + * This removes the value currently at the head of the Queue for the + * provided key. Will immediately fire the Queue filler function if key + * does not exist. + * If Queue exists but all values are drained, It will ask the generator + * function to add 1 value to Queue and then drain it. + * @param keyName String key name + * @return E the next value in the Queue + * @throws IOException + * @throws ExecutionException + */ + public E getNext(String keyName) + throws IOException, ExecutionException { + return getAtMost(keyName, 1).get(0); + } + + /** + * This removes the "num" values currently at the head of the Queue for the + * provided key. Will immediately fire the Queue filler function if key + * does not exist + * How many values are actually returned is governed by the + * SyncGenerationPolicy specified by the user. + * @param keyName String key name + * @param num Minimum number of values to return. + * @return List values returned + * @throws IOException + * @throws ExecutionException + */ + public List getAtMost(String keyName, int num) throws IOException, + ExecutionException { + LinkedBlockingQueue keyQueue = keyQueues.get(keyName); + // Using poll to avoid race condition.. + LinkedList ekvs = new LinkedList(); + try { + for (int i = 0; i < num; i++) { + E val = keyQueue.poll(); + // If queue is empty now, Based on the provided SyncGenerationPolicy, + // figure out how many new values need to be generated synchronously + if (val == null) { + // Synchronous call to get remaining values + int numToFill = 0; + switch (policy) { + case ATLEAST_ONE: + numToFill = (ekvs.size() < 1) ? 1 : 0; + break; + case LOW_WATERMARK: + numToFill = + Math.min(num, (int) (lowWatermark * numValues)) - ekvs.size(); + break; + case ALL: + numToFill = num - ekvs.size(); + break; + } + // Synchronous fill if not enough values found + if (numToFill > 0) { + refiller.fillQueueForKey(keyName, ekvs, numToFill); + } + // Asynch task to fill > lowWatermark + if (i <= (int) (lowWatermark * numValues)) { + submitRefillTask(keyName, keyQueue); + } + return ekvs; + } + ekvs.add(val); + } + } catch (Exception e) { + throw new IOException("Exeption while contacting value generator ", e); + } + return ekvs; + } + + private void submitRefillTask(final String keyName, + final Queue keyQueue) throws InterruptedException { + // The submit/execute method of the ThreadPoolExecutor is bypassed and + // the Runnable is directly put in the backing BlockingQueue so that we + // can control exactly how the runnable is inserted into the queue. + queue.put( + new NamedRunnable(keyName) { + @Override + public void run() { + int cacheSize = numValues; + int threshold = (int) (lowWatermark * (float) cacheSize); + // Need to ensure that only one refill task per key is executed + try { + if (keyQueue.size() < threshold) { + refiller.fillQueueForKey(name, keyQueue, + cacheSize - keyQueue.size()); + } + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + } + ); + } + + /** + * Cleanly shutdown + */ + public void shutdown() { + executor.shutdownNow(); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index a2d7d96ceb7..57d4eec23be 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -250,6 +250,12 @@ public class CommonConfigurationKeysPublic { public static final long HADOOP_SECURITY_GROUPS_CACHE_SECS_DEFAULT = 300; /** See core-default.xml */ + public static final String HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS = + "hadoop.security.groups.negative-cache.secs"; + /** See core-default.xml */ + public static final long HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS_DEFAULT = + 30; + /** See core-default.xml */ public static final String HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS = "hadoop.security.groups.cache.warn.after.ms"; public static final long HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT = @@ -285,5 +291,32 @@ public class CommonConfigurationKeysPublic { /** Class to override Impersonation provider */ public static final String HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS = "hadoop.security.impersonation.provider.class"; + + // + + hadoop.security.kms.client.encrypted.key.cache.size + 500 + + Size of the EncryptedKeyVersion cache Queue for each key + + + + hadoop.security.kms.client.encrypted.key.cache.low-watermark + 0.3f + + If size of the EncryptedKeyVersion cache Queue falls below the + low watermark, this cache queue will be scheduled for a refill + + + + hadoop.security.kms.client.encrypted.key.cache.num.refill.threads + 2 + + Number of threads to use for refilling depleted EncryptedKeyVersion + cache Queues + + + + "hadoop.security.kms.client.encrypted.key.cache.expiry + 43200000 + + Cache expiry time for a Key, after which the cache Queue for this + key will be dropped. Default = 12hrs + + diff --git a/hadoop-common-project/hadoop-common/src/site/apt/NativeLibraries.apt.vm b/hadoop-common-project/hadoop-common/src/site/apt/NativeLibraries.apt.vm index 5b315ee3580..27325194100 100644 --- a/hadoop-common-project/hadoop-common/src/site/apt/NativeLibraries.apt.vm +++ b/hadoop-common-project/hadoop-common/src/site/apt/NativeLibraries.apt.vm @@ -116,6 +116,8 @@ Native Libraries Guide * zlib-development package (stable version >= 1.2.0) + * openssl-development package(e.g. libssl-dev) + Once you installed the prerequisite packages use the standard hadoop pom.xml file and pass along the native flag to build the native hadoop library: diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java new file mode 100644 index 00000000000..7946588a309 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto.key; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.crypto.key.kms.ValueQueue; +import org.apache.hadoop.crypto.key.kms.ValueQueue.QueueRefiller; +import org.apache.hadoop.crypto.key.kms.ValueQueue.SyncGenerationPolicy; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Sets; + +public class TestValueQueue { + + private static class FillInfo { + final int num; + final String key; + FillInfo(int num, String key) { + this.num = num; + this.key = key; + } + } + + private static class MockFiller implements QueueRefiller { + final LinkedBlockingQueue fillCalls = + new LinkedBlockingQueue(); + @Override + public void fillQueueForKey(String keyName, Queue keyQueue, + int numValues) throws IOException { + fillCalls.add(new FillInfo(numValues, keyName)); + for(int i = 0; i < numValues; i++) { + keyQueue.add("test"); + } + } + public FillInfo getTop() throws InterruptedException { + return fillCalls.poll(500, TimeUnit.MILLISECONDS); + } + } + + /** + * Verifies that Queue is initially filled to "numInitValues" + */ + @Test + public void testInitFill() throws Exception { + MockFiller filler = new MockFiller(); + ValueQueue vq = + new ValueQueue(10, 0.1f, 300, 1, + SyncGenerationPolicy.ALL, filler); + Assert.assertEquals("test", vq.getNext("k1")); + Assert.assertEquals(1, filler.getTop().num); + vq.shutdown(); + } + + /** + * Verifies that Queue is initialized (Warmed-up) for provided keys + */ + @Test + public void testWarmUp() throws Exception { + MockFiller filler = new MockFiller(); + ValueQueue vq = + new ValueQueue(10, 0.5f, 300, 1, + SyncGenerationPolicy.ALL, filler); + vq.initializeQueuesForKeys("k1", "k2", "k3"); + FillInfo[] fillInfos = + {filler.getTop(), filler.getTop(), filler.getTop()}; + Assert.assertEquals(5, fillInfos[0].num); + Assert.assertEquals(5, fillInfos[1].num); + Assert.assertEquals(5, fillInfos[2].num); + Assert.assertEquals(Sets.newHashSet("k1", "k2", "k3"), + Sets.newHashSet(fillInfos[0].key, + fillInfos[1].key, + fillInfos[2].key)); + vq.shutdown(); + } + + /** + * Verifies that the refill task is executed after "checkInterval" if + * num values below "lowWatermark" + */ + @Test + public void testRefill() throws Exception { + MockFiller filler = new MockFiller(); + ValueQueue vq = + new ValueQueue(10, 0.1f, 300, 1, + SyncGenerationPolicy.ALL, filler); + Assert.assertEquals("test", vq.getNext("k1")); + Assert.assertEquals(1, filler.getTop().num); + // Trigger refill + vq.getNext("k1"); + Assert.assertEquals(1, filler.getTop().num); + Assert.assertEquals(10, filler.getTop().num); + vq.shutdown(); + } + + /** + * Verifies that the No refill Happens after "checkInterval" if + * num values above "lowWatermark" + */ + @Test + public void testNoRefill() throws Exception { + MockFiller filler = new MockFiller(); + ValueQueue vq = + new ValueQueue(10, 0.5f, 300, 1, + SyncGenerationPolicy.ALL, filler); + Assert.assertEquals("test", vq.getNext("k1")); + Assert.assertEquals(5, filler.getTop().num); + Assert.assertEquals(null, filler.getTop()); + vq.shutdown(); + } + + /** + * Verify getAtMost when SyncGeneration Policy = ALL + */ + @Test + public void testgetAtMostPolicyALL() throws Exception { + MockFiller filler = new MockFiller(); + ValueQueue vq = + new ValueQueue(10, 0.1f, 300, 1, + SyncGenerationPolicy.ALL, filler); + Assert.assertEquals("test", vq.getNext("k1")); + Assert.assertEquals(1, filler.getTop().num); + // Drain completely + Assert.assertEquals(10, vq.getAtMost("k1", 10).size()); + // Synchronous call + Assert.assertEquals(10, filler.getTop().num); + // Ask for more... return all + Assert.assertEquals(19, vq.getAtMost("k1", 19).size()); + // Synchronous call (No Async call since num > lowWatermark) + Assert.assertEquals(19, filler.getTop().num); + vq.shutdown(); + } + + /** + * Verify getAtMost when SyncGeneration Policy = ALL + */ + @Test + public void testgetAtMostPolicyATLEAST_ONE() throws Exception { + MockFiller filler = new MockFiller(); + ValueQueue vq = + new ValueQueue(10, 0.3f, 300, 1, + SyncGenerationPolicy.ATLEAST_ONE, filler); + Assert.assertEquals("test", vq.getNext("k1")); + Assert.assertEquals(3, filler.getTop().num); + // Drain completely + Assert.assertEquals(2, vq.getAtMost("k1", 10).size()); + // Asynch Refill call + Assert.assertEquals(10, filler.getTop().num); + vq.shutdown(); + } + + /** + * Verify getAtMost when SyncGeneration Policy = LOW_WATERMARK + */ + @Test + public void testgetAtMostPolicyLOW_WATERMARK() throws Exception { + MockFiller filler = new MockFiller(); + ValueQueue vq = + new ValueQueue(10, 0.3f, 300, 1, + SyncGenerationPolicy.LOW_WATERMARK, filler); + Assert.assertEquals("test", vq.getNext("k1")); + Assert.assertEquals(3, filler.getTop().num); + // Drain completely + Assert.assertEquals(3, vq.getAtMost("k1", 10).size()); + // Synchronous call + Assert.assertEquals(1, filler.getTop().num); + // Asynch Refill call + Assert.assertEquals(10, filler.getTop().num); + vq.shutdown(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestStat.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestStat.java index 6dadb01e1f5..62650122f93 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestStat.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestStat.java @@ -45,15 +45,15 @@ public class TestStat extends FileSystemTestHelper { final String doesNotExist; final String directory; final String file; - final String symlink; + final String[] symlinks; final String stickydir; StatOutput(String doesNotExist, String directory, String file, - String symlink, String stickydir) { + String[] symlinks, String stickydir) { this.doesNotExist = doesNotExist; this.directory = directory; this.file = file; - this.symlink = symlink; + this.symlinks = symlinks; this.stickydir = stickydir; } @@ -78,10 +78,12 @@ public class TestStat extends FileSystemTestHelper { status = stat.getFileStatusForTesting(); assertTrue(status.isFile()); - br = new BufferedReader(new StringReader(symlink)); - stat.parseExecResult(br); - status = stat.getFileStatusForTesting(); - assertTrue(status.isSymlink()); + for (String symlink : symlinks) { + br = new BufferedReader(new StringReader(symlink)); + stat.parseExecResult(br); + status = stat.getFileStatusForTesting(); + assertTrue(status.isSymlink()); + } br = new BufferedReader(new StringReader(stickydir)); stat.parseExecResult(br); @@ -93,22 +95,30 @@ public class TestStat extends FileSystemTestHelper { @Test(timeout=10000) public void testStatLinux() throws Exception { + String[] symlinks = new String[] { + "6,symbolic link,1373584236,1373584236,777,andrew,andrew,`link' -> `target'", + "6,symbolic link,1373584236,1373584236,777,andrew,andrew,'link' -> 'target'" + }; StatOutput linux = new StatOutput( "stat: cannot stat `watermelon': No such file or directory", "4096,directory,1373584236,1373586485,755,andrew,root,`.'", "0,regular empty file,1373584228,1373584228,644,andrew,andrew,`target'", - "6,symbolic link,1373584236,1373584236,777,andrew,andrew,`link' -> `target'", + symlinks, "4096,directory,1374622334,1375124212,1755,andrew,andrew,`stickydir'"); linux.test(); } @Test(timeout=10000) public void testStatFreeBSD() throws Exception { + String[] symlinks = new String[] { + "6,Symbolic Link,1373508941,1373508941,120755,awang,awang,`link' -> `target'" + }; + StatOutput freebsd = new StatOutput( "stat: symtest/link: stat: No such file or directory", "512,Directory,1373583695,1373583669,40755,awang,awang,`link' -> `'", "0,Regular File,1373508937,1373508937,100644,awang,awang,`link' -> `'", - "6,Symbolic Link,1373508941,1373508941,120755,awang,awang,`link' -> `target'", + symlinks, "512,Directory,1375139537,1375139537,41755,awang,awang,`link' -> `'"); freebsd.test(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java index 44134cc755d..a814b0d9e68 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java @@ -26,8 +26,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.FakeTimer; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; @@ -94,6 +97,9 @@ public class TestGroupsCaching { @Test public void testGroupsCaching() throws Exception { + // Disable negative cache. + conf.setLong( + CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 0); Groups groups = new Groups(conf); groups.cacheGroupsAdd(Arrays.asList(myGroups)); groups.refresh(); @@ -163,4 +169,54 @@ public class TestGroupsCaching { FakeunPrivilegedGroupMapping.invoked); } + + @Test + public void testNegativeGroupCaching() throws Exception { + final String user = "negcache"; + final String failMessage = "Did not throw IOException: "; + conf.setLong( + CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS, 2); + FakeTimer timer = new FakeTimer(); + Groups groups = new Groups(conf, timer); + groups.cacheGroupsAdd(Arrays.asList(myGroups)); + groups.refresh(); + FakeGroupMapping.addToBlackList(user); + + // In the first attempt, the user will be put in the negative cache. + try { + groups.getGroups(user); + fail(failMessage + "Failed to obtain groups from FakeGroupMapping."); + } catch (IOException e) { + // Expects to raise exception for the first time. But the user will be + // put into the negative cache + GenericTestUtils.assertExceptionContains("No groups found for user", e); + } + + // The second time, the user is in the negative cache. + try { + groups.getGroups(user); + fail(failMessage + "The user is in the negative cache."); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("No groups found for user", e); + } + + // Brings back the backend user-group mapping service. + FakeGroupMapping.clearBlackList(); + + // It should still get groups from the negative cache. + try { + groups.getGroups(user); + fail(failMessage + "The user is still in the negative cache, even " + + "FakeGroupMapping has resumed."); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("No groups found for user", e); + } + + // Let the elements in the negative cache expire. + timer.advance(4 * 1000); + + // The groups for the user is expired in the negative cache, a new copy of + // groups for the user is fetched. + assertEquals(Arrays.asList(myGroups), groups.getGroups(user)); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/FakeTimer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/FakeTimer.java new file mode 100644 index 00000000000..66386fd2b2c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/FakeTimer.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * FakeTimer can be used for test purposes to control the return values + * from {{@link Timer}}. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class FakeTimer extends Timer { + private long nowMillis; + + /** Constructs a FakeTimer with a non-zero value */ + public FakeTimer() { + nowMillis = 1000; // Initialize with a non-trivial value. + } + + @Override + public long now() { + return nowMillis; + } + + @Override + public long monotonicNow() { + return nowMillis; + } + + /** Increases the time by milliseconds */ + public void advance(long advMillis) { + nowMillis += advMillis; + } +} diff --git a/hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml b/hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml index ab6c993e72c..cdff629128f 100644 --- a/hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml +++ b/hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml @@ -79,4 +79,19 @@ + + hadoop.kms.acl.GENERATE_EEK + * + + ACL for generateEncryptedKey CryptoExtension operations + + + + + hadoop.kms.acl.DECRYPT_EEK + * + + ACL for decrypt EncryptedKey CryptoExtension operations + + diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java new file mode 100644 index 00000000000..a952cfeb9b9 --- /dev/null +++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.crypto.key.kms.server; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.crypto.key.kms.ValueQueue; +import org.apache.hadoop.crypto.key.kms.ValueQueue.SyncGenerationPolicy; + +/** + * A {@link KeyProviderCryptoExtension} that pre-generates and caches encrypted + * keys. + */ +@InterfaceAudience.Private +public class EagerKeyGeneratorKeyProviderCryptoExtension + extends KeyProviderCryptoExtension { + + private static final String KEY_CACHE_PREFIX = + "hadoop.security.kms.encrypted.key.cache."; + + public static final String KMS_KEY_CACHE_SIZE = + KEY_CACHE_PREFIX + "size"; + public static final int KMS_KEY_CACHE_SIZE_DEFAULT = 100; + + public static final String KMS_KEY_CACHE_LOW_WATERMARK = + KEY_CACHE_PREFIX + "low.watermark"; + public static final float KMS_KEY_CACHE_LOW_WATERMARK_DEFAULT = 0.30f; + + public static final String KMS_KEY_CACHE_EXPIRY_MS = + KEY_CACHE_PREFIX + "expiry"; + public static final int KMS_KEY_CACHE_EXPIRY_DEFAULT = 43200000; + + public static final String KMS_KEY_CACHE_NUM_REFILL_THREADS = + KEY_CACHE_PREFIX + "num.fill.threads"; + public static final int KMS_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT = 2; + + + private static class CryptoExtension + implements KeyProviderCryptoExtension.CryptoExtension { + + private class EncryptedQueueRefiller implements + ValueQueue.QueueRefiller { + + @Override + public void fillQueueForKey(String keyName, + Queue keyQueue, int numKeys) throws IOException { + List retEdeks = + new LinkedList(); + for (int i = 0; i < numKeys; i++) { + try { + retEdeks.add(keyProviderCryptoExtension.generateEncryptedKey( + keyName)); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + keyQueue.addAll(retEdeks); + } + } + + private KeyProviderCryptoExtension keyProviderCryptoExtension; + private final ValueQueue encKeyVersionQueue; + + public CryptoExtension(Configuration conf, + KeyProviderCryptoExtension keyProviderCryptoExtension) { + this.keyProviderCryptoExtension = keyProviderCryptoExtension; + encKeyVersionQueue = + new ValueQueue( + conf.getInt(KMS_KEY_CACHE_SIZE, + KMS_KEY_CACHE_SIZE_DEFAULT), + conf.getFloat(KMS_KEY_CACHE_LOW_WATERMARK, + KMS_KEY_CACHE_LOW_WATERMARK_DEFAULT), + conf.getInt(KMS_KEY_CACHE_EXPIRY_MS, + KMS_KEY_CACHE_EXPIRY_DEFAULT), + conf.getInt(KMS_KEY_CACHE_NUM_REFILL_THREADS, + KMS_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT), + SyncGenerationPolicy.LOW_WATERMARK, new EncryptedQueueRefiller() + ); + } + + @Override + public void warmUpEncryptedKeys(String... keyNames) throws + IOException { + try { + encKeyVersionQueue.initializeQueuesForKeys(keyNames); + } catch (ExecutionException e) { + throw new IOException(e); + } + } + + @Override + public EncryptedKeyVersion generateEncryptedKey(String encryptionKeyName) + throws IOException, GeneralSecurityException { + try { + return encKeyVersionQueue.getNext(encryptionKeyName); + } catch (ExecutionException e) { + throw new IOException(e); + } + } + + @Override + public KeyVersion + decryptEncryptedKey(EncryptedKeyVersion encryptedKeyVersion) + throws IOException, GeneralSecurityException { + return keyProviderCryptoExtension.decryptEncryptedKey( + encryptedKeyVersion); + } + } + + /** + * This class is a proxy for a KeyProviderCryptoExtension that + * decorates the underlying CryptoExtension with one that eagerly + * caches pre-generated Encrypted Keys using a ValueQueue + * + * @param conf Configuration object to load parameters from + * @param keyProviderCryptoExtension KeyProviderCryptoExtension + * to delegate calls to. + */ + public EagerKeyGeneratorKeyProviderCryptoExtension(Configuration conf, + KeyProviderCryptoExtension keyProviderCryptoExtension) { + super(keyProviderCryptoExtension, + new CryptoExtension(conf, keyProviderCryptoExtension)); + } + +} diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMS.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMS.java index 3574bf43b74..2b663368737 100644 --- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMS.java +++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMS.java @@ -20,6 +20,8 @@ package org.apache.hadoop.crypto.key.kms.server; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.crypto.key.kms.KMSRESTConstants; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.authentication.client.AuthenticationException; @@ -29,6 +31,7 @@ import org.apache.hadoop.util.StringUtils; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -39,10 +42,14 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.SecurityContext; + +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.security.Principal; import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -61,8 +68,10 @@ public class KMS { private static final String GET_CURRENT_KEY = "GET_CURRENT_KEY"; private static final String GET_KEY_VERSIONS = "GET_KEY_VERSIONS"; private static final String GET_METADATA = "GET_METADATA"; + private static final String GENERATE_EEK = "GENERATE_EEK"; + private static final String DECRYPT_EEK = "DECRYPT_EEK"; - private KeyProvider provider; + private KeyProviderCryptoExtension provider; public KMS() throws Exception { provider = KMSWebApp.getKeyProvider(); @@ -289,6 +298,92 @@ public class KMS { return Response.ok().type(MediaType.APPLICATION_JSON).entity(json).build(); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @GET + @Path(KMSRESTConstants.KEY_RESOURCE + "/{name:.*}/" + + KMSRESTConstants.EEK_SUB_RESOURCE) + @Produces(MediaType.APPLICATION_JSON) + public Response generateEncryptedKeys( + @Context SecurityContext securityContext, + @PathParam("name") String name, + @QueryParam(KMSRESTConstants.EEK_OP) String edekOp, + @DefaultValue("1") + @QueryParam(KMSRESTConstants.EEK_NUM_KEYS) int numKeys) + throws Exception { + Principal user = getPrincipal(securityContext); + KMSClientProvider.checkNotEmpty(name, "name"); + KMSClientProvider.checkNotNull(edekOp, "eekOp"); + + Object retJSON; + if (edekOp.equals(KMSRESTConstants.EEK_GENERATE)) { + assertAccess(KMSACLs.Type.GENERATE_EEK, user, GENERATE_EEK, name); + + List retEdeks = + new LinkedList(); + try { + for (int i = 0; i < numKeys; i ++) { + retEdeks.add(provider.generateEncryptedKey(name)); + } + } catch (Exception e) { + throw new IOException(e); + } + KMSAudit.ok(user, GENERATE_EEK, name, ""); + retJSON = new ArrayList(); + for (EncryptedKeyVersion edek : retEdeks) { + ((ArrayList)retJSON).add(KMSServerJSONUtils.toJSON(edek)); + } + } else { + throw new IllegalArgumentException("Wrong " + KMSRESTConstants.EEK_OP + + " value, it must be " + KMSRESTConstants.EEK_GENERATE + " or " + + KMSRESTConstants.EEK_DECRYPT); + } + KMSWebApp.getGenerateEEKCallsMeter().mark(); + return Response.ok().type(MediaType.APPLICATION_JSON).entity(retJSON) + .build(); + } + + @SuppressWarnings("rawtypes") + @POST + @Path(KMSRESTConstants.KEY_VERSION_RESOURCE + "/{versionName:.*}/" + + KMSRESTConstants.EEK_SUB_RESOURCE) + @Produces(MediaType.APPLICATION_JSON) + public Response decryptEncryptedKey(@Context SecurityContext securityContext, + @PathParam("versionName") String versionName, + @QueryParam(KMSRESTConstants.EEK_OP) String eekOp, + Map jsonPayload) + throws Exception { + Principal user = getPrincipal(securityContext); + KMSClientProvider.checkNotEmpty(versionName, "versionName"); + KMSClientProvider.checkNotNull(eekOp, "eekOp"); + + String keyName = (String) jsonPayload.get(KMSRESTConstants.NAME_FIELD); + String ivStr = (String) jsonPayload.get(KMSRESTConstants.IV_FIELD); + String encMaterialStr = + (String) jsonPayload.get(KMSRESTConstants.MATERIAL_FIELD); + Object retJSON; + if (eekOp.equals(KMSRESTConstants.EEK_DECRYPT)) { + assertAccess(KMSACLs.Type.DECRYPT_EEK, user, DECRYPT_EEK, versionName); + KMSClientProvider.checkNotNull(ivStr, KMSRESTConstants.IV_FIELD); + byte[] iv = Base64.decodeBase64(ivStr); + KMSClientProvider.checkNotNull(encMaterialStr, + KMSRESTConstants.MATERIAL_FIELD); + byte[] encMaterial = Base64.decodeBase64(encMaterialStr); + KeyProvider.KeyVersion retKeyVersion = + provider.decryptEncryptedKey( + new KMSClientProvider.KMSEncryptedKeyVersion(keyName, versionName, + iv, KeyProviderCryptoExtension.EEK, encMaterial)); + retJSON = KMSServerJSONUtils.toJSON(retKeyVersion); + KMSAudit.ok(user, DECRYPT_EEK, versionName, ""); + } else { + throw new IllegalArgumentException("Wrong " + KMSRESTConstants.EEK_OP + + " value, it must be " + KMSRESTConstants.EEK_GENERATE + " or " + + KMSRESTConstants.EEK_DECRYPT); + } + KMSWebApp.getDecryptEEKCallsMeter().mark(); + return Response.ok().type(MediaType.APPLICATION_JSON).entity(retJSON) + .build(); + } + @GET @Path(KMSRESTConstants.KEY_RESOURCE + "/{name:.*}/" + KMSRESTConstants.VERSIONS_SUB_RESOURCE) diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSACLs.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSACLs.java index e3e6ce09007..58e91475f73 100644 --- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSACLs.java +++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSACLs.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.crypto.key.kms.server; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; @@ -34,12 +35,14 @@ import java.util.concurrent.TimeUnit; * hot-reloading them if the kms-acls.xml file where the ACLs * are defined has been updated. */ +@InterfaceAudience.Private public class KMSACLs implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(KMSACLs.class); public enum Type { - CREATE, DELETE, ROLLOVER, GET, GET_KEYS, GET_METADATA, SET_KEY_MATERIAL; + CREATE, DELETE, ROLLOVER, GET, GET_KEYS, GET_METADATA, + SET_KEY_MATERIAL, GENERATE_EEK, DECRYPT_EEK; public String getConfigKey() { return KMSConfiguration.CONFIG_PREFIX + "acl." + this.toString(); diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java index 94501ecf3d4..aafb7046fc8 100644 --- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java +++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.crypto.key.kms.server; +import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.crypto.key.kms.KMSRESTConstants; import java.util.ArrayList; @@ -39,7 +41,9 @@ public class KMSServerJSONUtils { keyVersion.getName()); json.put(KMSRESTConstants.VERSION_NAME_FIELD, keyVersion.getVersionName()); - json.put(KMSRESTConstants.MATERIAL_FIELD, keyVersion.getMaterial()); + json.put(KMSRESTConstants.MATERIAL_FIELD, + Base64.encodeBase64URLSafeString( + keyVersion.getMaterial())); } return json; } @@ -55,6 +59,21 @@ public class KMSServerJSONUtils { return json; } + @SuppressWarnings("unchecked") + public static Map toJSON(EncryptedKeyVersion encryptedKeyVersion) { + Map json = new LinkedHashMap(); + if (encryptedKeyVersion != null) { + json.put(KMSRESTConstants.VERSION_NAME_FIELD, + encryptedKeyVersion.getKeyVersionName()); + json.put(KMSRESTConstants.IV_FIELD, + Base64.encodeBase64URLSafeString( + encryptedKeyVersion.getIv())); + json.put(KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD, + toJSON(encryptedKeyVersion.getEncryptedKey())); + } + return json; + } + @SuppressWarnings("unchecked") public static Map toJSON(String keyName, KeyProvider.Metadata meta) { Map json = new LinkedHashMap(); diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java index 88ea8c4fa42..d794463ac32 100644 --- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java +++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java @@ -20,10 +20,12 @@ package org.apache.hadoop.crypto.key.kms.server; import com.codahale.metrics.JmxReporter; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.CachingKeyProvider; import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.security.authorize.AccessControlList; @@ -35,6 +37,7 @@ import org.slf4j.bridge.SLF4JBridgeHandler; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; + import java.io.File; import java.net.URL; import java.util.List; @@ -55,6 +58,10 @@ public class KMSWebApp implements ServletContextListener { "unauthorized.calls.meter"; private static final String UNAUTHENTICATED_CALLS_METER = METRICS_PREFIX + "unauthenticated.calls.meter"; + private static final String GENERATE_EEK_METER = METRICS_PREFIX + + "generate_eek.calls.meter"; + private static final String DECRYPT_EEK_METER = METRICS_PREFIX + + "decrypt_eek.calls.meter"; private static Logger LOG; private static MetricRegistry metricRegistry; @@ -66,8 +73,10 @@ public class KMSWebApp implements ServletContextListener { private static Meter keyCallsMeter; private static Meter unauthorizedCallsMeter; private static Meter unauthenticatedCallsMeter; + private static Meter decryptEEKCallsMeter; + private static Meter generateEEKCallsMeter; private static Meter invalidCallsMeter; - private static KeyProvider keyProvider; + private static KeyProviderCryptoExtension keyProviderCryptoExtension; static { SLF4JBridgeHandler.removeHandlersForRootLogger(); @@ -122,6 +131,10 @@ public class KMSWebApp implements ServletContextListener { metricRegistry = new MetricRegistry(); jmxReporter = JmxReporter.forRegistry(metricRegistry).build(); jmxReporter.start(); + generateEEKCallsMeter = metricRegistry.register(GENERATE_EEK_METER, + new Meter()); + decryptEEKCallsMeter = metricRegistry.register(DECRYPT_EEK_METER, + new Meter()); adminCallsMeter = metricRegistry.register(ADMIN_CALLS_METER, new Meter()); keyCallsMeter = metricRegistry.register(KEY_CALLS_METER, new Meter()); invalidCallsMeter = metricRegistry.register(INVALID_CALLS_METER, @@ -150,7 +163,7 @@ public class KMSWebApp implements ServletContextListener { "the first provider", kmsConf.get(KeyProviderFactory.KEY_PROVIDER_PATH)); } - keyProvider = providers.get(0); + KeyProvider keyProvider = providers.get(0); if (kmsConf.getBoolean(KMSConfiguration.KEY_CACHE_ENABLE, KMSConfiguration.KEY_CACHE_ENABLE_DEFAULT)) { long keyTimeOutMillis = @@ -162,6 +175,11 @@ public class KMSWebApp implements ServletContextListener { keyProvider = new CachingKeyProvider(keyProvider, keyTimeOutMillis, currKeyTimeOutMillis); } + keyProviderCryptoExtension = KeyProviderCryptoExtension. + createKeyProviderCryptoExtension(keyProvider); + keyProviderCryptoExtension = + new EagerKeyGeneratorKeyProviderCryptoExtension(kmsConf, + keyProviderCryptoExtension); LOG.info("KMS Started"); } catch (Throwable ex) { @@ -208,6 +226,14 @@ public class KMSWebApp implements ServletContextListener { return invalidCallsMeter; } + public static Meter getGenerateEEKCallsMeter() { + return generateEEKCallsMeter; + } + + public static Meter getDecryptEEKCallsMeter() { + return decryptEEKCallsMeter; + } + public static Meter getUnauthorizedCallsMeter() { return unauthorizedCallsMeter; } @@ -216,7 +242,7 @@ public class KMSWebApp implements ServletContextListener { return unauthenticatedCallsMeter; } - public static KeyProvider getKeyProvider() { - return keyProvider; + public static KeyProviderCryptoExtension getKeyProvider() { + return keyProviderCryptoExtension; } } diff --git a/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm b/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm index 297d0325d01..41a2cd968af 100644 --- a/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm +++ b/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm @@ -279,6 +279,25 @@ $ keytool -genkey -alias tomcat -keyalg RSA to provide the key material when creating or rolling a key. + + + hadoop.kms.acl.GENERATE_EEK + * + + ACL for generateEncryptedKey + CryptoExtension operations + + + + + hadoop.kms.acl.DECRYPT_EEK + * + + ACL for decrypt EncryptedKey + CryptoExtension operations + + + +---+ ** KMS HTTP REST API @@ -396,6 +415,70 @@ Content-Type: application/json } +---+ + +*** Generate Encrypted Key for Current KeyVersion + + + ++---+ +GET http://HOST:PORT/kms/v1/key//_eek?eek_op=generate&num_keys= ++---+ + + + ++---+ +200 OK +Content-Type: application/json +[ + { + "versionName" : "encryptionVersionName", + "iv" : "", //base64 + "encryptedKeyVersion" : { + "versionName" : "EEK", + "material" : "", //base64 + } + }, + { + "versionName" : "encryptionVersionName", + "iv" : "", //base64 + "encryptedKeyVersion" : { + "versionName" : "EEK", + "material" : "", //base64 + } + }, + ... +] ++---+ + +*** Decrypt Encrypted Key + + + ++---+ +POST http://HOST:PORT/kms/v1/keyversion//_eek?ee_op=decrypt +Content-Type: application/json + +{ + "name" : "", + "iv" : "", //base64 + "material" : "", //base64 +} + ++---+ + + + ++---+ +200 OK +Content-Type: application/json + +{ + "name" : "EK", + "material" : "", //base64 +} ++---+ + + *** Get Key Version diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java index a3cd29db7b5..26b334df455 100644 --- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java +++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java @@ -19,6 +19,9 @@ package org.apache.hadoop.crypto.key.kms.server; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.crypto.key.kms.KMSClientProvider; import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.security.authorize.AuthorizationException; @@ -36,6 +39,7 @@ import javax.security.auth.Subject; import javax.security.auth.kerberos.KerberosPrincipal; import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.LoginContext; + import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -267,7 +271,7 @@ public class TestKMS { } } - private void doAs(String user, final PrivilegedExceptionAction action) + private T doAs(String user, final PrivilegedExceptionAction action) throws Exception { Set principals = new HashSet(); principals.add(new KerberosPrincipal(user)); @@ -280,7 +284,7 @@ public class TestKMS { try { loginContext.login(); subject = loginContext.getSubject(); - Subject.doAs(subject, action); + return Subject.doAs(subject, action); } finally { loginContext.logout(); } @@ -474,6 +478,32 @@ public class TestKMS { Assert.assertNotNull(kms1[0].getCreated()); Assert.assertTrue(started.before(kms1[0].getCreated())); + // test generate and decryption of EEK + KeyProvider.KeyVersion kv = kp.getCurrentKey("k1"); + KeyProviderCryptoExtension kpExt = + KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp); + + EncryptedKeyVersion ek1 = kpExt.generateEncryptedKey(kv.getName()); + Assert.assertEquals(KeyProviderCryptoExtension.EEK, + ek1.getEncryptedKey().getVersionName()); + Assert.assertNotNull(ek1.getEncryptedKey().getMaterial()); + Assert.assertEquals(kv.getMaterial().length, + ek1.getEncryptedKey().getMaterial().length); + KeyProvider.KeyVersion k1 = kpExt.decryptEncryptedKey(ek1); + Assert.assertEquals(KeyProviderCryptoExtension.EK, k1.getVersionName()); + KeyProvider.KeyVersion k1a = kpExt.decryptEncryptedKey(ek1); + Assert.assertArrayEquals(k1.getMaterial(), k1a.getMaterial()); + Assert.assertEquals(kv.getMaterial().length, k1.getMaterial().length); + + EncryptedKeyVersion ek2 = kpExt.generateEncryptedKey(kv.getName()); + KeyProvider.KeyVersion k2 = kpExt.decryptEncryptedKey(ek2); + boolean isEq = true; + for (int i = 0; isEq && i < ek2.getEncryptedKey().getMaterial().length; + i++) { + isEq = k2.getMaterial()[i] == k1.getMaterial()[i]; + } + Assert.assertFalse(isEq); + // deleteKey() kp.deleteKey("k1"); @@ -565,7 +595,7 @@ public class TestKMS { @Override public Void call() throws Exception { final Configuration conf = new Configuration(); - conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 64); + conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128); URI uri = createKMSUri(getKMSUrl()); final KeyProvider kp = new KMSClientProvider(uri, conf); @@ -582,7 +612,7 @@ public class TestKMS { Assert.fail(ex.toString()); } try { - kp.createKey("k", new byte[8], new KeyProvider.Options(conf)); + kp.createKey("k", new byte[16], new KeyProvider.Options(conf)); Assert.fail(); } catch (AuthorizationException ex) { //NOP @@ -598,7 +628,7 @@ public class TestKMS { Assert.fail(ex.toString()); } try { - kp.rollNewVersion("k", new byte[8]); + kp.rollNewVersion("k", new byte[16]); Assert.fail(); } catch (AuthorizationException ex) { //NOP @@ -690,7 +720,7 @@ public class TestKMS { @Override public Void run() throws Exception { try { - KeyProvider.KeyVersion kv = kp.createKey("k1", new byte[8], + KeyProvider.KeyVersion kv = kp.createKey("k1", new byte[16], new KeyProvider.Options(conf)); Assert.assertNull(kv.getMaterial()); } catch (Exception ex) { @@ -717,7 +747,8 @@ public class TestKMS { @Override public Void run() throws Exception { try { - KeyProvider.KeyVersion kv = kp.rollNewVersion("k1", new byte[8]); + KeyProvider.KeyVersion kv = + kp.rollNewVersion("k1", new byte[16]); Assert.assertNull(kv.getMaterial()); } catch (Exception ex) { Assert.fail(ex.toString()); @@ -726,12 +757,46 @@ public class TestKMS { } }); - doAs("GET", new PrivilegedExceptionAction() { + final KeyVersion currKv = + doAs("GET", new PrivilegedExceptionAction() { + @Override + public KeyVersion run() throws Exception { + try { + kp.getKeyVersion("k1@0"); + KeyVersion kv = kp.getCurrentKey("k1"); + return kv; + } catch (Exception ex) { + Assert.fail(ex.toString()); + } + return null; + } + }); + + final EncryptedKeyVersion encKv = + doAs("GENERATE_EEK", + new PrivilegedExceptionAction() { + @Override + public EncryptedKeyVersion run() throws Exception { + try { + KeyProviderCryptoExtension kpCE = KeyProviderCryptoExtension. + createKeyProviderCryptoExtension(kp); + EncryptedKeyVersion ek1 = + kpCE.generateEncryptedKey(currKv.getName()); + return ek1; + } catch (Exception ex) { + Assert.fail(ex.toString()); + } + return null; + } + }); + + doAs("DECRYPT_EEK", new PrivilegedExceptionAction() { @Override public Void run() throws Exception { try { - kp.getKeyVersion("k1@0"); - kp.getCurrentKey("k1"); + KeyProviderCryptoExtension kpCE = KeyProviderCryptoExtension. + createKeyProviderCryptoExtension(kp); + kpCE.decryptEncryptedKey(encKv); } catch (Exception ex) { Assert.fail(ex.toString()); } @@ -817,7 +882,7 @@ public class TestKMS { @Override public Void call() throws Exception { final Configuration conf = new Configuration(); - conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 64); + conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128); URI uri = createKMSUri(getKMSUrl()); final KeyProvider kp = new KMSClientProvider(uri, conf); @@ -889,6 +954,30 @@ public class TestKMS { Assert.assertTrue("Caught unexpected exception" + e.toString(), false); } + caughtTimeout = false; + try { + KeyProvider kp = new KMSClientProvider(uri, conf); + KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp) + .generateEncryptedKey("a"); + } catch (SocketTimeoutException e) { + caughtTimeout = true; + } catch (IOException e) { + Assert.assertTrue("Caught unexpected exception" + e.toString(), false); + } + + caughtTimeout = false; + try { + KeyProvider kp = new KMSClientProvider(uri, conf); + KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp) + .decryptEncryptedKey( + new KMSClientProvider.KMSEncryptedKeyVersion("a", + "a", new byte[] {1, 2}, "EEK", new byte[] {1, 2})); + } catch (SocketTimeoutException e) { + caughtTimeout = true; + } catch (IOException e) { + Assert.assertTrue("Caught unexpected exception" + e.toString(), false); + } + Assert.assertTrue(caughtTimeout); sock.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3ee4c827b54..f39a7544325 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -606,6 +606,9 @@ Release 2.5.0 - UNRELEASED HDFS-6493. Change dfs.namenode.startup.delay.block.deletion to second instead of millisecond. (Juan Yu via wang) + HDFS-6680. BlockPlacementPolicyDefault does not choose favored nodes + correctly. (szetszwo) + OPTIMIZATIONS HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 406f065929c..9932bff2aa0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -148,14 +148,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { List results = new ArrayList(); boolean avoidStaleNodes = stats != null && stats.isAvoidingStaleDataNodesForWrite(); - for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) { + for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) { DatanodeDescriptor favoredNode = favoredNodes.get(i); // Choose a single node which is local to favoredNode. // 'results' is updated within chooseLocalNode final DatanodeStorageInfo target = chooseLocalStorage(favoredNode, favoriteAndExcludedNodes, blocksize, getMaxNodesPerRack(results.size(), numOfReplicas)[1], - results, avoidStaleNodes, storageTypes.get(0)); + results, avoidStaleNodes, storageTypes.get(0), false); if (target == null) { LOG.warn("Could not find a target for file " + src + " with favored node " + favoredNode); @@ -277,7 +277,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { try { if (numOfResults == 0) { writer = chooseLocalStorage(writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0)) + maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0), true) .getDatanodeDescriptor(); if (--numOfReplicas == 0) { return writer; @@ -351,12 +351,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { int maxNodesPerRack, List results, boolean avoidStaleNodes, - StorageType storageType) + StorageType storageType, + boolean fallbackToLocalRack) throws NotEnoughReplicasException { // if no local machine, randomly choose one node - if (localMachine == null) + if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + } if (preferLocalNode && localMachine instanceof DatanodeDescriptor) { DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine; // otherwise try local machine first @@ -369,7 +371,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } } } - } + } + + if (!fallbackToLocalRack) { + return null; + } // try a node on local rack return chooseLocalRack(localMachine, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index 1069b4e3d75..b3ff6b9b1f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -70,7 +70,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, Set excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes, - StorageType storageType) throws NotEnoughReplicasException { + StorageType storageType, boolean fallbackToLocalRack + ) throws NotEnoughReplicasException { // if no local machine, randomly choose one node if (localMachine == null) return chooseRandom(NodeBase.ROOT, excludedNodes, @@ -97,6 +98,10 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau if (chosenStorage != null) { return chosenStorage; } + + if (!fallbackToLocalRack) { + return null; + } // try a node on local rack return chooseLocalRack(localMachine, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java index ea5bb7a91ed..4f110372be8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java @@ -18,32 +18,41 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; -import java.util.ArrayList; -import java.util.Random; -import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Random; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.junit.Test; +import org.apache.log4j.Level; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Test; public class TestFavoredNodesEndToEnd { + { + ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)).getLogger().setLevel(Level.ALL); + } + private static MiniDFSCluster cluster; private static Configuration conf; private final static int NUM_DATA_NODES = 10; @@ -79,7 +88,7 @@ public class TestFavoredNodesEndToEnd { InetSocketAddress datanode[] = getDatanodes(rand); Path p = new Path("/filename"+i); FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, - 4096, (short)3, (long)4096, null, datanode); + 4096, (short)3, 4096L, null, datanode); out.write(SOME_BYTES); out.close(); BlockLocation[] locations = getBlockLocations(p); @@ -98,14 +107,13 @@ public class TestFavoredNodesEndToEnd { //get some other nodes. In other words, the write to hdfs should not fail //and if we do getBlockLocations on the file, we should see one blklocation //and three hosts for that - Random rand = new Random(System.currentTimeMillis()); InetSocketAddress arbitraryAddrs[] = new InetSocketAddress[3]; for (int i = 0; i < 3; i++) { arbitraryAddrs[i] = getArbitraryLocalHostAddr(); } Path p = new Path("/filename-foo-bar"); FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, - 4096, (short)3, (long)4096, null, arbitraryAddrs); + 4096, (short)3, 4096L, null, arbitraryAddrs); out.write(SOME_BYTES); out.close(); getBlockLocations(p); @@ -113,35 +121,41 @@ public class TestFavoredNodesEndToEnd { @Test(timeout=180000) public void testWhenSomeNodesAreNotGood() throws Exception { + // 4 favored nodes + final InetSocketAddress addrs[] = new InetSocketAddress[4]; + final String[] hosts = new String[addrs.length]; + for (int i = 0; i < addrs.length; i++) { + addrs[i] = datanodes.get(i).getXferAddress(); + hosts[i] = addrs[i].getAddress().getHostAddress() + ":" + addrs[i].getPort(); + } + //make some datanode not "good" so that even if the client prefers it, //the namenode would not give it as a replica to write to DatanodeInfo d = cluster.getNameNode().getNamesystem().getBlockManager() .getDatanodeManager().getDatanodeByXferAddr( - datanodes.get(0).getXferAddress().getAddress().getHostAddress(), - datanodes.get(0).getXferAddress().getPort()); + addrs[0].getAddress().getHostAddress(), addrs[0].getPort()); //set the decommission status to true so that //BlockPlacementPolicyDefault.isGoodTarget returns false for this dn d.setDecommissioned(); - InetSocketAddress addrs[] = new InetSocketAddress[3]; - for (int i = 0; i < 3; i++) { - addrs[i] = datanodes.get(i).getXferAddress(); - } Path p = new Path("/filename-foo-bar-baz"); + final short replication = (short)3; FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, - 4096, (short)3, (long)4096, null, addrs); + 4096, replication, 4096L, null, addrs); out.write(SOME_BYTES); out.close(); //reset the state d.stopDecommission(); + BlockLocation[] locations = getBlockLocations(p); + Assert.assertEquals(replication, locations[0].getNames().length);; //also make sure that the datanode[0] is not in the list of hosts - String datanode0 = - datanodes.get(0).getXferAddress().getAddress().getHostAddress() - + ":" + datanodes.get(0).getXferAddress().getPort(); - for (int i = 0; i < 3; i++) { - if (locations[0].getNames()[i].equals(datanode0)) { - fail(datanode0 + " not supposed to be a replica for the block"); - } + for (int i = 0; i < replication; i++) { + final String loc = locations[0].getNames()[i]; + int j = 0; + for(; j < hosts.length && !loc.equals(hosts[j]); j++); + Assert.assertTrue("j=" + j, j > 0); + Assert.assertTrue("loc=" + loc + " not in host list " + + Arrays.asList(hosts) + ", j=" + j, j < hosts.length); } } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 7da3de0f8ff..84295f3d238 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -169,6 +169,12 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current attempt is the last retry. (Wangda Tan via zjshen) + MAPREDUCE-5957. AM throws ClassNotFoundException with job classloader + enabled if custom output format/committer is used (Sangjin Lee via jlowe) + + MAPREDUCE-5756. CombineFileInputFormat.getSplits() including directories + in its results (Jason Dere via jlowe) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index dceb9b15fed..8c1892af392 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocalContainerLauncher; @@ -200,6 +199,7 @@ public class MRAppMaster extends CompositeService { new JobTokenSecretManager(); private JobId jobId; private boolean newApiCommitter; + private ClassLoader jobClassLoader; private OutputCommitter committer; private JobEventDispatcher jobEventDispatcher; private JobHistoryEventHandler jobHistoryEventHandler; @@ -250,6 +250,9 @@ public class MRAppMaster extends CompositeService { @Override protected void serviceInit(final Configuration conf) throws Exception { + // create the job classloader if enabled + createJobClassLoader(conf); + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); initJobCredentialsAndUGI(conf); @@ -387,8 +390,13 @@ public class MRAppMaster extends CompositeService { addIfService(committerEventHandler); //policy handling preemption requests from RM - preemptionPolicy = createPreemptionPolicy(conf); - preemptionPolicy.init(context); + callWithJobClassLoader(conf, new Action() { + public Void call(Configuration conf) { + preemptionPolicy = createPreemptionPolicy(conf); + preemptionPolicy.init(context); + return null; + } + }); //service to handle requests to TaskUmbilicalProtocol taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy); @@ -453,33 +461,37 @@ public class MRAppMaster extends CompositeService { } private OutputCommitter createOutputCommitter(Configuration conf) { - OutputCommitter committer = null; + return callWithJobClassLoader(conf, new Action() { + public OutputCommitter call(Configuration conf) { + OutputCommitter committer = null; - LOG.info("OutputCommitter set in config " - + conf.get("mapred.output.committer.class")); + LOG.info("OutputCommitter set in config " + + conf.get("mapred.output.committer.class")); - if (newApiCommitter) { - org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils - .newTaskId(jobId, 0, TaskType.MAP); - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils - .newTaskAttemptId(taskID, 0); - TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf, - TypeConverter.fromYarn(attemptID)); - OutputFormat outputFormat; - try { - outputFormat = ReflectionUtils.newInstance(taskContext - .getOutputFormatClass(), conf); - committer = outputFormat.getOutputCommitter(taskContext); - } catch (Exception e) { - throw new YarnRuntimeException(e); + if (newApiCommitter) { + org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = + MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = + MRBuilderUtils.newTaskAttemptId(taskID, 0); + TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf, + TypeConverter.fromYarn(attemptID)); + OutputFormat outputFormat; + try { + outputFormat = ReflectionUtils.newInstance(taskContext + .getOutputFormatClass(), conf); + committer = outputFormat.getOutputCommitter(taskContext); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } else { + committer = ReflectionUtils.newInstance(conf.getClass( + "mapred.output.committer.class", FileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class), conf); + } + LOG.info("OutputCommitter is " + committer.getClass().getName()); + return committer; } - } else { - committer = ReflectionUtils.newInstance(conf.getClass( - "mapred.output.committer.class", FileOutputCommitter.class, - org.apache.hadoop.mapred.OutputCommitter.class), conf); - } - LOG.info("OutputCommitter is " + committer.getClass().getName()); - return committer; + }); } protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) { @@ -667,38 +679,42 @@ public class MRAppMaster extends CompositeService { return new StagingDirCleaningService(); } - protected Speculator createSpeculator(Configuration conf, AppContext context) { - Class speculatorClass; + protected Speculator createSpeculator(Configuration conf, + final AppContext context) { + return callWithJobClassLoader(conf, new Action() { + public Speculator call(Configuration conf) { + Class speculatorClass; + try { + speculatorClass + // "yarn.mapreduce.job.speculator.class" + = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR, + DefaultSpeculator.class, + Speculator.class); + Constructor speculatorConstructor + = speculatorClass.getConstructor + (Configuration.class, AppContext.class); + Speculator result = speculatorConstructor.newInstance(conf, context); - try { - speculatorClass - // "yarn.mapreduce.job.speculator.class" - = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR, - DefaultSpeculator.class, - Speculator.class); - Constructor speculatorConstructor - = speculatorClass.getConstructor - (Configuration.class, AppContext.class); - Speculator result = speculatorConstructor.newInstance(conf, context); - - return result; - } catch (InstantiationException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (IllegalAccessException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (InvocationTargetException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } catch (NoSuchMethodException ex) { - LOG.error("Can't make a speculator -- check " - + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); - throw new YarnRuntimeException(ex); - } + return result; + } catch (InstantiationException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (IllegalAccessException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (InvocationTargetException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } catch (NoSuchMethodException ex) { + LOG.error("Can't make a speculator -- check " + + MRJobConfig.MR_AM_JOB_SPECULATOR, ex); + throw new YarnRuntimeException(ex); + } + } + }); } protected TaskAttemptListener createTaskAttemptListener(AppContext context, @@ -712,7 +728,7 @@ public class MRAppMaster extends CompositeService { protected EventHandler createCommitterEventHandler( AppContext context, OutputCommitter committer) { return new CommitterEventHandler(context, committer, - getRMHeartbeatHandler()); + getRMHeartbeatHandler(), jobClassLoader); } protected ContainerAllocator createContainerAllocator( @@ -1083,8 +1099,8 @@ public class MRAppMaster extends CompositeService { //start all the components super.serviceStart(); - // set job classloader if configured - MRApps.setJobClassLoader(getConfig()); + // finally set the job classloader + MRApps.setClassLoader(jobClassLoader, getConfig()); if (initFailed) { JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); @@ -1101,19 +1117,24 @@ public class MRAppMaster extends CompositeService { TaskLog.syncLogsShutdown(logSyncer); } - private boolean isRecoverySupported(OutputCommitter committer2) - throws IOException { + private boolean isRecoverySupported() throws IOException { boolean isSupported = false; - JobContext _jobContext; + Configuration conf = getConfig(); if (committer != null) { + final JobContext _jobContext; if (newApiCommitter) { _jobContext = new JobContextImpl( - getConfig(), TypeConverter.fromYarn(getJobId())); + conf, TypeConverter.fromYarn(getJobId())); } else { _jobContext = new org.apache.hadoop.mapred.JobContextImpl( - new JobConf(getConfig()), TypeConverter.fromYarn(getJobId())); + new JobConf(conf), TypeConverter.fromYarn(getJobId())); } - isSupported = committer.isRecoverySupported(_jobContext); + isSupported = callWithJobClassLoader(conf, + new ExceptionAction() { + public Boolean call(Configuration conf) throws IOException { + return committer.isRecoverySupported(_jobContext); + } + }); } return isSupported; } @@ -1127,7 +1148,7 @@ public class MRAppMaster extends CompositeService { MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT); - boolean recoverySupportedByCommitter = isRecoverySupported(committer); + boolean recoverySupportedByCommitter = isRecoverySupported(); // If a shuffle secret was not provided by the job client then this app // attempt will generate one. However that disables recovery if there @@ -1312,7 +1333,7 @@ public class MRAppMaster extends CompositeService { this.conf = config; } @Override - public void handle(SpeculatorEvent event) { + public void handle(final SpeculatorEvent event) { if (disabled) { return; } @@ -1339,7 +1360,12 @@ public class MRAppMaster extends CompositeService { if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP)) || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) { // Speculator IS enabled, direct the event to there. - speculator.handle(event); + callWithJobClassLoader(conf, new Action() { + public Void call(Configuration conf) { + speculator.handle(event); + return null; + } + }); } } @@ -1499,6 +1525,102 @@ public class MRAppMaster extends CompositeService { }); } + /** + * Creates a job classloader based on the configuration if the job classloader + * is enabled. It is a no-op if the job classloader is not enabled. + */ + private void createJobClassLoader(Configuration conf) throws IOException { + jobClassLoader = MRApps.createJobClassLoader(conf); + } + + /** + * Executes the given action with the job classloader set as the configuration + * classloader as well as the thread context class loader if the job + * classloader is enabled. After the call, the original classloader is + * restored. + * + * If the job classloader is enabled and the code needs to load user-supplied + * classes via configuration or thread context classloader, this method should + * be used in order to load them. + * + * @param conf the configuration on which the classloader will be set + * @param action the callable action to be executed + */ + T callWithJobClassLoader(Configuration conf, Action action) { + // if the job classloader is enabled, we may need it to load the (custom) + // classes; we make the job classloader available and unset it once it is + // done + ClassLoader currentClassLoader = conf.getClassLoader(); + boolean setJobClassLoader = + jobClassLoader != null && currentClassLoader != jobClassLoader; + if (setJobClassLoader) { + MRApps.setClassLoader(jobClassLoader, conf); + } + try { + return action.call(conf); + } finally { + if (setJobClassLoader) { + // restore the original classloader + MRApps.setClassLoader(currentClassLoader, conf); + } + } + } + + /** + * Executes the given action that can throw a checked exception with the job + * classloader set as the configuration classloader as well as the thread + * context class loader if the job classloader is enabled. After the call, the + * original classloader is restored. + * + * If the job classloader is enabled and the code needs to load user-supplied + * classes via configuration or thread context classloader, this method should + * be used in order to load them. + * + * @param conf the configuration on which the classloader will be set + * @param action the callable action to be executed + * @throws IOException if the underlying action throws an IOException + * @throws YarnRuntimeException if the underlying action throws an exception + * other than an IOException + */ + T callWithJobClassLoader(Configuration conf, ExceptionAction action) + throws IOException { + // if the job classloader is enabled, we may need it to load the (custom) + // classes; we make the job classloader available and unset it once it is + // done + ClassLoader currentClassLoader = conf.getClassLoader(); + boolean setJobClassLoader = + jobClassLoader != null && currentClassLoader != jobClassLoader; + if (setJobClassLoader) { + MRApps.setClassLoader(jobClassLoader, conf); + } + try { + return action.call(conf); + } catch (IOException e) { + throw e; + } catch (YarnRuntimeException e) { + throw e; + } catch (Exception e) { + // wrap it with a YarnRuntimeException + throw new YarnRuntimeException(e); + } finally { + if (setJobClassLoader) { + // restore the original classloader + MRApps.setClassLoader(currentClassLoader, conf); + } + } + } + + /** + * Action to be wrapped with setting and unsetting the job classloader + */ + private static interface Action { + T call(Configuration conf); + } + + private static interface ExceptionAction { + T call(Configuration conf) throws Exception; + } + @Override protected void serviceStop() throws Exception { super.serviceStop(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java index f15bce23577..8c3be86cb11 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java @@ -68,6 +68,7 @@ public class CommitterEventHandler extends AbstractService private BlockingQueue eventQueue = new LinkedBlockingQueue(); private final AtomicBoolean stopped; + private final ClassLoader jobClassLoader; private Thread jobCommitThread = null; private int commitThreadCancelTimeoutMs; private long commitWindowMs; @@ -79,11 +80,17 @@ public class CommitterEventHandler extends AbstractService public CommitterEventHandler(AppContext context, OutputCommitter committer, RMHeartbeatHandler rmHeartbeatHandler) { + this(context, committer, rmHeartbeatHandler, null); + } + + public CommitterEventHandler(AppContext context, OutputCommitter committer, + RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) { super("CommitterEventHandler"); this.context = context; this.committer = committer; this.rmHeartbeatHandler = rmHeartbeatHandler; this.stopped = new AtomicBoolean(false); + this.jobClassLoader = jobClassLoader; } @Override @@ -109,9 +116,23 @@ public class CommitterEventHandler extends AbstractService @Override protected void serviceStart() throws Exception { - ThreadFactory tf = new ThreadFactoryBuilder() - .setNameFormat("CommitterEvent Processor #%d") - .build(); + ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder() + .setNameFormat("CommitterEvent Processor #%d"); + if (jobClassLoader != null) { + // if the job classloader is enabled, we need to use the job classloader + // as the thread context classloader (TCCL) of these threads in case the + // committer needs to load another class via TCCL + ThreadFactory backingTf = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setContextClassLoader(jobClassLoader); + return thread; + } + }; + tfBuilder.setThreadFactory(backingTf); + } + ThreadFactory tf = tfBuilder.build(); launcherPool = new ThreadPoolExecutor(5, 5, 1, TimeUnit.HOURS, new LinkedBlockingQueue(), tf); eventHandlingThread = new Thread(new Runnable() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 4862810bfa5..423b842962f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -327,8 +327,8 @@ public class MRApps extends Apps { } /** - * Sets a {@link ApplicationClassLoader} on the given configuration and as - * the context classloader, if + * Creates and sets a {@link ApplicationClassLoader} on the given + * configuration and as the thread context classloader, if * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and * the APP_CLASSPATH environment variable is set. * @param conf @@ -336,24 +336,52 @@ public class MRApps extends Apps { */ public static void setJobClassLoader(Configuration conf) throws IOException { + setClassLoader(createJobClassLoader(conf), conf); + } + + /** + * Creates a {@link ApplicationClassLoader} if + * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and + * the APP_CLASSPATH environment variable is set. + * @param conf + * @returns the created job classloader, or null if the job classloader is not + * enabled or the APP_CLASSPATH environment variable is not set + * @throws IOException + */ + public static ClassLoader createJobClassLoader(Configuration conf) + throws IOException { + ClassLoader jobClassLoader = null; if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) { String appClasspath = System.getenv(Environment.APP_CLASSPATH.key()); if (appClasspath == null) { - LOG.warn("Not using job classloader since APP_CLASSPATH is not set."); + LOG.warn("Not creating job classloader since APP_CLASSPATH is not set."); } else { - LOG.info("Using job classloader"); + LOG.info("Creating job classloader"); if (LOG.isDebugEnabled()) { LOG.debug("APP_CLASSPATH=" + appClasspath); } String[] systemClasses = getSystemClasses(conf); - ClassLoader jobClassLoader = createJobClassLoader(appClasspath, + jobClassLoader = createJobClassLoader(appClasspath, systemClasses); - if (jobClassLoader != null) { - conf.setClassLoader(jobClassLoader); - Thread.currentThread().setContextClassLoader(jobClassLoader); - } } } + return jobClassLoader; + } + + /** + * Sets the provided classloader on the given configuration and as the thread + * context classloader if the classloader is not null. + * @param classLoader + * @param conf + */ + public static void setClassLoader(ClassLoader classLoader, + Configuration conf) { + if (classLoader != null) { + LOG.info("Setting classloader " + classLoader.getClass().getName() + + " on the configuration and as the thread context classloader"); + conf.setClassLoader(classLoader); + Thread.currentThread().setContextClassLoader(classLoader); + } } @VisibleForTesting diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java index 4cf5c36f71b..040c54be975 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java @@ -579,7 +579,7 @@ public abstract class CombineFileInputFormat blocks = new OneBlockInfo[0]; } else { - if(locations.length == 0) { + if(locations.length == 0 && !stat.isDirectory()) { locations = new BlockLocation[] { new BlockLocation() }; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java index 1e7386fd60f..4290914728e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java @@ -1274,6 +1274,61 @@ public class TestCombineFileInputFormat extends TestCase { fileSys.delete(file.getParent(), true); } + /** + * Test that directories do not get included as part of getSplits() + */ + @Test + public void testGetSplitsWithDirectory() throws Exception { + MiniDFSCluster dfs = null; + try { + Configuration conf = new Configuration(); + dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1) + .build(); + dfs.waitActive(); + + dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1) + .build(); + dfs.waitActive(); + + FileSystem fileSys = dfs.getFileSystem(); + + // Set up the following directory structure: + // /dir1/: directory + // /dir1/file: regular file + // /dir1/dir2/: directory + Path dir1 = new Path("/dir1"); + Path file = new Path("/dir1/file1"); + Path dir2 = new Path("/dir1/dir2"); + if (!fileSys.mkdirs(dir1)) { + throw new IOException("Mkdirs failed to create " + dir1.toString()); + } + FSDataOutputStream out = fileSys.create(file); + out.write(new byte[0]); + out.close(); + if (!fileSys.mkdirs(dir2)) { + throw new IOException("Mkdirs failed to create " + dir2.toString()); + } + + // split it using a CombinedFile input format + DummyInputFormat inFormat = new DummyInputFormat(); + Job job = Job.getInstance(conf); + FileInputFormat.setInputPaths(job, "/dir1"); + List splits = inFormat.getSplits(job); + + // directories should be omitted from getSplits() - we should only see file1 and not dir2 + assertEquals(1, splits.size()); + CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(file.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(0, fileSplit.getLength(0)); + } finally { + if (dfs != null) { + dfs.shutdown(); + } + } + } + /** * Test when input files are from non-default file systems */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index 2027d37b362..6b47554e8eb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -33,8 +33,8 @@ import java.util.HashMap; import java.util.Map; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; -import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.FailingMapper; @@ -77,6 +77,10 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; +import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; +import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -86,6 +90,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.util.ApplicationClassLoader; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Level; import org.junit.AfterClass; @@ -210,7 +215,19 @@ public class TestMRJobs { @Test(timeout = 300000) public void testJobClassloader() throws IOException, InterruptedException, ClassNotFoundException { - LOG.info("\n\n\nStarting testJobClassloader()."); + testJobClassloader(false); + } + + @Test(timeout = 300000) + public void testJobClassloaderWithCustomClasses() throws IOException, + InterruptedException, ClassNotFoundException { + testJobClassloader(true); + } + + private void testJobClassloader(boolean useCustomClasses) throws IOException, + InterruptedException, ClassNotFoundException { + LOG.info("\n\n\nStarting testJobClassloader()" + + " useCustomClasses=" + useCustomClasses); if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR @@ -221,6 +238,19 @@ public class TestMRJobs { // set master address to local to test that local mode applied iff framework == local sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true); + if (useCustomClasses) { + // to test AM loading user classes such as output format class, we want + // to blacklist them from the system classes (they need to be prepended + // as the first match wins) + String systemClasses = + sleepConf.get(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES); + // exclude the custom classes from system classes + systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" + + CustomSpeculator.class.getName() + "," + + systemClasses; + sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES, + systemClasses); + } sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB); sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString()); sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString()); @@ -233,12 +263,66 @@ public class TestMRJobs { job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. job.setJarByClass(SleepJob.class); job.setMaxMapAttempts(1); // speed up failures + if (useCustomClasses) { + // set custom output format class and speculator class + job.setOutputFormatClass(CustomOutputFormat.class); + final Configuration jobConf = job.getConfiguration(); + jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class, + Speculator.class); + // speculation needs to be enabled for the speculator to be loaded + jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true); + } job.submit(); boolean succeeded = job.waitForCompletion(true); Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(), succeeded); } + public static class CustomOutputFormat extends NullOutputFormat { + public CustomOutputFormat() { + verifyClassLoader(getClass()); + } + + /** + * Verifies that the class was loaded by the job classloader if it is in the + * context of the MRAppMaster, and if not throws an exception to fail the + * job. + */ + private void verifyClassLoader(Class cls) { + // to detect that it is instantiated in the context of the MRAppMaster, we + // inspect the stack trace and determine a caller is MRAppMaster + for (StackTraceElement e: new Throwable().getStackTrace()) { + if (e.getClassName().equals(MRAppMaster.class.getName()) && + !(cls.getClassLoader() instanceof ApplicationClassLoader)) { + throw new ExceptionInInitializerError("incorrect classloader used"); + } + } + } + } + + public static class CustomSpeculator extends DefaultSpeculator { + public CustomSpeculator(Configuration conf, AppContext context) { + super(conf, context); + verifyClassLoader(getClass()); + } + + /** + * Verifies that the class was loaded by the job classloader if it is in the + * context of the MRAppMaster, and if not throws an exception to fail the + * job. + */ + private void verifyClassLoader(Class cls) { + // to detect that it is instantiated in the context of the MRAppMaster, we + // inspect the stack trace and determine a caller is MRAppMaster + for (StackTraceElement e: new Throwable().getStackTrace()) { + if (e.getClassName().equals(MRAppMaster.class.getName()) && + !(cls.getClassLoader() instanceof ApplicationClassLoader)) { + throw new ExceptionInInitializerError("incorrect classloader used"); + } + } + } + } + protected void verifySleepJobCounters(Job job) throws InterruptedException, IOException { Counters counters = job.getCounters(); diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e1fa4445d55..19bf0b4c455 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -54,6 +54,8 @@ Release 2.6.0 - UNRELEASED YARN-2323. FairShareComparator creates too many Resource objects (Hong Zhiguo via Sandy Ryza) + YARN-2045. Data persisted in NM should be versioned (Junping Du via jlowe) + OPTIMIZATIONS BUG FIXES @@ -77,6 +79,9 @@ Release 2.6.0 - UNRELEASED YARN-2244. FairScheduler missing handling of containers for unknown application attempts. (Anubhav Dhoot via kasha) + YARN-2321. NodeManager web UI can incorrectly report Pmem enforcement + (Leitao Guo via jlowe) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 1954fee4425..b905c1e5ad8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -42,8 +42,11 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; +import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.util.ConverterUtils; import org.fusesource.leveldbjni.JniDBFactory; @@ -54,14 +57,18 @@ import org.iq80.leveldb.Logger; import org.iq80.leveldb.Options; import org.iq80.leveldb.WriteBatch; +import com.google.common.annotations.VisibleForTesting; + public class NMLeveldbStateStoreService extends NMStateStoreService { public static final Log LOG = LogFactory.getLog(NMLeveldbStateStoreService.class); private static final String DB_NAME = "yarn-nm-state"; - private static final String DB_SCHEMA_VERSION_KEY = "schema-version"; - private static final String DB_SCHEMA_VERSION = "1.0"; + private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version"; + + private static final NMDBSchemaVersion CURRENT_VERSION_INFO = NMDBSchemaVersion + .newInstance(1, 0); private static final String DELETION_TASK_KEY_PREFIX = "DeletionService/deltask_"; @@ -475,22 +482,16 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { options.logger(new LeveldbLogger()); LOG.info("Using state database at " + storeRoot + " for recovery"); File dbfile = new File(storeRoot.toString()); - byte[] schemaVersionData = null; try { db = JniDBFactory.factory.open(dbfile, options); - try { - schemaVersionData = db.get(bytes(DB_SCHEMA_VERSION_KEY)); - } catch (DBException e) { - throw new IOException(e.getMessage(), e); - } } catch (NativeDB.DBException e) { if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { LOG.info("Creating state database at " + dbfile); options.createIfMissing(true); try { db = JniDBFactory.factory.open(dbfile, options); - schemaVersionData = bytes(DB_SCHEMA_VERSION); - db.put(bytes(DB_SCHEMA_VERSION_KEY), schemaVersionData); + // store version + storeVersion(); } catch (DBException dbErr) { throw new IOException(dbErr.getMessage(), dbErr); } @@ -498,16 +499,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { throw e; } } - if (schemaVersionData != null) { - String schemaVersion = asString(schemaVersionData); - // only support exact schema matches for now - if (!DB_SCHEMA_VERSION.equals(schemaVersion)) { - throw new IOException("Incompatible state database schema, found " - + schemaVersion + " expected " + DB_SCHEMA_VERSION); - } - } else { - throw new IOException("State database schema version not found"); - } + checkVersion(); } private Path createStorageDir(Configuration conf) throws IOException { @@ -532,4 +524,68 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { LOG.info(message); } } + + + NMDBSchemaVersion loadVersion() throws IOException { + byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY)); + // if version is not stored previously, treat it as 1.0. + if (data == null || data.length == 0) { + return NMDBSchemaVersion.newInstance(1, 0); + } + NMDBSchemaVersion version = + new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data)); + return version; + } + + private void storeVersion() throws IOException { + dbStoreVersion(CURRENT_VERSION_INFO); + } + + // Only used for test + @VisibleForTesting + void storeVersion(NMDBSchemaVersion state) throws IOException { + dbStoreVersion(state); + } + + private void dbStoreVersion(NMDBSchemaVersion state) throws IOException { + String key = DB_SCHEMA_VERSION_KEY; + byte[] data = + ((NMDBSchemaVersionPBImpl) state).getProto().toByteArray(); + try { + db.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } + } + + NMDBSchemaVersion getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + /** + * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. + * 2) Any incompatible change of state-store is a major upgrade, and any + * compatible change of state-store is a minor upgrade. + * 3) Within a minor upgrade, say 1.1 to 1.2: + * overwrite the version info and proceed as normal. + * 4) Within a major upgrade, say 1.2 to 2.0: + * throw exception and indicate user to use a separate upgrade tool to + * upgrade NM state or remove incompatible old state. + */ + private void checkVersion() throws IOException { + NMDBSchemaVersion loadedVersion = loadVersion(); + LOG.info("Loaded NM state version info " + loadedVersion); + if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { + return; + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing NM state version info " + getCurrentVersion()); + storeVersion(); + } else { + throw new IOException( + "Incompatible version for NM state: expecting NM state version " + + getCurrentVersion() + ", but loading version " + loadedVersion); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/records/NMDBSchemaVersion.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/records/NMDBSchemaVersion.java new file mode 100644 index 00000000000..1ee59ea4d83 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/records/NMDBSchemaVersion.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.recovery.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + * The version information of DB Schema for NM. + */ +@Private +@Unstable +public abstract class NMDBSchemaVersion { + + public static NMDBSchemaVersion newInstance(int majorVersion, int minorVersion) { + NMDBSchemaVersion version = Records.newRecord(NMDBSchemaVersion.class); + version.setMajorVersion(majorVersion); + version.setMinorVersion(minorVersion); + return version; + } + + public abstract int getMajorVersion(); + + public abstract void setMajorVersion(int majorVersion); + + public abstract int getMinorVersion(); + + public abstract void setMinorVersion(int minorVersion); + + public String toString() { + return getMajorVersion() + "." + getMinorVersion(); + } + + public boolean isCompatibleTo(NMDBSchemaVersion version) { + return getMajorVersion() == version.getMajorVersion(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + getMajorVersion(); + result = prime * result + getMinorVersion(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + NMDBSchemaVersion other = (NMDBSchemaVersion) obj; + if (this.getMajorVersion() == other.getMajorVersion() + && this.getMinorVersion() == other.getMinorVersion()) { + return true; + } else { + return false; + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/records/impl/pb/NMDBSchemaVersionPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/records/impl/pb/NMDBSchemaVersionPBImpl.java new file mode 100644 index 00000000000..f42c1bee331 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/records/impl/pb/NMDBSchemaVersionPBImpl.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProtoOrBuilder; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; + +@Private +@Evolving +public class NMDBSchemaVersionPBImpl extends NMDBSchemaVersion { + + NMDBSchemaVersionProto proto = NMDBSchemaVersionProto.getDefaultInstance(); + NMDBSchemaVersionProto.Builder builder = null; + boolean viaProto = false; + + public NMDBSchemaVersionPBImpl() { + builder = NMDBSchemaVersionProto.newBuilder(); + } + + public NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto proto) { + this.proto = proto; + viaProto = true; + } + + public NMDBSchemaVersionProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = NMDBSchemaVersionProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getMajorVersion() { + NMDBSchemaVersionProtoOrBuilder p = viaProto ? proto : builder; + return p.getMajorVersion(); + } + + @Override + public void setMajorVersion(int majorVersion) { + maybeInitBuilder(); + builder.setMajorVersion(majorVersion); + } + + @Override + public int getMinorVersion() { + NMDBSchemaVersionProtoOrBuilder p = viaProto ? proto : builder; + return p.getMinorVersion(); + } + + @Override + public void setMinorVersion(int minorVersion) { + maybeInitBuilder(); + builder.setMinorVersion(minorVersion); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java index 5e5408d8de5..92c4187c201 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java @@ -72,7 +72,7 @@ public class NodePage extends NMView { ._("Total Pmem allocated for Container", StringUtils.byteDesc(info.getTotalPmemAllocated() * BYTES_IN_MB)) ._("Pmem enforcement enabled", - info.isVmemCheckEnabled()) + info.isPmemCheckEnabled()) ._("Total VCores allocated for Containers", String.valueOf(info.getTotalVCoresAllocated())) ._("NodeHealthyStatus", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto index 9546dbbe70d..a07e7ad6b2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto @@ -38,3 +38,9 @@ message LocalizedResourceProto { optional string localPath = 2; optional int64 size = 3; } + +message NMDBSchemaVersionProto { + optional int32 majorVersion = 1; + optional int32 minorVersion = 2; +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 89d1c237bd2..ca17a4e6e8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.ServiceStateException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -45,9 +46,11 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources; +import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -102,6 +105,36 @@ public class TestNMLeveldbStateStoreService { assertTrue(stateStore.canRecover()); verifyEmptyState(); } + + @Test + public void testCheckVersion() throws IOException { + // default version + NMDBSchemaVersion defaultVersion = stateStore.getCurrentVersion(); + Assert.assertEquals(defaultVersion, stateStore.loadVersion()); + + // compatible version + NMDBSchemaVersion compatibleVersion = + NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion(), + defaultVersion.getMinorVersion() + 2); + stateStore.storeVersion(compatibleVersion); + Assert.assertEquals(compatibleVersion, stateStore.loadVersion()); + restartStateStore(); + // overwrite the compatible version + Assert.assertEquals(defaultVersion, stateStore.loadVersion()); + + // incompatible version + NMDBSchemaVersion incompatibleVersion = + NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion() + 1, + defaultVersion.getMinorVersion()); + stateStore.storeVersion(incompatibleVersion); + try { + restartStateStore(); + Assert.fail("Incompatible version, should expect fail here."); + } catch (ServiceStateException e) { + Assert.assertTrue("Exception message mismatch", + e.getMessage().contains("Incompatible version for NM state:")); + } + } @Test public void testStartResourceLocalization() throws IOException {