HDFS-11026. Convert BlockTokenIdentifier to use Protobuf. Contributed by Ewan Higgs.
This commit is contained in:
parent
646c6d6509
commit
4ed33e9ca3
|
@ -121,9 +121,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmI
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTypeProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenSecretProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CryptoProtocolVersionProto;
|
||||
|
@ -584,6 +586,55 @@ public class PBHelperClient {
|
|||
return blockTokens;
|
||||
}
|
||||
|
||||
public static AccessModeProto convert(BlockTokenIdentifier.AccessMode aMode) {
|
||||
switch (aMode) {
|
||||
case READ: return AccessModeProto.READ;
|
||||
case WRITE: return AccessModeProto.WRITE;
|
||||
case COPY: return AccessModeProto.COPY;
|
||||
case REPLACE: return AccessModeProto.REPLACE;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unexpected AccessMode: " + aMode);
|
||||
}
|
||||
}
|
||||
|
||||
public static BlockTokenIdentifier.AccessMode convert(
|
||||
AccessModeProto accessModeProto) {
|
||||
switch (accessModeProto) {
|
||||
case READ: return BlockTokenIdentifier.AccessMode.READ;
|
||||
case WRITE: return BlockTokenIdentifier.AccessMode.WRITE;
|
||||
case COPY: return BlockTokenIdentifier.AccessMode.COPY;
|
||||
case REPLACE: return BlockTokenIdentifier.AccessMode.REPLACE;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unexpected AccessModeProto: " +
|
||||
accessModeProto);
|
||||
}
|
||||
}
|
||||
|
||||
public static BlockTokenSecretProto convert(
|
||||
BlockTokenIdentifier blockTokenSecret) {
|
||||
BlockTokenSecretProto.Builder builder =
|
||||
BlockTokenSecretProto.newBuilder();
|
||||
builder.setExpiryDate(blockTokenSecret.getExpiryDate());
|
||||
builder.setKeyId(blockTokenSecret.getKeyId());
|
||||
String userId = blockTokenSecret.getUserId();
|
||||
if (userId != null) {
|
||||
builder.setUserId(userId);
|
||||
}
|
||||
|
||||
String blockPoolId = blockTokenSecret.getBlockPoolId();
|
||||
if (blockPoolId != null) {
|
||||
builder.setBlockPoolId(blockPoolId);
|
||||
}
|
||||
|
||||
builder.setBlockId(blockTokenSecret.getBlockId());
|
||||
|
||||
for (BlockTokenIdentifier.AccessMode aMode :
|
||||
blockTokenSecret.getAccessModes()) {
|
||||
builder.addModes(convert(aMode));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
static public DatanodeInfo convert(DatanodeInfoProto di) {
|
||||
if (di == null) {
|
||||
return null;
|
||||
|
|
|
@ -19,11 +19,16 @@
|
|||
package org.apache.hadoop.hdfs.security.token.block;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenSecretProto;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -44,20 +49,22 @@ public class BlockTokenIdentifier extends TokenIdentifier {
|
|||
private String blockPoolId;
|
||||
private long blockId;
|
||||
private final EnumSet<AccessMode> modes;
|
||||
private boolean useProto;
|
||||
|
||||
private byte [] cache;
|
||||
|
||||
public BlockTokenIdentifier() {
|
||||
this(null, null, 0, EnumSet.noneOf(AccessMode.class));
|
||||
this(null, null, 0, EnumSet.noneOf(AccessMode.class), false);
|
||||
}
|
||||
|
||||
public BlockTokenIdentifier(String userId, String bpid, long blockId,
|
||||
EnumSet<AccessMode> modes) {
|
||||
EnumSet<AccessMode> modes, boolean useProto) {
|
||||
this.cache = null;
|
||||
this.userId = userId;
|
||||
this.blockPoolId = bpid;
|
||||
this.blockId = blockId;
|
||||
this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
|
||||
this.useProto = useProto;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -144,9 +151,45 @@ public class BlockTokenIdentifier extends TokenIdentifier {
|
|||
^ (blockPoolId == null ? 0 : blockPoolId.hashCode());
|
||||
}
|
||||
|
||||
/**
|
||||
* readFields peeks at the first byte of the DataInput and determines if it
|
||||
* was written using WritableUtils ("Legacy") or Protobuf. We can do this
|
||||
* because we know the first field is the Expiry date.
|
||||
*
|
||||
* In the case of the legacy buffer, the expiry date is a VInt, so the size
|
||||
* (which should always be >1) is encoded in the first byte - which is
|
||||
* always negative due to this encoding. However, there are sometimes null
|
||||
* BlockTokenIdentifier written so we also need to handle the case there
|
||||
* the first byte is also 0.
|
||||
*
|
||||
* In the case of protobuf, the first byte is a type tag for the expiry date
|
||||
* which is written as <code>(field_number << 3 | wire_type</code>.
|
||||
* So as long as the field_number is less than 16, but also positive, then
|
||||
* we know we have a Protobuf.
|
||||
*
|
||||
* @param in <code>DataInput</code> to deserialize this object from.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.cache = null;
|
||||
|
||||
final DataInputStream dis = (DataInputStream)in;
|
||||
if (!dis.markSupported()) {
|
||||
throw new IOException("Could not peek first byte.");
|
||||
}
|
||||
dis.mark(1);
|
||||
final byte firstByte = dis.readByte();
|
||||
dis.reset();
|
||||
if (firstByte <= 0) {
|
||||
readFieldsLegacy(dis);
|
||||
} else {
|
||||
readFieldsProtobuf(dis);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void readFieldsLegacy(DataInput in) throws IOException {
|
||||
expiryDate = WritableUtils.readVLong(in);
|
||||
keyId = WritableUtils.readVInt(in);
|
||||
userId = WritableUtils.readString(in);
|
||||
|
@ -157,10 +200,44 @@ public class BlockTokenIdentifier extends TokenIdentifier {
|
|||
for (int i = 0; i < length; i++) {
|
||||
modes.add(WritableUtils.readEnum(in, AccessMode.class));
|
||||
}
|
||||
useProto = false;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void readFieldsProtobuf(DataInput in) throws IOException {
|
||||
BlockTokenSecretProto blockTokenSecretProto =
|
||||
BlockTokenSecretProto.parseFrom((DataInputStream)in);
|
||||
expiryDate = blockTokenSecretProto.getExpiryDate();
|
||||
keyId = blockTokenSecretProto.getKeyId();
|
||||
if (blockTokenSecretProto.hasUserId()) {
|
||||
userId = blockTokenSecretProto.getUserId();
|
||||
} else {
|
||||
userId = null;
|
||||
}
|
||||
if (blockTokenSecretProto.hasBlockPoolId()) {
|
||||
blockPoolId = blockTokenSecretProto.getBlockPoolId();
|
||||
} else {
|
||||
blockPoolId = null;
|
||||
}
|
||||
blockId = blockTokenSecretProto.getBlockId();
|
||||
for (int i = 0; i < blockTokenSecretProto.getModesCount(); i++) {
|
||||
AccessModeProto accessModeProto = blockTokenSecretProto.getModes(i);
|
||||
modes.add(PBHelperClient.convert(accessModeProto));
|
||||
}
|
||||
useProto = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
if (useProto) {
|
||||
writeProtobuf(out);
|
||||
} else {
|
||||
writeLegacy(out);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void writeLegacy(DataOutput out) throws IOException {
|
||||
WritableUtils.writeVLong(out, expiryDate);
|
||||
WritableUtils.writeVInt(out, keyId);
|
||||
WritableUtils.writeString(out, userId);
|
||||
|
@ -172,6 +249,12 @@ public class BlockTokenIdentifier extends TokenIdentifier {
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void writeProtobuf(DataOutput out) throws IOException {
|
||||
BlockTokenSecretProto secret = PBHelperClient.convert(this);
|
||||
out.write(secret.toByteArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getBytes() {
|
||||
if(cache == null) cache = super.getBytes();
|
||||
|
@ -186,4 +269,4 @@ public class BlockTokenIdentifier extends TokenIdentifier {
|
|||
return KIND_NAME;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -514,3 +514,36 @@ message RollingUpgradeStatusProto {
|
|||
message StorageUuidsProto {
|
||||
repeated string storageUuids = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* File access permissions mode.
|
||||
*/
|
||||
enum AccessModeProto {
|
||||
READ = 1;
|
||||
WRITE = 2;
|
||||
COPY = 3;
|
||||
REPLACE = 4;
|
||||
}
|
||||
|
||||
/**
|
||||
* Secret information for the BlockKeyProto. This is not sent on the wire as
|
||||
* such but is used to pack a byte array and encrypted and put in
|
||||
* BlockKeyProto.bytes
|
||||
* When adding further fields, make sure they are optional as they would
|
||||
* otherwise not be backwards compatible.
|
||||
*
|
||||
* Note: As part of the migration from WritableUtils based tokens (aka "legacy")
|
||||
* to Protocol Buffers, we use the first byte to determine the type. If the
|
||||
* first byte is <=0 then it is a legacy token. This means that when using
|
||||
* protobuf tokens, the the first field sent must have a `field_number` less
|
||||
* than 16 to make sure that the first byte is positive. Otherwise it could be
|
||||
* parsed as a legacy token. See HDFS-11026 for more discussion.
|
||||
*/
|
||||
message BlockTokenSecretProto {
|
||||
optional uint64 expiryDate = 1;
|
||||
optional uint32 keyId = 2;
|
||||
optional string userId = 3;
|
||||
optional string blockPoolId = 4;
|
||||
optional uint64 blockId = 5;
|
||||
repeated AccessModeProto modes = 6;
|
||||
}
|
||||
|
|
|
@ -641,6 +641,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final long DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT = 600L;
|
||||
public static final String DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY = "dfs.block.access.token.lifetime";
|
||||
public static final long DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT = 600L;
|
||||
public static final String DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE = "dfs.block.access.token.protobuf.enable";
|
||||
public static final boolean DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT = false;
|
||||
|
||||
public static final String DFS_BLOCK_REPLICATOR_CLASSNAME_KEY = "dfs.block.replicator.classname";
|
||||
public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class;
|
||||
|
|
|
@ -75,6 +75,7 @@ public class BlockTokenSecretManager extends
|
|||
|
||||
private final int intRange;
|
||||
private final int nnRangeStart;
|
||||
private final boolean useProto;
|
||||
|
||||
private final SecureRandom nonceGenerator = new SecureRandom();
|
||||
|
||||
|
@ -83,11 +84,13 @@ public class BlockTokenSecretManager extends
|
|||
*
|
||||
* @param keyUpdateInterval how often a new key will be generated
|
||||
* @param tokenLifetime how long an individual token is valid
|
||||
* @param useProto should we use new protobuf style tokens
|
||||
*/
|
||||
public BlockTokenSecretManager(long keyUpdateInterval,
|
||||
long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
|
||||
long tokenLifetime, String blockPoolId, String encryptionAlgorithm,
|
||||
boolean useProto) {
|
||||
this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
|
||||
encryptionAlgorithm, 0, 1);
|
||||
encryptionAlgorithm, 0, 1, useProto);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -102,8 +105,9 @@ public class BlockTokenSecretManager extends
|
|||
*/
|
||||
public BlockTokenSecretManager(long keyUpdateInterval,
|
||||
long tokenLifetime, int nnIndex, int numNNs, String blockPoolId,
|
||||
String encryptionAlgorithm) {
|
||||
this(true, keyUpdateInterval, tokenLifetime, blockPoolId, encryptionAlgorithm, nnIndex, numNNs);
|
||||
String encryptionAlgorithm, boolean useProto) {
|
||||
this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
|
||||
encryptionAlgorithm, nnIndex, numNNs, useProto);
|
||||
Preconditions.checkArgument(nnIndex >= 0);
|
||||
Preconditions.checkArgument(numNNs > 0);
|
||||
setSerialNo(new SecureRandom().nextInt());
|
||||
|
@ -111,7 +115,8 @@ public class BlockTokenSecretManager extends
|
|||
}
|
||||
|
||||
private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
|
||||
long tokenLifetime, String blockPoolId, String encryptionAlgorithm, int nnIndex, int numNNs) {
|
||||
long tokenLifetime, String blockPoolId, String encryptionAlgorithm,
|
||||
int nnIndex, int numNNs, boolean useProto) {
|
||||
this.intRange = Integer.MAX_VALUE / numNNs;
|
||||
this.nnRangeStart = intRange * nnIndex;
|
||||
this.isMaster = isMaster;
|
||||
|
@ -120,6 +125,7 @@ public class BlockTokenSecretManager extends
|
|||
this.allKeys = new HashMap<Integer, BlockKey>();
|
||||
this.blockPoolId = blockPoolId;
|
||||
this.encryptionAlgorithm = encryptionAlgorithm;
|
||||
this.useProto = useProto;
|
||||
generateKeys();
|
||||
}
|
||||
|
||||
|
@ -246,7 +252,7 @@ public class BlockTokenSecretManager extends
|
|||
public Token<BlockTokenIdentifier> generateToken(String userId,
|
||||
ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes) throws IOException {
|
||||
BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
|
||||
.getBlockPoolId(), block.getBlockId(), modes);
|
||||
.getBlockPoolId(), block.getBlockId(), modes, useProto);
|
||||
return new Token<BlockTokenIdentifier>(id, this);
|
||||
}
|
||||
|
||||
|
|
|
@ -69,8 +69,12 @@ public class KeyManager implements Closeable, DataEncryptionKeyFactory {
|
|||
+ ", token lifetime=" + StringUtils.formatTime(tokenLifetime));
|
||||
String encryptionAlgorithm = conf.get(
|
||||
DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
|
||||
final boolean enableProtobuf = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT);
|
||||
this.blockTokenSecretManager = new BlockTokenSecretManager(
|
||||
updateInterval, tokenLifetime, blockpoolID, encryptionAlgorithm);
|
||||
updateInterval, tokenLifetime, blockpoolID, encryptionAlgorithm,
|
||||
enableProtobuf);
|
||||
this.blockTokenSecretManager.addKeys(keys);
|
||||
|
||||
// sync block keys with NN more frequently than NN updates its block keys
|
||||
|
|
|
@ -542,6 +542,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
|
||||
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||
boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
|
||||
boolean shouldWriteProtobufToken = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT);
|
||||
|
||||
if (isHaEnabled) {
|
||||
// figure out which index we are of the nns
|
||||
|
@ -555,10 +558,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
nnIndex++;
|
||||
}
|
||||
return new BlockTokenSecretManager(updateMin * 60 * 1000L,
|
||||
lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null, encryptionAlgorithm);
|
||||
lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null,
|
||||
encryptionAlgorithm, shouldWriteProtobufToken);
|
||||
} else {
|
||||
return new BlockTokenSecretManager(updateMin*60*1000L,
|
||||
lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm);
|
||||
lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm,
|
||||
shouldWriteProtobufToken);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1534,9 +1534,12 @@ public class DataNode extends ReconfigurableBase
|
|||
+ blockKeyUpdateInterval / (60 * 1000)
|
||||
+ " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
|
||||
+ " min(s)");
|
||||
final boolean enableProtobuf = getConf().getBoolean(
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
|
||||
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE_DEFAULT);
|
||||
final BlockTokenSecretManager secretMgr =
|
||||
new BlockTokenSecretManager(0, blockTokenLifetime, blockPoolId,
|
||||
dnConf.encryptionAlgorithm);
|
||||
dnConf.encryptionAlgorithm, enableProtobuf);
|
||||
blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -502,6 +502,15 @@
|
|||
<description>The lifetime of access tokens in minutes.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.block.access.token.protobuf.enable</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
If "true", block tokens are written using Protocol Buffers.
|
||||
If "false", block tokens are written using Legacy format.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.data.dir</name>
|
||||
<value>file://${hadoop.tmp.dir}/dfs/data</value>
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.security.token.block;
|
|||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
|
@ -31,7 +32,10 @@ import java.io.DataInputStream;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Arrays;
|
||||
import java.util.Calendar;
|
||||
import java.util.EnumSet;
|
||||
import java.util.GregorianCalendar;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -57,6 +61,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRep
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.TestWritable;
|
||||
import org.apache.hadoop.ipc.Client;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
|
@ -104,7 +110,7 @@ public class TestBlockToken {
|
|||
final ExtendedBlock block1 = new ExtendedBlock("0", 0L);
|
||||
final ExtendedBlock block2 = new ExtendedBlock("10", 10L);
|
||||
final ExtendedBlock block3 = new ExtendedBlock("-10", -108L);
|
||||
|
||||
|
||||
@Before
|
||||
public void disableKerberos() {
|
||||
Configuration conf = new Configuration();
|
||||
|
@ -128,7 +134,7 @@ public class TestBlockToken {
|
|||
InvocationOnMock invocation) throws IOException {
|
||||
Object args[] = invocation.getArguments();
|
||||
assertEquals(2, args.length);
|
||||
GetReplicaVisibleLengthRequestProto req =
|
||||
GetReplicaVisibleLengthRequestProto req =
|
||||
(GetReplicaVisibleLengthRequestProto) args[1];
|
||||
Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
|
||||
.getTokenIdentifiers();
|
||||
|
@ -158,11 +164,11 @@ public class TestBlockToken {
|
|||
return id;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWritable() throws Exception {
|
||||
private void testWritable(boolean enableProtobuf) throws Exception {
|
||||
TestWritable.testWritable(new BlockTokenIdentifier());
|
||||
BlockTokenSecretManager sm = new BlockTokenSecretManager(
|
||||
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
|
||||
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
|
||||
enableProtobuf);
|
||||
TestWritable.testWritable(generateTokenId(sm, block1,
|
||||
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)));
|
||||
TestWritable.testWritable(generateTokenId(sm, block2,
|
||||
|
@ -171,6 +177,16 @@ public class TestBlockToken {
|
|||
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWritableLegacy() throws Exception {
|
||||
testWritable(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWritableProtobuf() throws Exception {
|
||||
testWritable(true);
|
||||
}
|
||||
|
||||
private void tokenGenerationAndVerification(BlockTokenSecretManager master,
|
||||
BlockTokenSecretManager slave) throws Exception {
|
||||
// single-mode tokens
|
||||
|
@ -198,12 +214,14 @@ public class TestBlockToken {
|
|||
}
|
||||
|
||||
/** test block key and token handling */
|
||||
@Test
|
||||
public void testBlockTokenSecretManager() throws Exception {
|
||||
private void testBlockTokenSecretManager(boolean enableProtobuf)
|
||||
throws Exception {
|
||||
BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
|
||||
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
|
||||
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
|
||||
enableProtobuf);
|
||||
BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
|
||||
blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
|
||||
blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null,
|
||||
enableProtobuf);
|
||||
ExportedBlockKeys keys = masterHandler.exportKeys();
|
||||
slaveHandler.addKeys(keys);
|
||||
tokenGenerationAndVerification(masterHandler, slaveHandler);
|
||||
|
@ -215,6 +233,16 @@ public class TestBlockToken {
|
|||
tokenGenerationAndVerification(masterHandler, slaveHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockTokenSecretManagerLegacy() throws Exception {
|
||||
testBlockTokenSecretManager(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockTokenSecretManagerProtobuf() throws Exception {
|
||||
testBlockTokenSecretManager(true);
|
||||
}
|
||||
|
||||
private static Server createMockDatanode(BlockTokenSecretManager sm,
|
||||
Token<BlockTokenIdentifier> token, Configuration conf)
|
||||
throws IOException, ServiceException {
|
||||
|
@ -223,7 +251,7 @@ public class TestBlockToken {
|
|||
BlockTokenIdentifier id = sm.createIdentifier();
|
||||
id.readFields(new DataInputStream(new ByteArrayInputStream(token
|
||||
.getIdentifier())));
|
||||
|
||||
|
||||
doAnswer(new GetLengthAnswer(sm, id)).when(mockDN)
|
||||
.getReplicaVisibleLength(any(RpcController.class),
|
||||
any(GetReplicaVisibleLengthRequestProto.class));
|
||||
|
@ -237,14 +265,14 @@ public class TestBlockToken {
|
|||
.setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockTokenRpc() throws Exception {
|
||||
private void testBlockTokenRpc(boolean enableProtobuf) throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
|
||||
BlockTokenSecretManager sm = new BlockTokenSecretManager(
|
||||
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
|
||||
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
|
||||
enableProtobuf);
|
||||
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
|
||||
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
|
||||
|
||||
|
@ -270,20 +298,30 @@ public class TestBlockToken {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockTokenRpcLegacy() throws Exception {
|
||||
testBlockTokenRpc(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockTokenRpcProtobuf() throws Exception {
|
||||
testBlockTokenRpc(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that fast repeated invocations of createClientDatanodeProtocolProxy
|
||||
* will not end up using up thousands of sockets. This is a regression test
|
||||
* for HDFS-1965.
|
||||
*/
|
||||
@Test
|
||||
public void testBlockTokenRpcLeak() throws Exception {
|
||||
private void testBlockTokenRpcLeak(boolean enableProtobuf) throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
|
||||
Assume.assumeTrue(FD_DIR.exists());
|
||||
BlockTokenSecretManager sm = new BlockTokenSecretManager(
|
||||
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
|
||||
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
|
||||
enableProtobuf);
|
||||
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
|
||||
EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
|
||||
|
||||
|
@ -334,6 +372,16 @@ public class TestBlockToken {
|
|||
RPC.stopProxy(proxyToNoWhere);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockTokenRpcLeakLegacy() throws Exception {
|
||||
testBlockTokenRpcLeak(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockTokenRpcLeakProtobuf() throws Exception {
|
||||
testBlockTokenRpcLeak(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the current number of file descriptors open by this process.
|
||||
*/
|
||||
|
@ -344,17 +392,19 @@ public class TestBlockToken {
|
|||
/**
|
||||
* Test {@link BlockPoolTokenSecretManager}
|
||||
*/
|
||||
@Test
|
||||
public void testBlockPoolTokenSecretManager() throws Exception {
|
||||
private void testBlockPoolTokenSecretManager(boolean enableProtobuf)
|
||||
throws Exception {
|
||||
BlockPoolTokenSecretManager bpMgr = new BlockPoolTokenSecretManager();
|
||||
|
||||
// Test BlockPoolSecretManager with upto 10 block pools
|
||||
for (int i = 0; i < 10; i++) {
|
||||
String bpid = Integer.toString(i);
|
||||
BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
|
||||
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
|
||||
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
|
||||
enableProtobuf);
|
||||
BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
|
||||
blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
|
||||
blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null,
|
||||
enableProtobuf);
|
||||
bpMgr.addBlockPool(bpid, slaveHandler);
|
||||
|
||||
ExportedBlockKeys keys = masterHandler.exportKeys();
|
||||
|
@ -370,20 +420,31 @@ public class TestBlockToken {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockPoolTokenSecretManagerLegacy() throws Exception {
|
||||
testBlockPoolTokenSecretManager(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockPoolTokenSecretManagerProtobuf() throws Exception {
|
||||
testBlockPoolTokenSecretManager(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test writes a file and gets the block locations without closing the
|
||||
* file, and tests the block token in the last block. Block token is verified
|
||||
* by ensuring it is of correct kind.
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test
|
||||
public void testBlockTokenInLastLocatedBlock() throws IOException,
|
||||
InterruptedException {
|
||||
private void testBlockTokenInLastLocatedBlock(boolean enableProtobuf)
|
||||
throws IOException, InterruptedException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
|
||||
enableProtobuf);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
|
@ -411,4 +472,188 @@ public class TestBlockToken {
|
|||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockTokenInLastLocatedBlockLegacy() throws IOException,
|
||||
InterruptedException {
|
||||
testBlockTokenInLastLocatedBlock(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockTokenInLastLocatedBlockProtobuf() throws IOException,
|
||||
InterruptedException {
|
||||
testBlockTokenInLastLocatedBlock(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLegacyBlockTokenBytesIsLegacy() throws IOException {
|
||||
final boolean useProto = false;
|
||||
BlockTokenSecretManager sm = new BlockTokenSecretManager(
|
||||
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
|
||||
useProto);
|
||||
Token<BlockTokenIdentifier> token = sm.generateToken(block1,
|
||||
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class));
|
||||
final byte[] tokenBytes = token.getIdentifier();
|
||||
BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
|
||||
BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
|
||||
BlockTokenIdentifier readToken = new BlockTokenIdentifier();
|
||||
|
||||
DataInputBuffer dib = new DataInputBuffer();
|
||||
|
||||
dib.reset(tokenBytes, tokenBytes.length);
|
||||
legacyToken.readFieldsLegacy(dib);
|
||||
|
||||
boolean invalidProtobufMessage = false;
|
||||
try {
|
||||
dib.reset(tokenBytes, tokenBytes.length);
|
||||
protobufToken.readFieldsProtobuf(dib);
|
||||
} catch (IOException e) {
|
||||
invalidProtobufMessage = true;
|
||||
}
|
||||
assertTrue(invalidProtobufMessage);
|
||||
|
||||
dib.reset(tokenBytes, tokenBytes.length);
|
||||
readToken.readFields(dib);
|
||||
|
||||
// Using legacy, the token parses as a legacy block token and not a protobuf
|
||||
assertEquals(legacyToken, readToken);
|
||||
assertNotEquals(protobufToken, readToken);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyLegacyBlockTokenBytesIsLegacy() throws IOException {
|
||||
BlockTokenIdentifier emptyIdent = new BlockTokenIdentifier();
|
||||
DataOutputBuffer dob = new DataOutputBuffer(4096);
|
||||
DataInputBuffer dib = new DataInputBuffer();
|
||||
|
||||
emptyIdent.writeLegacy(dob);
|
||||
byte[] emptyIdentBytes = Arrays.copyOf(dob.getData(), dob.getLength());
|
||||
|
||||
BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
|
||||
BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
|
||||
BlockTokenIdentifier readToken = new BlockTokenIdentifier();
|
||||
|
||||
dib.reset(emptyIdentBytes, emptyIdentBytes.length);
|
||||
legacyToken.readFieldsLegacy(dib);
|
||||
|
||||
boolean invalidProtobufMessage = false;
|
||||
try {
|
||||
dib.reset(emptyIdentBytes, emptyIdentBytes.length);
|
||||
protobufToken.readFieldsProtobuf(dib);
|
||||
} catch (IOException e) {
|
||||
invalidProtobufMessage = true;
|
||||
}
|
||||
assertTrue(invalidProtobufMessage);
|
||||
|
||||
dib.reset(emptyIdentBytes, emptyIdentBytes.length);
|
||||
readToken.readFields(dib);
|
||||
assertTrue(invalidProtobufMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProtobufBlockTokenBytesIsProtobuf() throws IOException {
|
||||
final boolean useProto = true;
|
||||
BlockTokenSecretManager sm = new BlockTokenSecretManager(
|
||||
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
|
||||
useProto);
|
||||
Token<BlockTokenIdentifier> token = sm.generateToken(block1,
|
||||
EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class));
|
||||
final byte[] tokenBytes = token.getIdentifier();
|
||||
BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
|
||||
BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
|
||||
BlockTokenIdentifier readToken = new BlockTokenIdentifier();
|
||||
|
||||
DataInputBuffer dib = new DataInputBuffer();
|
||||
|
||||
/* We receive NegativeArraySizeException because we didn't call
|
||||
* readFields and instead try to parse this directly as a legacy
|
||||
* BlockTokenIdentifier.
|
||||
*
|
||||
* Note: because the parsing depends on the expiryDate which is based on
|
||||
* `Time.now()` it can sometimes fail with IOException and sometimes with
|
||||
* NegativeArraySizeException.
|
||||
*/
|
||||
boolean invalidLegacyMessage = false;
|
||||
try {
|
||||
dib.reset(tokenBytes, tokenBytes.length);
|
||||
legacyToken.readFieldsLegacy(dib);
|
||||
} catch (IOException | NegativeArraySizeException e) {
|
||||
invalidLegacyMessage = true;
|
||||
}
|
||||
assertTrue(invalidLegacyMessage);
|
||||
|
||||
dib.reset(tokenBytes, tokenBytes.length);
|
||||
protobufToken.readFieldsProtobuf(dib);
|
||||
|
||||
dib.reset(tokenBytes, tokenBytes.length);
|
||||
readToken.readFields(dib);
|
||||
|
||||
// Using protobuf, the token parses as a protobuf and not a legacy block
|
||||
// token
|
||||
assertNotEquals(legacyToken, readToken);
|
||||
assertEquals(protobufToken, readToken);
|
||||
}
|
||||
|
||||
public void testCraftedProtobufBlockTokenIdentifier(
|
||||
BlockTokenIdentifier identifier, boolean expectIOE,
|
||||
boolean expectRTE) throws IOException {
|
||||
DataOutputBuffer dob = new DataOutputBuffer(4096);
|
||||
DataInputBuffer dib = new DataInputBuffer();
|
||||
|
||||
identifier.writeProtobuf(dob);
|
||||
byte[] identBytes = Arrays.copyOf(dob.getData(), dob.getLength());
|
||||
|
||||
BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
|
||||
BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
|
||||
BlockTokenIdentifier readToken = new BlockTokenIdentifier();
|
||||
|
||||
boolean invalidLegacyMessage = false;
|
||||
try {
|
||||
dib.reset(identBytes, identBytes.length);
|
||||
legacyToken.readFieldsLegacy(dib);
|
||||
} catch (IOException e) {
|
||||
if (!expectIOE) {
|
||||
fail("Received IOException but it was not expected.");
|
||||
}
|
||||
invalidLegacyMessage = true;
|
||||
} catch (RuntimeException e) {
|
||||
if (!expectRTE) {
|
||||
fail("Received RuntimeException but it was not expected.");
|
||||
}
|
||||
invalidLegacyMessage = true;
|
||||
}
|
||||
|
||||
assertTrue(invalidLegacyMessage);
|
||||
|
||||
dib.reset(identBytes, identBytes.length);
|
||||
protobufToken.readFieldsProtobuf(dib);
|
||||
|
||||
dib.reset(identBytes, identBytes.length);
|
||||
readToken.readFieldsProtobuf(dib);
|
||||
assertEquals(protobufToken, readToken);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCraftedProtobufBlockTokenBytesIsProtobuf() throws
|
||||
IOException {
|
||||
// Empty BlockTokenIdentifiers throw IOException
|
||||
BlockTokenIdentifier identifier = new BlockTokenIdentifier();
|
||||
testCraftedProtobufBlockTokenIdentifier(identifier, true, false);
|
||||
|
||||
/* Parsing BlockTokenIdentifier with expiryDate
|
||||
* 2017-02-09 00:12:35,072+0100 will throw IOException.
|
||||
* However, expiryDate of
|
||||
* 2017-02-09 00:12:35,071+0100 will throw NegativeArraySizeException.
|
||||
*/
|
||||
Calendar cal = new GregorianCalendar();
|
||||
cal.set(2017, 1, 9, 0, 12, 35);
|
||||
long datetime = cal.getTimeInMillis();
|
||||
datetime = ((datetime / 1000) * 1000); // strip milliseconds.
|
||||
datetime = datetime + 71; // 2017-02-09 00:12:35,071+0100
|
||||
identifier.setExpiryDate(datetime);
|
||||
testCraftedProtobufBlockTokenIdentifier(identifier, false, true);
|
||||
datetime += 1; // 2017-02-09 00:12:35,072+0100
|
||||
identifier.setExpiryDate(datetime);
|
||||
testCraftedProtobufBlockTokenIdentifier(identifier, true, false);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue