HDFS-6708. StorageType should be encoded in the block token. Contributed by Ewan Higgs

This commit is contained in:
Chris Douglas 2017-04-25 23:57:00 -07:00
parent 8a99eba96d
commit 2f73396b59
14 changed files with 401 additions and 100 deletions

View File

@ -635,6 +635,9 @@ public class PBHelperClient {
blockTokenSecret.getAccessModes()) {
builder.addModes(convert(aMode));
}
for (StorageType storageType : blockTokenSecret.getStorageTypes()) {
builder.addStorageTypes(convertStorageType(storageType));
}
return builder.build();
}

View File

@ -22,10 +22,13 @@ import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Optional;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenSecretProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
@ -49,21 +52,24 @@ public class BlockTokenIdentifier extends TokenIdentifier {
private String blockPoolId;
private long blockId;
private final EnumSet<AccessMode> modes;
private StorageType[] storageTypes;
private boolean useProto;
private byte [] cache;
public BlockTokenIdentifier() {
this(null, null, 0, EnumSet.noneOf(AccessMode.class), false);
this(null, null, 0, EnumSet.noneOf(AccessMode.class), null, false);
}
public BlockTokenIdentifier(String userId, String bpid, long blockId,
EnumSet<AccessMode> modes, boolean useProto) {
EnumSet<AccessMode> modes, StorageType[] storageTypes, boolean useProto) {
this.cache = null;
this.userId = userId;
this.blockPoolId = bpid;
this.blockId = blockId;
this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
this.storageTypes = Optional.ofNullable(storageTypes)
.orElse(StorageType.EMPTY_ARRAY);
this.useProto = useProto;
}
@ -115,13 +121,18 @@ public class BlockTokenIdentifier extends TokenIdentifier {
return modes;
}
public StorageType[] getStorageTypes(){
return storageTypes;
}
@Override
public String toString() {
return "block_token_identifier (expiryDate=" + this.getExpiryDate()
+ ", keyId=" + this.getKeyId() + ", userId=" + this.getUserId()
+ ", blockPoolId=" + this.getBlockPoolId()
+ ", blockId=" + this.getBlockId() + ", access modes="
+ this.getAccessModes() + ")";
+ this.getAccessModes() + ", storageTypes= "
+ Arrays.toString(this.getStorageTypes()) + ")";
}
static boolean isEqual(Object a, Object b) {
@ -139,7 +150,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
&& isEqual(this.userId, that.userId)
&& isEqual(this.blockPoolId, that.blockPoolId)
&& this.blockId == that.blockId
&& isEqual(this.modes, that.modes);
&& isEqual(this.modes, that.modes)
&& Arrays.equals(this.storageTypes, that.storageTypes);
}
return false;
}
@ -148,7 +160,8 @@ public class BlockTokenIdentifier extends TokenIdentifier {
public int hashCode() {
return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
^ (userId == null ? 0 : userId.hashCode())
^ (blockPoolId == null ? 0 : blockPoolId.hashCode());
^ (blockPoolId == null ? 0 : blockPoolId.hashCode())
^ (storageTypes == null ? 0 : Arrays.hashCode(storageTypes));
}
/**
@ -200,6 +213,13 @@ public class BlockTokenIdentifier extends TokenIdentifier {
for (int i = 0; i < length; i++) {
modes.add(WritableUtils.readEnum(in, AccessMode.class));
}
length = WritableUtils.readVInt(in);
StorageType[] readStorageTypes = new StorageType[length];
for (int i = 0; i < length; i++) {
readStorageTypes[i] = WritableUtils.readEnum(in, StorageType.class);
}
storageTypes = readStorageTypes;
useProto = false;
}
@ -224,6 +244,10 @@ public class BlockTokenIdentifier extends TokenIdentifier {
AccessModeProto accessModeProto = blockTokenSecretProto.getModes(i);
modes.add(PBHelperClient.convert(accessModeProto));
}
storageTypes = blockTokenSecretProto.getStorageTypesList().stream()
.map(PBHelperClient::convertStorageType)
.toArray(StorageType[]::new);
useProto = true;
}
@ -247,6 +271,10 @@ public class BlockTokenIdentifier extends TokenIdentifier {
for (AccessMode aMode : modes) {
WritableUtils.writeEnum(out, aMode);
}
WritableUtils.writeVInt(out, storageTypes.length);
for (StorageType type: storageTypes){
WritableUtils.writeEnum(out, type);
}
}
@VisibleForTesting

View File

@ -550,4 +550,5 @@ message BlockTokenSecretProto {
optional string blockPoolId = 4;
optional uint64 blockId = 5;
repeated AccessModeProto modes = 6;
repeated StorageTypeProto storageTypes = 7;
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.StorageType;
/**
* Manages a {@link BlockTokenSecretManager} per block pool. Routes the requests
@ -82,20 +83,26 @@ public class BlockPoolTokenSecretManager extends
/**
* See {@link BlockTokenSecretManager#checkAccess(BlockTokenIdentifier,
* String, ExtendedBlock, BlockTokenIdentifier.AccessMode)}
* String, ExtendedBlock, BlockTokenIdentifier.AccessMode,
* StorageType[])}
*/
public void checkAccess(BlockTokenIdentifier id, String userId,
ExtendedBlock block, AccessMode mode) throws InvalidToken {
get(block.getBlockPoolId()).checkAccess(id, userId, block, mode);
ExtendedBlock block, AccessMode mode,
StorageType[] storageTypes) throws InvalidToken {
get(block.getBlockPoolId()).checkAccess(id, userId, block, mode,
storageTypes);
}
/**
* See {@link BlockTokenSecretManager#checkAccess(Token, String,
* ExtendedBlock, BlockTokenIdentifier.AccessMode)}
* ExtendedBlock, BlockTokenIdentifier.AccessMode,
* StorageType[])}.
*/
public void checkAccess(Token<BlockTokenIdentifier> token,
String userId, ExtendedBlock block, AccessMode mode) throws InvalidToken {
get(block.getBlockPoolId()).checkAccess(token, userId, block, mode);
String userId, ExtendedBlock block, AccessMode mode,
StorageType[] storageTypes) throws InvalidToken {
get(block.getBlockPoolId()).checkAccess(token, userId, block, mode,
storageTypes);
}
/**
@ -107,11 +114,12 @@ public class BlockPoolTokenSecretManager extends
}
/**
* See {@link BlockTokenSecretManager#generateToken(ExtendedBlock, EnumSet)}
* See {@link BlockTokenSecretManager#generateToken(ExtendedBlock, EnumSet,
* StorageType[])}
*/
public Token<BlockTokenIdentifier> generateToken(ExtendedBlock b,
EnumSet<AccessMode> of) throws IOException {
return get(b.getBlockPoolId()).generateToken(b, of);
EnumSet<AccessMode> of, StorageType[] storageTypes) throws IOException {
return get(b.getBlockPoolId()).generateToken(b, of, storageTypes);
}
@VisibleForTesting

View File

@ -22,10 +22,13 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
@ -41,6 +44,7 @@ import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.StorageType;
/**
* BlockTokenSecretManager can be instantiated in 2 modes, master mode
@ -242,17 +246,19 @@ public class BlockTokenSecretManager extends
/** Generate an block token for current user */
public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
EnumSet<BlockTokenIdentifier.AccessMode> modes) throws IOException {
EnumSet<BlockTokenIdentifier.AccessMode> modes,
StorageType[] storageTypes) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String userID = (ugi == null ? null : ugi.getShortUserName());
return generateToken(userID, block, modes);
return generateToken(userID, block, modes, storageTypes);
}
/** Generate a block token for a specified user */
public Token<BlockTokenIdentifier> generateToken(String userId,
ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes) throws IOException {
ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes,
StorageType[] storageTypes) throws IOException {
BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
.getBlockPoolId(), block.getBlockId(), modes, useProto);
.getBlockPoolId(), block.getBlockId(), modes, storageTypes, useProto);
return new Token<BlockTokenIdentifier>(id, this);
}
@ -260,9 +266,22 @@ public class BlockTokenSecretManager extends
* Check if access should be allowed. userID is not checked if null. This
* method doesn't check if token password is correct. It should be used only
* when token password has already been verified (e.g., in the RPC layer).
*
* Some places need to check the access using StorageTypes and for other
* places the StorageTypes is not relevant.
*/
public void checkAccess(BlockTokenIdentifier id, String userId,
ExtendedBlock block, BlockTokenIdentifier.AccessMode mode) throws InvalidToken {
ExtendedBlock block, BlockTokenIdentifier.AccessMode mode,
StorageType[] storageTypes) throws InvalidToken {
checkAccess(id, userId, block, mode);
if (storageTypes != null && storageTypes.length > 0) {
checkAccess(id.getStorageTypes(), storageTypes);
}
}
public void checkAccess(BlockTokenIdentifier id, String userId,
ExtendedBlock block, BlockTokenIdentifier.AccessMode mode)
throws InvalidToken {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking access for user=" + userId + ", block=" + block
+ ", access mode=" + mode + " using " + id.toString());
@ -289,9 +308,41 @@ public class BlockTokenSecretManager extends
}
}
/**
* Check if the requested StorageTypes match the StorageTypes in the
* BlockTokenIdentifier.
* Empty candidateStorageTypes specifiers mean 'all is permitted'. They
* would otherwise be nonsensical.
*/
public static void checkAccess(StorageType[] candidateStorageTypes,
StorageType[] storageTypesRequested) throws InvalidToken {
if (storageTypesRequested.length == 0) {
throw new InvalidToken("The request has no StorageTypes. "
+ "This is probably a configuration error.");
}
if (candidateStorageTypes.length == 0) {
return;
}
List<StorageType> unseenCandidates = new ArrayList<StorageType>();
unseenCandidates.addAll(Arrays.asList(candidateStorageTypes));
for (StorageType storageType : storageTypesRequested) {
final int index = unseenCandidates.indexOf(storageType);
if (index == -1) {
throw new InvalidToken("Block token with StorageTypes "
+ Arrays.toString(candidateStorageTypes)
+ " not valid for access with StorageTypes "
+ Arrays.toString(storageTypesRequested));
}
Collections.swap(unseenCandidates, index, unseenCandidates.size()-1);
unseenCandidates.remove(unseenCandidates.size()-1);
}
}
/** Check if access should be allowed. userID is not checked if null */
public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
ExtendedBlock block, BlockTokenIdentifier.AccessMode mode) throws InvalidToken {
ExtendedBlock block, BlockTokenIdentifier.AccessMode mode,
StorageType[] storageTypes) throws InvalidToken {
BlockTokenIdentifier id = new BlockTokenIdentifier();
try {
id.readFields(new DataInputStream(new ByteArrayInputStream(token
@ -301,7 +352,7 @@ public class BlockTokenSecretManager extends
"Unable to de-serialize block token identifier for user=" + userId
+ ", block=" + block + ", access mode=" + mode);
}
checkAccess(id, userId, block, mode);
checkAccess(id, userId, block, mode, storageTypes);
if (!Arrays.equals(retrievePassword(id), token.getPassword())) {
throw new InvalidToken("Block token with " + id.toString()
+ " doesn't have the correct token password");

View File

@ -355,7 +355,8 @@ public class Dispatcher {
ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
reportedBlock.getBlock());
final KeyManager km = nnc.getKeyManager();
Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb,
new StorageType[]{target.storageType});
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, km, accessToken, target.getDatanodeInfo());
unbufOut = saslStreams.out;

View File

@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
@ -93,8 +94,8 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory {
}
/** Get an access token for a block. */
public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
) throws IOException {
public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb,
StorageType[] storageTypes) throws IOException {
if (!isBlockTokenEnabled) {
return BlockTokenSecretManager.DUMMY_TOKEN;
} else {
@ -103,7 +104,8 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory {
"Cannot get access token since BlockKeyUpdater is not running");
}
return blockTokenSecretManager.generateToken(null, eb,
EnumSet.of(BlockTokenIdentifier.AccessMode.REPLACE, BlockTokenIdentifier.AccessMode.COPY));
EnumSet.of(BlockTokenIdentifier.AccessMode.REPLACE,
BlockTokenIdentifier.AccessMode.COPY), storageTypes);
}
}

