HDFS-14611. Move handshake secret field from Token to BlockAccessToken. Contributed by Chen Liang
This commit is contained in:
parent
163cb4d5ae
commit
1ff68d0e47
|
@ -52,8 +52,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
|
||||||
private Text service;
|
private Text service;
|
||||||
private TokenRenewer renewer;
|
private TokenRenewer renewer;
|
||||||
|
|
||||||
private byte[] dnHandshakeSecret;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a token given a token identifier and a secret manager for the
|
* Construct a token given a token identifier and a secret manager for the
|
||||||
* type of the token identifier.
|
* type of the token identifier.
|
||||||
|
@ -65,7 +63,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
|
||||||
identifier = id.getBytes();
|
identifier = id.getBytes();
|
||||||
kind = id.getKind();
|
kind = id.getKind();
|
||||||
service = new Text();
|
service = new Text();
|
||||||
dnHandshakeSecret = new byte[0];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -80,7 +77,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
|
||||||
this.password = (password == null)? new byte[0] : password;
|
this.password = (password == null)? new byte[0] : password;
|
||||||
this.kind = (kind == null)? new Text() : kind;
|
this.kind = (kind == null)? new Text() : kind;
|
||||||
this.service = (service == null)? new Text() : service;
|
this.service = (service == null)? new Text() : service;
|
||||||
this.dnHandshakeSecret = new byte[0];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -91,7 +87,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
|
||||||
password = new byte[0];
|
password = new byte[0];
|
||||||
kind = new Text();
|
kind = new Text();
|
||||||
service = new Text();
|
service = new Text();
|
||||||
dnHandshakeSecret = new byte[0];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -103,7 +98,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
|
||||||
this.password = other.password.clone();
|
this.password = other.password.clone();
|
||||||
this.kind = new Text(other.kind);
|
this.kind = new Text(other.kind);
|
||||||
this.service = new Text(other.service);
|
this.service = new Text(other.service);
|
||||||
this.dnHandshakeSecret = other.dnHandshakeSecret.clone();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -114,14 +108,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
|
||||||
return identifier;
|
return identifier;
|
||||||
}
|
}
|
||||||
|
|
||||||
public byte[] getDnHandshakeSecret() {
|
|
||||||
return dnHandshakeSecret;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setDNHandshakeSecret(byte[] secret) {
|
|
||||||
this.dnHandshakeSecret = secret;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Class<? extends TokenIdentifier>
|
private static Class<? extends TokenIdentifier>
|
||||||
getClassForIdentifier(Text kind) {
|
getClassForIdentifier(Text kind) {
|
||||||
Class<? extends TokenIdentifier> cls = null;
|
Class<? extends TokenIdentifier> cls = null;
|
||||||
|
@ -204,6 +190,14 @@ public class Token<T extends TokenIdentifier> implements Writable {
|
||||||
service = newService;
|
service = newService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setID(byte[] bytes) {
|
||||||
|
identifier = bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPassword(byte[] newPassword) {
|
||||||
|
password = newPassword;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whether this is a private token.
|
* Whether this is a private token.
|
||||||
* @return false always for non-private tokens
|
* @return false always for non-private tokens
|
||||||
|
@ -304,11 +298,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
|
||||||
in.readFully(password);
|
in.readFully(password);
|
||||||
kind.readFields(in);
|
kind.readFields(in);
|
||||||
service.readFields(in);
|
service.readFields(in);
|
||||||
len = WritableUtils.readVInt(in);
|
|
||||||
if (dnHandshakeSecret == null || dnHandshakeSecret.length != len) {
|
|
||||||
dnHandshakeSecret = new byte[len];
|
|
||||||
}
|
|
||||||
in.readFully(dnHandshakeSecret);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -319,8 +308,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
|
||||||
out.write(password);
|
out.write(password);
|
||||||
kind.write(out);
|
kind.write(out);
|
||||||
service.write(out);
|
service.write(out);
|
||||||
WritableUtils.writeVInt(out, dnHandshakeSecret.length);
|
|
||||||
out.write(dnHandshakeSecret);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -36,7 +36,6 @@ message TokenProto {
|
||||||
required bytes password = 2;
|
required bytes password = 2;
|
||||||
required string kind = 3;
|
required string kind = 3;
|
||||||
required string service = 4;
|
required string service = 4;
|
||||||
optional bytes handshakeSecret = 5;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message GetDelegationTokenRequestProto {
|
message GetDelegationTokenRequestProto {
|
||||||
|
|
|
@ -236,7 +236,7 @@ public class SaslDataTransferClient {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
boolean localTrusted = trustedChannelResolver.isTrusted();
|
boolean localTrusted = trustedChannelResolver.isTrusted();
|
||||||
boolean remoteTrusted = trustedChannelResolver.isTrusted(addr);
|
boolean remoteTrusted = trustedChannelResolver.isTrusted(addr);
|
||||||
LOG.info("SASL encryption trust check: localHostTrusted = {}, "
|
LOG.debug("SASL encryption trust check: localHostTrusted = {}, "
|
||||||
+ "remoteHostTrusted = {}", localTrusted, remoteTrusted);
|
+ "remoteHostTrusted = {}", localTrusted, remoteTrusted);
|
||||||
if (!localTrusted || !remoteTrusted) {
|
if (!localTrusted || !remoteTrusted) {
|
||||||
// The encryption key factory only returns a key if encryption is enabled.
|
// The encryption key factory only returns a key if encryption is enabled.
|
||||||
|
@ -329,9 +329,7 @@ public class SaslDataTransferClient {
|
||||||
if (secretKey != null) {
|
if (secretKey != null) {
|
||||||
LOG.debug("DataNode overwriting downstream QOP" +
|
LOG.debug("DataNode overwriting downstream QOP" +
|
||||||
saslProps.get(Sasl.QOP));
|
saslProps.get(Sasl.QOP));
|
||||||
byte[] newSecret = SecretManager.createPassword(saslProps.get(Sasl.QOP)
|
updateToken(accessToken, secretKey, saslProps);
|
||||||
.getBytes(Charsets.UTF_8), secretKey);
|
|
||||||
accessToken.setDNHandshakeSecret(newSecret);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("Client using encryption algorithm {}",
|
LOG.debug("Client using encryption algorithm {}",
|
||||||
|
@ -447,9 +445,7 @@ public class SaslDataTransferClient {
|
||||||
}
|
}
|
||||||
LOG.debug("DataNode overwriting downstream QOP " +
|
LOG.debug("DataNode overwriting downstream QOP " +
|
||||||
saslProps.get(Sasl.QOP));
|
saslProps.get(Sasl.QOP));
|
||||||
byte[] newSecret = SecretManager.createPassword(
|
updateToken(accessToken, secretKey, saslProps);
|
||||||
saslProps.get(Sasl.QOP).getBytes(Charsets.UTF_8), secretKey);
|
|
||||||
accessToken.setDNHandshakeSecret(newSecret);
|
|
||||||
}
|
}
|
||||||
targetQOP = saslProps.get(Sasl.QOP);
|
targetQOP = saslProps.get(Sasl.QOP);
|
||||||
String userName = buildUserName(accessToken);
|
String userName = buildUserName(accessToken);
|
||||||
|
@ -460,6 +456,18 @@ public class SaslDataTransferClient {
|
||||||
saslProps, callbackHandler, accessToken);
|
saslProps, callbackHandler, accessToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void updateToken(Token<BlockTokenIdentifier> accessToken,
|
||||||
|
SecretKey secretKey, Map<String, String> saslProps)
|
||||||
|
throws IOException {
|
||||||
|
byte[] newSecret = saslProps.get(Sasl.QOP).getBytes(Charsets.UTF_8);
|
||||||
|
BlockTokenIdentifier bkid = accessToken.decodeIdentifier();
|
||||||
|
bkid.setHandshakeMsg(newSecret);
|
||||||
|
byte[] bkidBytes = bkid.getBytes();
|
||||||
|
accessToken.setPassword(
|
||||||
|
SecretManager.createPassword(bkidBytes, secretKey));
|
||||||
|
accessToken.setID(bkidBytes);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Builds the client's user name for the general-purpose handshake, consisting
|
* Builds the client's user name for the general-purpose handshake, consisting
|
||||||
* of the base64-encoded serialized block access token identifier. Note that
|
* of the base64-encoded serialized block access token identifier. Note that
|
||||||
|
@ -516,12 +524,16 @@ public class SaslDataTransferClient {
|
||||||
try {
|
try {
|
||||||
// Start of handshake - "initial response" in SASL terminology.
|
// Start of handshake - "initial response" in SASL terminology.
|
||||||
// The handshake secret can be null, this happens when client is running
|
// The handshake secret can be null, this happens when client is running
|
||||||
// a new version but the cluster does not have this feature. In which case
|
// a new version but the cluster does not have this feature.
|
||||||
// there will be no encrypted secret sent from NN.
|
// In which case there will be no encrypted secret sent from NN.
|
||||||
byte[] handshakeSecret = accessToken.getDnHandshakeSecret();
|
BlockTokenIdentifier blockTokenIdentifier =
|
||||||
|
accessToken.decodeIdentifier();
|
||||||
|
if (blockTokenIdentifier != null) {
|
||||||
|
byte[] handshakeSecret =
|
||||||
|
accessToken.decodeIdentifier().getHandshakeMsg();
|
||||||
if (handshakeSecret == null || handshakeSecret.length == 0) {
|
if (handshakeSecret == null || handshakeSecret.length == 0) {
|
||||||
LOG.debug("Handshake secret is null, sending without "
|
LOG.debug("Handshake secret is null, "
|
||||||
+ "handshake secret.");
|
+ "sending without handshake secret.");
|
||||||
sendSaslMessage(out, new byte[0]);
|
sendSaslMessage(out, new byte[0]);
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Sending handshake secret.");
|
LOG.debug("Sending handshake secret.");
|
||||||
|
@ -529,7 +541,12 @@ public class SaslDataTransferClient {
|
||||||
identifier.readFields(new DataInputStream(
|
identifier.readFields(new DataInputStream(
|
||||||
new ByteArrayInputStream(accessToken.getIdentifier())));
|
new ByteArrayInputStream(accessToken.getIdentifier())));
|
||||||
String bpid = identifier.getBlockPoolId();
|
String bpid = identifier.getBlockPoolId();
|
||||||
sendSaslMessageHandshakeSecret(out, new byte[0], handshakeSecret, bpid);
|
sendSaslMessageHandshakeSecret(out, new byte[0],
|
||||||
|
handshakeSecret, bpid);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.debug("Block token id is null, sending without handshake secret.");
|
||||||
|
sendSaslMessage(out, new byte[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// step 1
|
// step 1
|
||||||
|
|
|
@ -271,16 +271,11 @@ public class PBHelperClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TokenProto convert(Token<?> tok) {
|
public static TokenProto convert(Token<?> tok) {
|
||||||
TokenProto.Builder builder = TokenProto.newBuilder().
|
return TokenProto.newBuilder().
|
||||||
setIdentifier(getByteString(tok.getIdentifier())).
|
setIdentifier(getByteString(tok.getIdentifier())).
|
||||||
setPassword(getByteString(tok.getPassword())).
|
setPassword(getByteString(tok.getPassword())).
|
||||||
setKindBytes(getFixedByteString(tok.getKind())).
|
setKindBytes(getFixedByteString(tok.getKind())).
|
||||||
setServiceBytes(getFixedByteString(tok.getService()));
|
setServiceBytes(getFixedByteString(tok.getService())).build();
|
||||||
if (tok.getDnHandshakeSecret() != null) {
|
|
||||||
builder.setHandshakeSecret(
|
|
||||||
ByteString.copyFrom(tok.getDnHandshakeSecret()));
|
|
||||||
}
|
|
||||||
return builder.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ShortCircuitShmIdProto convert(ShmId shmId) {
|
public static ShortCircuitShmIdProto convert(ShmId shmId) {
|
||||||
|
@ -652,14 +647,9 @@ public class PBHelperClient {
|
||||||
|
|
||||||
public static Token<BlockTokenIdentifier> convert(
|
public static Token<BlockTokenIdentifier> convert(
|
||||||
TokenProto blockToken) {
|
TokenProto blockToken) {
|
||||||
Token<BlockTokenIdentifier> token =
|
return new Token<>(blockToken.getIdentifier()
|
||||||
new Token<>(blockToken.getIdentifier()
|
|
||||||
.toByteArray(), blockToken.getPassword().toByteArray(), new Text(
|
.toByteArray(), blockToken.getPassword().toByteArray(), new Text(
|
||||||
blockToken.getKind()), new Text(blockToken.getService()));
|
blockToken.getKind()), new Text(blockToken.getService()));
|
||||||
if (blockToken.hasHandshakeSecret()) {
|
|
||||||
token.setDNHandshakeSecret(blockToken.getHandshakeSecret().toByteArray());
|
|
||||||
}
|
|
||||||
return token;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DatanodeId
|
// DatanodeId
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.security.token.block;
|
||||||
|
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
||||||
|
@ -44,6 +45,7 @@ public class BlockTokenIdentifier extends TokenIdentifier {
|
||||||
private String blockPoolId;
|
private String blockPoolId;
|
||||||
private long blockId;
|
private long blockId;
|
||||||
private final EnumSet<AccessMode> modes;
|
private final EnumSet<AccessMode> modes;
|
||||||
|
private byte[] handshakeMsg;
|
||||||
|
|
||||||
private byte [] cache;
|
private byte [] cache;
|
||||||
|
|
||||||
|
@ -58,6 +60,7 @@ public class BlockTokenIdentifier extends TokenIdentifier {
|
||||||
this.blockPoolId = bpid;
|
this.blockPoolId = bpid;
|
||||||
this.blockId = blockId;
|
this.blockId = blockId;
|
||||||
this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
|
this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
|
||||||
|
this.handshakeMsg = new byte[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -108,6 +111,15 @@ public class BlockTokenIdentifier extends TokenIdentifier {
|
||||||
return modes;
|
return modes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public byte[] getHandshakeMsg() {
|
||||||
|
return handshakeMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHandshakeMsg(byte[] bytes) {
|
||||||
|
handshakeMsg = bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "block_token_identifier (expiryDate=" + this.getExpiryDate()
|
return "block_token_identifier (expiryDate=" + this.getExpiryDate()
|
||||||
|
@ -157,6 +169,15 @@ public class BlockTokenIdentifier extends TokenIdentifier {
|
||||||
for (int i = 0; i < length; i++) {
|
for (int i = 0; i < length; i++) {
|
||||||
modes.add(WritableUtils.readEnum(in, AccessMode.class));
|
modes.add(WritableUtils.readEnum(in, AccessMode.class));
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
int handshakeMsgLen = WritableUtils.readVInt(in);
|
||||||
|
if (handshakeMsgLen != 0) {
|
||||||
|
handshakeMsg = new byte[handshakeMsgLen];
|
||||||
|
in.readFully(handshakeMsg);
|
||||||
|
}
|
||||||
|
} catch (EOFException eof) {
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -170,6 +191,10 @@ public class BlockTokenIdentifier extends TokenIdentifier {
|
||||||
for (AccessMode aMode : modes) {
|
for (AccessMode aMode : modes) {
|
||||||
WritableUtils.writeEnum(out, aMode);
|
WritableUtils.writeEnum(out, aMode);
|
||||||
}
|
}
|
||||||
|
if (handshakeMsg != null && handshakeMsg.length > 0) {
|
||||||
|
WritableUtils.writeVInt(out, handshakeMsg.length);
|
||||||
|
out.write(handshakeMsg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -28,11 +28,9 @@ import java.io.DataOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import javax.crypto.SecretKey;
|
|
||||||
import javax.security.auth.callback.Callback;
|
import javax.security.auth.callback.Callback;
|
||||||
import javax.security.auth.callback.CallbackHandler;
|
import javax.security.auth.callback.CallbackHandler;
|
||||||
import javax.security.auth.callback.NameCallback;
|
import javax.security.auth.callback.NameCallback;
|
||||||
|
@ -52,15 +50,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
|
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DNConf;
|
import org.apache.hadoop.hdfs.server.datanode.DNConf;
|
||||||
import org.apache.hadoop.security.SaslPropertiesResolver;
|
import org.apache.hadoop.security.SaslPropertiesResolver;
|
||||||
import org.apache.hadoop.security.SaslRpcServer;
|
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -348,21 +343,6 @@ public class SaslDataTransferServer {
|
||||||
return identifier;
|
return identifier;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String examineSecret(byte[] secret, String bpid) {
|
|
||||||
BlockKey blockKey = blockPoolTokenSecretManager.get(bpid).getCurrentKey();
|
|
||||||
SecretKey secretKey = blockKey.getKey();
|
|
||||||
for (SaslRpcServer.QualityOfProtection qop :
|
|
||||||
SaslRpcServer.QualityOfProtection.values()) {
|
|
||||||
String qopString = qop.getSaslQop();
|
|
||||||
byte[] data = qopString.getBytes(Charsets.UTF_8);
|
|
||||||
byte[] encryptedData = SecretManager.createPassword(data, secretKey);
|
|
||||||
if (Arrays.equals(encryptedData, secret)) {
|
|
||||||
return qopString;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public String getNegotiatedQOP() {
|
public String getNegotiatedQOP() {
|
||||||
return negotiatedQOP;
|
return negotiatedQOP;
|
||||||
|
@ -399,12 +379,8 @@ public class SaslDataTransferServer {
|
||||||
if (secret != null || bpid != null) {
|
if (secret != null || bpid != null) {
|
||||||
// sanity check, if one is null, the other must also not be null
|
// sanity check, if one is null, the other must also not be null
|
||||||
assert(secret != null && bpid != null);
|
assert(secret != null && bpid != null);
|
||||||
String qop = examineSecret(secret, bpid);
|
String qop = new String(secret, Charsets.UTF_8);
|
||||||
if (qop != null) {
|
|
||||||
saslProps.put(Sasl.QOP, qop);
|
saslProps.put(Sasl.QOP, qop);
|
||||||
} else {
|
|
||||||
LOG.error("Unable to match secret to a QOP!");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(
|
SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(
|
||||||
saslProps, callbackHandler);
|
saslProps, callbackHandler);
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.security.token.block;
|
package org.apache.hadoop.hdfs.security.token.block;
|
||||||
|
|
||||||
|
import com.google.common.base.Charsets;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -34,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -75,6 +77,7 @@ public class BlockTokenSecretManager extends
|
||||||
|
|
||||||
private final int intRange;
|
private final int intRange;
|
||||||
private final int nnRangeStart;
|
private final int nnRangeStart;
|
||||||
|
private final boolean shouldWrapQOP;
|
||||||
|
|
||||||
private final SecureRandom nonceGenerator = new SecureRandom();
|
private final SecureRandom nonceGenerator = new SecureRandom();
|
||||||
|
|
||||||
|
@ -92,7 +95,7 @@ public class BlockTokenSecretManager extends
|
||||||
public BlockTokenSecretManager(long keyUpdateInterval,
|
public BlockTokenSecretManager(long keyUpdateInterval,
|
||||||
long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
|
long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
|
||||||
this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
|
this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
|
||||||
encryptionAlgorithm, 0, 1);
|
encryptionAlgorithm, 0, 1, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -104,19 +107,29 @@ public class BlockTokenSecretManager extends
|
||||||
* @param blockPoolId block pool ID
|
* @param blockPoolId block pool ID
|
||||||
* @param encryptionAlgorithm encryption algorithm to use
|
* @param encryptionAlgorithm encryption algorithm to use
|
||||||
* @param numNNs number of namenodes possible
|
* @param numNNs number of namenodes possible
|
||||||
|
* @param shouldWrapQOP should wrap QOP in the block access token
|
||||||
*/
|
*/
|
||||||
public BlockTokenSecretManager(long keyUpdateInterval,
|
public BlockTokenSecretManager(long keyUpdateInterval,
|
||||||
long tokenLifetime, int nnIndex, int numNNs, String blockPoolId,
|
long tokenLifetime, int nnIndex, int numNNs, String blockPoolId,
|
||||||
String encryptionAlgorithm) {
|
String encryptionAlgorithm, boolean shouldWrapQOP) {
|
||||||
this(true, keyUpdateInterval, tokenLifetime, blockPoolId, encryptionAlgorithm, nnIndex, numNNs);
|
this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
|
||||||
|
encryptionAlgorithm, nnIndex, numNNs, shouldWrapQOP);
|
||||||
Preconditions.checkArgument(nnIndex >= 0);
|
Preconditions.checkArgument(nnIndex >= 0);
|
||||||
Preconditions.checkArgument(numNNs > 0);
|
Preconditions.checkArgument(numNNs > 0);
|
||||||
setSerialNo(new SecureRandom().nextInt());
|
setSerialNo(new SecureRandom().nextInt());
|
||||||
generateKeys();
|
generateKeys();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public BlockTokenSecretManager(long keyUpdateInterval,
|
||||||
|
long tokenLifetime, int nnIndex, int numNNs, String blockPoolId,
|
||||||
|
String encryptionAlgorithm) {
|
||||||
|
this(keyUpdateInterval, tokenLifetime, nnIndex, numNNs, blockPoolId,
|
||||||
|
encryptionAlgorithm, false);
|
||||||
|
}
|
||||||
|
|
||||||
private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
|
private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
|
||||||
long tokenLifetime, String blockPoolId, String encryptionAlgorithm, int nnIndex, int numNNs) {
|
long tokenLifetime, String blockPoolId, String encryptionAlgorithm,
|
||||||
|
int nnIndex, int numNNs, boolean shouldWrapQOP) {
|
||||||
this.intRange = Integer.MAX_VALUE / numNNs;
|
this.intRange = Integer.MAX_VALUE / numNNs;
|
||||||
this.nnRangeStart = intRange * nnIndex;
|
this.nnRangeStart = intRange * nnIndex;
|
||||||
this.isMaster = isMaster;
|
this.isMaster = isMaster;
|
||||||
|
@ -125,6 +138,7 @@ public class BlockTokenSecretManager extends
|
||||||
this.allKeys = new HashMap<Integer, BlockKey>();
|
this.allKeys = new HashMap<Integer, BlockKey>();
|
||||||
this.blockPoolId = blockPoolId;
|
this.blockPoolId = blockPoolId;
|
||||||
this.encryptionAlgorithm = encryptionAlgorithm;
|
this.encryptionAlgorithm = encryptionAlgorithm;
|
||||||
|
this.shouldWrapQOP = shouldWrapQOP;
|
||||||
this.timer = new Timer();
|
this.timer = new Timer();
|
||||||
generateKeys();
|
generateKeys();
|
||||||
}
|
}
|
||||||
|
@ -253,6 +267,12 @@ public class BlockTokenSecretManager extends
|
||||||
ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes) throws IOException {
|
ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes) throws IOException {
|
||||||
BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
|
BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
|
||||||
.getBlockPoolId(), block.getBlockId(), modes);
|
.getBlockPoolId(), block.getBlockId(), modes);
|
||||||
|
if (shouldWrapQOP) {
|
||||||
|
String qop = Server.getEstablishedQOP();
|
||||||
|
if (qop != null) {
|
||||||
|
id.setHandshakeMsg(qop.getBytes(Charsets.UTF_8));
|
||||||
|
}
|
||||||
|
}
|
||||||
return new Token<BlockTokenIdentifier>(id, this);
|
return new Token<BlockTokenIdentifier>(id, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -431,18 +451,6 @@ public class BlockTokenSecretManager extends
|
||||||
return createPassword(nonce, key.getKey());
|
return createPassword(nonce, key.getKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Encrypt the given message with the current block key, using the current
|
|
||||||
* block key.
|
|
||||||
*
|
|
||||||
* @param message the message to be encrypted.
|
|
||||||
* @return the secret created by encrypting the given message.
|
|
||||||
*/
|
|
||||||
public byte[] secretGen(byte[] message) {
|
|
||||||
return createPassword(message, currentKey.getKey());
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public BlockKey getCurrentKey() {
|
public BlockKey getCurrentKey() {
|
||||||
return currentKey;
|
return currentKey;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
import static org.apache.hadoop.util.Time.now;
|
import static org.apache.hadoop.util.Time.now;
|
||||||
|
|
||||||
|
@ -509,6 +509,9 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||||
boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
|
boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
|
||||||
|
|
||||||
|
boolean shouldWrapQOP = conf.getBoolean(
|
||||||
|
DFS_NAMENODE_SEND_QOP_ENABLED, DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT);
|
||||||
|
|
||||||
if (isHaEnabled) {
|
if (isHaEnabled) {
|
||||||
// figure out which index we are of the nns
|
// figure out which index we are of the nns
|
||||||
Collection<String> nnIds = DFSUtilClient.getNameNodeIds(conf, nsId);
|
Collection<String> nnIds = DFSUtilClient.getNameNodeIds(conf, nsId);
|
||||||
|
@ -521,10 +524,12 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
nnIndex++;
|
nnIndex++;
|
||||||
}
|
}
|
||||||
return new BlockTokenSecretManager(updateMin * 60 * 1000L,
|
return new BlockTokenSecretManager(updateMin * 60 * 1000L,
|
||||||
lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null, encryptionAlgorithm);
|
lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(),
|
||||||
|
null, encryptionAlgorithm, shouldWrapQOP);
|
||||||
} else {
|
} else {
|
||||||
return new BlockTokenSecretManager(updateMin*60*1000L,
|
return new BlockTokenSecretManager(updateMin*60*1000L,
|
||||||
lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm);
|
lifetimeMin*60*1000L, 0, 1,
|
||||||
|
null, encryptionAlgorithm, shouldWrapQOP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -92,8 +92,6 @@ import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.StopWatch;
|
import org.apache.hadoop.util.StopWatch;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.protobuf.ByteString;
|
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
|
|
@ -27,14 +27,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
|
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
|
||||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
|
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
|
||||||
|
|
||||||
import static org.apache.hadoop.util.Time.now;
|
import static org.apache.hadoop.util.Time.now;
|
||||||
|
|
||||||
import com.google.common.base.Charsets;
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -135,8 +132,6 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
|
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
|
||||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
@ -255,8 +250,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
|
|
||||||
private final String minimumDataNodeVersion;
|
private final String minimumDataNodeVersion;
|
||||||
|
|
||||||
private final boolean shouldSendQOP;
|
|
||||||
|
|
||||||
public NameNodeRpcServer(Configuration conf, NameNode nn)
|
public NameNodeRpcServer(Configuration conf, NameNode nn)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.nn = nn;
|
this.nn = nn;
|
||||||
|
@ -534,8 +527,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
|
this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.shouldSendQOP = conf.getBoolean(
|
|
||||||
DFS_NAMENODE_SEND_QOP_ENABLED, DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Allow access to the lifeline RPC server for testing */
|
/** Allow access to the lifeline RPC server for testing */
|
||||||
|
@ -738,11 +729,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
metrics.incrGetBlockLocations();
|
metrics.incrGetBlockLocations();
|
||||||
LocatedBlocks locatedBlocks =
|
LocatedBlocks locatedBlocks =
|
||||||
namesystem.getBlockLocations(getClientMachine(), src, offset, length);
|
namesystem.getBlockLocations(getClientMachine(), src, offset, length);
|
||||||
if (shouldSendQOP) {
|
|
||||||
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
|
|
||||||
wrapEstablishedQOP(lb, getEstablishedClientQOP());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return locatedBlocks;
|
return locatedBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -816,9 +802,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
RetryCache.setState(cacheEntry, success, info);
|
RetryCache.setState(cacheEntry, success, info);
|
||||||
}
|
}
|
||||||
metrics.incrFilesAppended();
|
metrics.incrFilesAppended();
|
||||||
if (shouldSendQOP) {
|
|
||||||
wrapEstablishedQOP(info.getLastBlock(), getEstablishedClientQOP());
|
|
||||||
}
|
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -887,9 +870,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
if (locatedBlock != null) {
|
if (locatedBlock != null) {
|
||||||
metrics.incrAddBlockOps();
|
metrics.incrAddBlockOps();
|
||||||
}
|
}
|
||||||
if (shouldSendQOP) {
|
|
||||||
wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
|
|
||||||
}
|
|
||||||
return locatedBlock;
|
return locatedBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -923,9 +903,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
LocatedBlock locatedBlock = namesystem.getAdditionalDatanode(src, fileId,
|
LocatedBlock locatedBlock = namesystem.getAdditionalDatanode(src, fileId,
|
||||||
blk, existings, existingStorageIDs, excludeSet, numAdditionalNodes,
|
blk, existings, existingStorageIDs, excludeSet, numAdditionalNodes,
|
||||||
clientName);
|
clientName);
|
||||||
if (shouldSendQOP) {
|
|
||||||
wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
|
|
||||||
}
|
|
||||||
return locatedBlock;
|
return locatedBlock;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
|
@ -1794,7 +1771,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
*
|
*
|
||||||
* @return the established QOP of this client.
|
* @return the established QOP of this client.
|
||||||
*/
|
*/
|
||||||
private static String getEstablishedClientQOP() {
|
public static String getEstablishedClientQOP() {
|
||||||
return Server.getEstablishedQOP();
|
return Server.getEstablishedQOP();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2344,26 +2321,4 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
namesystem.checkSuperuserPrivilege();
|
namesystem.checkSuperuserPrivilege();
|
||||||
return Lists.newArrayList(nn.getReconfigurableProperties());
|
return Lists.newArrayList(nn.getReconfigurableProperties());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrapping the QOP information into the LocatedBlock instance.
|
|
||||||
* The wrapped QOP will be used by DataNode, i.e. DataNode will simply use
|
|
||||||
* this QOP to accept client calls, because this this QOP is viewed
|
|
||||||
* as the QOP that NameNode has accepted.
|
|
||||||
*
|
|
||||||
* @param locatedBlock the LocatedBlock instance
|
|
||||||
* @param qop the QOP to wrap in
|
|
||||||
* @throws RuntimeException
|
|
||||||
*/
|
|
||||||
private void wrapEstablishedQOP(LocatedBlock locatedBlock, String qop) {
|
|
||||||
if (qop == null || locatedBlock == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
BlockTokenSecretManager btsm = namesystem.getBlockManager()
|
|
||||||
.getBlockTokenSecretManager();
|
|
||||||
Token<BlockTokenIdentifier> token = locatedBlock.getBlockToken();
|
|
||||||
byte[] secret = btsm.secretGen(qop.getBytes(Charsets.UTF_8));
|
|
||||||
token.setDNHandshakeSecret(secret);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import javax.crypto.Mac;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
|
@ -32,7 +31,6 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
|
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.security.TestPermission;
|
import org.apache.hadoop.security.TestPermission;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -41,8 +39,11 @@ import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
|
||||||
import static org.junit.Assert.*;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -55,7 +56,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
|
||||||
|
|
||||||
private HdfsConfiguration conf;
|
private HdfsConfiguration conf;
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
private String encryptionAlgorithm;
|
|
||||||
private DistributedFileSystem dfs;
|
private DistributedFileSystem dfs;
|
||||||
|
|
||||||
private String configKey;
|
private String configKey;
|
||||||
|
@ -84,7 +84,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
|
||||||
conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
|
conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
|
||||||
conf.set(HADOOP_RPC_PROTECTION, this.configKey);
|
conf.set(HADOOP_RPC_PROTECTION, this.configKey);
|
||||||
cluster = null;
|
cluster = null;
|
||||||
encryptionAlgorithm = "HmacSHA1";
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
}
|
}
|
||||||
|
@ -109,12 +108,8 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
|
||||||
|
|
||||||
LocatedBlock lb = client.namenode.addBlock(src, clientName, null, null,
|
LocatedBlock lb = client.namenode.addBlock(src, clientName, null, null,
|
||||||
HdfsConstants.GRANDFATHER_INODE_ID, null, null);
|
HdfsConstants.GRANDFATHER_INODE_ID, null, null);
|
||||||
byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
|
byte[] secret = lb.getBlockToken().decodeIdentifier().getHandshakeMsg();
|
||||||
BlockKey currentKey = cluster.getNamesystem().getBlockManager()
|
assertEquals(this.qopValue, new String(secret));
|
||||||
.getBlockTokenSecretManager().getCurrentKey();
|
|
||||||
String decrypted = decryptMessage(secret, currentKey,
|
|
||||||
encryptionAlgorithm);
|
|
||||||
assertEquals(this.qopValue, decrypted);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -137,12 +132,8 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
|
||||||
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
|
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
|
||||||
|
|
||||||
byte[] secret = lastBlock.getLastBlock().getBlockToken()
|
byte[] secret = lastBlock.getLastBlock().getBlockToken()
|
||||||
.getDnHandshakeSecret();
|
.decodeIdentifier().getHandshakeMsg();
|
||||||
BlockKey currentKey = cluster.getNamesystem().getBlockManager()
|
assertEquals(this.qopValue, new String(secret));
|
||||||
.getBlockTokenSecretManager().getCurrentKey();
|
|
||||||
String decrypted = decryptMessage(secret, currentKey,
|
|
||||||
encryptionAlgorithm);
|
|
||||||
assertEquals(this.qopValue, decrypted);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -164,27 +155,10 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
|
||||||
|
|
||||||
assertTrue(lbs.getLocatedBlocks().size() > 0);
|
assertTrue(lbs.getLocatedBlocks().size() > 0);
|
||||||
|
|
||||||
BlockKey currentKey = cluster.getNamesystem().getBlockManager()
|
|
||||||
.getBlockTokenSecretManager().getCurrentKey();
|
|
||||||
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
|
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
|
||||||
byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
|
byte[] secret = lb.getBlockToken()
|
||||||
String decrypted = decryptMessage(secret, currentKey,
|
.decodeIdentifier().getHandshakeMsg();
|
||||||
encryptionAlgorithm);
|
assertEquals(this.qopValue, new String(secret));
|
||||||
assertEquals(this.qopValue, decrypted);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String decryptMessage(byte[] secret, BlockKey key,
|
|
||||||
String algorithm) throws Exception {
|
|
||||||
String[] qops = {"auth", "auth-conf", "auth-int"};
|
|
||||||
Mac mac = Mac.getInstance(algorithm);
|
|
||||||
mac.init(key.getKey());
|
|
||||||
for (String qop : qops) {
|
|
||||||
byte[] encrypted = mac.doFinal(qop.getBytes());
|
|
||||||
if (Arrays.equals(encrypted, secret)) {
|
|
||||||
return qop;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue