HDFS-12359. Re-encryption should operate with minimum KMS ACL requirements.

(cherry picked from commit 0ba8ff4b77)
This commit is contained in:
Xiao Chen 2017-09-05 10:07:40 -07:00 committed by Andrew Wang
parent b9d3e2eb70
commit 663e4eac27
5 changed files with 188 additions and 81 deletions

View File

@ -587,13 +587,14 @@ public class EncryptionZoneManager {
* Re-encrypts the given encryption zone path. If the given path is not the
* root of an encryption zone, an exception is thrown.
*/
XAttr reencryptEncryptionZone(final INodesInPath zoneIIP,
List<XAttr> reencryptEncryptionZone(final INodesInPath zoneIIP,
final String keyVersionName) throws IOException {
assert dir.hasWriteLock();
if (reencryptionHandler == null) {
throw new IOException("No key provider configured, re-encryption "
+ "operation is rejected");
}
final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
final INode inode = zoneIIP.getLastINode();
final String zoneName = zoneIIP.getPath();
checkEncryptionZoneRoot(inode, zoneName);
@ -603,10 +604,11 @@ public class EncryptionZoneManager {
}
LOG.info("Zone {}({}) is submitted for re-encryption.", zoneName,
inode.getId());
XAttr ret = FSDirEncryptionZoneOp
final XAttr xattr = FSDirEncryptionZoneOp
.updateReencryptionSubmitted(dir, zoneIIP, keyVersionName);
xAttrs.add(xattr);
reencryptionHandler.notifyNewSubmission();
return ret;
return xAttrs;
}
/**

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
@ -32,8 +31,8 @@ import java.util.Map;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
@ -225,37 +224,15 @@ final class FSDirEncryptionZoneOp {
}
}
static void reencryptEncryptionZone(final FSDirectory fsd,
final String zone, final String keyVersionName,
final boolean logRetryCache) throws IOException {
final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
final FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.writeLock();
try {
final INodesInPath iip = fsd.resolvePath(pc, zone, DirOp.WRITE);
final XAttr xattr = fsd.ezManager
.reencryptEncryptionZone(iip, keyVersionName);
xAttrs.add(xattr);
} finally {
fsd.writeUnlock();
}
fsd.getEditLog().logSetXAttrs(zone, xAttrs, logRetryCache);
static List<XAttr> reencryptEncryptionZone(final FSDirectory fsd,
final INodesInPath iip, final String keyVersionName) throws IOException {
assert keyVersionName != null;
return fsd.ezManager.reencryptEncryptionZone(iip, keyVersionName);
}
static void cancelReencryptEncryptionZone(final FSDirectory fsd,
final String zone, final boolean logRetryCache) throws IOException {
final List<XAttr> xattrs;
final FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.writeLock();
try {
final INodesInPath iip = fsd.resolvePath(pc, zone, DirOp.WRITE);
xattrs = fsd.ezManager.cancelReencryptEncryptionZone(iip);
} finally {
fsd.writeUnlock();
}
if (xattrs != null && !xattrs.isEmpty()) {
fsd.getEditLog().logSetXAttrs(zone, xattrs, logRetryCache);
}
static List<XAttr> cancelReencryptEncryptionZone(final FSDirectory fsd,
final INodesInPath iip) throws IOException {
return fsd.ezManager.cancelReencryptEncryptionZone(iip);
}
static BatchedListEntries<ZoneReencryptionStatus> listReencryptionStatus(
@ -698,32 +675,58 @@ final class FSDirEncryptionZoneOp {
}
/**
* Get the last key version name for the given EZ. This will contact
* the KMS to getKeyVersions.
* @param zone the encryption zone
* @param pc the permission checker
* @return the last element from the list of keyVersionNames returned by KMS.
* @throws IOException
* Get the current key version name for the given EZ. This will first drain
* the provider's local cache, then generate a new edek.
* <p>
* The encryption key version of the newly generated edek will be used as
* the target key version of this re-encryption - meaning all edeks'
* keyVersion are compared with it, and only sent to the KMS for re-encryption
* when the version is different.
* <p>
* Note: KeyProvider has a getCurrentKey interface, but that is under
* a different ACL. HDFS should not try to operate on additional ACLs, but
* rather use the generate ACL it already has.
*/
static KeyVersion getLatestKeyVersion(final FSDirectory dir,
final String zone, final FSPermissionChecker pc) throws IOException {
final EncryptionZone ez;
static String getCurrentKeyVersion(final FSDirectory dir, final String zone)
throws IOException {
assert dir.getProvider() != null;
assert !dir.hasReadLock();
final String keyName = FSDirEncryptionZoneOp.getKeyNameForZone(dir, zone);
if (keyName == null) {
throw new IOException(zone + " is not an encryption zone.");
}
// drain the local cache of the key provider.
// Do not invalidateCache on the server, since that's the responsibility
// when rolling the key version.
if (dir.getProvider() instanceof CryptoExtension) {
((CryptoExtension) dir.getProvider()).drain(keyName);
}
final EncryptedKeyVersion edek;
try {
edek = dir.getProvider().generateEncryptedKey(keyName);
} catch (GeneralSecurityException gse) {
throw new IOException(gse);
}
Preconditions.checkNotNull(edek);
return edek.getEncryptionKeyVersionName();
}
/**
* Resolve the zone to an inode, find the encryption zone info associated with
* that inode, and return the key name. Does not contact the KMS.
*/
static String getKeyNameForZone(final FSDirectory dir, final String zone)
throws IOException {
assert dir.getProvider() != null;
final INodesInPath iip;
final FSPermissionChecker pc = dir.getPermissionChecker();
dir.readLock();
try {
final INodesInPath iip = dir.resolvePath(pc, zone, DirOp.READ);
if (iip.getLastINode() == null) {
throw new FileNotFoundException(zone + " does not exist.");
}
dir.ezManager.checkEncryptionZoneRoot(iip.getLastINode(), iip.getPath());
ez = FSDirEncryptionZoneOp.getEZForPath(dir, iip);
iip = dir.resolvePath(pc, zone, DirOp.READ);
dir.ezManager.checkEncryptionZoneRoot(iip.getLastINode(), zone);
return dir.ezManager.getKeyName(iip);
} finally {
dir.readUnlock();
}
// Contact KMS out of locks.
KeyVersion currKv = dir.getProvider().getCurrentKey(ez.getKeyName());
Preconditions.checkNotNull(currKv,
"No current key versions for key name " + ez.getKeyName());
return currKv;
}
}

View File

@ -89,7 +89,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.hdfs.protocol.BlocksStats;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
@ -7105,34 +7104,46 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throw new IOException("No key provider configured, re-encryption "
+ "operation is rejected");
}
FSPermissionChecker pc = getPermissionChecker();
// get keyVersionName out of the lock. This keyVersionName will be used
// as the target keyVersion for the entire re-encryption.
// This means all edek's keyVersion will be compared with this one, and
// kms is only contacted if the edek's keyVersion is different.
final KeyVersion kv =
FSDirEncryptionZoneOp.getLatestKeyVersion(dir, zone, pc);
provider.invalidateCache(kv.getName());
String keyVersionName = null;
if (action == ReencryptAction.START) {
// get zone's latest key version name out of the lock.
keyVersionName = FSDirEncryptionZoneOp.getCurrentKeyVersion(dir, zone);
if (keyVersionName == null) {
throw new IOException("Failed to get key version name for " + zone);
}
}
writeLock();
try {
checkSuperuserPrivilege();
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode(
"NameNode in safemode, cannot " + action + " re-encryption on zone "
+ zone);
switch (action) {
case START:
FSDirEncryptionZoneOp
.reencryptEncryptionZone(dir, zone, kv.getVersionName(),
logRetryCache);
break;
case CANCEL:
FSDirEncryptionZoneOp
.cancelReencryptEncryptionZone(dir, zone, logRetryCache);
break;
default:
throw new IOException(
"Re-encryption action " + action + " is not supported");
checkNameNodeSafeMode("NameNode in safemode, cannot " + action
+ " re-encryption on zone " + zone);
final FSPermissionChecker pc = dir.getPermissionChecker();
List<XAttr> xattrs;
dir.writeLock();
try {
final INodesInPath iip = dir.resolvePath(pc, zone, DirOp.WRITE);
if (iip.getLastINode() == null) {
throw new FileNotFoundException(zone + " does not exist.");
}
switch (action) {
case START:
xattrs = FSDirEncryptionZoneOp
.reencryptEncryptionZone(dir, iip, keyVersionName);
break;
case CANCEL:
xattrs =
FSDirEncryptionZoneOp.cancelReencryptEncryptionZone(dir, iip);
break;
default:
throw new IOException(
"Re-encryption action " + action + " is not supported");
}
} finally {
dir.writeUnlock();
}
if (xattrs != null && !xattrs.isEmpty()) {
getEditLog().logSetXAttrs(zone, xattrs, logRetryCache);
}
} finally {
writeUnlock();

View File

@ -103,7 +103,7 @@ public class TestReencryption {
private static final EnumSet<CreateEncryptionZoneFlag> NO_TRASH =
EnumSet.of(CreateEncryptionZoneFlag.NO_TRASH);
private String getKeyProviderURI() {
protected String getKeyProviderURI() {
return JavaKeyStoreProvider.SCHEME_NAME + "://file" + new Path(
testRootDir.toString(), "test.jks").toUri();
}
@ -149,7 +149,7 @@ public class TestReencryption {
GenericTestUtils.setLogLevel(ReencryptionUpdater.LOG, Level.TRACE);
}
private void setProvider() {
protected void setProvider() {
// Need to set the client's KeyProvider to the NN's for JKS,
// else the updates do not get flushed properly
fs.getClient()

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
import org.apache.hadoop.crypto.key.kms.server.KMSACLs;
import org.apache.hadoop.crypto.key.kms.server.KMSConfiguration;
import org.apache.hadoop.crypto.key.kms.server.KMSWebApp;
import org.apache.hadoop.crypto.key.kms.server.MiniKMS;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import java.io.Writer;
import java.util.UUID;
import static org.junit.Assert.assertTrue;
/**
* Test class for re-encryption with minikms.
*/
public class TestReencryptionWithKMS extends TestReencryption{
private MiniKMS miniKMS;
private String kmsDir;
@Override
protected String getKeyProviderURI() {
return KMSClientProvider.SCHEME_NAME + "://" +
miniKMS.getKMSUrl().toExternalForm().replace("://", "@");
}
@Before
public void setup() throws Exception {
kmsDir = "target/test-classes/" + UUID.randomUUID().toString();
final File dir = new File(kmsDir);
assertTrue(dir.mkdirs());
MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder();
miniKMS = miniKMSBuilder.setKmsConfDir(dir).build();
miniKMS.start();
super.setup();
}
@After
public void teardown() {
super.teardown();
if (miniKMS != null) {
miniKMS.stop();
}
}
@Override
protected void setProvider() {
}
@Test
public void testReencryptionKMSACLs() throws Exception {
final Path aclPath = new Path(kmsDir, KMSConfiguration.KMS_ACLS_XML);
final Configuration acl = new Configuration(false);
acl.addResource(aclPath);
// should not require any of the get ACLs.
acl.set(KMSACLs.Type.GET.getBlacklistConfigKey(), "*");
acl.set(KMSACLs.Type.GET_KEYS.getBlacklistConfigKey(), "*");
final File kmsAcl = new File(aclPath.toString());
assertTrue(kmsAcl.exists());
try (Writer writer = new FileWriter(kmsAcl)) {
acl.writeXml(writer);
}
KMSWebApp.getACLs().run();
testReencryptionBasic();
}
}