diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 653e9034d6e..6303754d170 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -140,6 +140,7 @@ import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.net.InetAddresses; @@ -871,6 +872,16 @@ public class DFSClient implements java.io.Closeable { public short getDefaultReplication() { return dfsClientConf.defaultReplication; } + + /* + * This is just a wrapper around callGetBlockLocations, but non-static so that + * we can stub it out for tests. + */ + @VisibleForTesting + public LocatedBlocks getLocatedBlocks(String src, long start, long length) + throws IOException { + return callGetBlockLocations(namenode, src, start, length); + } /** * @see ClientProtocol#getBlockLocations(String, long, long) @@ -918,7 +929,7 @@ public class DFSClient implements java.io.Closeable { */ public BlockLocation[] getBlockLocations(String src, long start, long length) throws IOException, UnresolvedLinkException { - LocatedBlocks blocks = callGetBlockLocations(namenode, src, start, length); + LocatedBlocks blocks = getLocatedBlocks(src, start, length); return DFSUtil.locatedBlocks2Locations(blocks); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 5888365d2b9..a8d069f77e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -151,7 +151,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable } private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException { - LocatedBlocks newInfo = DFSClient.callGetBlockLocations(dfsClient.namenode, src, 0, prefetchSize); + LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0, prefetchSize); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("newInfo = " + newInfo); } @@ -298,7 +298,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); // fetch more blocks LocatedBlocks newBlocks; - newBlocks = DFSClient.callGetBlockLocations(dfsClient.namenode, src, offset, prefetchSize); + newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize); assert (newBlocks != null) : "Could not find target position " + offset; locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); } @@ -322,7 +322,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable } // fetch blocks LocatedBlocks newBlocks; - newBlocks = DFSClient.callGetBlockLocations(dfsClient.namenode, src, offset, prefetchSize); + newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize); if (newBlocks == null) { throw new IOException("Could not find target position " + offset); } @@ -391,7 +391,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable blk = locatedBlocks.get(blockIdx); if (blk == null || curOff < blk.getStartOffset()) { LocatedBlocks newBlocks; - newBlocks = DFSClient.callGetBlockLocations(dfsClient.namenode, src, curOff, remaining); + newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining); locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks()); continue; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index 77f0597bb27..9674b6d6f7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -127,25 +127,21 @@ public class HAUtil { } return null; } - + /** - * Given the configuration for this node, return a Configuration object for - * the other node in an HA setup. + * Get the NN ID of the other node in an HA setup. * - * @param myConf the configuration of this node - * @return the configuration of the other node in an HA setup + * @param conf the configuration of this node + * @return the NN ID of the other node in this nameservice */ - public static Configuration getConfForOtherNode( - Configuration myConf) { - - String nsId = DFSUtil.getNamenodeNameServiceId(myConf); + public static String getNameNodeIdOfOtherNode(Configuration conf, String nsId) { Preconditions.checkArgument(nsId != null, "Could not determine namespace id. Please ensure that this " + "machine is one of the machines listed as a NN RPC address, " + "or configure " + DFSConfigKeys.DFS_NAMESERVICE_ID); - Collection nnIds = DFSUtil.getNameNodeIds(myConf, nsId); - String myNNId = myConf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY); + Collection nnIds = DFSUtil.getNameNodeIds(conf, nsId); + String myNNId = conf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY); Preconditions.checkArgument(nnIds != null, "Could not determine namenode ids in namespace '%s'. " + "Please configure " + @@ -165,11 +161,25 @@ public class HAUtil { ArrayList nnSet = Lists.newArrayList(nnIds); nnSet.remove(myNNId); assert nnSet.size() == 1; - String activeNN = nnSet.get(0); + return nnSet.get(0); + } + + /** + * Given the configuration for this node, return a Configuration object for + * the other node in an HA setup. + * + * @param myConf the configuration of this node + * @return the configuration of the other node in an HA setup + */ + public static Configuration getConfForOtherNode( + Configuration myConf) { + + String nsId = DFSUtil.getNamenodeNameServiceId(myConf); + String otherNn = getNameNodeIdOfOtherNode(myConf, nsId); // Look up the address of the active NN. Configuration confForOtherNode = new Configuration(myConf); - NameNode.initializeGenericKeys(confForOtherNode, nsId, activeNN); + NameNode.initializeGenericKeys(confForOtherNode, nsId, otherNn); return confForOtherNode; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java index 05fba79fe63..b65d073f60b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.Acces import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; +import com.google.common.annotations.VisibleForTesting; + /** * Manages a {@link BlockTokenSecretManager} per block pool. Routes the requests * given a block pool Id to corresponding {@link BlockTokenSecretManager} @@ -96,11 +98,11 @@ public class BlockPoolTokenSecretManager extends } /** - * See {@link BlockTokenSecretManager#setKeys(ExportedBlockKeys)} + * See {@link BlockTokenSecretManager#addKeys(ExportedBlockKeys)} */ - public void setKeys(String bpid, ExportedBlockKeys exportedKeys) + public void addKeys(String bpid, ExportedBlockKeys exportedKeys) throws IOException { - get(bpid).setKeys(exportedKeys); + get(bpid).addKeys(exportedKeys); } /** @@ -110,4 +112,11 @@ public class BlockPoolTokenSecretManager extends EnumSet of) throws IOException { return get(b.getBlockPoolId()).generateToken(b, of); } + + @VisibleForTesting + public void clearAllKeysForTesting() { + for (BlockTokenSecretManager btsm : map.values()) { + btsm.clearAllKeysForTesting(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java index e9fa5785ba2..832b0e75141 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java @@ -37,6 +37,9 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + /** * BlockTokenSecretManager can be instantiated in 2 modes, master mode and slave * mode. Master can generate new block keys and export block keys to slaves, @@ -49,17 +52,24 @@ public class BlockTokenSecretManager extends SecretManager { public static final Log LOG = LogFactory .getLog(BlockTokenSecretManager.class); + + // We use these in an HA setup to ensure that the pair of NNs produce block + // token serial numbers that are in different ranges. + private static final int LOW_MASK = ~(1 << 31); + public static final Token DUMMY_TOKEN = new Token(); private final boolean isMaster; + private int nnIndex; + /** * keyUpdateInterval is the interval that NN updates its block keys. It should * be set long enough so that all live DN's and Balancer should have sync'ed * their block keys with NN at least once during each interval. */ - private final long keyUpdateInterval; + private long keyUpdateInterval; private volatile long tokenLifetime; - private int serialNo = new SecureRandom().nextInt(); + private int serialNo; private BlockKey currentKey; private BlockKey nextKey; private Map allKeys; @@ -67,22 +77,47 @@ public class BlockTokenSecretManager extends public static enum AccessMode { READ, WRITE, COPY, REPLACE }; - + /** - * Constructor + * Constructor for slaves. * - * @param isMaster - * @param keyUpdateInterval - * @param tokenLifetime - * @throws IOException + * @param keyUpdateInterval how often a new key will be generated + * @param tokenLifetime how long an individual token is valid */ - public BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval, - long tokenLifetime) throws IOException { + public BlockTokenSecretManager(long keyUpdateInterval, + long tokenLifetime) { + this(false, keyUpdateInterval, tokenLifetime); + } + + /** + * Constructor for masters. + * + * @param keyUpdateInterval how often a new key will be generated + * @param tokenLifetime how long an individual token is valid + * @param isHaEnabled whether or not HA is enabled + * @param thisNnId the NN ID of this NN in an HA setup + * @param otherNnId the NN ID of the other NN in an HA setup + */ + public BlockTokenSecretManager(long keyUpdateInterval, + long tokenLifetime, int nnIndex) { + this(true, keyUpdateInterval, tokenLifetime); + Preconditions.checkArgument(nnIndex == 0 || nnIndex == 1); + this.nnIndex = nnIndex; + setSerialNo(new SecureRandom().nextInt()); + generateKeys(); + } + + private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval, + long tokenLifetime) { this.isMaster = isMaster; this.keyUpdateInterval = keyUpdateInterval; this.tokenLifetime = tokenLifetime; this.allKeys = new HashMap(); - generateKeys(); + } + + @VisibleForTesting + public void setSerialNo(int serialNo) { + this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31); } /** Initialize block keys */ @@ -101,10 +136,10 @@ public class BlockTokenSecretManager extends * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval * more. */ - serialNo++; + setSerialNo(serialNo + 1); currentKey = new BlockKey(serialNo, System.currentTimeMillis() + 2 * keyUpdateInterval + tokenLifetime, generateSecret()); - serialNo++; + setSerialNo(serialNo + 1); nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3 * keyUpdateInterval + tokenLifetime, generateSecret()); allKeys.put(currentKey.getKeyId(), currentKey); @@ -135,7 +170,7 @@ public class BlockTokenSecretManager extends /** * Set block keys, only to be used in slave mode */ - public synchronized void setKeys(ExportedBlockKeys exportedKeys) + public synchronized void addKeys(ExportedBlockKeys exportedKeys) throws IOException { if (isMaster || exportedKeys == null) return; @@ -179,7 +214,7 @@ public class BlockTokenSecretManager extends + 2 * keyUpdateInterval + tokenLifetime, nextKey.getKey()); allKeys.put(currentKey.getKeyId(), currentKey); // generate a new nextKey - serialNo++; + setSerialNo(serialNo + 1); nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3 * keyUpdateInterval + tokenLifetime, generateSecret()); allKeys.put(nextKey.getKeyId(), nextKey); @@ -334,4 +369,20 @@ public class BlockTokenSecretManager extends } return createPassword(identifier.getBytes(), key.getKey()); } + + @VisibleForTesting + public void setKeyUpdateIntervalForTesting(long millis) { + this.keyUpdateInterval = millis; + } + + @VisibleForTesting + public void clearAllKeysForTesting() { + allKeys.clear(); + } + + @VisibleForTesting + public int getSerialNoForTesting() { + return serialNo; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 352b77ba4ac..e154b1cb85f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -88,9 +88,9 @@ class NameNodeConnector { LOG.info("Block token params received from NN: keyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000) + " min(s)"); - this.blockTokenSecretManager = new BlockTokenSecretManager(false, + this.blockTokenSecretManager = new BlockTokenSecretManager( blockKeyUpdateInterval, blockTokenLifetime); - this.blockTokenSecretManager.setKeys(keys); + this.blockTokenSecretManager.addKeys(keys); /* * Balancer should sync its block keys with NN more frequently than NN * updates its block keys @@ -193,7 +193,7 @@ class NameNodeConnector { try { while (shouldRun) { try { - blockTokenSecretManager.setKeys(namenode.getBlockKeys()); + blockTokenSecretManager.addKeys(namenode.getBlockKeys()); } catch (IOException e) { LOG.error("Failed to set keys", e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 068537bd3d1..002ec503ca8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator; @@ -302,12 +303,24 @@ public class BlockManager { + "=" + updateMin + " min(s), " + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY + "=" + lifetimeMin + " min(s)"); - return new BlockTokenSecretManager(true, - updateMin*60*1000L, lifetimeMin*60*1000L); + + String nsId = DFSUtil.getNamenodeNameServiceId(conf); + boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId); + + if (isHaEnabled) { + String thisNnId = HAUtil.getNameNodeId(conf, nsId); + String otherNnId = HAUtil.getNameNodeIdOfOtherNode(conf, nsId); + return new BlockTokenSecretManager(updateMin*60*1000L, + lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1); + } else { + return new BlockTokenSecretManager(updateMin*60*1000L, + lifetimeMin*60*1000L, 0); + } } /** get the BlockTokenSecretManager */ - BlockTokenSecretManager getBlockTokenSecretManager() { + @VisibleForTesting + public BlockTokenSecretManager getBlockTokenSecretManager() { return blockTokenSecretManager; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 41f8c657614..69efdd0a3bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -334,6 +334,11 @@ class BPOfferService { } dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); + // Add the initial block token secret keys to the DN's secret manager. + if (dn.isBlockTokenEnabled) { + dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), + reg.getExportedKeys()); + } } /** @@ -598,7 +603,7 @@ class BPOfferService { case DatanodeProtocol.DNA_ACCESSKEYUPDATE: LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE"); if (dn.isBlockTokenEnabled) { - dn.blockPoolTokenSecretManager.setKeys( + dn.blockPoolTokenSecretManager.addKeys( getBlockPoolId(), ((KeyUpdateCommand) cmd).getExportedKeys()); } @@ -626,17 +631,24 @@ class BPOfferService { switch(cmd.getAction()) { case DatanodeProtocol.DNA_REGISTER: // namenode requested a registration - at start or if NN lost contact - LOG.info("DatanodeCommand action: DNA_REGISTER"); + LOG.info("DatanodeCommand action from standby: DNA_REGISTER"); actor.reRegister(); - return true; + break; + case DatanodeProtocol.DNA_ACCESSKEYUPDATE: + LOG.info("DatanodeCommand action from standby: DNA_ACCESSKEYUPDATE"); + if (dn.isBlockTokenEnabled) { + dn.blockPoolTokenSecretManager.addKeys( + getBlockPoolId(), + ((KeyUpdateCommand) cmd).getExportedKeys()); + } + break; case DatanodeProtocol.DNA_TRANSFER: case DatanodeProtocol.DNA_INVALIDATE: case DatanodeProtocol.DNA_SHUTDOWN: case DatanodeProtocol.DNA_RECOVERBLOCK: - case DatanodeProtocol.DNA_ACCESSKEYUPDATE: case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE: LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction()); - return true; + break; default: LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 94da81c4210..67e088c719b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -255,6 +255,7 @@ public class DataNode extends Configured boolean isBlockTokenEnabled; BlockPoolTokenSecretManager blockPoolTokenSecretManager; + private boolean hasAnyBlockPoolRegistered = false; volatile DataBlockScanner blockScanner = null; private DirectoryScanner directoryScanner = null; @@ -719,10 +720,19 @@ public class DataNode extends Configured * @param blockPoolId * @throws IOException */ - private void registerBlockPoolWithSecretManager(DatanodeRegistration bpRegistration, - String blockPoolId) throws IOException { + private synchronized void registerBlockPoolWithSecretManager( + DatanodeRegistration bpRegistration, String blockPoolId) throws IOException { ExportedBlockKeys keys = bpRegistration.getExportedKeys(); - isBlockTokenEnabled = keys.isBlockTokenEnabled(); + if (!hasAnyBlockPoolRegistered) { + hasAnyBlockPoolRegistered = true; + isBlockTokenEnabled = keys.isBlockTokenEnabled(); + } else { + if (isBlockTokenEnabled != keys.isBlockTokenEnabled()) { + throw new RuntimeException("Inconsistent configuration of block access" + + " tokens. Either all block pools must be configured to use block" + + " tokens, or none may be."); + } + } // TODO should we check that all federated nns are either enabled or // disabled? if (!isBlockTokenEnabled) return; @@ -736,13 +746,9 @@ public class DataNode extends Configured + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000) + " min(s)"); final BlockTokenSecretManager secretMgr = - new BlockTokenSecretManager(false, 0, blockTokenLifetime); + new BlockTokenSecretManager(0, blockTokenLifetime); blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr); } - - blockPoolTokenSecretManager.setKeys(blockPoolId, - bpRegistration.getExportedKeys()); - bpRegistration.setExportedKeys(ExportedBlockKeys.DUMMY_KEYS); } /** @@ -2204,6 +2210,11 @@ public class DataNode extends Configured public DatanodeID getDatanodeId() { return id; } + + @VisibleForTesting + public void clearAllBlockSecretKeys() { + blockPoolTokenSecretManager.clearAllKeysForTesting(); + } /** * Get current value of the max balancer bandwidth in bytes per second. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 6c280d8767d..e9bf17fec49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -830,6 +830,10 @@ class DataXceiver extends Receiver implements Runnable { final Op op, final BlockTokenSecretManager.AccessMode mode) throws IOException { if (datanode.isBlockTokenEnabled) { + if (LOG.isDebugEnabled()) { + LOG.debug("Checking block access token for block '" + blk.getBlockId() + + "' with mode '" + mode + "'"); + } try { datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode); } catch(InvalidToken e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java index 20302217c12..3a2e542e0d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSClientAdapter.java @@ -27,6 +27,10 @@ public class DFSClientAdapter { return dfs.dfs; } + public static void setDFSClient(DistributedFileSystem dfs, DFSClient client) { + dfs.dfs = client; + } + public static void stopLeaseRenewer(DistributedFileSystem dfs) throws IOException { try { dfs.dfs.leaserenewer.interruptAndJoin(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java index 0c056e933df..d0ddea01ff8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java @@ -285,8 +285,7 @@ public class TestLeaseRecovery2 { LocatedBlocks locatedBlocks; do { Thread.sleep(SHORT_LEASE_PERIOD); - locatedBlocks = DFSClient.callGetBlockLocations(dfs.dfs.namenode, - filestr, 0L, size); + locatedBlocks = dfs.dfs.getLocatedBlocks(filestr, 0L, size); } while (locatedBlocks.isUnderConstruction()); assertEquals(size, locatedBlocks.getFileLength()); @@ -498,8 +497,7 @@ public class TestLeaseRecovery2 { LocatedBlocks locatedBlocks; do { Thread.sleep(SHORT_LEASE_PERIOD); - locatedBlocks = DFSClient.callGetBlockLocations(dfs.dfs.namenode, - fileStr, 0L, size); + locatedBlocks = dfs.dfs.getLocatedBlocks(fileStr, 0L, size); } while (locatedBlocks.isUnderConstruction()); assertEquals(size, locatedBlocks.getFileLength()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java index bf2c33815bf..34676cc89b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java @@ -160,8 +160,8 @@ public class TestBlockToken { @Test public void testWritable() throws Exception { TestWritable.testWritable(new BlockTokenIdentifier()); - BlockTokenSecretManager sm = new BlockTokenSecretManager(true, - blockKeyUpdateInterval, blockTokenLifetime); + BlockTokenSecretManager sm = new BlockTokenSecretManager( + blockKeyUpdateInterval, blockTokenLifetime, 0); TestWritable.testWritable(generateTokenId(sm, block1, EnumSet.allOf(BlockTokenSecretManager.AccessMode.class))); TestWritable.testWritable(generateTokenId(sm, block2, @@ -199,18 +199,18 @@ public class TestBlockToken { /** test block key and token handling */ @Test public void testBlockTokenSecretManager() throws Exception { - BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(true, - blockKeyUpdateInterval, blockTokenLifetime); - BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(false, + BlockTokenSecretManager masterHandler = new BlockTokenSecretManager( + blockKeyUpdateInterval, blockTokenLifetime, 0); + BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager( blockKeyUpdateInterval, blockTokenLifetime); ExportedBlockKeys keys = masterHandler.exportKeys(); - slaveHandler.setKeys(keys); + slaveHandler.addKeys(keys); tokenGenerationAndVerification(masterHandler, slaveHandler); // key updating masterHandler.updateKeys(); tokenGenerationAndVerification(masterHandler, slaveHandler); keys = masterHandler.exportKeys(); - slaveHandler.setKeys(keys); + slaveHandler.addKeys(keys); tokenGenerationAndVerification(masterHandler, slaveHandler); } @@ -236,8 +236,8 @@ public class TestBlockToken { @Test public void testBlockTokenRpc() throws Exception { - BlockTokenSecretManager sm = new BlockTokenSecretManager(true, - blockKeyUpdateInterval, blockTokenLifetime); + BlockTokenSecretManager sm = new BlockTokenSecretManager( + blockKeyUpdateInterval, blockTokenLifetime, 0); Token token = sm.generateToken(block3, EnumSet.allOf(BlockTokenSecretManager.AccessMode.class)); @@ -271,8 +271,8 @@ public class TestBlockToken { @Test public void testBlockTokenRpcLeak() throws Exception { Assume.assumeTrue(FD_DIR.exists()); - BlockTokenSecretManager sm = new BlockTokenSecretManager(true, - blockKeyUpdateInterval, blockTokenLifetime); + BlockTokenSecretManager sm = new BlockTokenSecretManager( + blockKeyUpdateInterval, blockTokenLifetime, 0); Token token = sm.generateToken(block3, EnumSet.allOf(BlockTokenSecretManager.AccessMode.class)); @@ -340,21 +340,21 @@ public class TestBlockToken { // Test BlockPoolSecretManager with upto 10 block pools for (int i = 0; i < 10; i++) { String bpid = Integer.toString(i); - BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(true, - blockKeyUpdateInterval, blockTokenLifetime); - BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(false, + BlockTokenSecretManager masterHandler = new BlockTokenSecretManager( + blockKeyUpdateInterval, blockTokenLifetime, 0); + BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager( blockKeyUpdateInterval, blockTokenLifetime); bpMgr.addBlockPool(bpid, slaveHandler); ExportedBlockKeys keys = masterHandler.exportKeys(); - bpMgr.setKeys(bpid, keys); + bpMgr.addKeys(bpid, keys); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid)); // Test key updating masterHandler.updateKeys(); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid)); keys = masterHandler.exportKeys(); - bpMgr.setKeys(bpid, keys); + bpMgr.addKeys(bpid, keys); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java new file mode 100644 index 00000000000..30133b01ec8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.URISyntaxException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSClientAdapter; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestFailoverWithBlockTokensEnabled { + + private static final Path TEST_PATH = new Path("/test-path"); + private static final String TEST_DATA = "very important text"; + + private Configuration conf; + private MiniDFSCluster cluster; + + @Before + public void startCluster() throws IOException { + conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(1) + .build(); + } + + @After + public void shutDownCluster() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void ensureSerialNumbersNeverOverlap() { + BlockTokenSecretManager btsm1 = cluster.getNamesystem(0).getBlockManager() + .getBlockTokenSecretManager(); + BlockTokenSecretManager btsm2 = cluster.getNamesystem(1).getBlockManager() + .getBlockTokenSecretManager(); + + btsm1.setSerialNo(0); + btsm2.setSerialNo(0); + assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); + + btsm1.setSerialNo(Integer.MAX_VALUE); + btsm2.setSerialNo(Integer.MAX_VALUE); + assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); + + btsm1.setSerialNo(Integer.MIN_VALUE); + btsm2.setSerialNo(Integer.MIN_VALUE); + assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); + + btsm1.setSerialNo(Integer.MAX_VALUE / 2); + btsm2.setSerialNo(Integer.MAX_VALUE / 2); + assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); + + btsm1.setSerialNo(Integer.MIN_VALUE / 2); + btsm2.setSerialNo(Integer.MIN_VALUE / 2); + assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); + } + + @Test + public void ensureInvalidBlockTokensAreRejected() throws IOException, + URISyntaxException { + cluster.transitionToActive(0); + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + + DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA); + assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH)); + + DFSClient dfsClient = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs); + DFSClient spyDfsClient = Mockito.spy(dfsClient); + Mockito.doAnswer( + new Answer() { + @Override + public LocatedBlocks answer(InvocationOnMock arg0) throws Throwable { + LocatedBlocks locatedBlocks = (LocatedBlocks)arg0.callRealMethod(); + for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { + Token token = lb.getBlockToken(); + BlockTokenIdentifier id = lb.getBlockToken().decodeIdentifier(); + // This will make the token invalid, since the password + // won't match anymore + id.setExpiryDate(System.currentTimeMillis() + 10); + Token newToken = + new Token(id.getBytes(), + token.getPassword(), token.getKind(), token.getService()); + lb.setBlockToken(newToken); + } + return locatedBlocks; + } + }).when(spyDfsClient).getLocatedBlocks(Mockito.anyString(), + Mockito.anyLong(), Mockito.anyLong()); + DFSClientAdapter.setDFSClient((DistributedFileSystem)fs, spyDfsClient); + + try { + assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH)); + fail("Shouldn't have been able to read a file with invalid block tokens"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("Could not obtain block", ioe); + } + } + + @Test + public void testFailoverAfterRegistration() throws IOException, + URISyntaxException { + writeUsingBothNameNodes(); + } + + @Test + public void TestFailoverAfterAccessKeyUpdate() throws IOException, + URISyntaxException, InterruptedException { + lowerKeyUpdateIntervalAndClearKeys(cluster); + // Sleep 10s to guarantee DNs heartbeat and get new keys. + Thread.sleep(10 * 1000); + writeUsingBothNameNodes(); + } + + private void writeUsingBothNameNodes() throws ServiceFailedException, + IOException, URISyntaxException { + cluster.transitionToActive(0); + + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA); + + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + fs.delete(TEST_PATH, false); + DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA); + } + + private static void lowerKeyUpdateIntervalAndClearKeys(MiniDFSCluster cluster) { + lowerKeyUpdateIntervalAndClearKeys(cluster.getNamesystem(0)); + lowerKeyUpdateIntervalAndClearKeys(cluster.getNamesystem(1)); + for (DataNode dn : cluster.getDataNodes()) { + dn.clearAllBlockSecretKeys(); + } + } + + private static void lowerKeyUpdateIntervalAndClearKeys(FSNamesystem namesystem) { + BlockTokenSecretManager btsm = namesystem.getBlockManager() + .getBlockTokenSecretManager(); + btsm.setKeyUpdateIntervalForTesting(2 * 1000); + btsm.setTokenLifetime(2 * 1000); + btsm.clearAllKeysForTesting(); + } + +}