View File

@ -1283,13 +1283,13 @@ public class BlockManager implements BlockStatsMXBean {
internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]);
blockTokens[i] = blockTokenSecretManager.generateToken(
NameNode.getRemoteUser().getShortUserName(),
internalBlock, EnumSet.of(mode));
internalBlock, EnumSet.of(mode), b.getStorageTypes());
}
sb.setBlockTokens(blockTokens);
} else {
b.setBlockToken(blockTokenSecretManager.generateToken(
NameNode.getRemoteUser().getShortUserName(),
b.getBlock(), EnumSet.of(mode)));
b.getBlock(), EnumSet.of(mode), b.getStorageTypes()));
}
}
}

View File

@ -1929,8 +1929,9 @@ public class DataNode extends ReconfigurableBase
return fis;
}
private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token,
AccessMode accessMode) throws IOException {
private void checkBlockToken(ExtendedBlock block,
Token<BlockTokenIdentifier> token, AccessMode accessMode)
throws IOException {
if (isBlockTokenEnabled) {
BlockTokenIdentifier id = new BlockTokenIdentifier();
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
@ -1939,7 +1940,8 @@ public class DataNode extends ReconfigurableBase
if (LOG.isDebugEnabled()) {
LOG.debug("Got: " + id.toString());
}
blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode);
blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode,
null);
}
}
@ -2451,7 +2453,8 @@ public class DataNode extends ReconfigurableBase
// Header info
//
Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
targetStorageTypes);
long writeTimeout = dnConf.socketWriteTimeout +
HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
@ -2534,11 +2537,13 @@ public class DataNode extends ReconfigurableBase
* Use BlockTokenSecretManager to generate block token for current user.
*/
public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b,
EnumSet<AccessMode> mode) throws IOException {
EnumSet<AccessMode> mode,
StorageType[] storageTypes) throws IOException {
Token<BlockTokenIdentifier> accessToken =
BlockTokenSecretManager.DUMMY_TOKEN;
if (isBlockTokenEnabled) {
accessToken = blockPoolTokenSecretManager.generateToken(b, mode);
accessToken = blockPoolTokenSecretManager.generateToken(b, mode,
storageTypes);
}
return accessToken;
}
@ -2911,7 +2916,7 @@ public class DataNode extends ReconfigurableBase
LOG.debug("Got: " + id.toString());
}
blockPoolTokenSecretManager.checkAccess(id, null, block,
BlockTokenIdentifier.AccessMode.READ);
BlockTokenIdentifier.AccessMode.READ, null);
}
}
}

View File

@ -573,8 +573,8 @@ class DataXceiver extends Receiver implements Runnable {
updateCurrentThreadName("Sending block " + block);
OutputStream baseStream = getOutputStream();
DataOutputStream out = getBufferedOutputStream();
checkAccess(out, true, block, blockToken,
Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
checkAccess(out, true, block, blockToken, Op.READ_BLOCK,
BlockTokenIdentifier.AccessMode.READ);
// send the block
BlockSender blockSender = null;
@ -685,8 +685,16 @@ class DataXceiver extends Receiver implements Runnable {
long size = 0;
// reply to upstream datanode or client
final DataOutputStream replyOut = getBufferedOutputStream();
checkAccess(replyOut, isClient, block, blockToken,
Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
int nst = targetStorageTypes.length;
StorageType[] storageTypes = new StorageType[nst + 1];
storageTypes[0] = storageType;
if (targetStorageTypes.length > 0) {
System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst);
}
checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK,
BlockTokenIdentifier.AccessMode.WRITE, storageTypes);
// check single target for transfer-RBW/Finalized
if (isTransfer && targets.length > 0) {
throw new IOException(stage + " does not support multiple targets "
@ -927,8 +935,8 @@ class DataXceiver extends Receiver implements Runnable {
final DataOutputStream out = new DataOutputStream(
getOutputStream());
checkAccess(out, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
checkAccess(out, true, blk, blockToken, Op.TRANSFER_BLOCK,
BlockTokenIdentifier.AccessMode.COPY, targetStorageTypes);
try {
datanode.transferReplicaForPipelineRecovery(blk, targets,
targetStorageTypes, clientName);
@ -949,9 +957,8 @@ class DataXceiver extends Receiver implements Runnable {
updateCurrentThreadName("Getting checksum for block " + block);
final DataOutputStream out = new DataOutputStream(
getOutputStream());
checkAccess(out, true, block, blockToken,
Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM,
BlockTokenIdentifier.AccessMode.READ);
BlockChecksumComputer maker =
new ReplicatedBlockChecksumComputer(datanode, block);
@ -985,11 +992,12 @@ class DataXceiver extends Receiver implements Runnable {
public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo,
final Token<BlockTokenIdentifier> blockToken, long requestedNumBytes)
throws IOException {
final ExtendedBlock block = stripedBlockInfo.getBlock();
updateCurrentThreadName("Getting checksum for block group" +
stripedBlockInfo.getBlock());
block);
final DataOutputStream out = new DataOutputStream(getOutputStream());
checkAccess(out, true, stripedBlockInfo.getBlock(), blockToken,
Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
checkAccess(out, true, block, blockToken, Op.BLOCK_GROUP_CHECKSUM,
BlockTokenIdentifier.AccessMode.READ);
AbstractBlockChecksumComputer maker =
new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo,
@ -1027,8 +1035,8 @@ class DataXceiver extends Receiver implements Runnable {
final Token<BlockTokenIdentifier> blockToken) throws IOException {
updateCurrentThreadName("Copying block " + block);
DataOutputStream reply = getBufferedOutputStream();
checkAccess(reply, true, block, blockToken,
Op.COPY_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
checkAccess(reply, true, block, blockToken, Op.COPY_BLOCK,
BlockTokenIdentifier.AccessMode.COPY);
if (datanode.data.getPinning(block)) {
String msg = "Not able to copy block " + block.getBlockId() + " " +
@ -1101,7 +1109,8 @@ class DataXceiver extends Receiver implements Runnable {
updateCurrentThreadName("Replacing block " + block + " from " + delHint);
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
checkAccess(replyOut, true, block, blockToken,
Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE);
Op.REPLACE_BLOCK, BlockTokenIdentifier.AccessMode.REPLACE,
new StorageType[]{ storageType });
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
String msg = "Not able to receive block " + block.getBlockId() +
@ -1353,11 +1362,18 @@ class DataXceiver extends Receiver implements Runnable {
throw new IOException("Not ready to serve the block pool, " + bpId + ".");
}
private void checkAccess(OutputStream out, final boolean reply,
ExtendedBlock blk, Token<BlockTokenIdentifier> t, Op op,
BlockTokenIdentifier.AccessMode mode) throws IOException {
checkAccess(out, reply, blk, t, op, mode, null);
}
private void checkAccess(OutputStream out, final boolean reply,
final ExtendedBlock blk,
final Token<BlockTokenIdentifier> t,
final Op op,
final BlockTokenIdentifier.AccessMode mode) throws IOException {
final BlockTokenIdentifier.AccessMode mode,
final StorageType[] storageTypes) throws IOException {
checkAndWaitForBP(blk);
if (datanode.isBlockTokenEnabled) {
if (LOG.isDebugEnabled()) {
@ -1365,7 +1381,8 @@ class DataXceiver extends Receiver implements Runnable {
+ "' with mode '" + mode + "'");
}
try {
datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode,
storageTypes);
} catch(InvalidToken e) {
try {
if (reply) {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.erasurecode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
@ -108,7 +109,8 @@ class StripedBlockReader {
InetSocketAddress dnAddr =
stripedReader.getSocketAddress4Transfer(source);
Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ));
block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ),
StorageType.EMPTY_ARRAY);
/*
* This can be further improved if the replica is local, then we can
* read directly from DN and need to check the replica is FINALIZED

View File

@ -116,7 +116,8 @@ class StripedBlockWriter {
Token<BlockTokenIdentifier> blockToken =
datanode.getBlockAccessToken(block,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
new StorageType[]{storageType});
long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout);

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.*;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@ -44,9 +45,11 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.junit.Assert;
import static org.junit.Assert.fail;
import org.junit.Test;
/** Test {@link BlockStoragePolicy} */
@ -1397,4 +1400,71 @@ public class TestBlockStoragePolicy {
}
}
@Test
public void testStorageTypeCheckAccess(){
testStorageTypeCheckAccessResult(new StorageType[]{StorageType.DEFAULT},
new StorageType[]{StorageType.DEFAULT}, true);
testStorageTypeCheckAccessResult(StorageType.EMPTY_ARRAY,
StorageType.EMPTY_ARRAY, false);
testStorageTypeCheckAccessResult(new StorageType[]{StorageType.DISK},
StorageType.EMPTY_ARRAY, false);
testStorageTypeCheckAccessResult(StorageType.EMPTY_ARRAY,
new StorageType[]{StorageType.RAM_DISK}, true);
testStorageTypeCheckAccessResult(new StorageType[]{StorageType.DISK},
new StorageType[]{StorageType.DISK}, true);
testStorageTypeCheckAccessResult(new StorageType[]{StorageType.DISK},
new StorageType[]{StorageType.DISK, StorageType.DISK, StorageType.DISK},
false);
testStorageTypeCheckAccessResult(
new StorageType[]{StorageType.DISK, StorageType.DISK, StorageType.DISK},
new StorageType[]{StorageType.DISK, StorageType.DISK, StorageType.DISK},
true);
testStorageTypeCheckAccessResult(
new StorageType[]{StorageType.RAM_DISK, StorageType.SSD},
new StorageType[]{StorageType.DISK, StorageType.RAM_DISK,
StorageType.SSD},
false);
testStorageTypeCheckAccessResult(
new StorageType[]{StorageType.DISK, StorageType.SSD},
new StorageType[]{StorageType.SSD},
true);
testStorageTypeCheckAccessResult(new StorageType[]{StorageType.DISK},
new StorageType[]{StorageType.RAM_DISK}, false);
testStorageTypeCheckAccessResult(new StorageType[]{StorageType.DISK},
new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
StorageType.ARCHIVE},
false);
testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK,
StorageType.SSD, StorageType.ARCHIVE},
new StorageType[]{StorageType.DISK}, false);
}
private void testStorageTypeCheckAccessResult(StorageType[] requested,
StorageType[] allowed, boolean expAccess) {
try {
BlockTokenSecretManager.checkAccess(requested, allowed);
if (!expAccess) {
fail("No expected access with allowed StorageTypes "
+ Arrays.toString(allowed) + " and requested StorageTypes "
+ Arrays.toString(requested));
}
} catch (SecretManager.InvalidToken e) {
if (expAccess) {
fail("Expected access with allowed StorageTypes "
+ Arrays.toString(allowed) + " and requested StorageTypes "
+ Arrays.toString(requested));
}
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.security.token.block;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
@ -73,6 +74,7 @@ import org.apache.hadoop.security.SaslInputStream;
import org.apache.hadoop.security.SaslRpcClient;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
@ -88,6 +90,7 @@ import org.mockito.stubbing.Answer;
import com.google.protobuf.BlockingService;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.fs.StorageType;
/** Unit tests for block tokens */
public class TestBlockToken {
@ -102,7 +105,9 @@ public class TestBlockToken {
GenericTestUtils.setLogLevel(SaslInputStream.LOG, Level.ALL);
}
/** Directory where we can count our open file descriptors under Linux */
/**
* Directory where we can count our open file descriptors under Linux
*/
static final File FD_DIR = new File("/proc/self/fd/");
final long blockKeyUpdateInterval = 10 * 60 * 1000; // 10 mins
@ -124,7 +129,7 @@ public class TestBlockToken {
final BlockTokenIdentifier ident;
public GetLengthAnswer(BlockTokenSecretManager sm,
BlockTokenIdentifier ident) {
BlockTokenIdentifier ident) {
this.sm = sm;
this.ident = ident;
}
@ -145,7 +150,8 @@ public class TestBlockToken {
LOG.info("Got: " + id.toString());
assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
sm.checkAccess(id, null, PBHelperClient.convert(req.getBlock()),
BlockTokenIdentifier.AccessMode.WRITE);
BlockTokenIdentifier.AccessMode.WRITE,
new StorageType[]{StorageType.DEFAULT});
result = id.getBlockId();
}
return GetReplicaVisibleLengthResponseProto.newBuilder()
@ -154,10 +160,11 @@ public class TestBlockToken {
}
private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
ExtendedBlock block,
EnumSet<BlockTokenIdentifier.AccessMode> accessModes)
throws IOException {
Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes);
ExtendedBlock block,
EnumSet<BlockTokenIdentifier.AccessMode> accessModes,
StorageType... storageTypes) throws IOException {
Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes,
storageTypes);
BlockTokenIdentifier id = sm.createIdentifier();
id.readFields(new DataInputStream(new ByteArrayInputStream(token
.getIdentifier())));
@ -169,12 +176,31 @@ public class TestBlockToken {
BlockTokenSecretManager sm = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
enableProtobuf);
TestWritable.testWritable(generateTokenId(sm, block1,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)));
TestWritable.testWritable(generateTokenId(sm, block2,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)));
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class)));
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
StorageType.DEFAULT));
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
StorageType.DEFAULT));
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
StorageType.DEFAULT));
TestWritable.testWritable(generateTokenId(sm, block1,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
StorageType.DEFAULT));
TestWritable.testWritable(generateTokenId(sm, block2,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
StorageType.DEFAULT));
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
StorageType.DEFAULT));
// We must be backwards compatible when adding storageType
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
(StorageType[]) null));
TestWritable.testWritable(generateTokenId(sm, block3,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
StorageType.EMPTY_ARRAY));
}
@Test
@ -187,29 +213,37 @@ public class TestBlockToken {
testWritable(true);
}
private static void checkAccess(BlockTokenSecretManager m,
Token<BlockTokenIdentifier> t, ExtendedBlock blk,
BlockTokenIdentifier.AccessMode mode) throws SecretManager.InvalidToken {
m.checkAccess(t, null, blk, mode, new StorageType[]{ StorageType.DEFAULT });
}
private void tokenGenerationAndVerification(BlockTokenSecretManager master,
BlockTokenSecretManager slave) throws Exception {
BlockTokenSecretManager slave, StorageType... storageTypes)
throws Exception {
// single-mode tokens
for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
.values()) {
// generated by master
Token<BlockTokenIdentifier> token1 = master.generateToken(block1,
EnumSet.of(mode));
master.checkAccess(token1, null, block1, mode);
slave.checkAccess(token1, null, block1, mode);
EnumSet.of(mode), storageTypes);
checkAccess(master, token1, block1, mode);
checkAccess(slave, token1, block1, mode);
// generated by slave
Token<BlockTokenIdentifier> token2 = slave.generateToken(block2,
EnumSet.of(mode));
master.checkAccess(token2, null, block2, mode);
slave.checkAccess(token2, null, block2, mode);
EnumSet.of(mode), storageTypes);
checkAccess(master, token2, block2, mode);
checkAccess(slave, token2, block2, mode);
}
// multi-mode tokens
Token<BlockTokenIdentifier> mtoken = master.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
storageTypes);
for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
.values()) {
master.checkAccess(mtoken, null, block3, mode);
slave.checkAccess(mtoken, null, block3, mode);
checkAccess(master, mtoken, block3, mode);
checkAccess(slave, mtoken, block3, mode);
}
}
@ -224,13 +258,19 @@ public class TestBlockToken {
enableProtobuf);
ExportedBlockKeys keys = masterHandler.exportKeys();
slaveHandler.addKeys(keys);
tokenGenerationAndVerification(masterHandler, slaveHandler);
tokenGenerationAndVerification(masterHandler, slaveHandler,
StorageType.DEFAULT);
tokenGenerationAndVerification(masterHandler, slaveHandler, null);
// key updating
masterHandler.updateKeys();
tokenGenerationAndVerification(masterHandler, slaveHandler);
tokenGenerationAndVerification(masterHandler, slaveHandler,
StorageType.DEFAULT);
tokenGenerationAndVerification(masterHandler, slaveHandler, null);
keys = masterHandler.exportKeys();
slaveHandler.addKeys(keys);
tokenGenerationAndVerification(masterHandler, slaveHandler);
tokenGenerationAndVerification(masterHandler, slaveHandler,
StorageType.DEFAULT);
tokenGenerationAndVerification(masterHandler, slaveHandler, null);
}
@Test
@ -274,7 +314,8 @@ public class TestBlockToken {
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
enableProtobuf);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
new StorageType[]{StorageType.DEFAULT});
final Server server = createMockDatanode(sm, token, conf);
@ -323,7 +364,8 @@ public class TestBlockToken {
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
enableProtobuf);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
new StorageType[]{StorageType.DEFAULT});
final Server server = createMockDatanode(sm, token, conf);
server.start();
@ -409,14 +451,19 @@ public class TestBlockToken {
ExportedBlockKeys keys = masterHandler.exportKeys();
bpMgr.addKeys(bpid, keys);
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
StorageType.DEFAULT);
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
// Test key updating
masterHandler.updateKeys();
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
StorageType.DEFAULT);
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
keys = masterHandler.exportKeys();
bpMgr.addKeys(bpid, keys);
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
StorageType.DEFAULT);
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null);
}
}
@ -492,7 +539,8 @@ public class TestBlockToken {
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
useProto);
Token<BlockTokenIdentifier> token = sm.generateToken(block1,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class));
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
new StorageType[]{StorageType.DEFAULT});
final byte[] tokenBytes = token.getIdentifier();
BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@ -547,7 +595,6 @@ public class TestBlockToken {
dib.reset(emptyIdentBytes, emptyIdentBytes.length);
readToken.readFields(dib);
assertTrue(invalidProtobufMessage);
}
@Test
@ -557,7 +604,8 @@ public class TestBlockToken {
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
useProto);
Token<BlockTokenIdentifier> token = sm.generateToken(block1,
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class));
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
StorageType.EMPTY_ARRAY);
final byte[] tokenBytes = token.getIdentifier();
BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
@ -594,7 +642,7 @@ public class TestBlockToken {
assertEquals(protobufToken, readToken);
}
public void testCraftedProtobufBlockTokenIdentifier(
private void testCraftedProtobufBlockTokenIdentifier(
BlockTokenIdentifier identifier, boolean expectIOE,
boolean expectRTE) throws IOException {
DataOutputBuffer dob = new DataOutputBuffer(4096);
@ -631,20 +679,27 @@ public class TestBlockToken {
dib.reset(identBytes, identBytes.length);
readToken.readFieldsProtobuf(dib);
assertEquals(protobufToken, readToken);
assertEquals(identifier, readToken);
}
@Test
public void testEmptyProtobufBlockTokenBytesIsProtobuf() throws IOException {
// Empty BlockTokenIdentifiers throw IOException
BlockTokenIdentifier identifier = new BlockTokenIdentifier();
testCraftedProtobufBlockTokenIdentifier(identifier, true, false);
}
@Test
public void testCraftedProtobufBlockTokenBytesIsProtobuf() throws
IOException {
// Empty BlockTokenIdentifiers throw IOException
BlockTokenIdentifier identifier = new BlockTokenIdentifier();
testCraftedProtobufBlockTokenIdentifier(identifier, true, false);
/* Parsing BlockTokenIdentifier with expiryDate
* 2017-02-09 00:12:35,072+0100 will throw IOException.
* However, expiryDate of
* 2017-02-09 00:12:35,071+0100 will throw NegativeArraySizeException.
*/
BlockTokenIdentifier identifier = new BlockTokenIdentifier("user",
"blockpool", 123, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, true);
Calendar cal = new GregorianCalendar();
cal.set(2017, 1, 9, 0, 12, 35);
long datetime = cal.getTimeInMillis();
@ -656,4 +711,61 @@ public class TestBlockToken {
identifier.setExpiryDate(datetime);
testCraftedProtobufBlockTokenIdentifier(identifier, true, false);
}
private BlockTokenIdentifier writeAndReadBlockToken(
BlockTokenIdentifier identifier) throws IOException {
DataOutputBuffer dob = new DataOutputBuffer(4096);
DataInputBuffer dib = new DataInputBuffer();
identifier.write(dob);
byte[] identBytes = Arrays.copyOf(dob.getData(), dob.getLength());
BlockTokenIdentifier readToken = new BlockTokenIdentifier();
dib.reset(identBytes, identBytes.length);
readToken.readFields(dib);
assertEquals(identifier, readToken);
return readToken;
}
@Test
public void testEmptyBlockTokenSerialization() throws IOException {
BlockTokenIdentifier ident = new BlockTokenIdentifier();
BlockTokenIdentifier ret = writeAndReadBlockToken(ident);
assertEquals(ret.getExpiryDate(), 0);
assertEquals(ret.getKeyId(), 0);
assertEquals(ret.getUserId(), null);
assertEquals(ret.getBlockPoolId(), null);
assertEquals(ret.getBlockId(), 0);
assertEquals(ret.getAccessModes(),
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class));
assertArrayEquals(ret.getStorageTypes(), StorageType.EMPTY_ARRAY);
}
private void testBlockTokenSerialization(boolean useProto) throws
IOException {
EnumSet<BlockTokenIdentifier.AccessMode> accessModes =
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class);
StorageType[] storageTypes =
new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
StorageType.DISK, StorageType.ARCHIVE};
BlockTokenIdentifier ident = new BlockTokenIdentifier("user", "bpool",
123, accessModes, storageTypes, useProto);
ident.setExpiryDate(1487080345L);
BlockTokenIdentifier ret = writeAndReadBlockToken(ident);
assertEquals(ret.getExpiryDate(), 1487080345L);
assertEquals(ret.getKeyId(), 0);
assertEquals(ret.getUserId(), "user");
assertEquals(ret.getBlockPoolId(), "bpool");
assertEquals(ret.getBlockId(), 123);
assertEquals(ret.getAccessModes(),
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
assertArrayEquals(ret.getStorageTypes(), storageTypes);
}
@Test
public void testBlockTokenSerialization() throws IOException {
testBlockTokenSerialization(false);
testBlockTokenSerialization(true);
}
}