HDFS-8103. Move BlockTokenSecretManager.AccessMode into BlockTokenIdentifier. Contributed by Haohui Mai.
This commit is contained in:
parent
2cc9514ad6
commit
36e4cd3be6
|
@ -418,6 +418,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-8100. Refactor DFSClient.Conf to a standalone class and separates
|
HDFS-8100. Refactor DFSClient.Conf to a standalone class and separates
|
||||||
short-circuit related conf to ShortCircuitConf. (szetszwo)
|
short-circuit related conf to ShortCircuitConf. (szetszwo)
|
||||||
|
|
||||||
|
HDFS-8103. Move BlockTokenSecretManager.AccessMode into
|
||||||
|
BlockTokenIdentifier. (wheat9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -23,7 +23,7 @@ import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
|
||||||
import org.apache.hadoop.security.token.SecretManager;
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ public class BlockPoolTokenSecretManager extends
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* See {@link BlockTokenSecretManager#checkAccess(BlockTokenIdentifier,
|
* See {@link BlockTokenSecretManager#checkAccess(BlockTokenIdentifier,
|
||||||
* String, ExtendedBlock, AccessMode)}
|
* String, ExtendedBlock, BlockTokenIdentifier.AccessMode)}
|
||||||
*/
|
*/
|
||||||
public void checkAccess(BlockTokenIdentifier id, String userId,
|
public void checkAccess(BlockTokenIdentifier id, String userId,
|
||||||
ExtendedBlock block, AccessMode mode) throws InvalidToken {
|
ExtendedBlock block, AccessMode mode) throws InvalidToken {
|
||||||
|
@ -90,7 +90,7 @@ public class BlockPoolTokenSecretManager extends
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* See {@link BlockTokenSecretManager#checkAccess(Token, String,
|
* See {@link BlockTokenSecretManager#checkAccess(Token, String,
|
||||||
* ExtendedBlock, AccessMode)}
|
* ExtendedBlock, BlockTokenIdentifier.AccessMode)}
|
||||||
*/
|
*/
|
||||||
public void checkAccess(Token<BlockTokenIdentifier> token,
|
public void checkAccess(Token<BlockTokenIdentifier> token,
|
||||||
String userId, ExtendedBlock block, AccessMode mode) throws InvalidToken {
|
String userId, ExtendedBlock block, AccessMode mode) throws InvalidToken {
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.io.IOException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -35,6 +34,10 @@ import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
public class BlockTokenIdentifier extends TokenIdentifier {
|
public class BlockTokenIdentifier extends TokenIdentifier {
|
||||||
static final Text KIND_NAME = new Text("HDFS_BLOCK_TOKEN");
|
static final Text KIND_NAME = new Text("HDFS_BLOCK_TOKEN");
|
||||||
|
|
||||||
|
public enum AccessMode {
|
||||||
|
READ, WRITE, COPY, REPLACE
|
||||||
|
}
|
||||||
|
|
||||||
private long expiryDate;
|
private long expiryDate;
|
||||||
private int keyId;
|
private int keyId;
|
||||||
private String userId;
|
private String userId;
|
||||||
|
@ -175,7 +178,7 @@ public class BlockTokenIdentifier extends TokenIdentifier {
|
||||||
|
|
||||||
return cache;
|
return cache;
|
||||||
}
|
}
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public static class Renewer extends Token.TrivialRenewer {
|
public static class Renewer extends Token.TrivialRenewer {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -80,9 +80,7 @@ public class BlockTokenSecretManager extends
|
||||||
|
|
||||||
private final SecureRandom nonceGenerator = new SecureRandom();
|
private final SecureRandom nonceGenerator = new SecureRandom();
|
||||||
|
|
||||||
public static enum AccessMode {
|
;
|
||||||
READ, WRITE, COPY, REPLACE
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor for slaves.
|
* Constructor for slaves.
|
||||||
|
@ -239,7 +237,7 @@ public class BlockTokenSecretManager extends
|
||||||
|
|
||||||
/** Generate an block token for current user */
|
/** Generate an block token for current user */
|
||||||
public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
|
public Token<BlockTokenIdentifier> generateToken(ExtendedBlock block,
|
||||||
EnumSet<AccessMode> modes) throws IOException {
|
EnumSet<BlockTokenIdentifier.AccessMode> modes) throws IOException {
|
||||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||||
String userID = (ugi == null ? null : ugi.getShortUserName());
|
String userID = (ugi == null ? null : ugi.getShortUserName());
|
||||||
return generateToken(userID, block, modes);
|
return generateToken(userID, block, modes);
|
||||||
|
@ -247,7 +245,7 @@ public class BlockTokenSecretManager extends
|
||||||
|
|
||||||
/** Generate a block token for a specified user */
|
/** Generate a block token for a specified user */
|
||||||
public Token<BlockTokenIdentifier> generateToken(String userId,
|
public Token<BlockTokenIdentifier> generateToken(String userId,
|
||||||
ExtendedBlock block, EnumSet<AccessMode> modes) throws IOException {
|
ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes) throws IOException {
|
||||||
BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
|
BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
|
||||||
.getBlockPoolId(), block.getBlockId(), modes);
|
.getBlockPoolId(), block.getBlockId(), modes);
|
||||||
return new Token<BlockTokenIdentifier>(id, this);
|
return new Token<BlockTokenIdentifier>(id, this);
|
||||||
|
@ -259,7 +257,7 @@ public class BlockTokenSecretManager extends
|
||||||
* when token password has already been verified (e.g., in the RPC layer).
|
* when token password has already been verified (e.g., in the RPC layer).
|
||||||
*/
|
*/
|
||||||
public void checkAccess(BlockTokenIdentifier id, String userId,
|
public void checkAccess(BlockTokenIdentifier id, String userId,
|
||||||
ExtendedBlock block, AccessMode mode) throws InvalidToken {
|
ExtendedBlock block, BlockTokenIdentifier.AccessMode mode) throws InvalidToken {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Checking access for user=" + userId + ", block=" + block
|
LOG.debug("Checking access for user=" + userId + ", block=" + block
|
||||||
+ ", access mode=" + mode + " using " + id.toString());
|
+ ", access mode=" + mode + " using " + id.toString());
|
||||||
|
@ -288,7 +286,7 @@ public class BlockTokenSecretManager extends
|
||||||
|
|
||||||
/** Check if access should be allowed. userID is not checked if null */
|
/** Check if access should be allowed. userID is not checked if null */
|
||||||
public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
|
public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
|
||||||
ExtendedBlock block, AccessMode mode) throws InvalidToken {
|
ExtendedBlock block, BlockTokenIdentifier.AccessMode mode) throws InvalidToken {
|
||||||
BlockTokenIdentifier id = new BlockTokenIdentifier();
|
BlockTokenIdentifier id = new BlockTokenIdentifier();
|
||||||
try {
|
try {
|
||||||
id.readFields(new DataInputStream(new ByteArrayInputStream(token
|
id.readFields(new DataInputStream(new ByteArrayInputStream(token
|
||||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
|
|
||||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
|
@ -100,7 +99,7 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory {
|
||||||
"Cannot get access token since BlockKeyUpdater is not running");
|
"Cannot get access token since BlockKeyUpdater is not running");
|
||||||
}
|
}
|
||||||
return blockTokenSecretManager.generateToken(null, eb,
|
return blockTokenSecretManager.generateToken(null, eb,
|
||||||
EnumSet.of(AccessMode.REPLACE, AccessMode.COPY));
|
EnumSet.of(BlockTokenIdentifier.AccessMode.REPLACE, BlockTokenIdentifier.AccessMode.COPY));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,8 +55,9 @@ import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
|
import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
|
||||||
|
@ -747,7 +748,7 @@ public class BlockManager {
|
||||||
|
|
||||||
final long fileLength = bc.computeContentSummary(getStoragePolicySuite()).getLength();
|
final long fileLength = bc.computeContentSummary(getStoragePolicySuite()).getLength();
|
||||||
final long pos = fileLength - ucBlock.getNumBytes();
|
final long pos = fileLength - ucBlock.getNumBytes();
|
||||||
return createLocatedBlock(ucBlock, pos, AccessMode.WRITE);
|
return createLocatedBlock(ucBlock, pos, BlockTokenIdentifier.AccessMode.WRITE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -813,7 +814,7 @@ public class BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private LocatedBlock createLocatedBlock(final BlockInfoContiguous blk, final long pos,
|
private LocatedBlock createLocatedBlock(final BlockInfoContiguous blk, final long pos,
|
||||||
final BlockTokenSecretManager.AccessMode mode) throws IOException {
|
final AccessMode mode) throws IOException {
|
||||||
final LocatedBlock lb = createLocatedBlock(blk, pos);
|
final LocatedBlock lb = createLocatedBlock(blk, pos);
|
||||||
if (mode != null) {
|
if (mode != null) {
|
||||||
setBlockToken(lb, mode);
|
setBlockToken(lb, mode);
|
||||||
|
@ -886,7 +887,7 @@ public class BlockManager {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
|
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
|
||||||
}
|
}
|
||||||
final AccessMode mode = needBlockToken? AccessMode.READ: null;
|
final AccessMode mode = needBlockToken? BlockTokenIdentifier.AccessMode.READ: null;
|
||||||
final List<LocatedBlock> locatedblocks = createLocatedBlockList(
|
final List<LocatedBlock> locatedblocks = createLocatedBlockList(
|
||||||
blocks, offset, length, Integer.MAX_VALUE, mode);
|
blocks, offset, length, Integer.MAX_VALUE, mode);
|
||||||
|
|
||||||
|
@ -918,7 +919,7 @@ public class BlockManager {
|
||||||
|
|
||||||
/** Generate a block token for the located block. */
|
/** Generate a block token for the located block. */
|
||||||
public void setBlockToken(final LocatedBlock b,
|
public void setBlockToken(final LocatedBlock b,
|
||||||
final BlockTokenSecretManager.AccessMode mode) throws IOException {
|
final AccessMode mode) throws IOException {
|
||||||
if (isBlockTokenEnabled()) {
|
if (isBlockTokenEnabled()) {
|
||||||
// Use cached UGI if serving RPC calls.
|
// Use cached UGI if serving RPC calls.
|
||||||
b.setBlockToken(blockTokenSecretManager.generateToken(
|
b.setBlockToken(blockTokenSecretManager.generateToken(
|
||||||
|
|
|
@ -143,7 +143,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||||
|
@ -157,7 +157,6 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
|
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
|
@ -1540,7 +1539,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
|
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
|
||||||
Token<BlockTokenIdentifier> token) throws IOException {
|
Token<BlockTokenIdentifier> token) throws IOException {
|
||||||
checkBlockLocalPathAccess();
|
checkBlockLocalPathAccess();
|
||||||
checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
|
checkBlockToken(block, token, BlockTokenIdentifier.AccessMode.READ);
|
||||||
Preconditions.checkNotNull(data, "Storage not yet initialized");
|
Preconditions.checkNotNull(data, "Storage not yet initialized");
|
||||||
BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
|
BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -1585,7 +1584,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
throw new ShortCircuitFdsUnsupportedException(
|
throw new ShortCircuitFdsUnsupportedException(
|
||||||
fileDescriptorPassingDisabledReason);
|
fileDescriptorPassingDisabledReason);
|
||||||
}
|
}
|
||||||
checkBlockToken(blk, token, BlockTokenSecretManager.AccessMode.READ);
|
checkBlockToken(blk, token, BlockTokenIdentifier.AccessMode.READ);
|
||||||
int blkVersion = CURRENT_BLOCK_FORMAT_VERSION;
|
int blkVersion = CURRENT_BLOCK_FORMAT_VERSION;
|
||||||
if (maxVersion < blkVersion) {
|
if (maxVersion < blkVersion) {
|
||||||
throw new ShortCircuitFdsVersionException("Your client is too old " +
|
throw new ShortCircuitFdsVersionException("Your client is too old " +
|
||||||
|
@ -1622,7 +1621,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
// Check access for each block
|
// Check access for each block
|
||||||
for (int i = 0; i < blockIds.length; i++) {
|
for (int i = 0; i < blockIds.length; i++) {
|
||||||
checkBlockToken(new ExtendedBlock(bpId, blockIds[i]),
|
checkBlockToken(new ExtendedBlock(bpId, blockIds[i]),
|
||||||
tokens.get(i), BlockTokenSecretManager.AccessMode.READ);
|
tokens.get(i), BlockTokenIdentifier.AccessMode.READ);
|
||||||
}
|
}
|
||||||
|
|
||||||
DataNodeFaultInjector.get().getHdfsBlocksMetadata();
|
DataNodeFaultInjector.get().getHdfsBlocksMetadata();
|
||||||
|
@ -2124,7 +2123,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
|
Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
|
||||||
if (isBlockTokenEnabled) {
|
if (isBlockTokenEnabled) {
|
||||||
accessToken = blockPoolTokenSecretManager.generateToken(b,
|
accessToken = blockPoolTokenSecretManager.generateToken(b,
|
||||||
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
|
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
|
||||||
}
|
}
|
||||||
|
|
||||||
long writeTimeout = dnConf.socketWriteTimeout +
|
long writeTimeout = dnConf.socketWriteTimeout +
|
||||||
|
@ -2847,7 +2846,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
LOG.debug("Got: " + id.toString());
|
LOG.debug("Got: " + id.toString());
|
||||||
}
|
}
|
||||||
blockPoolTokenSecretManager.checkAccess(id, null, block,
|
blockPoolTokenSecretManager.checkAccess(id, null, block,
|
||||||
BlockTokenSecretManager.AccessMode.READ);
|
BlockTokenIdentifier.AccessMode.READ);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmR
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
|
||||||
|
@ -507,7 +506,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
||||||
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
|
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
|
||||||
checkAccess(out, true, block, blockToken,
|
checkAccess(out, true, block, blockToken,
|
||||||
Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
|
Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
|
||||||
|
|
||||||
// send the block
|
// send the block
|
||||||
BlockSender blockSender = null;
|
BlockSender blockSender = null;
|
||||||
|
@ -651,7 +650,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
getOutputStream(),
|
getOutputStream(),
|
||||||
HdfsConstants.SMALL_BUFFER_SIZE));
|
HdfsConstants.SMALL_BUFFER_SIZE));
|
||||||
checkAccess(replyOut, isClient, block, blockToken,
|
checkAccess(replyOut, isClient, block, blockToken,
|
||||||
Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
|
Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
|
||||||
|
|
||||||
DataOutputStream mirrorOut = null; // stream to next target
|
DataOutputStream mirrorOut = null; // stream to next target
|
||||||
DataInputStream mirrorIn = null; // reply from next target
|
DataInputStream mirrorIn = null; // reply from next target
|
||||||
|
@ -849,7 +848,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
final DatanodeInfo[] targets,
|
final DatanodeInfo[] targets,
|
||||||
final StorageType[] targetStorageTypes) throws IOException {
|
final StorageType[] targetStorageTypes) throws IOException {
|
||||||
checkAccess(socketOut, true, blk, blockToken,
|
checkAccess(socketOut, true, blk, blockToken,
|
||||||
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
|
Op.TRANSFER_BLOCK, BlockTokenIdentifier.AccessMode.COPY);
|
||||||
previousOpClientName = clientName;
|
previousOpClientName = clientName;
|
||||||
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
|
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
|
||||||
|
|
||||||
|
@ -911,7 +910,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
final DataOutputStream out = new DataOutputStream(
|
final DataOutputStream out = new DataOutputStream(
|
||||||
getOutputStream());
|
getOutputStream());
|
||||||
checkAccess(out, true, block, blockToken,
|
checkAccess(out, true, block, blockToken,
|
||||||
Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
|
Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
|
||||||
// client side now can specify a range of the block for checksum
|
// client side now can specify a range of the block for checksum
|
||||||
long requestLength = block.getNumBytes();
|
long requestLength = block.getNumBytes();
|
||||||
Preconditions.checkArgument(requestLength >= 0);
|
Preconditions.checkArgument(requestLength >= 0);
|
||||||
|
@ -976,7 +975,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
if (datanode.isBlockTokenEnabled) {
|
if (datanode.isBlockTokenEnabled) {
|
||||||
try {
|
try {
|
||||||
datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
|
datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
|
||||||
BlockTokenSecretManager.AccessMode.COPY);
|
BlockTokenIdentifier.AccessMode.COPY);
|
||||||
} catch (InvalidToken e) {
|
} catch (InvalidToken e) {
|
||||||
LOG.warn("Invalid access token in request from " + remoteAddress
|
LOG.warn("Invalid access token in request from " + remoteAddress
|
||||||
+ " for OP_COPY_BLOCK for block " + block + " : "
|
+ " for OP_COPY_BLOCK for block " + block + " : "
|
||||||
|
@ -1064,7 +1063,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
if (datanode.isBlockTokenEnabled) {
|
if (datanode.isBlockTokenEnabled) {
|
||||||
try {
|
try {
|
||||||
datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
|
datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
|
||||||
BlockTokenSecretManager.AccessMode.REPLACE);
|
BlockTokenIdentifier.AccessMode.REPLACE);
|
||||||
} catch (InvalidToken e) {
|
} catch (InvalidToken e) {
|
||||||
LOG.warn("Invalid access token in request from " + remoteAddress
|
LOG.warn("Invalid access token in request from " + remoteAddress
|
||||||
+ " for OP_REPLACE_BLOCK for block " + block + " : "
|
+ " for OP_REPLACE_BLOCK for block " + block + " : "
|
||||||
|
@ -1251,7 +1250,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
final ExtendedBlock blk,
|
final ExtendedBlock blk,
|
||||||
final Token<BlockTokenIdentifier> t,
|
final Token<BlockTokenIdentifier> t,
|
||||||
final Op op,
|
final Op op,
|
||||||
final BlockTokenSecretManager.AccessMode mode) throws IOException {
|
final BlockTokenIdentifier.AccessMode mode) throws IOException {
|
||||||
if (datanode.isBlockTokenEnabled) {
|
if (datanode.isBlockTokenEnabled) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Checking block access token for block '" + blk.getBlockId()
|
LOG.debug("Checking block access token for block '" + blk.getBlockId()
|
||||||
|
@ -1264,7 +1263,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
if (reply) {
|
if (reply) {
|
||||||
BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
|
BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
|
||||||
.setStatus(ERROR_ACCESS_TOKEN);
|
.setStatus(ERROR_ACCESS_TOKEN);
|
||||||
if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
|
if (mode == BlockTokenIdentifier.AccessMode.WRITE) {
|
||||||
DatanodeRegistration dnR =
|
DatanodeRegistration dnR =
|
||||||
datanode.getDNRegistrationForBP(blk.getBlockPoolId());
|
datanode.getDNRegistrationForBP(blk.getBlockPoolId());
|
||||||
// NB: Unconditionally using the xfer addr w/o hostname
|
// NB: Unconditionally using the xfer addr w/o hostname
|
||||||
|
|
|
@ -197,8 +197,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
|
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager.SecretManagerState;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager.SecretManagerState;
|
||||||
|
@ -3288,7 +3287,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
LocatedBlock lBlk = new LocatedBlock(
|
LocatedBlock lBlk = new LocatedBlock(
|
||||||
getExtendedBlock(blk), locs, offset, false);
|
getExtendedBlock(blk), locs, offset, false);
|
||||||
getBlockManager().setBlockToken(
|
getBlockManager().setBlockToken(
|
||||||
lBlk, BlockTokenSecretManager.AccessMode.WRITE);
|
lBlk, BlockTokenIdentifier.AccessMode.WRITE);
|
||||||
return lBlk;
|
return lBlk;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3350,7 +3349,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
src, numAdditionalNodes, clientnode, chosen,
|
src, numAdditionalNodes, clientnode, chosen,
|
||||||
excludes, preferredblocksize, storagePolicyID);
|
excludes, preferredblocksize, storagePolicyID);
|
||||||
final LocatedBlock lb = new LocatedBlock(blk, targets, -1, false);
|
final LocatedBlock lb = new LocatedBlock(blk, targets, -1, false);
|
||||||
blockManager.setBlockToken(lb, AccessMode.COPY);
|
blockManager.setBlockToken(lb, BlockTokenIdentifier.AccessMode.COPY);
|
||||||
return lb;
|
return lb;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6272,7 +6271,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
// get a new generation stamp and an access token
|
// get a new generation stamp and an access token
|
||||||
block.setGenerationStamp(nextGenerationStamp(blockIdManager.isLegacyBlock(block.getLocalBlock())));
|
block.setGenerationStamp(nextGenerationStamp(blockIdManager.isLegacyBlock(block.getLocalBlock())));
|
||||||
locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
|
locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
|
||||||
blockManager.setBlockToken(locatedBlock, AccessMode.WRITE);
|
blockManager.setBlockToken(locatedBlock, BlockTokenIdentifier.AccessMode.WRITE);
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,7 +139,7 @@ public class TestBlockToken {
|
||||||
LOG.info("Got: " + id.toString());
|
LOG.info("Got: " + id.toString());
|
||||||
assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
|
assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
|
||||||
sm.checkAccess(id, null, PBHelper.convert(req.getBlock()),
|
sm.checkAccess(id, null, PBHelper.convert(req.getBlock()),
|
||||||
BlockTokenSecretManager.AccessMode.WRITE);
|
BlockTokenIdentifier.AccessMode.WRITE);
|
||||||
result = id.getBlockId();
|
result = id.getBlockId();
|
||||||
}
|
}
|
||||||
return GetReplicaVisibleLengthResponseProto.newBuilder()
|
return GetReplicaVisibleLengthResponseProto.newBuilder()
|
||||||
|
@ -149,7 +149,7 @@ public class TestBlockToken {
|
||||||
|
|
||||||
private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
|
private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
|
||||||
ExtendedBlock block,
|
ExtendedBlock block,
|
||||||
EnumSet<BlockTokenSecretManager.AccessMode> accessModes)
|
EnumSet<BlockTokenIdentifier.AccessMode> accessModes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes);
|
Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes);
|
||||||
BlockTokenIdentifier id = sm.createIdentifier();
|
BlockTokenIdentifier id = sm.createIdentifier();
|
||||||
|
@ -164,17 +164,17 @@ public class TestBlockToken {
|
||||||
BlockTokenSecretManager sm = new BlockTokenSecretManager(
|
BlockTokenSecretManager sm = new BlockTokenSecretManager(
|
||||||
blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
|
blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
|
||||||
TestWritable.testWritable(generateTokenId(sm, block1,
|
TestWritable.testWritable(generateTokenId(sm, block1,
|
||||||
EnumSet.allOf(BlockTokenSecretManager.AccessMode.class)));
|
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)));
|
||||||
TestWritable.testWritable(generateTokenId(sm, block2,
|
TestWritable.testWritable(generateTokenId(sm, block2,
|
||||||
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
|
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)));
|
||||||
TestWritable.testWritable(generateTokenId(sm, block3,
|
TestWritable.testWritable(generateTokenId(sm, block3,
|
||||||
EnumSet.noneOf(BlockTokenSecretManager.AccessMode.class)));
|
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class)));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void tokenGenerationAndVerification(BlockTokenSecretManager master,
|
private void tokenGenerationAndVerification(BlockTokenSecretManager master,
|
||||||
BlockTokenSecretManager slave) throws Exception {
|
BlockTokenSecretManager slave) throws Exception {
|
||||||
// single-mode tokens
|
// single-mode tokens
|
||||||
for (BlockTokenSecretManager.AccessMode mode : BlockTokenSecretManager.AccessMode
|
for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
|
||||||
.values()) {
|
.values()) {
|
||||||
// generated by master
|
// generated by master
|
||||||
Token<BlockTokenIdentifier> token1 = master.generateToken(block1,
|
Token<BlockTokenIdentifier> token1 = master.generateToken(block1,
|
||||||
|
@ -189,8 +189,8 @@ public class TestBlockToken {
|
||||||
}
|
}
|
||||||
// multi-mode tokens
|
// multi-mode tokens
|
||||||
Token<BlockTokenIdentifier> mtoken = master.generateToken(block3,
|
Token<BlockTokenIdentifier> mtoken = master.generateToken(block3,
|
||||||
EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
|
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
|
||||||
for (BlockTokenSecretManager.AccessMode mode : BlockTokenSecretManager.AccessMode
|
for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
|
||||||
.values()) {
|
.values()) {
|
||||||
master.checkAccess(mtoken, null, block3, mode);
|
master.checkAccess(mtoken, null, block3, mode);
|
||||||
slave.checkAccess(mtoken, null, block3, mode);
|
slave.checkAccess(mtoken, null, block3, mode);
|
||||||
|
@ -246,7 +246,7 @@ public class TestBlockToken {
|
||||||
BlockTokenSecretManager sm = new BlockTokenSecretManager(
|
BlockTokenSecretManager sm = new BlockTokenSecretManager(
|
||||||
blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
|
blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
|
||||||
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
|
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
|
||||||
EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
|
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
|
||||||
|
|
||||||
final Server server = createMockDatanode(sm, token, conf);
|
final Server server = createMockDatanode(sm, token, conf);
|
||||||
|
|
||||||
|
@ -285,7 +285,7 @@ public class TestBlockToken {
|
||||||
BlockTokenSecretManager sm = new BlockTokenSecretManager(
|
BlockTokenSecretManager sm = new BlockTokenSecretManager(
|
||||||
blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
|
blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
|
||||||
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
|
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
|
||||||
EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
|
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
|
||||||
|
|
||||||
final Server server = createMockDatanode(sm, token, conf);
|
final Server server = createMockDatanode(sm, token, conf);
|
||||||
server.start();
|
server.start();
|
||||||
|
|
|
@ -413,21 +413,21 @@ public class TestBlockTokenWithDFS {
|
||||||
tryRead(conf, lblock, false);
|
tryRead(conf, lblock, false);
|
||||||
// use a valid new token
|
// use a valid new token
|
||||||
lblock.setBlockToken(sm.generateToken(lblock.getBlock(),
|
lblock.setBlockToken(sm.generateToken(lblock.getBlock(),
|
||||||
EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
|
EnumSet.of(BlockTokenIdentifier.AccessMode.READ)));
|
||||||
// read should succeed
|
// read should succeed
|
||||||
tryRead(conf, lblock, true);
|
tryRead(conf, lblock, true);
|
||||||
// use a token with wrong blockID
|
// use a token with wrong blockID
|
||||||
ExtendedBlock wrongBlock = new ExtendedBlock(lblock.getBlock()
|
ExtendedBlock wrongBlock = new ExtendedBlock(lblock.getBlock()
|
||||||
.getBlockPoolId(), lblock.getBlock().getBlockId() + 1);
|
.getBlockPoolId(), lblock.getBlock().getBlockId() + 1);
|
||||||
lblock.setBlockToken(sm.generateToken(wrongBlock,
|
lblock.setBlockToken(sm.generateToken(wrongBlock,
|
||||||
EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
|
EnumSet.of(BlockTokenIdentifier.AccessMode.READ)));
|
||||||
// read should fail
|
// read should fail
|
||||||
tryRead(conf, lblock, false);
|
tryRead(conf, lblock, false);
|
||||||
// use a token with wrong access modes
|
// use a token with wrong access modes
|
||||||
lblock.setBlockToken(sm.generateToken(lblock.getBlock(),
|
lblock.setBlockToken(sm.generateToken(lblock.getBlock(),
|
||||||
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE,
|
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE,
|
||||||
BlockTokenSecretManager.AccessMode.COPY,
|
BlockTokenIdentifier.AccessMode.COPY,
|
||||||
BlockTokenSecretManager.AccessMode.REPLACE)));
|
BlockTokenIdentifier.AccessMode.REPLACE)));
|
||||||
// read should fail
|
// read should fail
|
||||||
tryRead(conf, lblock, false);
|
tryRead(conf, lblock, false);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue