HDFS-7209. Populate EDEK cache when creating encryption zone. (Yi Liu via wang)

This commit is contained in:
Andrew Wang 2014-10-10 13:40:37 -07:00
parent 1ceb326933
commit d3d3d47202
6 changed files with 69 additions and 29 deletions

View File

@ -71,6 +71,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@ -773,6 +774,15 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
encKeyVersionQueue.drain(keyName);
}
@VisibleForTesting
public int getEncKeyQueueSize(String keyName) throws IOException {
try {
return encKeyVersionQueue.getSize(keyName);
} catch (ExecutionException e) {
throw new IOException(e);
}
}
@Override
public Token<?>[] addDelegationTokens(String renewer,
Credentials credentials) throws IOException {

View File

@ -239,6 +239,16 @@ public class ValueQueue <E> {
}
}
/**
* Get size of the Queue for keyName
* @param keyName the key name
* @return int queue size
* @throws ExecutionException
*/
public int getSize(String keyName) throws ExecutionException {
return keyQueues.get(keyName).size();
}
/**
* This removes the "num" values currently at the head of the Queue for the
* provided key. Will immediately fire the Queue filler function if key

View File

@ -379,6 +379,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7026. Introduce a string constant for "Failed to obtain user group
info...". (Yongjun Zhang via atm)
HDFS-7209. Populate EDEK cache when creating encryption zone. (Yi Liu via wang)
OPTIMIZATIONS
BUG FIXES

View File

@ -2569,6 +2569,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
CryptoProtocolVersion protocolVersion = null;
CipherSuite suite = null;
String ezKeyName = null;
EncryptedKeyVersion edek = null;
if (provider != null) {
readLock();
try {
src = resolvePath(src, pathComponents);
@ -2596,9 +2599,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
"Both suite and ezKeyName should both be null or not null");
// Generate EDEK if necessary while not holding the lock
EncryptedKeyVersion edek =
generateEncryptedDataEncryptionKey(ezKeyName);
edek = generateEncryptedDataEncryptionKey(ezKeyName);
EncryptionFaultInjector.getInstance().startFileAfterGenerateKey();
}
// Proceed with the create, using the computed cipher suite and
// generated EDEK
@ -8844,6 +8847,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
throw new IOException("Key " + keyName + " doesn't exist.");
}
// If the provider supports pool for EDEKs, this will fill in the pool
generateEncryptedDataEncryptionKey(keyName);
createEncryptionZoneInt(src, metadata.getCipher(),
keyName, cacheEntry != null);
success = true;

View File

@ -117,10 +117,10 @@ public class TestEncryptionZones {
private FileSystemTestHelper fsHelper;
private MiniDFSCluster cluster;
private HdfsAdmin dfsAdmin;
protected HdfsAdmin dfsAdmin;
protected DistributedFileSystem fs;
private File testRootDir;
private final String TEST_KEY = "testKey";
protected final String TEST_KEY = "testKey";
protected FileSystemTestWrapper fsWrapper;
protected FileContextTestWrapper fcWrapper;

View File

@ -17,11 +17,15 @@
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
import org.apache.hadoop.crypto.key.kms.server.MiniKMS;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -62,6 +66,15 @@ public class TestEncryptionZonesWithKMS extends TestEncryptionZones {
protected void setProvider() {
}
@Test(timeout = 120000)
public void testCreateEZPopulatesEDEKCache() throws Exception {
final Path zonePath = new Path("/TestEncryptionZone");
fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
dfsAdmin.createEncryptionZone(zonePath, TEST_KEY);
assertTrue(((KMSClientProvider)fs.getClient().provider).
getEncKeyQueueSize(TEST_KEY) > 0);
}
@Test(timeout = 120000)
public void testDelegationToken() throws Exception {
final String renewer = "JobTracker";