Fix issue with NN/DN re-registration.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1358348 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-07-06 19:00:06 +00:00
parent 2e6f49f4e4
commit fbb1760459
14 changed files with 390 additions and 75 deletions

View File

@ -140,6 +140,7 @@ import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses; import com.google.common.net.InetAddresses;
@ -871,6 +872,16 @@ public class DFSClient implements java.io.Closeable {
public short getDefaultReplication() { public short getDefaultReplication() {
return dfsClientConf.defaultReplication; return dfsClientConf.defaultReplication;
} }
/*
* This is just a wrapper around callGetBlockLocations, but non-static so that
* we can stub it out for tests.
*/
@VisibleForTesting
public LocatedBlocks getLocatedBlocks(String src, long start, long length)
throws IOException {
return callGetBlockLocations(namenode, src, start, length);
}
/** /**
* @see ClientProtocol#getBlockLocations(String, long, long) * @see ClientProtocol#getBlockLocations(String, long, long)
@ -918,7 +929,7 @@ public class DFSClient implements java.io.Closeable {
*/ */
public BlockLocation[] getBlockLocations(String src, long start, public BlockLocation[] getBlockLocations(String src, long start,
long length) throws IOException, UnresolvedLinkException { long length) throws IOException, UnresolvedLinkException {
LocatedBlocks blocks = callGetBlockLocations(namenode, src, start, length); LocatedBlocks blocks = getLocatedBlocks(src, start, length);
return DFSUtil.locatedBlocks2Locations(blocks); return DFSUtil.locatedBlocks2Locations(blocks);
} }

View File

@ -151,7 +151,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
} }
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException { private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
LocatedBlocks newInfo = DFSClient.callGetBlockLocations(dfsClient.namenode, src, 0, prefetchSize); LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0, prefetchSize);
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("newInfo = " + newInfo); DFSClient.LOG.debug("newInfo = " + newInfo);
} }
@ -298,7 +298,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
// fetch more blocks // fetch more blocks
LocatedBlocks newBlocks; LocatedBlocks newBlocks;
newBlocks = DFSClient.callGetBlockLocations(dfsClient.namenode, src, offset, prefetchSize); newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize);
assert (newBlocks != null) : "Could not find target position " + offset; assert (newBlocks != null) : "Could not find target position " + offset;
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
} }
@ -322,7 +322,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
} }
// fetch blocks // fetch blocks
LocatedBlocks newBlocks; LocatedBlocks newBlocks;
newBlocks = DFSClient.callGetBlockLocations(dfsClient.namenode, src, offset, prefetchSize); newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize);
if (newBlocks == null) { if (newBlocks == null) {
throw new IOException("Could not find target position " + offset); throw new IOException("Could not find target position " + offset);
} }
@ -391,7 +391,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
blk = locatedBlocks.get(blockIdx); blk = locatedBlocks.get(blockIdx);
if (blk == null || curOff < blk.getStartOffset()) { if (blk == null || curOff < blk.getStartOffset()) {
LocatedBlocks newBlocks; LocatedBlocks newBlocks;
newBlocks = DFSClient.callGetBlockLocations(dfsClient.namenode, src, curOff, remaining); newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks()); locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
continue; continue;
} }

View File

