HDFS-13617. Allow wrapping NN QOP into token in encrypted message. Contributed by Chen Liang

This commit is contained in:
Chen Liang 2019-02-13 12:40:31 -08:00
parent a06e35a197
commit e63619a6dd
9 changed files with 325 additions and 18 deletions

View File

@ -1777,6 +1777,7 @@ public abstract class Server {
IpcConnectionContextProto connectionContext;
String protocolName;
SaslServer saslServer;
private String establishedQOP;
private AuthMethod authMethod;
private AuthProtocol authProtocol;
private boolean saslContextEstablished;
@ -1851,14 +1852,7 @@ public abstract class Server {
}
public String getEstablishedQOP() {
// In practice, saslServer should not be null when this is
// called. If it is null, it must be either some
// configuration mistake or it is called from unit test.
if (saslServer == null) {
LOG.warn("SASL server should not be null!");
return null;
}
return (String)saslServer.getNegotiatedProperty(Sasl.QOP);
return establishedQOP;
}
public void setLastContact(long lastContact) {
@ -1998,6 +1992,7 @@ public abstract class Server {
// do NOT enable wrapping until the last auth response is sent
if (saslContextEstablished) {
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
establishedQOP = qop;
// SASL wrapping is only used if the connection has a QOP, and
// the value is not auth. ex. auth-int & auth-priv
useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));

View File

@ -52,6 +52,8 @@ public class Token<T extends TokenIdentifier> implements Writable {
private Text service;
private TokenRenewer renewer;
private byte[] dnHandshakeSecret;
/**
* Construct a token given a token identifier and a secret manager for the
* type of the token identifier.
@ -63,6 +65,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
identifier = id.getBytes();
kind = id.getKind();
service = new Text();
dnHandshakeSecret = new byte[0];
}
/**
@ -77,6 +80,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
this.password = (password == null)? new byte[0] : password;
this.kind = (kind == null)? new Text() : kind;
this.service = (service == null)? new Text() : service;
this.dnHandshakeSecret = new byte[0];
}
/**
@ -87,6 +91,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
password = new byte[0];
kind = new Text();
service = new Text();
dnHandshakeSecret = new byte[0];
}
/**
@ -98,6 +103,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
this.password = other.password.clone();
this.kind = new Text(other.kind);
this.service = new Text(other.service);
this.dnHandshakeSecret = other.dnHandshakeSecret.clone();
}
/**
@ -108,6 +114,14 @@ public class Token<T extends TokenIdentifier> implements Writable {
return identifier;
}
public byte[] getDnHandshakeSecret() {
return dnHandshakeSecret;
}
public void setDNHandshakeSecret(byte[] secret) {
this.dnHandshakeSecret = secret;
}
private static Class<? extends TokenIdentifier>
getClassForIdentifier(Text kind) {
Class<? extends TokenIdentifier> cls = null;
@ -290,6 +304,11 @@ public class Token<T extends TokenIdentifier> implements Writable {
in.readFully(password);
kind.readFields(in);
service.readFields(in);
len = WritableUtils.readVInt(in);
if (dnHandshakeSecret == null || dnHandshakeSecret.length != len) {
dnHandshakeSecret = new byte[len];
}
in.readFully(dnHandshakeSecret);
}
@Override
@ -300,6 +319,8 @@ public class Token<T extends TokenIdentifier> implements Writable {
out.write(password);
kind.write(out);
service.write(out);
WritableUtils.writeVInt(out, dnHandshakeSecret.length);
out.write(dnHandshakeSecret);
}
/**

View File

@ -36,6 +36,7 @@ message TokenProto {
required bytes password = 2;
required string kind = 3;
required string service = 4;
optional bytes handshakeSecret = 5;
}
message GetDelegationTokenRequestProto {

View File

@ -271,11 +271,16 @@ public class PBHelperClient {
}
public static TokenProto convert(Token<?> tok) {
return TokenProto.newBuilder().
TokenProto.Builder builder = TokenProto.newBuilder().
setIdentifier(getByteString(tok.getIdentifier())).
setPassword(getByteString(tok.getPassword())).
setKindBytes(getFixedByteString(tok.getKind())).
setServiceBytes(getFixedByteString(tok.getService())).build();
setServiceBytes(getFixedByteString(tok.getService()));
if (tok.getDnHandshakeSecret() != null) {
builder.setHandshakeSecret(
ByteString.copyFrom(tok.getDnHandshakeSecret()));
}
return builder.build();
}
public static ShortCircuitShmIdProto convert(ShmId shmId) {
@ -647,9 +652,14 @@ public class PBHelperClient {
public static Token<BlockTokenIdentifier> convert(
TokenProto blockToken) {
return new Token<>(blockToken.getIdentifier()
Token<BlockTokenIdentifier> token =
new Token<>(blockToken.getIdentifier()
.toByteArray(), blockToken.getPassword().toByteArray(), new Text(
blockToken.getKind()), new Text(blockToken.getService()));
if (blockToken.hasHandshakeSecret()) {
token.setDNHandshakeSecret(blockToken.getHandshakeSecret().toByteArray());
}
return token;
}
// DatanodeId

View File

@ -1057,6 +1057,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY =
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
public static final String DFS_NAMENODE_SEND_QOP_ENABLED =
"dfs.namenode.send.qop.enabled";
public static final boolean DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT = false;
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

View File

@ -431,6 +431,22 @@ public class BlockTokenSecretManager extends
return createPassword(nonce, key.getKey());
}
/**
* Encrypt the given message with the current block key, using the current
* block key.
*
* @param message the message to be encrypted.
* @return the secret created by encrypting the given message.
*/
public byte[] secretGen(byte[] message) {
return createPassword(message, currentKey.getKey());
}
@VisibleForTesting
public BlockKey getCurrentKey() {
return currentKey;
}
@VisibleForTesting
public synchronized void setKeyUpdateIntervalForTesting(long millis) {
this.keyUpdateInterval = millis;

View File

@ -27,11 +27,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
import static org.apache.hadoop.util.Time.now;
import com.google.common.base.Charsets;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -132,6 +135,8 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -250,6 +255,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
private final String minimumDataNodeVersion;
private final boolean shouldSendQOP;
public NameNodeRpcServer(Configuration conf, NameNode nn)
throws IOException {
this.nn = nn;
@ -527,6 +534,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
}
}
this.shouldSendQOP = conf.getBoolean(
DFS_NAMENODE_SEND_QOP_ENABLED, DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT);
}
/** Allow access to the lifeline RPC server for testing */
@ -727,8 +736,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
throws IOException {
checkNNStartup();
metrics.incrGetBlockLocations();
return namesystem.getBlockLocations(getClientMachine(),
src, offset, length);
LocatedBlocks locatedBlocks =
namesystem.getBlockLocations(getClientMachine(), src, offset, length);
if (shouldSendQOP) {
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
wrapEstablishedQOP(lb, getEstablishedClientQOP());
}
}
return locatedBlocks;
}
@Override // ClientProtocol
@ -801,6 +816,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
RetryCache.setState(cacheEntry, success, info);
}
metrics.incrFilesAppended();
if (shouldSendQOP) {
wrapEstablishedQOP(info.getLastBlock(), getEstablishedClientQOP());
}
return info;
}
@ -869,6 +887,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
if (locatedBlock != null) {
metrics.incrAddBlockOps();
}
if (shouldSendQOP) {
wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
}
return locatedBlock;
}
@ -899,8 +920,13 @@ public class NameNodeRpcServer implements NamenodeProtocols {
excludeSet.add(node);
}
}
return namesystem.getAdditionalDatanode(src, fileId, blk, existings,
existingStorageIDs, excludeSet, numAdditionalNodes, clientName);
LocatedBlock locatedBlock = namesystem.getAdditionalDatanode(src, fileId,
blk, existings, existingStorageIDs, excludeSet, numAdditionalNodes,
clientName);
if (shouldSendQOP) {
wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
}
return locatedBlock;
}
/**
* The client needs to give up on the block.
@ -1761,6 +1787,17 @@ public class NameNodeRpcServer implements NamenodeProtocols {
return clientMachine;
}
/**
* Return the QOP of the client that the current handler thread
* is handling. Assuming the negotiation is done at this point,
* otherwise returns null.
*
* @return the established QOP of this client.
*/
private static String getEstablishedClientQOP() {
return Server.getEstablishedQOP();
}
@Override
public DataEncryptionKey getDataEncryptionKey() throws IOException {
checkNNStartup();
@ -2307,4 +2344,26 @@ public class NameNodeRpcServer implements NamenodeProtocols {
namesystem.checkSuperuserPrivilege();
return Lists.newArrayList(nn.getReconfigurableProperties());
}
/**
* Wrapping the QOP information into the LocatedBlock instance.
* The wrapped QOP will be used by DataNode, i.e. DataNode will simply use
* this QOP to accept client calls, because this this QOP is viewed
* as the QOP that NameNode has accepted.
*
* @param locatedBlock the LocatedBlock instance
* @param qop the QOP to wrap in
* @throws RuntimeException
*/
private void wrapEstablishedQOP(LocatedBlock locatedBlock, String qop) {
if (qop == null || locatedBlock == null) {
return;
}
BlockTokenSecretManager btsm = namesystem.getBlockManager()
.getBlockTokenSecretManager();
Token<BlockTokenIdentifier> token = locatedBlock.getBlockToken();
byte[] secret = btsm.secretGen(qop.getBytes(Charsets.UTF_8));
token.setDNHandshakeSecret(secret);
}
}

