HDFS-11956. Do not require a storage ID or target storage IDs when writing a block. Contributed by Ewan Higgs.

This commit is contained in:
Andrew Wang 2017-06-26 11:20:07 -07:00
parent 06c8ca3bb3
commit 2c367b464c
3 changed files with 48 additions and 5 deletions

View File

@ -112,6 +112,7 @@ public class BlockTokenSecretManager extends
* @param blockPoolId block pool ID * @param blockPoolId block pool ID
* @param encryptionAlgorithm encryption algorithm to use * @param encryptionAlgorithm encryption algorithm to use
* @param numNNs number of namenodes possible * @param numNNs number of namenodes possible
* @param useProto should we use new protobuf style tokens
*/ */
public BlockTokenSecretManager(long keyUpdateInterval, public BlockTokenSecretManager(long keyUpdateInterval,
long tokenLifetime, int nnIndex, int numNNs, String blockPoolId, long tokenLifetime, int nnIndex, int numNNs, String blockPoolId,

View File

@ -695,12 +695,19 @@ class DataXceiver extends Receiver implements Runnable {
if (targetStorageTypes.length > 0) { if (targetStorageTypes.length > 0) {
System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst); System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst);
} }
int nsi = targetStorageIds.length;
String[] storageIds = new String[nsi + 1]; // To support older clients, we don't pass in empty storageIds
final int nsi = targetStorageIds.length;
final String[] storageIds;
if (nsi > 0) {
storageIds = new String[nsi + 1];
storageIds[0] = storageId; storageIds[0] = storageId;
if (targetStorageTypes.length > 0) { if (targetStorageTypes.length > 0) {
System.arraycopy(targetStorageIds, 0, storageIds, 1, nsi); System.arraycopy(targetStorageIds, 0, storageIds, 1, nsi);
} }
} else {
storageIds = new String[0];
}
checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK, checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK,
BlockTokenIdentifier.AccessMode.WRITE, BlockTokenIdentifier.AccessMode.WRITE,
storageTypes, storageIds); storageTypes, storageIds);

View File

@ -774,4 +774,39 @@ public class TestBlockToken {
testBlockTokenSerialization(false); testBlockTokenSerialization(false);
testBlockTokenSerialization(true); testBlockTokenSerialization(true);
} }
private void testBadStorageIDCheckAccess(boolean enableProtobuf)
throws IOException {
BlockTokenSecretManager sm = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
enableProtobuf);
StorageType[] storageTypes = new StorageType[] {StorageType.DISK};
String[] storageIds = new String[] {"fake-storage-id"};
String[] badStorageIds = new String[] {"BAD-STORAGE-ID"};
String[] emptyStorageIds = new String[] {};
BlockTokenIdentifier.AccessMode mode = BlockTokenIdentifier.AccessMode.READ;
BlockTokenIdentifier id = generateTokenId(sm, block3,
EnumSet.of(mode), storageTypes, storageIds);
sm.checkAccess(id, null, block3, mode, storageTypes, storageIds);
try {
sm.checkAccess(id, null, block3, mode, storageTypes, badStorageIds);
fail("Expected strict BlockTokenSecretManager to fail");
} catch(SecretManager.InvalidToken e) {
}
// We allow empty storageId tokens for backwards compatibility. i.e. old
// clients may not have known to pass the storageId parameter to the
// writeBlock api.
sm.checkAccess(id, null, block3, mode, storageTypes,
emptyStorageIds);
sm.checkAccess(id, null, block3, mode, storageTypes,
null);
}
@Test
public void testBadStorageIDCheckAccess() throws IOException {
testBadStorageIDCheckAccess(false);
testBadStorageIDCheckAccess(true);
}
} }