@ -127,25 +127,21 @@ public class HAUtil {
} }
return null; return null;
} }
/** /**
* Given the configuration for this node, return a Configuration object for * Get the NN ID of the other node in an HA setup.
* the other node in an HA setup.
* *
* @param myConf the configuration of this node * @param conf the configuration of this node
* @return the configuration of the other node in an HA setup * @return the NN ID of the other node in this nameservice
*/ */
public static Configuration getConfForOtherNode( public static String getNameNodeIdOfOtherNode(Configuration conf, String nsId) {
Configuration myConf) {
String nsId = DFSUtil.getNamenodeNameServiceId(myConf);
Preconditions.checkArgument(nsId != null, Preconditions.checkArgument(nsId != null,
"Could not determine namespace id. Please ensure that this " + "Could not determine namespace id. Please ensure that this " +
"machine is one of the machines listed as a NN RPC address, " + "machine is one of the machines listed as a NN RPC address, " +
"or configure " + DFSConfigKeys.DFS_NAMESERVICE_ID); "or configure " + DFSConfigKeys.DFS_NAMESERVICE_ID);
Collection<String> nnIds = DFSUtil.getNameNodeIds(myConf, nsId); Collection<String> nnIds = DFSUtil.getNameNodeIds(conf, nsId);
String myNNId = myConf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY); String myNNId = conf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY);
Preconditions.checkArgument(nnIds != null, Preconditions.checkArgument(nnIds != null,
"Could not determine namenode ids in namespace '%s'. " + "Could not determine namenode ids in namespace '%s'. " +
"Please configure " + "Please configure " +
@ -165,11 +161,25 @@ public class HAUtil {
ArrayList<String> nnSet = Lists.newArrayList(nnIds); ArrayList<String> nnSet = Lists.newArrayList(nnIds);
nnSet.remove(myNNId); nnSet.remove(myNNId);
assert nnSet.size() == 1; assert nnSet.size() == 1;
String activeNN = nnSet.get(0); return nnSet.get(0);
}
/**
* Given the configuration for this node, return a Configuration object for
* the other node in an HA setup.
*
* @param myConf the configuration of this node
* @return the configuration of the other node in an HA setup
*/
public static Configuration getConfForOtherNode(
Configuration myConf) {
String nsId = DFSUtil.getNamenodeNameServiceId(myConf);
String otherNn = getNameNodeIdOfOtherNode(myConf, nsId);
// Look up the address of the active NN. // Look up the address of the active NN.
Configuration confForOtherNode = new Configuration(myConf); Configuration confForOtherNode = new Configuration(myConf);
NameNode.initializeGenericKeys(confForOtherNode, nsId, activeNN); NameNode.initializeGenericKeys(confForOtherNode, nsId, otherNn);
return confForOtherNode; return confForOtherNode;
} }

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.Acces
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Manages a {@link BlockTokenSecretManager} per block pool. Routes the requests * Manages a {@link BlockTokenSecretManager} per block pool. Routes the requests
* given a block pool Id to corresponding {@link BlockTokenSecretManager} * given a block pool Id to corresponding {@link BlockTokenSecretManager}
@ -96,11 +98,11 @@ public class BlockPoolTokenSecretManager extends
} }
/** /**
* See {@link BlockTokenSecretManager#setKeys(ExportedBlockKeys)} * See {@link BlockTokenSecretManager#addKeys(ExportedBlockKeys)}
*/ */
public void setKeys(String bpid, ExportedBlockKeys exportedKeys) public void addKeys(String bpid, ExportedBlockKeys exportedKeys)
throws IOException { throws IOException {
get(bpid).setKeys(exportedKeys); get(bpid).addKeys(exportedKeys);
} }
/** /**
@ -110,4 +112,11 @@ public class BlockPoolTokenSecretManager extends
EnumSet<AccessMode> of) throws IOException { EnumSet<AccessMode> of) throws IOException {
return get(b.getBlockPoolId()).generateToken(b, of); return get(b.getBlockPoolId()).generateToken(b, of);
} }
@VisibleForTesting
public void clearAllKeysForTesting() {
for (BlockTokenSecretManager btsm : map.values()) {
btsm.clearAllKeysForTesting();
}
}
} }

View File

@ -37,6 +37,9 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/** /**
* BlockTokenSecretManager can be instantiated in 2 modes, master mode and slave * BlockTokenSecretManager can be instantiated in 2 modes, master mode and slave
* mode. Master can generate new block keys and export block keys to slaves, * mode. Master can generate new block keys and export block keys to slaves,
@ -49,17 +52,24 @@ public class BlockTokenSecretManager extends
SecretManager<BlockTokenIdentifier> { SecretManager<BlockTokenIdentifier> {
public static final Log LOG = LogFactory public static final Log LOG = LogFactory
.getLog(BlockTokenSecretManager.class); .getLog(BlockTokenSecretManager.class);
// We use these in an HA setup to ensure that the pair of NNs produce block
// token serial numbers that are in different ranges.
private static final int LOW_MASK = ~(1 << 31);
public static final Token<BlockTokenIdentifier> DUMMY_TOKEN = new Token<BlockTokenIdentifier>(); public static final Token<BlockTokenIdentifier> DUMMY_TOKEN = new Token<BlockTokenIdentifier>();
private final boolean isMaster; private final boolean isMaster;
private int nnIndex;
/** /**
* keyUpdateInterval is the interval that NN updates its block keys. It should * keyUpdateInterval is the interval that NN updates its block keys. It should
* be set long enough so that all live DN's and Balancer should have sync'ed * be set long enough so that all live DN's and Balancer should have sync'ed
* their block keys with NN at least once during each interval. * their block keys with NN at least once during each interval.
*/ */
private final long keyUpdateInterval; private long keyUpdateInterval;
private volatile long tokenLifetime; private volatile long tokenLifetime;
private int serialNo = new SecureRandom().nextInt(); private int serialNo;
private BlockKey currentKey; private BlockKey currentKey;
private BlockKey nextKey; private BlockKey nextKey;
private Map<Integer, BlockKey> allKeys; private Map<Integer, BlockKey> allKeys;
@ -67,22 +77,47 @@ public class BlockTokenSecretManager extends
public static enum AccessMode { public static enum AccessMode {
READ, WRITE, COPY, REPLACE READ, WRITE, COPY, REPLACE
}; };
/** /**
* Constructor * Constructor for slaves.
* *
* @param isMaster * @param keyUpdateInterval how often a new key will be generated
* @param keyUpdateInterval * @param tokenLifetime how long an individual token is valid
* @param tokenLifetime
* @throws IOException
*/ */
public BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval, public BlockTokenSecretManager(long keyUpdateInterval,
long tokenLifetime) throws IOException { long tokenLifetime) {
this(false, keyUpdateInterval, tokenLifetime);
}
/**
* Constructor for masters.
*
* @param keyUpdateInterval how often a new key will be generated
* @param tokenLifetime how long an individual token is valid
* @param isHaEnabled whether or not HA is enabled
* @param thisNnId the NN ID of this NN in an HA setup
* @param otherNnId the NN ID of the other NN in an HA setup
*/
public BlockTokenSecretManager(long keyUpdateInterval,
long tokenLifetime, int nnIndex) {
this(true, keyUpdateInterval, tokenLifetime);
Preconditions.checkArgument(nnIndex == 0 || nnIndex == 1);
this.nnIndex = nnIndex;
setSerialNo(new SecureRandom().nextInt());
generateKeys();
}
private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
long tokenLifetime) {
this.isMaster = isMaster; this.isMaster = isMaster;
this.keyUpdateInterval = keyUpdateInterval; this.keyUpdateInterval = keyUpdateInterval;
this.tokenLifetime = tokenLifetime; this.tokenLifetime = tokenLifetime;
this.allKeys = new HashMap<Integer, BlockKey>(); this.allKeys = new HashMap<Integer, BlockKey>();
generateKeys(); }
@VisibleForTesting
public void setSerialNo(int serialNo) {
this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31);
} }
/** Initialize block keys */ /** Initialize block keys */
@ -101,10 +136,10 @@ public class BlockTokenSecretManager extends
* Similarly, the estimated expiry date for nextKey is one keyUpdateInterval * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval
* more. * more.
*/ */
serialNo++; setSerialNo(serialNo + 1);
currentKey = new BlockKey(serialNo, System.currentTimeMillis() + 2 currentKey = new BlockKey(serialNo, System.currentTimeMillis() + 2
* keyUpdateInterval + tokenLifetime, generateSecret()); * keyUpdateInterval + tokenLifetime, generateSecret());
serialNo++; setSerialNo(serialNo + 1);
nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3 nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
* keyUpdateInterval + tokenLifetime, generateSecret()); * keyUpdateInterval + tokenLifetime, generateSecret());
allKeys.put(currentKey.getKeyId(), currentKey); allKeys.put(currentKey.getKeyId(), currentKey);
@ -135,7 +170,7 @@ public class BlockTokenSecretManager extends
/** /**
* Set block keys, only to be used in slave mode * Set block keys, only to be used in slave mode
*/ */
public synchronized void setKeys(ExportedBlockKeys exportedKeys) public synchronized void addKeys(ExportedBlockKeys exportedKeys)
throws IOException { throws IOException {
if (isMaster || exportedKeys == null) if (isMaster || exportedKeys == null)
return; return;
@ -179,7 +214,7 @@ public class BlockTokenSecretManager extends
+ 2 * keyUpdateInterval + tokenLifetime, nextKey.getKey()); + 2 * keyUpdateInterval + tokenLifetime, nextKey.getKey());
allKeys.put(currentKey.getKeyId(), currentKey); allKeys.put(currentKey.getKeyId(), currentKey);
// generate a new nextKey // generate a new nextKey
serialNo++; setSerialNo(serialNo + 1);
nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3 nextKey = new BlockKey(serialNo, System.currentTimeMillis() + 3
* keyUpdateInterval + tokenLifetime, generateSecret()); * keyUpdateInterval + tokenLifetime, generateSecret());
allKeys.put(nextKey.getKeyId(), nextKey); allKeys.put(nextKey.getKeyId(), nextKey);
@ -334,4 +369,20 @@ public class BlockTokenSecretManager extends
} }
return createPassword(identifier.getBytes(), key.getKey()); return createPassword(identifier.getBytes(), key.getKey());
} }
@VisibleForTesting
public void setKeyUpdateIntervalForTesting(long millis) {
this.keyUpdateInterval = millis;
}
@VisibleForTesting
public void clearAllKeysForTesting() {
allKeys.clear();
}
@VisibleForTesting
public int getSerialNoForTesting() {
return serialNo;
}
} }

View File

@ -88,9 +88,9 @@ class NameNodeConnector {
LOG.info("Block token params received from NN: keyUpdateInterval=" LOG.info("Block token params received from NN: keyUpdateInterval="
+ blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime=" + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+ blockTokenLifetime / (60 * 1000) + " min(s)"); + blockTokenLifetime / (60 * 1000) + " min(s)");
this.blockTokenSecretManager = new BlockTokenSecretManager(false, this.blockTokenSecretManager = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime); blockKeyUpdateInterval, blockTokenLifetime);
this.blockTokenSecretManager.setKeys(keys); this.blockTokenSecretManager.addKeys(keys);
/* /*
* Balancer should sync its block keys with NN more frequently than NN * Balancer should sync its block keys with NN more frequently than NN
* updates its block keys * updates its block keys
@ -193,7 +193,7 @@ class NameNodeConnector {
try { try {
while (shouldRun) { while (shouldRun) {
try { try {
blockTokenSecretManager.setKeys(namenode.getBlockKeys()); blockTokenSecretManager.addKeys(namenode.getBlockKeys());
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to set keys", e); LOG.error("Failed to set keys", e);
} }

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
@ -302,12 +303,24 @@ public class BlockManager {
+ "=" + updateMin + " min(s), " + "=" + updateMin + " min(s), "
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY
+ "=" + lifetimeMin + " min(s)"); + "=" + lifetimeMin + " min(s)");
return new BlockTokenSecretManager(true,
updateMin*60*1000L, lifetimeMin*60*1000L); String nsId = DFSUtil.getNamenodeNameServiceId(conf);
boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
if (isHaEnabled) {
String thisNnId = HAUtil.getNameNodeId(conf, nsId);
String otherNnId = HAUtil.getNameNodeIdOfOtherNode(conf, nsId);
return new BlockTokenSecretManager(updateMin*60*1000L,
lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1);
} else {
return new BlockTokenSecretManager(updateMin*60*1000L,
lifetimeMin*60*1000L, 0);
}
} }
/** get the BlockTokenSecretManager */ /** get the BlockTokenSecretManager */
BlockTokenSecretManager getBlockTokenSecretManager() { @VisibleForTesting
public BlockTokenSecretManager getBlockTokenSecretManager() {
return blockTokenSecretManager; return blockTokenSecretManager;
} }

View File

@ -334,6 +334,11 @@ class BPOfferService {
} }
dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
// Add the initial block token secret keys to the DN's secret manager.
if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
reg.getExportedKeys());
}
} }
/** /**
@ -598,7 +603,7 @@ class BPOfferService {
case DatanodeProtocol.DNA_ACCESSKEYUPDATE: case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE"); LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
if (dn.isBlockTokenEnabled) { if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.setKeys( dn.blockPoolTokenSecretManager.addKeys(
getBlockPoolId(), getBlockPoolId(),
((KeyUpdateCommand) cmd).getExportedKeys()); ((KeyUpdateCommand) cmd).getExportedKeys());
} }
@ -626,17 +631,24 @@ class BPOfferService {
switch(cmd.getAction()) { switch(cmd.getAction()) {
case DatanodeProtocol.DNA_REGISTER: case DatanodeProtocol.DNA_REGISTER:
// namenode requested a registration - at start or if NN lost contact // namenode requested a registration - at start or if NN lost contact
LOG.info("DatanodeCommand action: DNA_REGISTER"); LOG.info("DatanodeCommand action from standby: DNA_REGISTER");
actor.reRegister(); actor.reRegister();
return true; break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action from standby: DNA_ACCESSKEYUPDATE");
if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.addKeys(
getBlockPoolId(),
((KeyUpdateCommand) cmd).getExportedKeys());
}
break;
case DatanodeProtocol.DNA_TRANSFER: case DatanodeProtocol.DNA_TRANSFER:
case DatanodeProtocol.DNA_INVALIDATE: case DatanodeProtocol.DNA_INVALIDATE:
case DatanodeProtocol.DNA_SHUTDOWN: case DatanodeProtocol.DNA_SHUTDOWN:
case DatanodeProtocol.DNA_RECOVERBLOCK: case DatanodeProtocol.DNA_RECOVERBLOCK:
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE: case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction()); LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
return true; break;
default: default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
} }

View File

@ -255,6 +255,7 @@ public class DataNode extends Configured
boolean isBlockTokenEnabled; boolean isBlockTokenEnabled;
BlockPoolTokenSecretManager blockPoolTokenSecretManager; BlockPoolTokenSecretManager blockPoolTokenSecretManager;
private boolean hasAnyBlockPoolRegistered = false;
volatile DataBlockScanner blockScanner = null; volatile DataBlockScanner blockScanner = null;
private DirectoryScanner directoryScanner = null; private DirectoryScanner directoryScanner = null;
@ -719,10 +720,19 @@ public class DataNode extends Configured
* @param blockPoolId * @param blockPoolId
* @throws IOException * @throws IOException
*/ */
private void registerBlockPoolWithSecretManager(DatanodeRegistration bpRegistration, private synchronized void registerBlockPoolWithSecretManager(
String blockPoolId) throws IOException { DatanodeRegistration bpRegistration, String blockPoolId) throws IOException {
ExportedBlockKeys keys = bpRegistration.getExportedKeys(); ExportedBlockKeys keys = bpRegistration.getExportedKeys();
isBlockTokenEnabled = keys.isBlockTokenEnabled(); if (!hasAnyBlockPoolRegistered) {
hasAnyBlockPoolRegistered = true;
isBlockTokenEnabled = keys.isBlockTokenEnabled();
} else {
if (isBlockTokenEnabled != keys.isBlockTokenEnabled()) {
throw new RuntimeException("Inconsistent configuration of block access"
+ " tokens. Either all block pools must be configured to use block"
+ " tokens, or none may be.");
}
}
// TODO should we check that all federated nns are either enabled or // TODO should we check that all federated nns are either enabled or
// disabled? // disabled?
if (!isBlockTokenEnabled) return; if (!isBlockTokenEnabled) return;
@ -736,13 +746,9 @@ public class DataNode extends Configured
+ " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000) + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)"); + " min(s)");
final BlockTokenSecretManager secretMgr = final BlockTokenSecretManager secretMgr =
new BlockTokenSecretManager(false, 0, blockTokenLifetime); new BlockTokenSecretManager(0, blockTokenLifetime);
blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr); blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
} }
blockPoolTokenSecretManager.setKeys(blockPoolId,
bpRegistration.getExportedKeys());
bpRegistration.setExportedKeys(ExportedBlockKeys.DUMMY_KEYS);
} }
/** /**
@ -2204,6 +2210,11 @@ public class DataNode extends Configured
public DatanodeID getDatanodeId() { public DatanodeID getDatanodeId() {
return id; return id;
} }
@VisibleForTesting
public void clearAllBlockSecretKeys() {
blockPoolTokenSecretManager.clearAllKeysForTesting();
}
/** /**
* Get current value of the max balancer bandwidth in bytes per second. * Get current value of the max balancer bandwidth in bytes per second.

View File

@ -830,6 +830,10 @@ class DataXceiver extends Receiver implements Runnable {
final Op op, final Op op,
final BlockTokenSecretManager.AccessMode mode) throws IOException { final BlockTokenSecretManager.AccessMode mode) throws IOException {
if (datanode.isBlockTokenEnabled) { if (datanode.isBlockTokenEnabled) {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking block access token for block '" + blk.getBlockId()
+ "' with mode '" + mode + "'");
}
try { try {
datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode); datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
} catch(InvalidToken e) { } catch(InvalidToken e) {

View File

@ -27,6 +27,10 @@ public class DFSClientAdapter {
return dfs.dfs; return dfs.dfs;
} }
public static void setDFSClient(DistributedFileSystem dfs, DFSClient client) {
dfs.dfs = client;
}
public static void stopLeaseRenewer(DistributedFileSystem dfs) throws IOException { public static void stopLeaseRenewer(DistributedFileSystem dfs) throws IOException {
try { try {
dfs.dfs.leaserenewer.interruptAndJoin(); dfs.dfs.leaserenewer.interruptAndJoin();

View File

@ -285,8 +285,7 @@ public class TestLeaseRecovery2 {
LocatedBlocks locatedBlocks; LocatedBlocks locatedBlocks;
do { do {
Thread.sleep(SHORT_LEASE_PERIOD); Thread.sleep(SHORT_LEASE_PERIOD);
locatedBlocks = DFSClient.callGetBlockLocations(dfs.dfs.namenode, locatedBlocks = dfs.dfs.getLocatedBlocks(filestr, 0L, size);
filestr, 0L, size);
} while (locatedBlocks.isUnderConstruction()); } while (locatedBlocks.isUnderConstruction());
assertEquals(size, locatedBlocks.getFileLength()); assertEquals(size, locatedBlocks.getFileLength());
@ -498,8 +497,7 @@ public class TestLeaseRecovery2 {
LocatedBlocks locatedBlocks; LocatedBlocks locatedBlocks;
do { do {
Thread.sleep(SHORT_LEASE_PERIOD); Thread.sleep(SHORT_LEASE_PERIOD);
locatedBlocks = DFSClient.callGetBlockLocations(dfs.dfs.namenode, locatedBlocks = dfs.dfs.getLocatedBlocks(fileStr, 0L, size);
fileStr, 0L, size);
} while (locatedBlocks.isUnderConstruction()); } while (locatedBlocks.isUnderConstruction());
assertEquals(size, locatedBlocks.getFileLength()); assertEquals(size, locatedBlocks.getFileLength());

View File

@ -160,8 +160,8 @@ public class TestBlockToken {
@Test @Test
public void testWritable() throws Exception { public void testWritable() throws Exception {
TestWritable.testWritable(new BlockTokenIdentifier()); TestWritable.testWritable(new BlockTokenIdentifier());
BlockTokenSecretManager sm = new BlockTokenSecretManager(true, BlockTokenSecretManager sm = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime); blockKeyUpdateInterval, blockTokenLifetime, 0);
TestWritable.testWritable(generateTokenId(sm, block1, TestWritable.testWritable(generateTokenId(sm, block1,
EnumSet.allOf(BlockTokenSecretManager.AccessMode.class))); EnumSet.allOf(BlockTokenSecretManager.AccessMode.class)));
TestWritable.testWritable(generateTokenId(sm, block2, TestWritable.testWritable(generateTokenId(sm, block2,
@ -199,18 +199,18 @@ public class TestBlockToken {
/** test block key and token handling */ /** test block key and token handling */
@Test @Test
public void testBlockTokenSecretManager() throws Exception { public void testBlockTokenSecretManager() throws Exception {
BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(true, BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime); blockKeyUpdateInterval, blockTokenLifetime, 0);
BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(false, BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime); blockKeyUpdateInterval, blockTokenLifetime);
ExportedBlockKeys keys = masterHandler.exportKeys(); ExportedBlockKeys keys = masterHandler.exportKeys();
slaveHandler.setKeys(keys); slaveHandler.addKeys(keys);
tokenGenerationAndVerification(masterHandler, slaveHandler); tokenGenerationAndVerification(masterHandler, slaveHandler);
// key updating // key updating
masterHandler.updateKeys(); masterHandler.updateKeys();
tokenGenerationAndVerification(masterHandler, slaveHandler); tokenGenerationAndVerification(masterHandler, slaveHandler);
keys = masterHandler.exportKeys(); keys = masterHandler.exportKeys();
slaveHandler.setKeys(keys); slaveHandler.addKeys(keys);
tokenGenerationAndVerification(masterHandler, slaveHandler); tokenGenerationAndVerification(masterHandler, slaveHandler);
} }
@ -236,8 +236,8 @@ public class TestBlockToken {
@Test @Test
public void testBlockTokenRpc() throws Exception { public void testBlockTokenRpc() throws Exception {
BlockTokenSecretManager sm = new BlockTokenSecretManager(true, BlockTokenSecretManager sm = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime); blockKeyUpdateInterval, blockTokenLifetime, 0);
Token<BlockTokenIdentifier> token = sm.generateToken(block3, Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenSecretManager.AccessMode.class)); EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
@ -271,8 +271,8 @@ public class TestBlockToken {
@Test @Test
public void testBlockTokenRpcLeak() throws Exception { public void testBlockTokenRpcLeak() throws Exception {
Assume.assumeTrue(FD_DIR.exists()); Assume.assumeTrue(FD_DIR.exists());
BlockTokenSecretManager sm = new BlockTokenSecretManager(true, BlockTokenSecretManager sm = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime); blockKeyUpdateInterval, blockTokenLifetime, 0);
Token<BlockTokenIdentifier> token = sm.generateToken(block3, Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenSecretManager.AccessMode.class)); EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
@ -340,21 +340,21 @@ public class TestBlockToken {
// Test BlockPoolSecretManager with upto 10 block pools // Test BlockPoolSecretManager with upto 10 block pools
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
String bpid = Integer.toString(i); String bpid = Integer.toString(i);
BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(true, BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime); blockKeyUpdateInterval, blockTokenLifetime, 0);
BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(false, BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime); blockKeyUpdateInterval, blockTokenLifetime);
bpMgr.addBlockPool(bpid, slaveHandler); bpMgr.addBlockPool(bpid, slaveHandler);
ExportedBlockKeys keys = masterHandler.exportKeys(); ExportedBlockKeys keys = masterHandler.exportKeys();
bpMgr.setKeys(bpid, keys); bpMgr.addKeys(bpid, keys);
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid)); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
// Test key updating // Test key updating
masterHandler.updateKeys(); masterHandler.updateKeys();
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid)); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
keys = masterHandler.exportKeys(); keys = masterHandler.exportKeys();
bpMgr.setKeys(bpid, keys); bpMgr.addKeys(bpid, keys);
tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid)); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid));
} }
} }

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClientAdapter;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestFailoverWithBlockTokensEnabled {
private static final Path TEST_PATH = new Path("/test-path");
private static final String TEST_DATA = "very important text";
private Configuration conf;
private MiniDFSCluster cluster;
@Before
public void startCluster() throws IOException {
conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(1)
.build();
}
@After
public void shutDownCluster() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void ensureSerialNumbersNeverOverlap() {
BlockTokenSecretManager btsm1 = cluster.getNamesystem(0).getBlockManager()
.getBlockTokenSecretManager();
BlockTokenSecretManager btsm2 = cluster.getNamesystem(1).getBlockManager()
.getBlockTokenSecretManager();
btsm1.setSerialNo(0);
btsm2.setSerialNo(0);
assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
btsm1.setSerialNo(Integer.MAX_VALUE);
btsm2.setSerialNo(Integer.MAX_VALUE);
assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
btsm1.setSerialNo(Integer.MIN_VALUE);
btsm2.setSerialNo(Integer.MIN_VALUE);
assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
btsm1.setSerialNo(Integer.MAX_VALUE / 2);
btsm2.setSerialNo(Integer.MAX_VALUE / 2);
assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
btsm1.setSerialNo(Integer.MIN_VALUE / 2);
btsm2.setSerialNo(Integer.MIN_VALUE / 2);
assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
}
@Test
public void ensureInvalidBlockTokensAreRejected() throws IOException,
URISyntaxException {
cluster.transitionToActive(0);
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA);
assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH));
DFSClient dfsClient = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs);
DFSClient spyDfsClient = Mockito.spy(dfsClient);
Mockito.doAnswer(
new Answer<LocatedBlocks>() {
@Override
public LocatedBlocks answer(InvocationOnMock arg0) throws Throwable {
LocatedBlocks locatedBlocks = (LocatedBlocks)arg0.callRealMethod();
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
Token<BlockTokenIdentifier> token = lb.getBlockToken();
BlockTokenIdentifier id = lb.getBlockToken().decodeIdentifier();
// This will make the token invalid, since the password
// won't match anymore
id.setExpiryDate(System.currentTimeMillis() + 10);
Token<BlockTokenIdentifier> newToken =
new Token<BlockTokenIdentifier>(id.getBytes(),
token.getPassword(), token.getKind(), token.getService());
lb.setBlockToken(newToken);
}
return locatedBlocks;
}
}).when(spyDfsClient).getLocatedBlocks(Mockito.anyString(),
Mockito.anyLong(), Mockito.anyLong());
DFSClientAdapter.setDFSClient((DistributedFileSystem)fs, spyDfsClient);
try {
assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH));
fail("Shouldn't have been able to read a file with invalid block tokens");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("Could not obtain block", ioe);
}
}
@Test
public void testFailoverAfterRegistration() throws IOException,
URISyntaxException {
writeUsingBothNameNodes();
}
@Test
public void TestFailoverAfterAccessKeyUpdate() throws IOException,
URISyntaxException, InterruptedException {
lowerKeyUpdateIntervalAndClearKeys(cluster);
// Sleep 10s to guarantee DNs heartbeat and get new keys.
Thread.sleep(10 * 1000);
writeUsingBothNameNodes();
}
private void writeUsingBothNameNodes() throws ServiceFailedException,
IOException, URISyntaxException {
cluster.transitionToActive(0);
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
fs.delete(TEST_PATH, false);
DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA);
}
private static void lowerKeyUpdateIntervalAndClearKeys(MiniDFSCluster cluster) {
lowerKeyUpdateIntervalAndClearKeys(cluster.getNamesystem(0));
lowerKeyUpdateIntervalAndClearKeys(cluster.getNamesystem(1));
for (DataNode dn : cluster.getDataNodes()) {
dn.clearAllBlockSecretKeys();
}
}
private static void lowerKeyUpdateIntervalAndClearKeys(FSNamesystem namesystem) {
BlockTokenSecretManager btsm = namesystem.getBlockManager()
.getBlockTokenSecretManager();
btsm.setKeyUpdateIntervalForTesting(2 * 1000);
btsm.setTokenLifetime(2 * 1000);
btsm.clearAllKeysForTesting();
}
}