View File

@ -4556,4 +4556,15 @@
Empty list indicates that auxiliary ports are disabled.
</description>
</property>
<property>
<name>dfs.namenode.send.qop.enabled</name>
<value>false</value>
<description>
A boolean specifies whether NameNode should encrypt the established QOP
and include it in block token. The encrypted QOP will be used by DataNode
as target QOP, overwriting DataNode configuration. This ensures DataNode
will use exactly the same QOP NameNode and client has already agreed on.
</description>
</property>
</configuration>

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import javax.crypto.Mac;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.security.TestPermission;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.junit.Assert.*;
/**
* This tests enabling NN sending the established QOP back to client,
* in encrypted message, using block access token key.
*/
@RunWith(Parameterized.class)
public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
public static final Log LOG = LogFactory.getLog(TestPermission.class);
private HdfsConfiguration conf;
private MiniDFSCluster cluster;
private String encryptionAlgorithm;
private DistributedFileSystem dfs;
private String configKey;
private String qopValue;
@Parameterized.Parameters
public static Collection<Object[]> qopSettings() {
// if configured with privacy, the negotiated QOP should auth-conf
// similarly for the other two
return Arrays.asList(new Object[][] {
{"privacy", "auth-conf"},
{"integrity", "auth-int"},
{"authentication", "auth"}
});
}
public TestBlockTokenWrappingQOP(String configKey, String qopValue) {
this.configKey = configKey;
this.qopValue = qopValue;
}
@Before
public void setup() throws Exception {
conf = createSecureConfig(this.configKey);
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
conf.set(HADOOP_RPC_PROTECTION, this.configKey);
cluster = null;
encryptionAlgorithm = "HmacSHA1";
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testAddBlockWrappingQOP() throws Exception {
final String src = "/testAddBlockWrappingQOP";
final Path path = new Path(src);
dfs = cluster.getFileSystem();
dfs.create(path);
DFSClient client = dfs.getClient();
String clientName = client.getClientName();
LocatedBlock lb = client.namenode.addBlock(src, clientName, null, null,
HdfsConstants.GRANDFATHER_INODE_ID, null, null);
byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
BlockKey currentKey = cluster.getNamesystem().getBlockManager()
.getBlockTokenSecretManager().getCurrentKey();
String decrypted = decryptMessage(secret, currentKey,
encryptionAlgorithm);
assertEquals(this.qopValue, decrypted);
}
@Test
public void testAppendWrappingQOP() throws Exception {
final String src = "/testAppendWrappingQOP";
final Path path = new Path(src);
dfs = cluster.getFileSystem();
FSDataOutputStream out = dfs.create(path);
// NameNode append call returns a last block instance. If there is nothing
// it returns as a null. So write something, so that lastBlock has
// something
out.write(0);
out.close();
DFSClient client = dfs.getClient();
String clientName = client.getClientName();
LastBlockWithStatus lastBlock = client.namenode.append(src, clientName,
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
byte[] secret = lastBlock.getLastBlock().getBlockToken()
.getDnHandshakeSecret();
BlockKey currentKey = cluster.getNamesystem().getBlockManager()
.getBlockTokenSecretManager().getCurrentKey();
String decrypted = decryptMessage(secret, currentKey,
encryptionAlgorithm);
assertEquals(this.qopValue, decrypted);
}
@Test
public void testGetBlockLocationWrappingQOP() throws Exception {
final String src = "/testGetBlockLocationWrappingQOP";
final Path path = new Path(src);
dfs = cluster.getFileSystem();
FSDataOutputStream out = dfs.create(path);
// if the file is empty, there will be no blocks returned. Write something
// so that getBlockLocations actually returns some block.
out.write(0);
out.close();
FileStatus status = dfs.getFileStatus(path);
DFSClient client = dfs.getClient();
LocatedBlocks lbs = client.namenode.getBlockLocations(
src, 0, status.getLen());
assertTrue(lbs.getLocatedBlocks().size() > 0);
BlockKey currentKey = cluster.getNamesystem().getBlockManager()
.getBlockTokenSecretManager().getCurrentKey();
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
String decrypted = decryptMessage(secret, currentKey,
encryptionAlgorithm);
assertEquals(this.qopValue, decrypted);
}
}
private String decryptMessage(byte[] secret, BlockKey key,
String algorithm) throws Exception {
String[] qops = {"auth", "auth-conf", "auth-int"};
Mac mac = Mac.getInstance(algorithm);
mac.init(key.getKey());
for (String qop : qops) {
byte[] encrypted = mac.doFinal(qop.getBytes());
if (Arrays.equals(encrypted, secret)) {
return qop;
}
}
return null;
}
}